Move stereo plugin from -bad
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel ParĂ­s <mparisdiaz@gmail.com>
10  *  Copyright 2016 Pexip AS
11  *   @author: Havard Graff <havard@pexip.com>
12  *   @author: Stian Selnes <stian@pexip.com>
13  *
14  * This library is free software; you can redistribute it and/or
15  * modify it under the terms of the GNU Library General Public
16  * License as published by the Free Software Foundation; either
17  * version 2 of the License, or (at your option) any later version.
18  *
19  * This library is distributed in the hope that it will be useful,
20  * but WITHOUT ANY WARRANTY; without even the implied warranty of
21  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
22  * Library General Public License for more details.
23  *
24  * You should have received a copy of the GNU Library General Public
25  * License along with this library; if not, write to the
26  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
27  * Boston, MA 02110-1301, USA.
28  *
29  */
30
31 /**
32  * SECTION:element-rtpjitterbuffer
33  *
34  * This element reorders and removes duplicate RTP packets as they are received
35  * from a network source.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * The rtpjitterbuffer will wait for missing packets up to a configurable time
43  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
44  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
45  * property is set, lost packets will result in a custom serialized downstream
46  * event of name GstRTPPacketLost. The lost packet events are usually used by a
47  * depayloader or other element to create concealment data or some other logic
48  * to gracefully handle the missing packets.
49  *
50  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
51  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
52  * buffer.
53  *
54  * The jitterbuffer can also be configured to send early retransmission events
55  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
56  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
57  * sends a custom upstream event named GstRTPRetransmissionRequest when the
58  * packet is considered late. The initial expected packet arrival time is
59  * calculated as follows:
60  *
61  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
62  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
63  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
64  *     packets with different rtptime.
65  *
66  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
67  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
68  *     previously scheduled timeout is overwritten.
69  *
70  * - If seqnum N arrived, all seqnum older than
71  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
72  *     immediately. This is to request fast feedback for abonormally reorder
73  *     packets before any of the previous timeouts is triggered.
74  *
75  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
76  * event. After the initial timeout expires and the retransmission event is
77  * sent, the timeout is scheduled for
78  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
79  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
80  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
81  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
82  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
83  * retransmission requests are sent and the regular logic is performed to
84  * schedule a lost packet as discussed above.
85  *
86  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
87  * to the pipeline.
88  *
89  * This element will automatically be used inside rtpbin.
90  *
91  * <refsect2>
92  * <title>Example pipelines</title>
93  * |[
94  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
95  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
96  * inserted into the pipeline to smooth out network jitter and to reorder the
97  * out-of-order RTP packets.
98  * </refsect2>
99  */
100
101 #ifdef HAVE_CONFIG_H
102 #include "config.h"
103 #endif
104
105 #include <stdlib.h>
106 #include <stdio.h>
107 #include <string.h>
108 #include <gst/rtp/gstrtpbuffer.h>
109 #include <gst/net/net.h>
110
111 #include "gstrtpjitterbuffer.h"
112 #include "rtpjitterbuffer.h"
113 #include "rtpstats.h"
114
115 #include <gst/glib-compat-private.h>
116
117 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
118 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
119
120 /* RTPJitterBuffer signals and args */
121 enum
122 {
123   SIGNAL_REQUEST_PT_MAP,
124   SIGNAL_CLEAR_PT_MAP,
125   SIGNAL_HANDLE_SYNC,
126   SIGNAL_ON_NPT_STOP,
127   SIGNAL_SET_ACTIVE,
128   LAST_SIGNAL
129 };
130
131 #define DEFAULT_LATENCY_MS          200
132 #define DEFAULT_DROP_ON_LATENCY     FALSE
133 #define DEFAULT_TS_OFFSET           0
134 #define DEFAULT_MAX_TS_OFFSET_ADJUSTMENT 0
135 #define DEFAULT_DO_LOST             FALSE
136 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
137 #define DEFAULT_PERCENT             0
138 #define DEFAULT_DO_RETRANSMISSION   FALSE
139 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
140 #define DEFAULT_RTX_DELAY           -1
141 #define DEFAULT_RTX_MIN_DELAY       0
142 #define DEFAULT_RTX_DELAY_REORDER   3
143 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
144 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
145 #define DEFAULT_RTX_RETRY_PERIOD    -1
146 #define DEFAULT_RTX_MAX_RETRIES    -1
147 #define DEFAULT_RTX_DEADLINE       -1
148 #define DEFAULT_RTX_STATS_TIMEOUT   1000
149 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
150 #define DEFAULT_MAX_DROPOUT_TIME    60000
151 #define DEFAULT_MAX_MISORDER_TIME   2000
152 #define DEFAULT_RFC7273_SYNC        FALSE
153 #define DEFAULT_FASTSTART_MIN_PACKETS 0
154
155 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
156 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
157
158 enum
159 {
160   PROP_0,
161   PROP_LATENCY,
162   PROP_DROP_ON_LATENCY,
163   PROP_TS_OFFSET,
164   PROP_MAX_TS_OFFSET_ADJUSTMENT,
165   PROP_DO_LOST,
166   PROP_MODE,
167   PROP_PERCENT,
168   PROP_DO_RETRANSMISSION,
169   PROP_RTX_NEXT_SEQNUM,
170   PROP_RTX_DELAY,
171   PROP_RTX_MIN_DELAY,
172   PROP_RTX_DELAY_REORDER,
173   PROP_RTX_RETRY_TIMEOUT,
174   PROP_RTX_MIN_RETRY_TIMEOUT,
175   PROP_RTX_RETRY_PERIOD,
176   PROP_RTX_MAX_RETRIES,
177   PROP_RTX_DEADLINE,
178   PROP_RTX_STATS_TIMEOUT,
179   PROP_STATS,
180   PROP_MAX_RTCP_RTP_TIME_DIFF,
181   PROP_MAX_DROPOUT_TIME,
182   PROP_MAX_MISORDER_TIME,
183   PROP_RFC7273_SYNC,
184   PROP_FASTSTART_MIN_PACKETS
185 };
186
187 #define JBUF_LOCK(priv)   G_STMT_START {                        \
188     GST_TRACE("Locking from thread %p", g_thread_self());       \
189     (g_mutex_lock (&(priv)->jbuf_lock));                        \
190     GST_TRACE("Locked from thread %p", g_thread_self());        \
191   } G_STMT_END
192
193 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
194   JBUF_LOCK (priv);                                   \
195   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
196     goto label;                                       \
197 } G_STMT_END
198 #define JBUF_UNLOCK(priv) G_STMT_START {                        \
199     GST_TRACE ("Unlocking from thread %p", g_thread_self ());   \
200     (g_mutex_unlock (&(priv)->jbuf_lock));                      \
201 } G_STMT_END
202
203 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
204   GST_DEBUG ("waiting timer");                            \
205   (priv)->waiting_timer++;                                \
206   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
207   (priv)->waiting_timer--;                                \
208   GST_DEBUG ("waiting timer done");                       \
209 } G_STMT_END
210 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
211   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
212     GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
213     g_cond_signal (&(priv)->jbuf_timer);                  \
214   }                                                       \
215 } G_STMT_END
216
217 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
218   GST_DEBUG ("waiting event");                           \
219   (priv)->waiting_event = TRUE;                          \
220   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
221   (priv)->waiting_event = FALSE;                         \
222   GST_DEBUG ("waiting event done");                      \
223   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
224     goto label;                                          \
225 } G_STMT_END
226 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
227   if (G_UNLIKELY ((priv)->waiting_event)) {              \
228     GST_DEBUG ("signal event");                          \
229     g_cond_signal (&(priv)->jbuf_event);                 \
230   }                                                      \
231 } G_STMT_END
232
233 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
234   GST_DEBUG ("waiting query");                           \
235   (priv)->waiting_query = TRUE;                          \
236   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
237   (priv)->waiting_query = FALSE;                         \
238   GST_DEBUG ("waiting query done");                      \
239   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
240     goto label;                                          \
241 } G_STMT_END
242 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
243   (priv)->last_query = res;                              \
244   if (G_UNLIKELY ((priv)->waiting_query)) {              \
245     GST_DEBUG ("signal query");                          \
246     g_cond_signal (&(priv)->jbuf_query);                 \
247   }                                                      \
248 } G_STMT_END
249
250 #define GST_BUFFER_IS_RETRANSMISSION(buffer) \
251   GST_BUFFER_FLAG_IS_SET (buffer, GST_RTP_BUFFER_FLAG_RETRANSMISSION)
252
253 typedef struct TimerQueue
254 {
255   GQueue *timers;
256   GHashTable *hashtable;
257 } TimerQueue;
258
259 struct _GstRtpJitterBufferPrivate
260 {
261   GstPad *sinkpad, *srcpad;
262   GstPad *rtcpsinkpad;
263
264   RTPJitterBuffer *jbuf;
265   GMutex jbuf_lock;
266   gboolean waiting_timer;
267   GCond jbuf_timer;
268   gboolean waiting_event;
269   GCond jbuf_event;
270   gboolean waiting_query;
271   GCond jbuf_query;
272   gboolean last_query;
273   gboolean discont;
274   gboolean ts_discont;
275   gboolean active;
276   guint64 out_offset;
277
278   gboolean timer_running;
279   GThread *timer_thread;
280
281   /* properties */
282   guint latency_ms;
283   guint64 latency_ns;
284   gboolean drop_on_latency;
285   gint64 ts_offset;
286   guint64 max_ts_offset_adjustment;
287   gboolean do_lost;
288   gboolean do_retransmission;
289   gboolean rtx_next_seqnum;
290   gint rtx_delay;
291   guint rtx_min_delay;
292   gint rtx_delay_reorder;
293   gint rtx_retry_timeout;
294   gint rtx_min_retry_timeout;
295   gint rtx_retry_period;
296   gint rtx_max_retries;
297   guint rtx_stats_timeout;
298   gint rtx_deadline_ms;
299   gint max_rtcp_rtp_time_diff;
300   guint32 max_dropout_time;
301   guint32 max_misorder_time;
302   guint faststart_min_packets;
303
304   /* the last seqnum we pushed out */
305   guint32 last_popped_seqnum;
306   /* the next expected seqnum we push */
307   guint32 next_seqnum;
308   /* seqnum-base, if known */
309   guint32 seqnum_base;
310   /* last output time */
311   GstClockTime last_out_time;
312   /* last valid input timestamp and rtptime pair */
313   GstClockTime ips_pts;
314   guint64 ips_rtptime;
315   GstClockTime packet_spacing;
316   gint equidistant;
317
318   GQueue gap_packets;
319
320   /* the next expected seqnum we receive */
321   GstClockTime last_in_pts;
322   guint32 next_in_seqnum;
323
324   GArray *timers;
325   TimerQueue *rtx_stats_timers;
326
327   /* start and stop ranges */
328   GstClockTime npt_start;
329   GstClockTime npt_stop;
330   guint64 ext_timestamp;
331   guint64 last_elapsed;
332   guint64 estimated_eos;
333   GstClockID eos_id;
334
335   /* state */
336   gboolean eos;
337   guint last_percent;
338
339   /* clock rate and rtp timestamp offset */
340   gint last_pt;
341   gint32 clock_rate;
342   gint64 clock_base;
343   gint64 ts_offset_remainder;
344
345   /* when we are shutting down */
346   GstFlowReturn srcresult;
347   gboolean blocked;
348
349   /* for sync */
350   GstSegment segment;
351   GstClockID clock_id;
352   GstClockTime timer_timeout;
353   guint16 timer_seqnum;
354   /* the latency of the upstream peer, we have to take this into account when
355    * synchronizing the buffers. */
356   GstClockTime peer_latency;
357   guint64 ext_rtptime;
358   GstBuffer *last_sr;
359
360   /* some accounting */
361   guint64 num_pushed;
362   guint64 num_lost;
363   guint64 num_late;
364   guint64 num_duplicates;
365   guint64 num_rtx_requests;
366   guint64 num_rtx_success;
367   guint64 num_rtx_failed;
368   gdouble avg_rtx_num;
369   guint64 avg_rtx_rtt;
370   RTPPacketRateCtx packet_rate_ctx;
371
372   /* for the jitter */
373   GstClockTime last_dts;
374   GstClockTime last_pts;
375   guint64 last_rtptime;
376   GstClockTime avg_jitter;
377 };
378
379 typedef enum
380 {
381   TIMER_TYPE_EXPECTED,
382   TIMER_TYPE_LOST,
383   TIMER_TYPE_DEADLINE,
384   TIMER_TYPE_EOS
385 } TimerType;
386
387 typedef struct
388 {
389   guint idx;
390   guint16 seqnum;
391   guint num;
392   TimerType type;
393   GstClockTime timeout;
394   GstClockTime duration;
395   GstClockTime rtx_base;
396   GstClockTime rtx_delay;
397   GstClockTime rtx_retry;
398   GstClockTime rtx_last;
399   guint num_rtx_retry;
400   guint num_rtx_received;
401 } TimerData;
402
403 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
404 GST_STATIC_PAD_TEMPLATE ("sink",
405     GST_PAD_SINK,
406     GST_PAD_ALWAYS,
407     GST_STATIC_CAPS ("application/x-rtp"
408         /* "clock-rate = (int) [ 1, 2147483647 ], "
409          * "payload = (int) , "
410          * "encoding-name = (string) "
411          */ )
412     );
413
414 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
415 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
416     GST_PAD_SINK,
417     GST_PAD_REQUEST,
418     GST_STATIC_CAPS ("application/x-rtcp")
419     );
420
421 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
422 GST_STATIC_PAD_TEMPLATE ("src",
423     GST_PAD_SRC,
424     GST_PAD_ALWAYS,
425     GST_STATIC_CAPS ("application/x-rtp"
426         /* "payload = (int) , "
427          * "clock-rate = (int) , "
428          * "encoding-name = (string) "
429          */ )
430     );
431
432 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
433
434 #define gst_rtp_jitter_buffer_parent_class parent_class
435 G_DEFINE_TYPE_WITH_PRIVATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer,
436     GST_TYPE_ELEMENT);
437
438 /* object overrides */
439 static void gst_rtp_jitter_buffer_set_property (GObject * object,
440     guint prop_id, const GValue * value, GParamSpec * pspec);
441 static void gst_rtp_jitter_buffer_get_property (GObject * object,
442     guint prop_id, GValue * value, GParamSpec * pspec);
443 static void gst_rtp_jitter_buffer_finalize (GObject * object);
444
445 /* element overrides */
446 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
447     * element, GstStateChange transition);
448 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
449     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
450 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
451     GstPad * pad);
452 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
453 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
454     GstClock * clock);
455
456 /* pad overrides */
457 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
458 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
459     GstObject * parent);
460
461 /* sinkpad overrides */
462 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
463     GstObject * parent, GstEvent * event);
464 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
465     GstObject * parent, GstBuffer * buffer);
466 static GstFlowReturn gst_rtp_jitter_buffer_chain_list (GstPad * pad,
467     GstObject * parent, GstBufferList * buffer_list);
468
469 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
470     GstObject * parent, GstEvent * event);
471 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
472     GstObject * parent, GstBuffer * buffer);
473
474 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
475     GstObject * parent, GstQuery * query);
476
477 /* srcpad overrides */
478 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
479     GstObject * parent, GstEvent * event);
480 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
481     GstObject * parent, GstPadMode mode, gboolean active);
482 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
483 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
484     GstObject * parent, GstQuery * query);
485
486 static void
487 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
488 static GstClockTime
489 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
490     gboolean active, guint64 base_time);
491 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
492
493 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
494 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
495
496 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
497
498 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
499     jitterbuffer);
500
501 static void update_rtx_stats (GstRtpJitterBuffer * jitterbuffer,
502     TimerData * timer, GstClockTime dts, gboolean success);
503
504 static TimerQueue *timer_queue_new (void);
505 static void timer_queue_free (TimerQueue * queue);
506
507 static void
508 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
509 {
510   GObjectClass *gobject_class;
511   GstElementClass *gstelement_class;
512
513   gobject_class = (GObjectClass *) klass;
514   gstelement_class = (GstElementClass *) klass;
515
516   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
517
518   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
519   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
520
521   /**
522    * GstRtpJitterBuffer:latency:
523    *
524    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
525    * for at most this time.
526    */
527   g_object_class_install_property (gobject_class, PROP_LATENCY,
528       g_param_spec_uint ("latency", "Buffer latency in ms",
529           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
530           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
531   /**
532    * GstRtpJitterBuffer:drop-on-latency:
533    *
534    * Drop oldest buffers when the queue is completely filled.
535    */
536   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
537       g_param_spec_boolean ("drop-on-latency",
538           "Drop buffers when maximum latency is reached",
539           "Tells the jitterbuffer to never exceed the given latency in size",
540           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
541   /**
542    * GstRtpJitterBuffer:ts-offset:
543    *
544    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
545    * This is mainly used to ensure interstream synchronisation.
546    */
547   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
548       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
549           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
550           G_MAXINT64, DEFAULT_TS_OFFSET,
551           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
552
553   /**
554    * GstRtpJitterBuffer:max-ts-offset-adjustment:
555    *
556    * The maximum number of nanoseconds per frame that time offset may be
557    * adjusted with. This is used to avoid sudden large changes to time stamps.
558    */
559   g_object_class_install_property (gobject_class, PROP_MAX_TS_OFFSET_ADJUSTMENT,
560       g_param_spec_uint64 ("max-ts-offset-adjustment",
561           "Max Timestamp Offset Adjustment",
562           "The maximum number of nanoseconds per frame that time stamp "
563           "offsets may be adjusted (0 = no limit).", 0, G_MAXUINT64,
564           DEFAULT_MAX_TS_OFFSET_ADJUSTMENT, G_PARAM_READWRITE |
565           G_PARAM_STATIC_STRINGS));
566
567   /**
568    * GstRtpJitterBuffer:do-lost:
569    *
570    * Send out a GstRTPPacketLost event downstream when a packet is considered
571    * lost.
572    */
573   g_object_class_install_property (gobject_class, PROP_DO_LOST,
574       g_param_spec_boolean ("do-lost", "Do Lost",
575           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
576           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
577
578   /**
579    * GstRtpJitterBuffer:mode:
580    *
581    * Control the buffering and timestamping mode used by the jitterbuffer.
582    */
583   g_object_class_install_property (gobject_class, PROP_MODE,
584       g_param_spec_enum ("mode", "Mode",
585           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
586           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
587   /**
588    * GstRtpJitterBuffer:percent:
589    *
590    * The percent of the jitterbuffer that is filled.
591    */
592   g_object_class_install_property (gobject_class, PROP_PERCENT,
593       g_param_spec_int ("percent", "percent",
594           "The buffer filled percent", 0, 100,
595           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
596   /**
597    * GstRtpJitterBuffer:do-retransmission:
598    *
599    * Send out a GstRTPRetransmission event upstream when a packet is considered
600    * late and should be retransmitted.
601    *
602    * Since: 1.2
603    */
604   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
605       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
606           "Send retransmission events upstream when a packet is late",
607           DEFAULT_DO_RETRANSMISSION,
608           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
609
610   /**
611    * GstRtpJitterBuffer:rtx-next-seqnum
612    *
613    * Estimate when the next packet should arrive and schedule a retransmission
614    * request for it.
615    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
616    * for packet N+1. So it will be requested if it does not arrive at the expected time.
617    * The expected time is calculated using the dts of N and the packet spacing.
618    *
619    * Since: 1.6
620    */
621   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
622       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
623           "Estimate when the next packet should arrive and schedule a "
624           "retransmission request for it.",
625           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
626
627   /**
628    * GstRtpJitterBuffer:rtx-delay:
629    *
630    * When a packet did not arrive at the expected time, wait this extra amount
631    * of time before sending a retransmission event.
632    *
633    * When -1 is used, the max jitter will be used as extra delay.
634    *
635    * Since: 1.2
636    */
637   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
638       g_param_spec_int ("rtx-delay", "RTX Delay",
639           "Extra time in ms to wait before sending retransmission "
640           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
641           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
642
643   /**
644    * GstRtpJitterBuffer:rtx-min-delay:
645    *
646    * When a packet did not arrive at the expected time, wait at least this extra amount
647    * of time before sending a retransmission event.
648    *
649    * Since: 1.6
650    */
651   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
652       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
653           "Minimum time in ms to wait before sending retransmission "
654           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
655           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
656   /**
657    * GstRtpJitterBuffer:rtx-delay-reorder:
658    *
659    * Assume that a retransmission event should be sent when we see
660    * this much packet reordering.
661    *
662    * When -1 is used, the value will be estimated based on observed packet
663    * reordering. When 0 is used packet reordering alone will not cause a
664    * retransmission event (Since 1.10).
665    *
666    * Since: 1.2
667    */
668   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
669       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
670           "Sending retransmission event when this much reordering "
671           "(0 disable, -1 automatic)",
672           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
673           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
674   /**
675    * GstRtpJitterBuffer::rtx-retry-timeout:
676    *
677    * When no packet has been received after sending a retransmission event
678    * for this time, retry sending a retransmission event.
679    *
680    * When -1 is used, the value will be estimated based on observed round
681    * trip time.
682    *
683    * Since: 1.2
684    */
685   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
686       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
687           "Retry sending a transmission event after this timeout in "
688           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
689           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
690   /**
691    * GstRtpJitterBuffer::rtx-min-retry-timeout:
692    *
693    * The minimum amount of time between retry timeouts. When
694    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
695    * minimum interval between retry timeouts.
696    *
697    * When -1 is used, the value will be estimated based on the
698    * packet spacing.
699    *
700    * Since: 1.6
701    */
702   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
703       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
704           "Minimum timeout between sending a transmission event in "
705           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
706           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
707   /**
708    * GstRtpJitterBuffer:rtx-retry-period:
709    *
710    * The amount of time to try to get a retransmission.
711    *
712    * When -1 is used, the value will be estimated based on the jitterbuffer
713    * latency and the observed round trip time.
714    *
715    * Since: 1.2
716    */
717   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
718       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
719           "Try to get a retransmission for this many ms "
720           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
721           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
722   /**
723    * GstRtpJitterBuffer:rtx-max-retries:
724    *
725    * The maximum number of retries to request a retransmission.
726    *
727    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
728    * When -1 is used, the number of retransmission request will not be limited.
729    *
730    * Since: 1.6
731    */
732   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
733       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
734           "The maximum number of retries to request a retransmission. "
735           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
736           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
737   /**
738    * GstRtpJitterBuffer:rtx-deadline:
739    *
740    * The deadline for a valid RTX request in ms.
741    *
742    * How long the RTX RTCP will be valid for.
743    * When -1 is used, the size of the jitterbuffer will be used.
744    *
745    * Since: 1.10
746    */
747   g_object_class_install_property (gobject_class, PROP_RTX_DEADLINE,
748       g_param_spec_int ("rtx-deadline", "RTX Deadline (ms)",
749           "The deadline for a valid RTX request in milliseconds. "
750           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DEADLINE,
751           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
752 /**
753    * GstRtpJitterBuffer::rtx-stats-timeout:
754    *
755    * The time to wait for a retransmitted packet after it has been
756    * considered lost in order to collect RTX statistics.
757    *
758    * Since: 1.10
759    */
760   g_object_class_install_property (gobject_class, PROP_RTX_STATS_TIMEOUT,
761       g_param_spec_uint ("rtx-stats-timeout", "RTX Statistics Timeout",
762           "The time to wait for a retransmitted packet after it has been "
763           "considered lost in order to collect statistics (ms)",
764           0, G_MAXUINT, DEFAULT_RTX_STATS_TIMEOUT,
765           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
766
767   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
768       g_param_spec_uint ("max-dropout-time", "Max dropout time",
769           "The maximum time (milliseconds) of missing packets tolerated.",
770           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
771           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
772
773   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
774       g_param_spec_uint ("max-misorder-time", "Max misorder time",
775           "The maximum time (milliseconds) of misordered packets tolerated.",
776           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
777           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
778   /**
779    * GstRtpJitterBuffer:stats:
780    *
781    * Various jitterbuffer statistics. This property returns a GstStructure
782    * with name application/x-rtp-jitterbuffer-stats with the following fields:
783    *
784    * <itemizedlist>
785    * <listitem>
786    *   <para>
787    *   #guint64
788    *   <classname>&quot;num-pushed&quot;</classname>:
789    *   the number of packets pushed out.
790    *   </para>
791    * </listitem>
792    * <listitem>
793    *   <para>
794    *   #guint64
795    *   <classname>&quot;num-lost&quot;</classname>:
796    *   the number of packets considered lost.
797    *   </para>
798    * </listitem>
799    * <listitem>
800    *   <para>
801    *   #guint64
802    *   <classname>&quot;num-late&quot;</classname>:
803    *   the number of packets arriving too late.
804    *   </para>
805    * </listitem>
806    * <listitem>
807    *   <para>
808    *   #guint64
809    *   <classname>&quot;num-duplicates&quot;</classname>:
810    *   the number of duplicate packets.
811    *   </para>
812    * </listitem>
813    * <listitem>
814    *   <para>
815    *   #guint64
816    *   <classname>&quot;rtx-count&quot;</classname>:
817    *   the number of retransmissions requested.
818    *   </para>
819    * </listitem>
820    * <listitem>
821    *   <para>
822    *   #guint64
823    *   <classname>&quot;rtx-success-count&quot;</classname>:
824    *   the number of successful retransmissions.
825    *   </para>
826    * </listitem>
827    * <listitem>
828    *   <para>
829    *   #gdouble
830    *   <classname>&quot;rtx-per-packet&quot;</classname>:
831    *   average number of RTX per packet.
832    *   </para>
833    * </listitem>
834    * <listitem>
835    *   <para>
836    *   #guint64
837    *   <classname>&quot;rtx-rtt&quot;</classname>:
838    *   average round trip time per RTX.
839    *   </para>
840    * </listitem>
841    * </itemizedlist>
842    *
843    * Since: 1.4
844    */
845   g_object_class_install_property (gobject_class, PROP_STATS,
846       g_param_spec_boxed ("stats", "Statistics",
847           "Various statistics", GST_TYPE_STRUCTURE,
848           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
849
850   /**
851    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
852    *
853    * The maximum amount of time in ms that the RTP time in the RTCP SRs
854    * is allowed to be ahead of the last RTP packet we received. Use
855    * -1 to disable ignoring of RTCP packets.
856    *
857    * Since: 1.8
858    */
859   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
860       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
861           "Maximum amount of time in ms that the RTP time in RTCP SRs "
862           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
863           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
864           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
865
866   g_object_class_install_property (gobject_class, PROP_RFC7273_SYNC,
867       g_param_spec_boolean ("rfc7273-sync", "Sync on RFC7273 clock",
868           "Synchronize received streams to the RFC7273 clock "
869           "(requires clock and offset to be provided)", DEFAULT_RFC7273_SYNC,
870           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
871
872   /**
873    * GstRtpJitterBuffer:faststart-min-packets
874    *
875    * The number of consecutive packets needed to start (set to 0 to
876    * disable faststart. The jitterbuffer will by default start after the
877    * latency has elapsed)
878    *
879    * Since: 1.14
880    */
881   g_object_class_install_property (gobject_class, PROP_FASTSTART_MIN_PACKETS,
882       g_param_spec_uint ("faststart-min-packets", "Faststart minimum packets",
883           "The number of consecutive packets needed to start (set to 0 to "
884           "disable faststart. The jitterbuffer will by default start after "
885           "the latency has elapsed)",
886           0, G_MAXUINT, DEFAULT_FASTSTART_MIN_PACKETS,
887           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
888
889   /**
890    * GstRtpJitterBuffer::request-pt-map:
891    * @buffer: the object which received the signal
892    * @pt: the pt
893    *
894    * Request the payload type as #GstCaps for @pt.
895    */
896   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
897       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
898       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
899           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
900       GST_TYPE_CAPS, 1, G_TYPE_UINT);
901   /**
902    * GstRtpJitterBuffer::handle-sync:
903    * @buffer: the object which received the signal
904    * @struct: a GstStructure containing sync values.
905    *
906    * Be notified of new sync values.
907    */
908   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
909       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
910       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
911           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
912       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
913
914   /**
915    * GstRtpJitterBuffer::on-npt-stop:
916    * @buffer: the object which received the signal
917    *
918    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
919    * the npt-stop position.
920    */
921   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
922       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
923       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
924           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
925       G_TYPE_NONE, 0, G_TYPE_NONE);
926
927   /**
928    * GstRtpJitterBuffer::clear-pt-map:
929    * @buffer: the object which received the signal
930    *
931    * Invalidate the clock-rate as obtained with the
932    * #GstRtpJitterBuffer::request-pt-map signal.
933    */
934   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
935       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
936       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
937       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
938       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
939
940   /**
941    * GstRtpJitterBuffer::set-active:
942    * @buffer: the object which received the signal
943    *
944    * Start pushing out packets with the given base time. This signal is only
945    * useful in buffering mode.
946    *
947    * Returns: the time of the last pushed packet.
948    */
949   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
950       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
951       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
952       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
953       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
954       G_TYPE_UINT64);
955
956   gstelement_class->change_state =
957       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
958   gstelement_class->request_new_pad =
959       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
960   gstelement_class->release_pad =
961       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
962   gstelement_class->provide_clock =
963       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
964   gstelement_class->set_clock =
965       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
966
967   gst_element_class_add_static_pad_template (gstelement_class,
968       &gst_rtp_jitter_buffer_src_template);
969   gst_element_class_add_static_pad_template (gstelement_class,
970       &gst_rtp_jitter_buffer_sink_template);
971   gst_element_class_add_static_pad_template (gstelement_class,
972       &gst_rtp_jitter_buffer_sink_rtcp_template);
973
974   gst_element_class_set_static_metadata (gstelement_class,
975       "RTP packet jitter-buffer", "Filter/Network/RTP",
976       "A buffer that deals with network jitter and other transmission faults",
977       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
978       "Wim Taymans <wim.taymans@gmail.com>");
979
980   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
981   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
982
983   GST_DEBUG_CATEGORY_INIT
984       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
985 }
986
987 static void
988 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
989 {
990   GstRtpJitterBufferPrivate *priv;
991
992   priv = gst_rtp_jitter_buffer_get_instance_private (jitterbuffer);
993   jitterbuffer->priv = priv;
994
995   priv->latency_ms = DEFAULT_LATENCY_MS;
996   priv->latency_ns = priv->latency_ms * GST_MSECOND;
997   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
998   priv->ts_offset = DEFAULT_TS_OFFSET;
999   priv->max_ts_offset_adjustment = DEFAULT_MAX_TS_OFFSET_ADJUSTMENT;
1000   priv->do_lost = DEFAULT_DO_LOST;
1001   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
1002   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
1003   priv->rtx_delay = DEFAULT_RTX_DELAY;
1004   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
1005   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
1006   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
1007   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
1008   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
1009   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
1010   priv->rtx_deadline_ms = DEFAULT_RTX_DEADLINE;
1011   priv->rtx_stats_timeout = DEFAULT_RTX_STATS_TIMEOUT;
1012   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
1013   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
1014   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
1015   priv->faststart_min_packets = DEFAULT_FASTSTART_MIN_PACKETS;
1016
1017   priv->ts_offset_remainder = 0;
1018   priv->last_dts = -1;
1019   priv->last_pts = -1;
1020   priv->last_rtptime = -1;
1021   priv->avg_jitter = 0;
1022   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
1023   priv->rtx_stats_timers = timer_queue_new ();
1024   priv->jbuf = rtp_jitter_buffer_new ();
1025   g_mutex_init (&priv->jbuf_lock);
1026   g_cond_init (&priv->jbuf_timer);
1027   g_cond_init (&priv->jbuf_event);
1028   g_cond_init (&priv->jbuf_query);
1029   g_queue_init (&priv->gap_packets);
1030   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1031
1032   /* reset skew detection initialy */
1033   rtp_jitter_buffer_reset_skew (priv->jbuf);
1034   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
1035   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1036   priv->active = TRUE;
1037
1038   priv->srcpad =
1039       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
1040       "src");
1041
1042   gst_pad_set_activatemode_function (priv->srcpad,
1043       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
1044   gst_pad_set_query_function (priv->srcpad,
1045       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
1046   gst_pad_set_event_function (priv->srcpad,
1047       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
1048
1049   priv->sinkpad =
1050       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
1051       "sink");
1052
1053   gst_pad_set_chain_function (priv->sinkpad,
1054       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
1055   gst_pad_set_chain_list_function (priv->sinkpad,
1056       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain_list));
1057   gst_pad_set_event_function (priv->sinkpad,
1058       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
1059   gst_pad_set_query_function (priv->sinkpad,
1060       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
1061
1062   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
1063   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
1064
1065   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
1066 }
1067
1068 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
1069
1070 #define ITEM_TYPE_BUFFER        0
1071 #define ITEM_TYPE_LOST          1
1072 #define ITEM_TYPE_EVENT         2
1073 #define ITEM_TYPE_QUERY         3
1074
1075 static RTPJitterBufferItem *
1076 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
1077     guint seqnum, guint count, guint rtptime)
1078 {
1079   RTPJitterBufferItem *item;
1080
1081   item = g_slice_new (RTPJitterBufferItem);
1082   item->data = data;
1083   item->next = NULL;
1084   item->prev = NULL;
1085   item->type = type;
1086   item->dts = dts;
1087   item->pts = pts;
1088   item->seqnum = seqnum;
1089   item->count = count;
1090   item->rtptime = rtptime;
1091
1092   return item;
1093 }
1094
1095 static void
1096 free_item (RTPJitterBufferItem * item)
1097 {
1098   g_return_if_fail (item != NULL);
1099
1100   if (item->data && item->type != ITEM_TYPE_QUERY)
1101     gst_mini_object_unref (item->data);
1102   g_slice_free (RTPJitterBufferItem, item);
1103 }
1104
1105 static void
1106 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
1107 {
1108   GList **l = user_data;
1109
1110   if (item->data && item->type == ITEM_TYPE_EVENT
1111       && GST_EVENT_IS_STICKY (item->data)) {
1112     *l = g_list_prepend (*l, item->data);
1113   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
1114     gst_mini_object_unref (item->data);
1115   }
1116   g_slice_free (RTPJitterBufferItem, item);
1117 }
1118
1119 static void
1120 gst_rtp_jitter_buffer_finalize (GObject * object)
1121 {
1122   GstRtpJitterBuffer *jitterbuffer;
1123   GstRtpJitterBufferPrivate *priv;
1124
1125   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1126   priv = jitterbuffer->priv;
1127
1128   g_array_free (priv->timers, TRUE);
1129   timer_queue_free (priv->rtx_stats_timers);
1130   g_mutex_clear (&priv->jbuf_lock);
1131   g_cond_clear (&priv->jbuf_timer);
1132   g_cond_clear (&priv->jbuf_event);
1133   g_cond_clear (&priv->jbuf_query);
1134
1135   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1136   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1137   g_queue_clear (&priv->gap_packets);
1138   g_object_unref (priv->jbuf);
1139
1140   G_OBJECT_CLASS (parent_class)->finalize (object);
1141 }
1142
1143 static GstIterator *
1144 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
1145 {
1146   GstRtpJitterBuffer *jitterbuffer;
1147   GstPad *otherpad = NULL;
1148   GstIterator *it = NULL;
1149   GValue val = { 0, };
1150
1151   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1152
1153   if (pad == jitterbuffer->priv->sinkpad) {
1154     otherpad = jitterbuffer->priv->srcpad;
1155   } else if (pad == jitterbuffer->priv->srcpad) {
1156     otherpad = jitterbuffer->priv->sinkpad;
1157   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1158     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1159   }
1160
1161   if (it == NULL) {
1162     g_value_init (&val, GST_TYPE_PAD);
1163     g_value_set_object (&val, otherpad);
1164     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1165     g_value_unset (&val);
1166   }
1167
1168   return it;
1169 }
1170
1171 static GstPad *
1172 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1173 {
1174   GstRtpJitterBufferPrivate *priv;
1175
1176   priv = jitterbuffer->priv;
1177
1178   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1179
1180   priv->rtcpsinkpad =
1181       gst_pad_new_from_static_template
1182       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1183   gst_pad_set_chain_function (priv->rtcpsinkpad,
1184       gst_rtp_jitter_buffer_chain_rtcp);
1185   gst_pad_set_event_function (priv->rtcpsinkpad,
1186       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1187   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1188       gst_rtp_jitter_buffer_iterate_internal_links);
1189   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1190   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1191
1192   return priv->rtcpsinkpad;
1193 }
1194
1195 static void
1196 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1197 {
1198   GstRtpJitterBufferPrivate *priv;
1199
1200   priv = jitterbuffer->priv;
1201
1202   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1203
1204   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1205
1206   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1207   priv->rtcpsinkpad = NULL;
1208 }
1209
1210 static GstPad *
1211 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1212     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1213 {
1214   GstRtpJitterBuffer *jitterbuffer;
1215   GstElementClass *klass;
1216   GstPad *result;
1217   GstRtpJitterBufferPrivate *priv;
1218
1219   g_return_val_if_fail (templ != NULL, NULL);
1220   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1221
1222   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1223   priv = jitterbuffer->priv;
1224   klass = GST_ELEMENT_GET_CLASS (element);
1225
1226   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1227
1228   /* figure out the template */
1229   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1230     if (priv->rtcpsinkpad != NULL)
1231       goto exists;
1232
1233     result = create_rtcp_sink (jitterbuffer);
1234   } else
1235     goto wrong_template;
1236
1237   return result;
1238
1239   /* ERRORS */
1240 wrong_template:
1241   {
1242     g_warning ("rtpjitterbuffer: this is not our template");
1243     return NULL;
1244   }
1245 exists:
1246   {
1247     g_warning ("rtpjitterbuffer: pad already requested");
1248     return NULL;
1249   }
1250 }
1251
1252 static void
1253 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1254 {
1255   GstRtpJitterBuffer *jitterbuffer;
1256   GstRtpJitterBufferPrivate *priv;
1257
1258   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1259   g_return_if_fail (GST_IS_PAD (pad));
1260
1261   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1262   priv = jitterbuffer->priv;
1263
1264   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1265
1266   if (priv->rtcpsinkpad == pad) {
1267     remove_rtcp_sink (jitterbuffer);
1268   } else
1269     goto wrong_pad;
1270
1271   return;
1272
1273   /* ERRORS */
1274 wrong_pad:
1275   {
1276     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1277     return;
1278   }
1279 }
1280
1281 static GstClock *
1282 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1283 {
1284   return gst_system_clock_obtain ();
1285 }
1286
1287 static gboolean
1288 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1289 {
1290   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1291
1292   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1293
1294   return GST_ELEMENT_CLASS (parent_class)->set_clock (element, clock);
1295 }
1296
1297 static void
1298 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1299 {
1300   GstRtpJitterBufferPrivate *priv;
1301
1302   priv = jitterbuffer->priv;
1303
1304   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1305
1306   JBUF_LOCK (priv);
1307   priv->clock_rate = -1;
1308   /* do not clear current content, but refresh state for new arrival */
1309   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1310   rtp_jitter_buffer_reset_skew (priv->jbuf);
1311   JBUF_UNLOCK (priv);
1312 }
1313
1314 static GstClockTime
1315 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1316     guint64 offset)
1317 {
1318   GstRtpJitterBufferPrivate *priv;
1319   GstClockTime last_out;
1320   RTPJitterBufferItem *item;
1321
1322   priv = jbuf->priv;
1323
1324   JBUF_LOCK (priv);
1325   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1326       active, GST_TIME_ARGS (offset));
1327
1328   if (active != priv->active) {
1329     /* add the amount of time spent in paused to the output offset. All
1330      * outgoing buffers will have this offset applied to their timestamps in
1331      * order to make them arrive in time in the sink. */
1332     priv->out_offset = offset;
1333     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1334         GST_TIME_ARGS (priv->out_offset));
1335     priv->active = active;
1336     JBUF_SIGNAL_EVENT (priv);
1337   }
1338   if (!active) {
1339     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1340   }
1341   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1342     /* head buffer timestamp and offset gives our output time */
1343     last_out = item->pts + priv->ts_offset;
1344   } else {
1345     /* use last known time when the buffer is empty */
1346     last_out = priv->last_out_time;
1347   }
1348   JBUF_UNLOCK (priv);
1349
1350   return last_out;
1351 }
1352
1353 static GstCaps *
1354 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1355 {
1356   GstRtpJitterBuffer *jitterbuffer;
1357   GstRtpJitterBufferPrivate *priv;
1358   GstPad *other;
1359   GstCaps *caps;
1360   GstCaps *templ;
1361
1362   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1363   priv = jitterbuffer->priv;
1364
1365   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1366
1367   caps = gst_pad_peer_query_caps (other, filter);
1368
1369   templ = gst_pad_get_pad_template_caps (pad);
1370   if (caps == NULL) {
1371     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1372     caps = templ;
1373   } else {
1374     GstCaps *intersect;
1375
1376     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1377
1378     intersect = gst_caps_intersect (caps, templ);
1379     gst_caps_unref (caps);
1380     gst_caps_unref (templ);
1381
1382     caps = intersect;
1383   }
1384   gst_object_unref (jitterbuffer);
1385
1386   return caps;
1387 }
1388
1389 /*
1390  * Must be called with JBUF_LOCK held
1391  */
1392
1393 static gboolean
1394 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1395     GstCaps * caps, gint pt)
1396 {
1397   GstRtpJitterBufferPrivate *priv;
1398   GstStructure *caps_struct;
1399   guint val;
1400   gint payload = -1;
1401   GstClockTime tval;
1402   const gchar *ts_refclk, *mediaclk;
1403
1404   priv = jitterbuffer->priv;
1405
1406   /* first parse the caps */
1407   caps_struct = gst_caps_get_structure (caps, 0);
1408
1409   GST_DEBUG_OBJECT (jitterbuffer, "got caps %" GST_PTR_FORMAT, caps);
1410
1411   if (gst_structure_get_int (caps_struct, "payload", &payload) && pt != -1
1412       && payload != pt) {
1413     GST_ERROR_OBJECT (jitterbuffer,
1414         "Got caps with wrong payload type (got %d, expected %d)", pt, payload);
1415     return FALSE;
1416   }
1417
1418   if (payload != -1) {
1419     GST_DEBUG_OBJECT (jitterbuffer, "Got payload type %d", payload);
1420     priv->last_pt = payload;
1421   }
1422
1423   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1424    * measure the amount of data in the buffer */
1425   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1426     goto error;
1427
1428   if (priv->clock_rate <= 0)
1429     goto wrong_rate;
1430
1431   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1432
1433   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1434
1435   gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
1436
1437   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1438    * can use this to track the amount of time elapsed on the sender. */
1439   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1440     priv->clock_base = val;
1441   else
1442     priv->clock_base = -1;
1443
1444   priv->ext_timestamp = priv->clock_base;
1445
1446   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1447       priv->clock_base);
1448
1449   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1450     /* first expected seqnum, only update when we didn't have a previous base. */
1451     if (priv->next_in_seqnum == -1)
1452       priv->next_in_seqnum = val;
1453     if (priv->next_seqnum == -1) {
1454       priv->next_seqnum = val;
1455       JBUF_SIGNAL_EVENT (priv);
1456     }
1457     priv->seqnum_base = val;
1458   } else {
1459     priv->seqnum_base = -1;
1460   }
1461
1462   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1463
1464   /* the start and stop times. The seqnum-base corresponds to the start time. We
1465    * will keep track of the seqnums on the output and when we reach the one
1466    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1467   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1468     priv->npt_start = tval;
1469   else
1470     priv->npt_start = 0;
1471
1472   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1473     priv->npt_stop = tval;
1474   else
1475     priv->npt_stop = -1;
1476
1477   GST_DEBUG_OBJECT (jitterbuffer,
1478       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1479       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1480
1481   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1482     GstClock *clock = NULL;
1483     guint64 clock_offset = -1;
1484
1485     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1486         ts_refclk);
1487
1488     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1489       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1490         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1491       } else {
1492         const gchar *host, *portstr;
1493         gchar *hostname;
1494         guint port;
1495
1496         host = ts_refclk + sizeof ("ntp=") - 1;
1497         if (host[0] == '[') {
1498           /* IPv6 */
1499           portstr = strchr (host, ']');
1500           if (portstr && portstr[1] == ':')
1501             portstr = portstr + 1;
1502           else
1503             portstr = NULL;
1504         } else {
1505           portstr = strrchr (host, ':');
1506         }
1507
1508
1509         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1510           port = 123;
1511
1512         if (portstr)
1513           hostname = g_strndup (host, (portstr - host));
1514         else
1515           hostname = g_strdup (host);
1516
1517         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1518         g_free (hostname);
1519       }
1520     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1521       const gchar *domainstr =
1522           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1523       guint domain;
1524
1525       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1526         domain = 0;
1527
1528       clock = gst_ptp_clock_new (NULL, domain);
1529     } else {
1530       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1531     }
1532
1533     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1534       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1535
1536       if (!g_str_has_prefix (mediaclk, "direct=")
1537           || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
1538         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1539       if (strstr (mediaclk, "rate=") != NULL) {
1540         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1541         clock_offset = -1;
1542       }
1543     }
1544
1545     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1546   } else {
1547     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1548   }
1549
1550   return TRUE;
1551
1552   /* ERRORS */
1553 error:
1554   {
1555     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1556     return FALSE;
1557   }
1558 wrong_rate:
1559   {
1560     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1561     return FALSE;
1562   }
1563 }
1564
1565 static void
1566 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1567 {
1568   GstRtpJitterBufferPrivate *priv;
1569
1570   priv = jitterbuffer->priv;
1571
1572   JBUF_LOCK (priv);
1573   /* mark ourselves as flushing */
1574   priv->srcresult = GST_FLOW_FLUSHING;
1575   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1576   /* this unblocks any waiting pops on the src pad task */
1577   JBUF_SIGNAL_EVENT (priv);
1578   JBUF_SIGNAL_QUERY (priv, FALSE);
1579   JBUF_UNLOCK (priv);
1580 }
1581
1582 static void
1583 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1584 {
1585   GstRtpJitterBufferPrivate *priv;
1586
1587   priv = jitterbuffer->priv;
1588
1589   JBUF_LOCK (priv);
1590   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1591   /* Mark as non flushing */
1592   priv->srcresult = GST_FLOW_OK;
1593   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1594   priv->last_popped_seqnum = -1;
1595   priv->last_out_time = GST_CLOCK_TIME_NONE;
1596   priv->next_seqnum = -1;
1597   priv->seqnum_base = -1;
1598   priv->ips_rtptime = -1;
1599   priv->ips_pts = GST_CLOCK_TIME_NONE;
1600   priv->packet_spacing = 0;
1601   priv->next_in_seqnum = -1;
1602   priv->clock_rate = -1;
1603   priv->last_pt = -1;
1604   priv->eos = FALSE;
1605   priv->estimated_eos = -1;
1606   priv->last_elapsed = 0;
1607   priv->ext_timestamp = -1;
1608   priv->avg_jitter = 0;
1609   priv->last_dts = -1;
1610   priv->last_rtptime = -1;
1611   priv->last_in_pts = 0;
1612   priv->equidistant = 0;
1613   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1614   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1615   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1616   rtp_jitter_buffer_reset_skew (priv->jbuf);
1617   remove_all_timers (jitterbuffer);
1618   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1619   g_queue_clear (&priv->gap_packets);
1620   JBUF_UNLOCK (priv);
1621 }
1622
1623 static gboolean
1624 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1625     GstPadMode mode, gboolean active)
1626 {
1627   gboolean result;
1628   GstRtpJitterBuffer *jitterbuffer = NULL;
1629
1630   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1631
1632   switch (mode) {
1633     case GST_PAD_MODE_PUSH:
1634       if (active) {
1635         /* allow data processing */
1636         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1637
1638         /* start pushing out buffers */
1639         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1640         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1641             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1642       } else {
1643         /* make sure all data processing stops ASAP */
1644         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1645
1646         /* NOTE this will hardlock if the state change is called from the src pad
1647          * task thread because we will _join() the thread. */
1648         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1649         result = gst_pad_stop_task (pad);
1650       }
1651       break;
1652     default:
1653       result = FALSE;
1654       break;
1655   }
1656   return result;
1657 }
1658
1659 static GstStateChangeReturn
1660 gst_rtp_jitter_buffer_change_state (GstElement * element,
1661     GstStateChange transition)
1662 {
1663   GstRtpJitterBuffer *jitterbuffer;
1664   GstRtpJitterBufferPrivate *priv;
1665   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1666
1667   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1668   priv = jitterbuffer->priv;
1669
1670   switch (transition) {
1671     case GST_STATE_CHANGE_NULL_TO_READY:
1672       break;
1673     case GST_STATE_CHANGE_READY_TO_PAUSED:
1674       JBUF_LOCK (priv);
1675       /* reset negotiated values */
1676       priv->clock_rate = -1;
1677       priv->clock_base = -1;
1678       priv->peer_latency = 0;
1679       priv->last_pt = -1;
1680       /* block until we go to PLAYING */
1681       priv->blocked = TRUE;
1682       priv->timer_running = TRUE;
1683       priv->timer_thread =
1684           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1685       JBUF_UNLOCK (priv);
1686       break;
1687     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1688       JBUF_LOCK (priv);
1689       /* unblock to allow streaming in PLAYING */
1690       priv->blocked = FALSE;
1691       JBUF_SIGNAL_EVENT (priv);
1692       JBUF_SIGNAL_TIMER (priv);
1693       JBUF_UNLOCK (priv);
1694       break;
1695     default:
1696       break;
1697   }
1698
1699   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1700
1701   switch (transition) {
1702     case GST_STATE_CHANGE_READY_TO_PAUSED:
1703       /* we are a live element because we sync to the clock, which we can only
1704        * do in the PLAYING state */
1705       if (ret != GST_STATE_CHANGE_FAILURE)
1706         ret = GST_STATE_CHANGE_NO_PREROLL;
1707       break;
1708     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1709       JBUF_LOCK (priv);
1710       /* block to stop streaming when PAUSED */
1711       priv->blocked = TRUE;
1712       unschedule_current_timer (jitterbuffer);
1713       JBUF_UNLOCK (priv);
1714       if (ret != GST_STATE_CHANGE_FAILURE)
1715         ret = GST_STATE_CHANGE_NO_PREROLL;
1716       break;
1717     case GST_STATE_CHANGE_PAUSED_TO_READY:
1718       JBUF_LOCK (priv);
1719       gst_buffer_replace (&priv->last_sr, NULL);
1720       priv->timer_running = FALSE;
1721       unschedule_current_timer (jitterbuffer);
1722       JBUF_SIGNAL_TIMER (priv);
1723       JBUF_SIGNAL_QUERY (priv, FALSE);
1724       JBUF_UNLOCK (priv);
1725       g_thread_join (priv->timer_thread);
1726       priv->timer_thread = NULL;
1727       break;
1728     case GST_STATE_CHANGE_READY_TO_NULL:
1729       break;
1730     default:
1731       break;
1732   }
1733
1734   return ret;
1735 }
1736
1737 static gboolean
1738 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1739     GstEvent * event)
1740 {
1741   gboolean ret = TRUE;
1742   GstRtpJitterBuffer *jitterbuffer;
1743   GstRtpJitterBufferPrivate *priv;
1744
1745   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1746   priv = jitterbuffer->priv;
1747
1748   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1749
1750   switch (GST_EVENT_TYPE (event)) {
1751     case GST_EVENT_LATENCY:
1752     {
1753       GstClockTime latency;
1754
1755       gst_event_parse_latency (event, &latency);
1756
1757       GST_DEBUG_OBJECT (jitterbuffer,
1758           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1759
1760       JBUF_LOCK (priv);
1761       /* adjust the overall buffer delay to the total pipeline latency in
1762        * buffering mode because if downstream consumes too fast (because of
1763        * large latency or queues, we would start rebuffering again. */
1764       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1765           RTP_JITTER_BUFFER_MODE_BUFFER) {
1766         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1767       }
1768       JBUF_UNLOCK (priv);
1769
1770       ret = gst_pad_push_event (priv->sinkpad, event);
1771       break;
1772     }
1773     default:
1774       ret = gst_pad_push_event (priv->sinkpad, event);
1775       break;
1776   }
1777
1778   return ret;
1779 }
1780
1781 /* handles and stores the event in the jitterbuffer, must be called with
1782  * LOCK */
1783 static gboolean
1784 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1785 {
1786   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1787   RTPJitterBufferItem *item;
1788   gboolean head;
1789
1790   switch (GST_EVENT_TYPE (event)) {
1791     case GST_EVENT_CAPS:
1792     {
1793       GstCaps *caps;
1794
1795       gst_event_parse_caps (event, &caps);
1796       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, -1);
1797       break;
1798     }
1799     case GST_EVENT_SEGMENT:
1800     {
1801       GstSegment segment;
1802       gst_event_copy_segment (event, &segment);
1803
1804       /* we need time for now */
1805       if (segment.format != GST_FORMAT_TIME) {
1806         GST_DEBUG_OBJECT (jitterbuffer, "ignoring non-TIME newsegment");
1807         gst_event_unref (event);
1808
1809         gst_segment_init (&segment, GST_FORMAT_TIME);
1810         event = gst_event_new_segment (&segment);
1811       }
1812
1813       priv->segment = segment;
1814       break;
1815     }
1816     case GST_EVENT_EOS:
1817       priv->eos = TRUE;
1818       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1819       break;
1820     default:
1821       break;
1822   }
1823
1824
1825   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1826   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1827   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1828   if (head || priv->eos)
1829     JBUF_SIGNAL_EVENT (priv);
1830
1831   return TRUE;
1832 }
1833
1834 static gboolean
1835 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1836     GstEvent * event)
1837 {
1838   gboolean ret = TRUE;
1839   GstRtpJitterBuffer *jitterbuffer;
1840   GstRtpJitterBufferPrivate *priv;
1841
1842   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1843   priv = jitterbuffer->priv;
1844
1845   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1846
1847   switch (GST_EVENT_TYPE (event)) {
1848     case GST_EVENT_FLUSH_START:
1849       ret = gst_pad_push_event (priv->srcpad, event);
1850       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1851       /* wait for the loop to go into PAUSED */
1852       gst_pad_pause_task (priv->srcpad);
1853       break;
1854     case GST_EVENT_FLUSH_STOP:
1855       ret = gst_pad_push_event (priv->srcpad, event);
1856       ret =
1857           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1858           GST_PAD_MODE_PUSH, TRUE);
1859       break;
1860     default:
1861       if (GST_EVENT_IS_SERIALIZED (event)) {
1862         /* serialized events go in the queue */
1863         JBUF_LOCK (priv);
1864         if (priv->srcresult != GST_FLOW_OK) {
1865           /* Errors in sticky event pushing are no problem and ignored here
1866            * as they will cause more meaningful errors during data flow.
1867            * For EOS events, that are not followed by data flow, we still
1868            * return FALSE here though.
1869            */
1870           if (!GST_EVENT_IS_STICKY (event) ||
1871               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1872             goto out_flow_error;
1873         }
1874         /* refuse more events on EOS */
1875         if (priv->eos)
1876           goto out_eos;
1877         ret = queue_event (jitterbuffer, event);
1878         JBUF_UNLOCK (priv);
1879       } else {
1880         /* non-serialized events are forwarded downstream immediately */
1881         ret = gst_pad_push_event (priv->srcpad, event);
1882       }
1883       break;
1884   }
1885   return ret;
1886
1887   /* ERRORS */
1888 out_flow_error:
1889   {
1890     GST_DEBUG_OBJECT (jitterbuffer,
1891         "refusing event, we have a downstream flow error: %s",
1892         gst_flow_get_name (priv->srcresult));
1893     JBUF_UNLOCK (priv);
1894     gst_event_unref (event);
1895     return FALSE;
1896   }
1897 out_eos:
1898   {
1899     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1900     JBUF_UNLOCK (priv);
1901     gst_event_unref (event);
1902     return FALSE;
1903   }
1904 }
1905
1906 static gboolean
1907 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1908     GstEvent * event)
1909 {
1910   gboolean ret = TRUE;
1911   GstRtpJitterBuffer *jitterbuffer;
1912
1913   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1914
1915   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1916
1917   switch (GST_EVENT_TYPE (event)) {
1918     case GST_EVENT_FLUSH_START:
1919       gst_event_unref (event);
1920       break;
1921     case GST_EVENT_FLUSH_STOP:
1922       gst_event_unref (event);
1923       break;
1924     default:
1925       ret = gst_pad_event_default (pad, parent, event);
1926       break;
1927   }
1928
1929   return ret;
1930 }
1931
1932 /*
1933  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1934  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1935  * GST_FLOW_FLUSHING when the element is shutting down. On success
1936  * GST_FLOW_OK is returned.
1937  */
1938 static GstFlowReturn
1939 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1940     guint8 pt)
1941 {
1942   GValue ret = { 0 };
1943   GValue args[2] = { {0}, {0} };
1944   GstCaps *caps;
1945   gboolean res;
1946
1947   g_value_init (&args[0], GST_TYPE_ELEMENT);
1948   g_value_set_object (&args[0], jitterbuffer);
1949   g_value_init (&args[1], G_TYPE_UINT);
1950   g_value_set_uint (&args[1], pt);
1951
1952   g_value_init (&ret, GST_TYPE_CAPS);
1953   g_value_set_boxed (&ret, NULL);
1954
1955   JBUF_UNLOCK (jitterbuffer->priv);
1956   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1957       &ret);
1958   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1959
1960   g_value_unset (&args[0]);
1961   g_value_unset (&args[1]);
1962   caps = (GstCaps *) g_value_dup_boxed (&ret);
1963   g_value_unset (&ret);
1964   if (!caps)
1965     goto no_caps;
1966
1967   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
1968   gst_caps_unref (caps);
1969
1970   if (G_UNLIKELY (!res))
1971     goto parse_failed;
1972
1973   return GST_FLOW_OK;
1974
1975   /* ERRORS */
1976 no_caps:
1977   {
1978     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1979     return GST_FLOW_ERROR;
1980   }
1981 out_flushing:
1982   {
1983     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1984     return GST_FLOW_FLUSHING;
1985   }
1986 parse_failed:
1987   {
1988     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1989     return GST_FLOW_ERROR;
1990   }
1991 }
1992
1993 /* call with jbuf lock held */
1994 static GstMessage *
1995 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1996 {
1997   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1998   GstMessage *message = NULL;
1999
2000   if (percent == -1)
2001     return NULL;
2002
2003   /* Post a buffering message */
2004   if (priv->last_percent != percent) {
2005     priv->last_percent = percent;
2006     message =
2007         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
2008     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
2009   }
2010
2011   return message;
2012 }
2013
2014 static void
2015 update_offset (GstRtpJitterBuffer * jitterbuffer)
2016 {
2017   GstRtpJitterBufferPrivate *priv;
2018
2019   priv = jitterbuffer->priv;
2020
2021   if (priv->ts_offset_remainder != 0) {
2022     GST_DEBUG ("adjustment %" G_GUINT64_FORMAT " remain %" G_GINT64_FORMAT
2023         " off %" G_GINT64_FORMAT, priv->max_ts_offset_adjustment,
2024         priv->ts_offset_remainder, priv->ts_offset);
2025     if (ABS (priv->ts_offset_remainder) > priv->max_ts_offset_adjustment) {
2026       if (priv->ts_offset_remainder > 0) {
2027         priv->ts_offset += priv->max_ts_offset_adjustment;
2028         priv->ts_offset_remainder -= priv->max_ts_offset_adjustment;
2029       } else {
2030         priv->ts_offset -= priv->max_ts_offset_adjustment;
2031         priv->ts_offset_remainder += priv->max_ts_offset_adjustment;
2032       }
2033     } else {
2034       priv->ts_offset += priv->ts_offset_remainder;
2035       priv->ts_offset_remainder = 0;
2036     }
2037   }
2038 }
2039
2040 static GstClockTime
2041 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
2042 {
2043   GstRtpJitterBufferPrivate *priv;
2044
2045   priv = jitterbuffer->priv;
2046
2047   if (timestamp == -1)
2048     return -1;
2049
2050   /* apply the timestamp offset, this is used for inter stream sync */
2051   timestamp += priv->ts_offset;
2052   /* add the offset, this is used when buffering */
2053   timestamp += priv->out_offset;
2054
2055   return timestamp;
2056 }
2057
2058 static TimerQueue *
2059 timer_queue_new (void)
2060 {
2061   TimerQueue *queue;
2062
2063   queue = g_slice_new (TimerQueue);
2064   queue->timers = g_queue_new ();
2065   queue->hashtable = g_hash_table_new (NULL, NULL);
2066
2067   return queue;
2068 }
2069
2070 static void
2071 timer_queue_free (TimerQueue * queue)
2072 {
2073   if (!queue)
2074     return;
2075
2076   g_hash_table_destroy (queue->hashtable);
2077   g_queue_free_full (queue->timers, g_free);
2078   g_slice_free (TimerQueue, queue);
2079 }
2080
2081 static void
2082 timer_queue_append (TimerQueue * queue, const TimerData * timer,
2083     GstClockTime timeout, gboolean lost)
2084 {
2085   TimerData *copy;
2086
2087   copy = g_memdup (timer, sizeof (*timer));
2088   copy->timeout = timeout;
2089   copy->type = lost ? TIMER_TYPE_LOST : TIMER_TYPE_EXPECTED;
2090   copy->idx = -1;
2091
2092   GST_LOG ("Append rtx-stats timer #%d, %" GST_TIME_FORMAT,
2093       copy->seqnum, GST_TIME_ARGS (copy->timeout));
2094   g_queue_push_tail (queue->timers, copy);
2095   g_hash_table_insert (queue->hashtable, GINT_TO_POINTER (copy->seqnum), copy);
2096 }
2097
2098 static void
2099 timer_queue_clear_until (TimerQueue * queue, GstClockTime timeout)
2100 {
2101   TimerData *test;
2102
2103   test = g_queue_peek_head (queue->timers);
2104   while (test && test->timeout < timeout) {
2105     GST_LOG ("Pop rtx-stats timer #%d, %" GST_TIME_FORMAT " < %"
2106         GST_TIME_FORMAT, test->seqnum, GST_TIME_ARGS (test->timeout),
2107         GST_TIME_ARGS (timeout));
2108     g_hash_table_remove (queue->hashtable, GINT_TO_POINTER (test->seqnum));
2109     g_free (g_queue_pop_head (queue->timers));
2110     test = g_queue_peek_head (queue->timers);
2111   }
2112 }
2113
2114 static TimerData *
2115 timer_queue_find (TimerQueue * queue, guint16 seqnum)
2116 {
2117   return g_hash_table_lookup (queue->hashtable, GINT_TO_POINTER (seqnum));
2118 }
2119
2120 static TimerData *
2121 find_timer (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2122 {
2123   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2124   TimerData *timer = NULL;
2125   gint i, len;
2126
2127   len = priv->timers->len;
2128   for (i = 0; i < len; i++) {
2129     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2130     if (test->seqnum == seqnum) {
2131       timer = test;
2132       break;
2133     }
2134   }
2135   return timer;
2136 }
2137
2138 static void
2139 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
2140 {
2141   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2142
2143   if (priv->clock_id) {
2144     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
2145     gst_clock_id_unschedule (priv->clock_id);
2146     priv->clock_id = NULL;
2147   }
2148 }
2149
2150 static GstClockTime
2151 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2152 {
2153   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2154   GstClockTime test_timeout;
2155
2156   if ((test_timeout = timer->timeout) == -1)
2157     return -1;
2158
2159   if (timer->type != TIMER_TYPE_EXPECTED) {
2160     /* add our latency and offset to get output times. */
2161     test_timeout = apply_offset (jitterbuffer, test_timeout);
2162     test_timeout += priv->latency_ns;
2163   }
2164   return test_timeout;
2165 }
2166
2167 static void
2168 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2169 {
2170   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2171
2172   if (priv->clock_id) {
2173     GstClockTime timeout = get_timeout (jitterbuffer, timer);
2174
2175     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
2176         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
2177
2178     if (timeout == -1 || timeout < priv->timer_timeout)
2179       unschedule_current_timer (jitterbuffer);
2180   }
2181 }
2182
2183 static TimerData *
2184 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2185     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
2186     GstClockTime duration)
2187 {
2188   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2189   TimerData *timer;
2190   gint len;
2191
2192   GST_DEBUG_OBJECT (jitterbuffer,
2193       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
2194       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
2195       GST_TIME_ARGS (delay));
2196
2197   len = priv->timers->len;
2198   g_array_set_size (priv->timers, len + 1);
2199   timer = &g_array_index (priv->timers, TimerData, len);
2200   timer->idx = len;
2201   timer->type = type;
2202   timer->seqnum = seqnum;
2203   timer->num = num;
2204   timer->timeout = timeout + delay;
2205   timer->duration = duration;
2206   if (type == TIMER_TYPE_EXPECTED) {
2207     timer->rtx_base = timeout;
2208     timer->rtx_delay = delay;
2209     timer->rtx_retry = 0;
2210   }
2211   timer->rtx_last = GST_CLOCK_TIME_NONE;
2212   timer->num_rtx_retry = 0;
2213   timer->num_rtx_received = 0;
2214   recalculate_timer (jitterbuffer, timer);
2215   JBUF_SIGNAL_TIMER (priv);
2216
2217   return timer;
2218 }
2219
2220 static void
2221 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2222     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
2223 {
2224   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2225   gboolean seqchange, timechange;
2226   guint16 oldseq;
2227   GstClockTime new_timeout;
2228
2229   oldseq = timer->seqnum;
2230   new_timeout = timeout + delay;
2231   seqchange = oldseq != seqnum;
2232   timechange = timer->timeout != new_timeout;
2233
2234   if (!seqchange && !timechange) {
2235     GST_DEBUG_OBJECT (jitterbuffer,
2236         "No changes in seqnum (%d) and timeout (%" GST_TIME_FORMAT
2237         "), skipping", oldseq, GST_TIME_ARGS (timer->timeout));
2238     return;
2239   }
2240
2241   GST_DEBUG_OBJECT (jitterbuffer,
2242       "replace timer %d for seqnum %d->%d timeout %" GST_TIME_FORMAT
2243       "->%" GST_TIME_FORMAT, timer->type, oldseq, seqnum,
2244       GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (new_timeout));
2245
2246   timer->timeout = new_timeout;
2247   timer->seqnum = seqnum;
2248   if (reset) {
2249     GST_DEBUG_OBJECT (jitterbuffer, "reset rtx delay %" GST_TIME_FORMAT
2250         "->%" GST_TIME_FORMAT, GST_TIME_ARGS (timer->rtx_delay),
2251         GST_TIME_ARGS (delay));
2252     timer->rtx_base = timeout;
2253     timer->rtx_delay = delay;
2254     timer->rtx_retry = 0;
2255   }
2256   if (seqchange) {
2257     timer->num_rtx_retry = 0;
2258     timer->num_rtx_received = 0;
2259   }
2260
2261   if (priv->clock_id) {
2262     /* we changed the seqnum and there is a timer currently waiting with this
2263      * seqnum, unschedule it */
2264     if (seqchange && priv->timer_seqnum == oldseq)
2265       unschedule_current_timer (jitterbuffer);
2266     /* we changed the time, check if it is earlier than what we are waiting
2267      * for and unschedule if so */
2268     else if (timechange)
2269       recalculate_timer (jitterbuffer, timer);
2270   }
2271 }
2272
2273 static TimerData *
2274 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2275     guint16 seqnum, GstClockTime timeout)
2276 {
2277   TimerData *timer;
2278
2279   /* find the seqnum timer */
2280   timer = find_timer (jitterbuffer, seqnum);
2281   if (timer == NULL) {
2282     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
2283   } else {
2284     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
2285   }
2286   return timer;
2287 }
2288
2289 static void
2290 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2291 {
2292   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2293   guint idx;
2294
2295   if (timer->idx == -1)
2296     return;
2297
2298   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
2299     unschedule_current_timer (jitterbuffer);
2300
2301   idx = timer->idx;
2302   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
2303   g_array_remove_index_fast (priv->timers, idx);
2304   timer->idx = idx;
2305
2306   JBUF_SIGNAL_TIMER (priv);
2307 }
2308
2309 static void
2310 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
2311 {
2312   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2313   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
2314   g_array_set_size (priv->timers, 0);
2315   unschedule_current_timer (jitterbuffer);
2316   JBUF_SIGNAL_TIMER (priv);
2317 }
2318
2319 /* get the extra delay to wait before sending RTX */
2320 static GstClockTime
2321 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2322 {
2323   GstClockTime delay;
2324
2325   if (priv->rtx_delay == -1) {
2326     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2327       delay = DEFAULT_AUTO_RTX_DELAY;
2328     } else {
2329       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2330        * packet spacing is a good margin */
2331       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2332     }
2333   } else {
2334     delay = priv->rtx_delay * GST_MSECOND;
2335   }
2336   if (priv->rtx_min_delay > 0)
2337     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2338
2339   return delay;
2340 }
2341
2342 /* Check if packet with seqnum is already considered definitely lost by being
2343  * part of a "lost timer" for multiple packets */
2344 static gboolean
2345 already_lost (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2346 {
2347   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2348   gint i, len;
2349
2350   len = priv->timers->len;
2351   for (i = 0; i < len; i++) {
2352     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2353     gint gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2354
2355     if (test->num > 1 && test->type == TIMER_TYPE_LOST && gap >= 0 &&
2356         gap < test->num) {
2357       GST_DEBUG ("seqnum #%d already considered definitely lost (#%d->#%d)",
2358           seqnum, test->seqnum, (test->seqnum + test->num - 1) & 0xffff);
2359       return TRUE;
2360     }
2361   }
2362
2363   return FALSE;
2364 }
2365
2366 /* we just received a packet with seqnum and dts.
2367  *
2368  * First check for old seqnum that we are still expecting. If the gap with the
2369  * current seqnum is too big, unschedule the timeouts.
2370  *
2371  * If we have a valid packet spacing estimate we can set a timer for when we
2372  * should receive the next packet.
2373  * If we don't have a valid estimate, we remove any timer we might have
2374  * had for this packet.
2375  */
2376 static void
2377 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2378     GstClockTime dts, GstClockTime pts, gboolean do_next_seqnum,
2379     gboolean is_rtx, TimerData * timer)
2380 {
2381   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2382
2383   /* go through all timers and unschedule the ones with a large gap */
2384   if (priv->do_retransmission && priv->rtx_delay_reorder > 0) {
2385     gint i, len;
2386     len = priv->timers->len;
2387     for (i = 0; i < len; i++) {
2388       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2389       gint gap;
2390
2391       gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2392
2393       GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d",
2394           test->type, test->seqnum, seqnum, gap);
2395
2396       if (gap > priv->rtx_delay_reorder) {
2397         /* max gap, we exceeded the max reorder distance and we don't expect the
2398          * missing packet to be this reordered */
2399         if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
2400           reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
2401       }
2402     }
2403   }
2404
2405   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2406       && priv->do_retransmission && priv->rtx_next_seqnum;
2407
2408   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2409     if (timer->num_rtx_retry > 0) {
2410       if (is_rtx) {
2411         update_rtx_stats (jitterbuffer, timer, dts, TRUE);
2412         /* don't try to estimate the next seqnum because this is a retransmitted
2413          * packet and it probably did not arrive with the expected packet
2414          * spacing. */
2415         do_next_seqnum = FALSE;
2416       }
2417
2418       if (!is_rtx || timer->num_rtx_retry > 1) {
2419         /* Store timer in order to record stats when/if the retransmitted
2420          * packet arrives. We should also store timer information if we've
2421          * requested retransmission more than once since we may receive
2422          * several retransmitted packets. For accuracy we should update the
2423          * stats also when the redundant retransmitted packets arrives. */
2424         timer_queue_append (priv->rtx_stats_timers, timer,
2425             pts + priv->rtx_stats_timeout * GST_MSECOND, FALSE);
2426       }
2427     }
2428   }
2429
2430   if (do_next_seqnum && pts != GST_CLOCK_TIME_NONE) {
2431     GstClockTime expected, delay;
2432
2433     /* calculate expected arrival time of the next seqnum */
2434     expected = pts + priv->packet_spacing;
2435
2436     delay = get_rtx_delay (priv);
2437
2438     /* and update/install timer for next seqnum */
2439     GST_DEBUG_OBJECT (jitterbuffer, "Add RTX timer #%d, expected %"
2440         GST_TIME_FORMAT ", delay %" GST_TIME_FORMAT ", packet-spacing %"
2441         GST_TIME_FORMAT ", jitter %" GST_TIME_FORMAT, priv->next_in_seqnum,
2442         GST_TIME_ARGS (expected), GST_TIME_ARGS (delay),
2443         GST_TIME_ARGS (priv->packet_spacing), GST_TIME_ARGS (priv->avg_jitter));
2444
2445     if (timer) {
2446       timer->type = TIMER_TYPE_EXPECTED;
2447       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2448           delay, TRUE);
2449     } else {
2450       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2451           expected, delay, priv->packet_spacing);
2452     }
2453   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2454     /* if we had a timer, remove it, we don't know when to expect the next
2455      * packet. */
2456     remove_timer (jitterbuffer, timer);
2457   }
2458 }
2459
2460 static void
2461 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2462     GstClockTime pts)
2463 {
2464   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2465
2466   /* we need consecutive seqnums with a different
2467    * rtptime to estimate the packet spacing. */
2468   if (priv->ips_rtptime != rtptime) {
2469     /* rtptime changed, check pts diff */
2470     if (priv->ips_pts != -1 && pts != -1 && pts > priv->ips_pts) {
2471       GstClockTime new_packet_spacing = pts - priv->ips_pts;
2472       GstClockTime old_packet_spacing = priv->packet_spacing;
2473
2474       /* Biased towards bigger packet spacings to prevent
2475        * too many unneeded retransmission requests for next
2476        * packets that just arrive a little later than we would
2477        * expect */
2478       if (old_packet_spacing > new_packet_spacing)
2479         priv->packet_spacing =
2480             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2481       else if (old_packet_spacing > 0)
2482         priv->packet_spacing =
2483             (3 * new_packet_spacing + old_packet_spacing) / 4;
2484       else
2485         priv->packet_spacing = new_packet_spacing;
2486
2487       GST_DEBUG_OBJECT (jitterbuffer,
2488           "new packet spacing %" GST_TIME_FORMAT
2489           " old packet spacing %" GST_TIME_FORMAT
2490           " combined to %" GST_TIME_FORMAT,
2491           GST_TIME_ARGS (new_packet_spacing),
2492           GST_TIME_ARGS (old_packet_spacing),
2493           GST_TIME_ARGS (priv->packet_spacing));
2494     }
2495     priv->ips_rtptime = rtptime;
2496     priv->ips_pts = pts;
2497   }
2498 }
2499
2500 static void
2501 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2502     guint16 seqnum, GstClockTime pts, gint gap)
2503 {
2504   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2505   GstClockTime duration, expected_pts, delay;
2506   TimerType type;
2507   gboolean equidistant = priv->equidistant > 0;
2508
2509   GST_DEBUG_OBJECT (jitterbuffer,
2510       "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2511       GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_in_pts));
2512
2513   if (pts == GST_CLOCK_TIME_NONE) {
2514     GST_WARNING_OBJECT (jitterbuffer, "Have no PTS");
2515     return;
2516   }
2517
2518   if (equidistant) {
2519     GstClockTime total_duration;
2520     /* the total duration spanned by the missing packets */
2521     if (pts >= priv->last_in_pts)
2522       total_duration = pts - priv->last_in_pts;
2523     else
2524       total_duration = 0;
2525
2526     /* interpolate between the current time and the last time based on
2527      * number of packets we are missing, this is the estimated duration
2528      * for the missing packet based on equidistant packet spacing. */
2529     duration = total_duration / (gap + 1);
2530
2531     GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2532         GST_TIME_ARGS (duration));
2533
2534     if (total_duration > priv->latency_ns) {
2535       GstClockTime gap_time;
2536       guint lost_packets;
2537
2538       if (duration > 0) {
2539         GstClockTime gap_dur = gap * duration;
2540         if (gap_dur > priv->latency_ns)
2541           gap_time = gap_dur - priv->latency_ns;
2542         else
2543           gap_time = 0;
2544         lost_packets = gap_time / duration;
2545       } else {
2546         gap_time = total_duration - priv->latency_ns;
2547         lost_packets = gap;
2548       }
2549
2550       /* too many lost packets, some of the missing packets are already
2551        * too late and we can generate lost packet events for them. */
2552       GST_INFO_OBJECT (jitterbuffer,
2553           "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
2554           " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
2555           gap, expected, seqnum - 1, GST_TIME_ARGS (total_duration),
2556           GST_TIME_ARGS (priv->latency_ns), lost_packets,
2557           GST_TIME_ARGS (gap_time));
2558
2559       /* this timer will fire immediately and the lost event will be pushed from
2560        * the timer thread */
2561       if (lost_packets > 0) {
2562         add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2563             priv->last_in_pts + duration, 0, gap_time);
2564         expected += lost_packets;
2565         priv->last_in_pts += gap_time;
2566       }
2567     }
2568
2569     expected_pts = priv->last_in_pts + duration;
2570   } else {
2571     /* If we cannot assume equidistant packet spacing, the only thing we now
2572      * for sure is that the missing packets have expected pts not later than
2573      * the last received pts. */
2574     duration = 0;
2575     expected_pts = pts;
2576   }
2577
2578   delay = 0;
2579
2580   if (priv->do_retransmission) {
2581     TimerData *timer = find_timer (jitterbuffer, expected);
2582
2583     type = TIMER_TYPE_EXPECTED;
2584     delay = get_rtx_delay (priv);
2585
2586     /* if we had a timer for the first missing packet, update it. */
2587     if (timer && timer->type == TIMER_TYPE_EXPECTED) {
2588       GstClockTime timeout = timer->timeout;
2589
2590       timer->duration = duration;
2591       if (timeout > (expected_pts + delay) && timer->num_rtx_retry == 0) {
2592         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_pts,
2593             delay, TRUE);
2594       }
2595       expected++;
2596       expected_pts += duration;
2597     }
2598   } else {
2599     type = TIMER_TYPE_LOST;
2600   }
2601
2602   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2603     add_timer (jitterbuffer, type, expected, 0, expected_pts, delay, duration);
2604     expected_pts += duration;
2605     expected++;
2606   }
2607 }
2608
2609 static void
2610 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2611     guint32 rtptime)
2612 {
2613   gint32 rtpdiff;
2614   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2615   GstRtpJitterBufferPrivate *priv;
2616
2617   priv = jitterbuffer->priv;
2618
2619   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2620     goto no_time;
2621
2622   if (priv->last_dts != -1)
2623     dtsdiff = dts - priv->last_dts;
2624   else
2625     dtsdiff = 0;
2626
2627   if (priv->last_rtptime != -1)
2628     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2629   else
2630     rtpdiff = 0;
2631
2632   /* Guess whether stream currently uses equidistant packet spacing. If we
2633    * often see identical timestamps it means the packets are not
2634    * equidistant. */
2635   if (rtptime == priv->last_rtptime)
2636     priv->equidistant -= 2;
2637   else
2638     priv->equidistant += 1;
2639   priv->equidistant = CLAMP (priv->equidistant, -7, 7);
2640
2641   priv->last_dts = dts;
2642   priv->last_rtptime = rtptime;
2643
2644   if (rtpdiff > 0)
2645     rtpdiffns =
2646         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2647   else
2648     rtpdiffns =
2649         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2650
2651   diff = ABS (dtsdiff - rtpdiffns);
2652
2653   /* jitter is stored in nanoseconds */
2654   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2655
2656   GST_LOG_OBJECT (jitterbuffer,
2657       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2658       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2659       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2660       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2661
2662   return;
2663
2664   /* ERRORS */
2665 no_time:
2666   {
2667     GST_DEBUG_OBJECT (jitterbuffer,
2668         "no dts or no clock-rate, can't calculate jitter");
2669     return;
2670   }
2671 }
2672
2673 static gint
2674 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2675 {
2676   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2677   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2678   guint seq_a, seq_b;
2679
2680   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2681   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2682   gst_rtp_buffer_unmap (&rtp_a);
2683
2684   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2685   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2686   gst_rtp_buffer_unmap (&rtp_b);
2687
2688   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2689 }
2690
2691 static gboolean
2692 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, GstBuffer * buffer,
2693     guint8 pt, guint16 seqnum, gint gap, guint max_dropout, guint max_misorder)
2694 {
2695   GstRtpJitterBufferPrivate *priv;
2696   guint gap_packets_length;
2697   gboolean reset = FALSE;
2698   gboolean future = gap > 0;
2699
2700   priv = jitterbuffer->priv;
2701
2702   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2703     GList *l;
2704     guint32 prev_gap_seq = -1;
2705     gboolean all_consecutive = TRUE;
2706
2707     g_queue_insert_sorted (&priv->gap_packets, buffer,
2708         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2709
2710     for (l = priv->gap_packets.head; l; l = l->next) {
2711       GstBuffer *gap_buffer = l->data;
2712       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2713       guint32 gap_seq;
2714
2715       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2716
2717       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2718
2719       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2720       if (prev_gap_seq == -1)
2721         prev_gap_seq = gap_seq;
2722       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2723         all_consecutive = FALSE;
2724       else
2725         prev_gap_seq = gap_seq;
2726
2727       gst_rtp_buffer_unmap (&gap_rtp);
2728       if (!all_consecutive)
2729         break;
2730     }
2731
2732     if (all_consecutive && gap_packets_length > 3) {
2733       GST_DEBUG_OBJECT (jitterbuffer,
2734           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2735           (future ? "new" : "old"), gap,
2736           (future ? max_dropout : -max_misorder));
2737       reset = TRUE;
2738     } else if (!all_consecutive) {
2739       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2740       g_queue_clear (&priv->gap_packets);
2741       GST_DEBUG_OBJECT (jitterbuffer,
2742           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2743           (future ? "new" : "old"), gap,
2744           (future ? max_dropout : -max_misorder));
2745       buffer = NULL;
2746     } else {
2747       GST_DEBUG_OBJECT (jitterbuffer,
2748           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2749           (future ? "new" : "old"), gap,
2750           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2751       buffer = NULL;
2752     }
2753   } else {
2754     GST_DEBUG_OBJECT (jitterbuffer,
2755         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2756         gap, -max_misorder);
2757     g_queue_push_tail (&priv->gap_packets, buffer);
2758     buffer = NULL;
2759   }
2760
2761   return reset;
2762 }
2763
2764 static GstClockTime
2765 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2766 {
2767   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2768   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2769
2770   if (clock) {
2771     GstClockTime base_time =
2772         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2773     GstClockTime clock_time = gst_clock_get_time (clock);
2774
2775     if (clock_time > base_time)
2776       running_time = clock_time - base_time;
2777     else
2778       running_time = 0;
2779
2780     gst_object_unref (clock);
2781   }
2782
2783   return running_time;
2784 }
2785
2786 static GstFlowReturn
2787 gst_rtp_jitter_buffer_reset (GstRtpJitterBuffer * jitterbuffer,
2788     GstPad * pad, GstObject * parent, guint16 seqnum)
2789 {
2790   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2791   GstFlowReturn ret = GST_FLOW_OK;
2792   GList *events = NULL, *l;
2793   GList *buffers;
2794   gboolean head;
2795
2796   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2797   rtp_jitter_buffer_flush (priv->jbuf,
2798       (GFunc) free_item_and_retain_events, &events);
2799   rtp_jitter_buffer_reset_skew (priv->jbuf);
2800   remove_all_timers (jitterbuffer);
2801   priv->discont = TRUE;
2802   priv->last_popped_seqnum = -1;
2803
2804   if (priv->gap_packets.head) {
2805     GstBuffer *gap_buffer = priv->gap_packets.head->data;
2806     GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2807
2808     gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2809     priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2810     gst_rtp_buffer_unmap (&gap_rtp);
2811   } else {
2812     priv->next_seqnum = seqnum;
2813   }
2814
2815   priv->last_in_pts = -1;
2816   priv->next_in_seqnum = -1;
2817
2818   /* Insert all sticky events again in order, otherwise we would
2819    * potentially loose STREAM_START, CAPS or SEGMENT events
2820    */
2821   events = g_list_reverse (events);
2822   for (l = events; l; l = l->next) {
2823     RTPJitterBufferItem *item;
2824
2825     item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2826     rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2827   }
2828   g_list_free (events);
2829
2830   JBUF_SIGNAL_EVENT (priv);
2831
2832   /* reset spacing estimation when gap */
2833   priv->ips_rtptime = -1;
2834   priv->ips_pts = GST_CLOCK_TIME_NONE;
2835
2836   buffers = g_list_copy (priv->gap_packets.head);
2837   g_queue_clear (&priv->gap_packets);
2838
2839   priv->ips_rtptime = -1;
2840   priv->ips_pts = GST_CLOCK_TIME_NONE;
2841   JBUF_UNLOCK (jitterbuffer->priv);
2842
2843   for (l = buffers; l; l = l->next) {
2844     ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2845     l->data = NULL;
2846     if (ret != GST_FLOW_OK) {
2847       l = l->next;
2848       break;
2849     }
2850   }
2851   for (; l; l = l->next)
2852     gst_buffer_unref (l->data);
2853   g_list_free (buffers);
2854
2855   return ret;
2856 }
2857
2858 static gboolean
2859 gst_rtp_jitter_buffer_fast_start (GstRtpJitterBuffer * jitterbuffer)
2860 {
2861   GstRtpJitterBufferPrivate *priv;
2862   RTPJitterBufferItem *item;
2863   TimerData *timer;
2864
2865   priv = jitterbuffer->priv;
2866
2867   if (priv->faststart_min_packets == 0)
2868     return FALSE;
2869
2870   item = rtp_jitter_buffer_peek (priv->jbuf);
2871   if (!item)
2872     return FALSE;
2873
2874   timer = find_timer (jitterbuffer, item->seqnum);
2875   if (!timer || timer->type != TIMER_TYPE_DEADLINE)
2876     return FALSE;
2877
2878   if (rtp_jitter_buffer_can_fast_start (priv->jbuf,
2879           priv->faststart_min_packets)) {
2880     GST_INFO_OBJECT (jitterbuffer, "We found %i consecutive packet, start now",
2881         priv->faststart_min_packets);
2882     timer->timeout = -1;
2883     return TRUE;
2884   }
2885
2886   return FALSE;
2887 }
2888
2889 static GstFlowReturn
2890 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2891     GstBuffer * buffer)
2892 {
2893   GstRtpJitterBuffer *jitterbuffer;
2894   GstRtpJitterBufferPrivate *priv;
2895   guint16 seqnum;
2896   guint32 expected, rtptime;
2897   GstFlowReturn ret = GST_FLOW_OK;
2898   GstClockTime dts, pts;
2899   guint64 latency_ts;
2900   gboolean head;
2901   gint percent = -1;
2902   guint8 pt;
2903   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2904   gboolean do_next_seqnum = FALSE;
2905   RTPJitterBufferItem *item;
2906   GstMessage *msg = NULL;
2907   gboolean estimated_dts = FALSE;
2908   gint32 packet_rate, max_dropout, max_misorder;
2909   TimerData *timer = NULL;
2910
2911   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2912
2913   priv = jitterbuffer->priv;
2914
2915   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2916     goto invalid_buffer;
2917
2918   pt = gst_rtp_buffer_get_payload_type (&rtp);
2919   seqnum = gst_rtp_buffer_get_seq (&rtp);
2920   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2921   gst_rtp_buffer_unmap (&rtp);
2922
2923   /* make sure we have PTS and DTS set */
2924   pts = GST_BUFFER_PTS (buffer);
2925   dts = GST_BUFFER_DTS (buffer);
2926   if (dts == -1)
2927     dts = pts;
2928   else if (pts == -1)
2929     pts = dts;
2930
2931   if (dts == -1) {
2932     /* If we have no DTS here, i.e. no capture time, get one from the
2933      * clock now to have something to calculate with in the future. */
2934     dts = get_current_running_time (jitterbuffer);
2935     pts = dts;
2936
2937     /* Remember that we estimated the DTS if we are running already
2938      * and this is not our first packet (or first packet after a reset).
2939      * If it's the first packet, we somehow must generate a timestamp for
2940      * everything, otherwise we can't calculate any times
2941      */
2942     estimated_dts = (priv->next_in_seqnum != -1);
2943   } else {
2944     /* take the DTS of the buffer. This is the time when the packet was
2945      * received and is used to calculate jitter and clock skew. We will adjust
2946      * this DTS with the smoothed value after processing it in the
2947      * jitterbuffer and assign it as the PTS. */
2948     /* bring to running time */
2949     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2950   }
2951
2952   GST_DEBUG_OBJECT (jitterbuffer,
2953       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d, rtx %d",
2954       seqnum, GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer),
2955       GST_BUFFER_IS_RETRANSMISSION (buffer));
2956
2957   JBUF_LOCK_CHECK (priv, out_flushing);
2958
2959   if (G_UNLIKELY (priv->last_pt != pt)) {
2960     GstCaps *caps;
2961
2962     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2963         pt);
2964
2965     priv->last_pt = pt;
2966     /* reset clock-rate so that we get a new one */
2967     priv->clock_rate = -1;
2968
2969     /* Try to get the clock-rate from the caps first if we can. If there are no
2970      * caps we must fire the signal to get the clock-rate. */
2971     if ((caps = gst_pad_get_current_caps (pad))) {
2972       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps, pt);
2973       gst_caps_unref (caps);
2974     }
2975   }
2976
2977   if (G_UNLIKELY (priv->clock_rate == -1)) {
2978     /* no clock rate given on the caps, try to get one with the signal */
2979     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2980             pt) == GST_FLOW_FLUSHING)
2981       goto out_flushing;
2982
2983     if (G_UNLIKELY (priv->clock_rate == -1))
2984       goto no_clock_rate;
2985
2986     gst_rtp_packet_rate_ctx_reset (&priv->packet_rate_ctx, priv->clock_rate);
2987   }
2988
2989   /* don't accept more data on EOS */
2990   if (G_UNLIKELY (priv->eos))
2991     goto have_eos;
2992
2993   if (!GST_BUFFER_IS_RETRANSMISSION (buffer))
2994     calculate_jitter (jitterbuffer, dts, rtptime);
2995
2996   if (priv->seqnum_base != -1) {
2997     gint gap;
2998
2999     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
3000
3001     if (gap < 0) {
3002       GST_DEBUG_OBJECT (jitterbuffer,
3003           "packet seqnum #%d before seqnum-base #%d", seqnum,
3004           priv->seqnum_base);
3005       gst_buffer_unref (buffer);
3006       goto finished;
3007     } else if (gap > 16384) {
3008       /* From now on don't compare against the seqnum base anymore as
3009        * at some point in the future we will wrap around and also that
3010        * much reordering is very unlikely */
3011       priv->seqnum_base = -1;
3012     }
3013   }
3014
3015   expected = priv->next_in_seqnum;
3016
3017   packet_rate =
3018       gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
3019   max_dropout =
3020       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
3021       priv->max_dropout_time);
3022   max_misorder =
3023       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
3024       priv->max_misorder_time);
3025   GST_TRACE_OBJECT (jitterbuffer,
3026       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
3027       max_dropout, max_misorder);
3028
3029   /* now check against our expected seqnum */
3030   if (G_UNLIKELY (expected == -1)) {
3031     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3032
3033     /* calculate a pts based on rtptime and arrival time (dts) */
3034     pts =
3035         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3036         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3037
3038     /* we don't know what the next_in_seqnum should be, wait for the last
3039      * possible moment to push this buffer, maybe we get an earlier seqnum
3040      * while we wait */
3041     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
3042
3043     do_next_seqnum = TRUE;
3044     /* take rtptime and pts to calculate packet spacing */
3045     priv->ips_rtptime = rtptime;
3046     priv->ips_pts = pts;
3047
3048   } else {
3049     gint gap;
3050     /* now calculate gap */
3051     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
3052     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
3053         expected, seqnum, gap);
3054
3055     if (G_UNLIKELY (gap > 0 && priv->timers->len >= max_dropout)) {
3056       /* If we have timers for more than RTP_MAX_DROPOUT packets
3057        * pending this means that we have a huge gap overall. We can
3058        * reset the jitterbuffer at this point because there's
3059        * just too much data missing to be able to do anything
3060        * sensible with the past data. Just try again from the
3061        * next packet */
3062       GST_WARNING_OBJECT (jitterbuffer, "%d pending timers > %d - resetting",
3063           priv->timers->len, max_dropout);
3064       gst_buffer_unref (buffer);
3065       return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3066     }
3067
3068     /* Special handling of large gaps */
3069     if ((gap != -1 && gap < -max_misorder) || (gap >= max_dropout)) {
3070       gboolean reset = handle_big_gap_buffer (jitterbuffer, buffer, pt, seqnum,
3071           gap, max_dropout, max_misorder);
3072       if (reset) {
3073         return gst_rtp_jitter_buffer_reset (jitterbuffer, pad, parent, seqnum);
3074       } else {
3075         GST_DEBUG_OBJECT (jitterbuffer,
3076             "Had big gap, waiting for more consecutive packets");
3077         goto finished;
3078       }
3079     }
3080
3081     /* We had no huge gap, let's drop all the gap packets */
3082     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
3083     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3084     g_queue_clear (&priv->gap_packets);
3085
3086     /* calculate a pts based on rtptime and arrival time (dts) */
3087     /* If we estimated the DTS, don't consider it in the clock skew calculations */
3088     pts =
3089         rtp_jitter_buffer_calculate_pts (priv->jbuf, dts, estimated_dts,
3090         rtptime, gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer)));
3091
3092     if (G_LIKELY (gap == 0)) {
3093       /* packet is expected */
3094       calculate_packet_spacing (jitterbuffer, rtptime, pts);
3095       do_next_seqnum = TRUE;
3096     } else {
3097
3098       /* we have a gap */
3099       if (gap > 0) {
3100         GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
3101         /* fill in the gap with EXPECTED timers */
3102         calculate_expected (jitterbuffer, expected, seqnum, pts, gap);
3103         do_next_seqnum = TRUE;
3104       } else {
3105         GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
3106         do_next_seqnum = FALSE;
3107       }
3108
3109       /* reset spacing estimation when gap */
3110       priv->ips_rtptime = -1;
3111       priv->ips_pts = GST_CLOCK_TIME_NONE;
3112     }
3113   }
3114
3115   if (do_next_seqnum) {
3116     priv->last_in_pts = pts;
3117     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
3118   }
3119
3120   timer = find_timer (jitterbuffer, seqnum);
3121   if (GST_BUFFER_IS_RETRANSMISSION (buffer)) {
3122     if (!timer)
3123       timer = timer_queue_find (priv->rtx_stats_timers, seqnum);
3124     if (timer)
3125       timer->num_rtx_received++;
3126   }
3127
3128   /* let's check if this buffer is too late, we can only accept packets with
3129    * bigger seqnum than the one we last pushed. */
3130   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
3131     gint gap;
3132
3133     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
3134
3135     /* priv->last_popped_seqnum >= seqnum, we're too late. */
3136     if (G_UNLIKELY (gap <= 0)) {
3137       if (priv->do_retransmission) {
3138         if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer) {
3139           update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3140           /* Only count the retranmitted packet too late if it has been
3141            * considered lost. If the original packet arrived before the
3142            * retransmitted we just count it as a duplicate. */
3143           if (timer->type != TIMER_TYPE_LOST)
3144             goto rtx_duplicate;
3145         }
3146       }
3147       goto too_late;
3148     }
3149   }
3150
3151   if (already_lost (jitterbuffer, seqnum))
3152     goto already_lost;
3153
3154   /* let's drop oldest packet if the queue is already full and drop-on-latency
3155    * is set. We can only do this when there actually is a latency. When no
3156    * latency is set, we just pump it in the queue and let the other end push it
3157    * out as fast as possible. */
3158   if (priv->latency_ms && priv->drop_on_latency) {
3159     latency_ts =
3160         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
3161
3162     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
3163       RTPJitterBufferItem *old_item;
3164
3165       old_item = rtp_jitter_buffer_peek (priv->jbuf);
3166
3167       if (IS_DROPABLE (old_item)) {
3168         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3169         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
3170             old_item);
3171         priv->next_seqnum = (old_item->seqnum + old_item->count) & 0xffff;
3172         free_item (old_item);
3173       }
3174       /* we might have removed some head buffers, signal the pushing thread to
3175        * see if it can push now */
3176       JBUF_SIGNAL_EVENT (priv);
3177     }
3178   }
3179
3180   /* If we estimated the DTS, don't consider it in the clock skew calculations
3181    * later. The code above always sets dts to pts or the other way around if
3182    * any of those is valid in the buffer, so we know that if we estimated the
3183    * dts that both are unknown */
3184   if (estimated_dts)
3185     item =
3186         alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
3187         pts, seqnum, 1, rtptime);
3188   else
3189     item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
3190
3191   /* now insert the packet into the queue in sorted order. This function returns
3192    * FALSE if a packet with the same seqnum was already in the queue, meaning we
3193    * have a duplicate. */
3194   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item, &head,
3195               &percent))) {
3196     if (GST_BUFFER_IS_RETRANSMISSION (buffer) && timer)
3197       update_rtx_stats (jitterbuffer, timer, dts, FALSE);
3198     goto duplicate;
3199   }
3200
3201   /* Trigger fast start if needed */
3202   if (gst_rtp_jitter_buffer_fast_start (jitterbuffer))
3203     head = TRUE;
3204
3205   /* update timers */
3206   update_timers (jitterbuffer, seqnum, dts, pts, do_next_seqnum,
3207       GST_BUFFER_IS_RETRANSMISSION (buffer), timer);
3208
3209   /* we had an unhandled SR, handle it now */
3210   if (priv->last_sr)
3211     do_handle_sync (jitterbuffer);
3212
3213   if (G_UNLIKELY (head)) {
3214     /* signal addition of new buffer when the _loop is waiting. */
3215     if (G_LIKELY (priv->active))
3216       JBUF_SIGNAL_EVENT (priv);
3217
3218     /* let's unschedule and unblock any waiting buffers. We only want to do this
3219      * when the head buffer changed */
3220     if (G_UNLIKELY (priv->clock_id)) {
3221       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
3222       unschedule_current_timer (jitterbuffer);
3223     }
3224   }
3225
3226   GST_DEBUG_OBJECT (jitterbuffer,
3227       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
3228       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
3229
3230   msg = check_buffering_percent (jitterbuffer, percent);
3231
3232 finished:
3233   JBUF_UNLOCK (priv);
3234
3235   if (msg)
3236     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3237
3238   return ret;
3239
3240   /* ERRORS */
3241 invalid_buffer:
3242   {
3243     /* this is not fatal but should be filtered earlier */
3244     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3245         ("Received invalid RTP payload, dropping"));
3246     gst_buffer_unref (buffer);
3247     return GST_FLOW_OK;
3248   }
3249 no_clock_rate:
3250   {
3251     GST_WARNING_OBJECT (jitterbuffer,
3252         "No clock-rate in caps!, dropping buffer");
3253     gst_buffer_unref (buffer);
3254     goto finished;
3255   }
3256 out_flushing:
3257   {
3258     ret = priv->srcresult;
3259     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
3260     gst_buffer_unref (buffer);
3261     goto finished;
3262   }
3263 have_eos:
3264   {
3265     ret = GST_FLOW_EOS;
3266     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
3267     gst_buffer_unref (buffer);
3268     goto finished;
3269   }
3270 too_late:
3271   {
3272     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
3273         " popped, dropping", seqnum, priv->last_popped_seqnum);
3274     priv->num_late++;
3275     gst_buffer_unref (buffer);
3276     goto finished;
3277   }
3278 already_lost:
3279   {
3280     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d too late as it was already "
3281         "considered lost", seqnum);
3282     priv->num_late++;
3283     gst_buffer_unref (buffer);
3284     goto finished;
3285   }
3286 duplicate:
3287   {
3288     GST_DEBUG_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
3289         seqnum);
3290     priv->num_duplicates++;
3291     free_item (item);
3292     goto finished;
3293   }
3294 rtx_duplicate:
3295   {
3296     GST_DEBUG_OBJECT (jitterbuffer,
3297         "Duplicate RTX packet #%d detected, dropping", seqnum);
3298     priv->num_duplicates++;
3299     gst_buffer_unref (buffer);
3300     goto finished;
3301   }
3302 }
3303
3304 /* FIXME: hopefully we can do something more efficient here, especially when
3305  * all packets are in order and/or outside of the currently cached range.
3306  * Still worthwhile to have it, avoids taking/releasing object lock and pad
3307  * stream lock for every single buffer in the default chain_list fallback. */
3308 static GstFlowReturn
3309 gst_rtp_jitter_buffer_chain_list (GstPad * pad, GstObject * parent,
3310     GstBufferList * buffer_list)
3311 {
3312   GstFlowReturn flow_ret = GST_FLOW_OK;
3313   guint i, n;
3314
3315   n = gst_buffer_list_length (buffer_list);
3316   for (i = 0; i < n; ++i) {
3317     GstBuffer *buf = gst_buffer_list_get (buffer_list, i);
3318
3319     flow_ret = gst_rtp_jitter_buffer_chain (pad, parent, gst_buffer_ref (buf));
3320
3321     if (flow_ret != GST_FLOW_OK)
3322       break;
3323   }
3324   gst_buffer_list_unref (buffer_list);
3325
3326   return flow_ret;
3327 }
3328
3329 static GstClockTime
3330 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
3331 {
3332   guint64 ext_time, elapsed;
3333   guint32 rtp_time;
3334   GstRtpJitterBufferPrivate *priv;
3335
3336   priv = jitterbuffer->priv;
3337   rtp_time = item->rtptime;
3338
3339   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
3340       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
3341
3342   ext_time = priv->ext_timestamp;
3343   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
3344   if (ext_time < priv->ext_timestamp) {
3345     ext_time = priv->ext_timestamp;
3346   } else {
3347     priv->ext_timestamp = ext_time;
3348   }
3349
3350   if (ext_time > priv->clock_base)
3351     elapsed = ext_time - priv->clock_base;
3352   else
3353     elapsed = 0;
3354
3355   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
3356   return elapsed;
3357 }
3358
3359 static void
3360 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
3361     RTPJitterBufferItem * item)
3362 {
3363   guint64 total, elapsed, left, estimated;
3364   GstClockTime out_time;
3365   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3366
3367   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
3368       || priv->clock_base == -1 || priv->clock_rate <= 0)
3369     return;
3370
3371   /* compute the elapsed time */
3372   elapsed = compute_elapsed (jitterbuffer, item);
3373
3374   /* do nothing if elapsed time doesn't increment */
3375   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
3376     return;
3377
3378   priv->last_elapsed = elapsed;
3379
3380   /* this is the total time we need to play */
3381   total = priv->npt_stop - priv->npt_start;
3382   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
3383       GST_TIME_ARGS (total));
3384
3385   /* this is how much time there is left */
3386   if (total > elapsed)
3387     left = total - elapsed;
3388   else
3389     left = 0;
3390
3391   /* if we have less time left that the size of the buffer, we will not
3392    * be able to keep it filled, disabled buffering then */
3393   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
3394     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
3395         ", disable buffering close to EOS", GST_TIME_ARGS (left));
3396     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
3397   }
3398
3399   /* this is the current time as running-time */
3400   out_time = item->pts;
3401
3402   if (elapsed > 0)
3403     estimated = gst_util_uint64_scale (out_time, total, elapsed);
3404   else {
3405     /* if there is almost nothing left,
3406      * we may never advance enough to end up in the above case */
3407     if (total < GST_SECOND)
3408       estimated = GST_SECOND;
3409     else
3410       estimated = -1;
3411   }
3412   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
3413       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3414
3415   if (estimated != -1 && priv->estimated_eos != estimated) {
3416     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
3417     priv->estimated_eos = estimated;
3418   }
3419 }
3420
3421 /* take a buffer from the queue and push it */
3422 static GstFlowReturn
3423 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3424 {
3425   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3426   GstFlowReturn result = GST_FLOW_OK;
3427   RTPJitterBufferItem *item;
3428   GstBuffer *outbuf = NULL;
3429   GstEvent *outevent = NULL;
3430   GstQuery *outquery = NULL;
3431   GstClockTime dts, pts;
3432   gint percent = -1;
3433   gboolean do_push = TRUE;
3434   guint type;
3435   GstMessage *msg;
3436
3437   /* when we get here we are ready to pop and push the buffer */
3438   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3439   type = item->type;
3440
3441   switch (type) {
3442     case ITEM_TYPE_BUFFER:
3443
3444       /* we need to make writable to change the flags and timestamps */
3445       outbuf = gst_buffer_make_writable (item->data);
3446
3447       if (G_UNLIKELY (priv->discont)) {
3448         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3449          * into the jitterbuffer so we can modify now. */
3450         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3451         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3452         priv->discont = FALSE;
3453       }
3454       if (G_UNLIKELY (priv->ts_discont)) {
3455         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3456         priv->ts_discont = FALSE;
3457       }
3458
3459       dts =
3460           gst_segment_position_from_running_time (&priv->segment,
3461           GST_FORMAT_TIME, item->dts);
3462       pts =
3463           gst_segment_position_from_running_time (&priv->segment,
3464           GST_FORMAT_TIME, item->pts);
3465
3466       /* if this is a new frame, check if ts_offset needs to be updated */
3467       if (pts != priv->last_pts) {
3468         update_offset (jitterbuffer);
3469       }
3470
3471       /* apply timestamp with offset to buffer now */
3472       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3473       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3474
3475       /* update the elapsed time when we need to check against the npt stop time. */
3476       update_estimated_eos (jitterbuffer, item);
3477
3478       priv->last_pts = pts;
3479       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3480       break;
3481     case ITEM_TYPE_LOST:
3482       priv->discont = TRUE;
3483       if (!priv->do_lost)
3484         do_push = FALSE;
3485       /* FALLTHROUGH */
3486     case ITEM_TYPE_EVENT:
3487       outevent = item->data;
3488       break;
3489     case ITEM_TYPE_QUERY:
3490       outquery = item->data;
3491       break;
3492   }
3493
3494   /* now we are ready to push the buffer. Save the seqnum and release the lock
3495    * so the other end can push stuff in the queue again. */
3496   if (seqnum != -1) {
3497     priv->last_popped_seqnum = seqnum;
3498     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3499   }
3500   msg = check_buffering_percent (jitterbuffer, percent);
3501
3502   if (type == ITEM_TYPE_EVENT && outevent &&
3503       GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3504     g_assert (priv->eos);
3505     while (priv->timers->len > 0) {
3506       /* Stopping timers */
3507       unschedule_current_timer (jitterbuffer);
3508       JBUF_WAIT_TIMER (priv);
3509     }
3510   }
3511
3512   JBUF_UNLOCK (priv);
3513
3514   item->data = NULL;
3515   free_item (item);
3516
3517   if (msg)
3518     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3519
3520   switch (type) {
3521     case ITEM_TYPE_BUFFER:
3522       /* push buffer */
3523       GST_DEBUG_OBJECT (jitterbuffer,
3524           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3525           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3526           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3527       priv->num_pushed++;
3528       result = gst_pad_push (priv->srcpad, outbuf);
3529
3530       JBUF_LOCK_CHECK (priv, out_flushing);
3531       break;
3532     case ITEM_TYPE_LOST:
3533     case ITEM_TYPE_EVENT:
3534       /* We got not enough consecutive packets with a huge gap, we can
3535        * as well just drop them here now on EOS */
3536       if (outevent && GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3537         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3538         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3539         g_queue_clear (&priv->gap_packets);
3540       }
3541
3542       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3543           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3544
3545       if (do_push)
3546         gst_pad_push_event (priv->srcpad, outevent);
3547       else if (outevent)
3548         gst_event_unref (outevent);
3549
3550       result = GST_FLOW_OK;
3551
3552       JBUF_LOCK_CHECK (priv, out_flushing);
3553       break;
3554     case ITEM_TYPE_QUERY:
3555     {
3556       gboolean res;
3557
3558       res = gst_pad_peer_query (priv->srcpad, outquery);
3559
3560       JBUF_LOCK_CHECK (priv, out_flushing);
3561       result = GST_FLOW_OK;
3562       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3563       JBUF_SIGNAL_QUERY (priv, res);
3564       break;
3565     }
3566   }
3567   return result;
3568
3569   /* ERRORS */
3570 out_flushing:
3571   {
3572     return priv->srcresult;
3573   }
3574 }
3575
3576 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3577
3578 /* Peek a buffer and compare the seqnum to the expected seqnum.
3579  * If all is fine, the buffer is pushed.
3580  * If something is wrong, we wait for some event
3581  */
3582 static GstFlowReturn
3583 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3584 {
3585   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3586   GstFlowReturn result;
3587   RTPJitterBufferItem *item;
3588   guint seqnum;
3589   guint32 next_seqnum;
3590
3591   /* only push buffers when PLAYING and active and not buffering */
3592   if (priv->blocked || !priv->active ||
3593       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3594     return GST_FLOW_WAIT;
3595   }
3596
3597   /* peek a buffer, we're just looking at the sequence number.
3598    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3599    * wait for a timeout or something to change.
3600    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3601   item = rtp_jitter_buffer_peek (priv->jbuf);
3602   if (item == NULL) {
3603     goto wait;
3604   }
3605
3606   /* get the seqnum and the next expected seqnum */
3607   seqnum = item->seqnum;
3608   if (seqnum == -1) {
3609     return pop_and_push_next (jitterbuffer, seqnum);
3610   }
3611
3612   next_seqnum = priv->next_seqnum;
3613
3614   /* get the gap between this and the previous packet. If we don't know the
3615    * previous packet seqnum assume no gap. */
3616   if (G_UNLIKELY (next_seqnum == -1)) {
3617     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3618     /* we don't know what the next_seqnum should be, the chain function should
3619      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3620      * fires, so wait for that */
3621     result = GST_FLOW_WAIT;
3622   } else {
3623     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3624
3625     if (G_LIKELY (gap == 0)) {
3626       /* no missing packet, pop and push */
3627       result = pop_and_push_next (jitterbuffer, seqnum);
3628     } else if (G_UNLIKELY (gap < 0)) {
3629       /* if we have a packet that we already pushed or considered dropped, pop it
3630        * off and get the next packet */
3631       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3632           seqnum, next_seqnum);
3633       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3634       free_item (item);
3635       result = GST_FLOW_OK;
3636     } else {
3637       /* the chain function has scheduled timers to request retransmission or
3638        * when to consider the packet lost, wait for that */
3639       GST_DEBUG_OBJECT (jitterbuffer,
3640           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3641           next_seqnum, seqnum, gap);
3642       /* if we have reached EOS, just keep processing */
3643       if (priv->eos) {
3644         result = pop_and_push_next (jitterbuffer, seqnum);
3645         result = GST_FLOW_OK;
3646       } else {
3647         result = GST_FLOW_WAIT;
3648       }
3649     }
3650   }
3651
3652   return result;
3653
3654 wait:
3655   {
3656     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3657     if (priv->eos) {
3658       return GST_FLOW_EOS;
3659     } else {
3660       return GST_FLOW_WAIT;
3661     }
3662   }
3663 }
3664
3665 static GstClockTime
3666 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3667 {
3668   GstClockTime rtx_retry_timeout;
3669   GstClockTime rtx_min_retry_timeout;
3670
3671   if (priv->rtx_retry_timeout == -1) {
3672     if (priv->avg_rtx_rtt == 0)
3673       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3674     else
3675       /* we want to ask for a retransmission after we waited for a
3676        * complete RTT and the additional jitter */
3677       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3678   } else {
3679     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3680   }
3681   /* make sure we don't retry too often. On very low latency networks,
3682    * the RTT and jitter can be very low. */
3683   if (priv->rtx_min_retry_timeout == -1) {
3684     rtx_min_retry_timeout = priv->packet_spacing;
3685   } else {
3686     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3687   }
3688   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3689
3690   return rtx_retry_timeout;
3691 }
3692
3693 static GstClockTime
3694 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3695     GstClockTime rtx_retry_timeout)
3696 {
3697   GstClockTime rtx_retry_period;
3698
3699   if (priv->rtx_retry_period == -1) {
3700     /* we retry up to the configured jitterbuffer size but leaving some
3701      * room for the retransmission to arrive in time */
3702     if (rtx_retry_timeout > priv->latency_ns) {
3703       rtx_retry_period = 0;
3704     } else {
3705       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3706     }
3707   } else {
3708     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3709   }
3710   return rtx_retry_period;
3711 }
3712
3713 /*
3714   1. For *larger* rtx-rtt, weigh a new measurement as before (1/8th)
3715   2. For *smaller* rtx-rtt, be a bit more conservative and weigh a bit less (1/16th)
3716   3. For very large measurements (> avg * 2), consider them "outliers"
3717      and count them a lot less (1/48th)
3718 */
3719 static void
3720 update_avg_rtx_rtt (GstRtpJitterBufferPrivate * priv, GstClockTime rtt)
3721 {
3722   gint weight;
3723
3724   if (priv->avg_rtx_rtt == 0) {
3725     priv->avg_rtx_rtt = rtt;
3726     return;
3727   }
3728
3729   if (rtt > 2 * priv->avg_rtx_rtt)
3730     weight = 48;
3731   else if (rtt > priv->avg_rtx_rtt)
3732     weight = 8;
3733   else
3734     weight = 16;
3735
3736   priv->avg_rtx_rtt = (rtt + (weight - 1) * priv->avg_rtx_rtt) / weight;
3737 }
3738
3739 static void
3740 update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3741     GstClockTime dts, gboolean success)
3742 {
3743   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3744   GstClockTime delay;
3745
3746   if (success) {
3747     /* we scheduled a retry for this packet and now we have it */
3748     priv->num_rtx_success++;
3749     /* all the previous retry attempts failed */
3750     priv->num_rtx_failed += timer->num_rtx_retry - 1;
3751   } else {
3752     /* All retries failed or was too late */
3753     priv->num_rtx_failed += timer->num_rtx_retry;
3754   }
3755
3756   /* number of retries before (hopefully) receiving the packet */
3757   if (priv->avg_rtx_num == 0.0)
3758     priv->avg_rtx_num = timer->num_rtx_retry;
3759   else
3760     priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
3761
3762   /* Calculate the delay between retransmission request and receiving this
3763    * packet. We have a valid delay if and only if this packet is a response to
3764    * our last request. If not we don't know if this is a response to an
3765    * earlier request and delay could be way off. For RTT is more important
3766    * with correct values than to update for every packet. */
3767   if (timer->num_rtx_retry == timer->num_rtx_received &&
3768       dts != GST_CLOCK_TIME_NONE && dts > timer->rtx_last) {
3769     delay = dts - timer->rtx_last;
3770     update_avg_rtx_rtt (priv, delay);
3771   } else {
3772     delay = 0;
3773   }
3774
3775   GST_LOG_OBJECT (jitterbuffer,
3776       "RTX #%d, result %d, success %" G_GUINT64_FORMAT ", failed %"
3777       G_GUINT64_FORMAT ", requests %" G_GUINT64_FORMAT ", dups %"
3778       G_GUINT64_FORMAT ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %"
3779       GST_TIME_FORMAT, timer->seqnum, success, priv->num_rtx_success,
3780       priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
3781       priv->avg_rtx_num, GST_TIME_ARGS (delay),
3782       GST_TIME_ARGS (priv->avg_rtx_rtt));
3783 }
3784
3785 /* the timeout for when we expected a packet expired */
3786 static gboolean
3787 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3788     GstClockTime now)
3789 {
3790   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3791   GstEvent *event;
3792   guint delay, delay_ms, avg_rtx_rtt_ms;
3793   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3794   guint rtx_deadline_ms;
3795   GstClockTime rtx_retry_period;
3796   GstClockTime rtx_retry_timeout;
3797   GstClock *clock;
3798
3799   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
3800       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3801
3802   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3803   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3804
3805   delay = timer->rtx_delay + timer->rtx_retry;
3806
3807   delay_ms = GST_TIME_AS_MSECONDS (delay);
3808   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3809   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3810   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3811   rtx_deadline_ms =
3812       priv->rtx_deadline_ms != -1 ? priv->rtx_deadline_ms : priv->latency_ms;
3813
3814   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3815       gst_structure_new ("GstRTPRetransmissionRequest",
3816           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3817           "running-time", G_TYPE_UINT64, timer->rtx_base,
3818           "delay", G_TYPE_UINT, delay_ms,
3819           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3820           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3821           "period", G_TYPE_UINT, rtx_retry_period_ms,
3822           "deadline", G_TYPE_UINT, rtx_deadline_ms,
3823           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3824           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3825   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
3826
3827   priv->num_rtx_requests++;
3828   timer->num_rtx_retry++;
3829
3830   GST_OBJECT_LOCK (jitterbuffer);
3831   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3832     timer->rtx_last = gst_clock_get_time (clock);
3833     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3834   } else {
3835     timer->rtx_last = now;
3836   }
3837   GST_OBJECT_UNLOCK (jitterbuffer);
3838
3839   /* calculate the timeout for the next retransmission attempt */
3840   timer->rtx_retry += rtx_retry_timeout;
3841   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
3842       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
3843       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
3844       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
3845   if ((priv->rtx_max_retries != -1
3846           && timer->num_rtx_retry >= priv->rtx_max_retries)
3847       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)
3848       || (timer->rtx_base + rtx_retry_period < now)) {
3849     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
3850     /* too many retransmission request, we now convert the timer
3851      * to a lost timer, leave the num_rtx_retry as it is for stats */
3852     timer->type = TIMER_TYPE_LOST;
3853     timer->rtx_delay = 0;
3854     timer->rtx_retry = 0;
3855   }
3856   reschedule_timer (jitterbuffer, timer, timer->seqnum,
3857       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
3858
3859   JBUF_UNLOCK (priv);
3860   gst_pad_push_event (priv->sinkpad, event);
3861   JBUF_LOCK (priv);
3862
3863   return FALSE;
3864 }
3865
3866 /* a packet is lost */
3867 static gboolean
3868 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3869     GstClockTime now)
3870 {
3871   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3872   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3873   gboolean head;
3874   GstEvent *event = NULL;
3875   RTPJitterBufferItem *item;
3876
3877   seqnum = timer->seqnum;
3878   lost_packets = MAX (timer->num, 1);
3879   num_rtx_retry = timer->num_rtx_retry;
3880
3881   /* we had a gap and thus we lost some packets. Create an event for this.  */
3882   if (lost_packets > 1)
3883     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3884         seqnum + lost_packets - 1);
3885   else
3886     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3887
3888   priv->num_lost += lost_packets;
3889   priv->num_rtx_failed += num_rtx_retry;
3890
3891   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3892
3893   /* we now only accept seqnum bigger than this */
3894   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0) {
3895     priv->next_in_seqnum = next_in_seqnum;
3896     priv->last_in_pts = apply_offset (jitterbuffer, timer->timeout);
3897   }
3898
3899   /* Avoid creating events if we don't need it. Note that we still need to create
3900    * the lost *ITEM* since it will be used to notify the outgoing thread of
3901    * lost items (so that we can set discont flags and such) */
3902   if (priv->do_lost) {
3903     GstClockTime duration, timestamp;
3904     /* create paket lost event */
3905     timestamp = apply_offset (jitterbuffer, timer->timeout);
3906     duration = timer->duration;
3907     if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3908       duration = priv->packet_spacing;
3909     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3910         gst_structure_new ("GstRTPPacketLost",
3911             "seqnum", G_TYPE_UINT, (guint) seqnum,
3912             "timestamp", G_TYPE_UINT64, timestamp,
3913             "duration", G_TYPE_UINT64, duration,
3914             "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3915   }
3916   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3917   if (!rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL))
3918     /* Duplicate */
3919     free_item (item);
3920
3921   if (GST_CLOCK_TIME_IS_VALID (timer->rtx_last)) {
3922     /* Store info to update stats if the packet arrives too late */
3923     timer_queue_append (priv->rtx_stats_timers, timer,
3924         now + priv->rtx_stats_timeout * GST_MSECOND, TRUE);
3925   }
3926   remove_timer (jitterbuffer, timer);
3927
3928   if (head)
3929     JBUF_SIGNAL_EVENT (priv);
3930
3931   return TRUE;
3932 }
3933
3934 static gboolean
3935 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3936     GstClockTime now)
3937 {
3938   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3939
3940   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3941   remove_timer (jitterbuffer, timer);
3942   if (!priv->eos) {
3943     /* there was no EOS in the buffer, put one in there now */
3944     queue_event (jitterbuffer, gst_event_new_eos ());
3945   }
3946   JBUF_SIGNAL_EVENT (priv);
3947
3948   return TRUE;
3949 }
3950
3951 static gboolean
3952 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3953     GstClockTime now)
3954 {
3955   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3956
3957   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3958
3959   /* timer seqnum might have been obsoleted by caps seqnum-base,
3960    * only mess with current ongoing seqnum if still unknown */
3961   if (priv->next_seqnum == -1)
3962     priv->next_seqnum = timer->seqnum;
3963   remove_timer (jitterbuffer, timer);
3964   JBUF_SIGNAL_EVENT (priv);
3965
3966   return TRUE;
3967 }
3968
3969 static gboolean
3970 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3971     GstClockTime now)
3972 {
3973   gboolean removed = FALSE;
3974
3975   switch (timer->type) {
3976     case TIMER_TYPE_EXPECTED:
3977       removed = do_expected_timeout (jitterbuffer, timer, now);
3978       break;
3979     case TIMER_TYPE_LOST:
3980       removed = do_lost_timeout (jitterbuffer, timer, now);
3981       break;
3982     case TIMER_TYPE_DEADLINE:
3983       removed = do_deadline_timeout (jitterbuffer, timer, now);
3984       break;
3985     case TIMER_TYPE_EOS:
3986       removed = do_eos_timeout (jitterbuffer, timer, now);
3987       break;
3988   }
3989   return removed;
3990 }
3991
3992 /* called when we need to wait for the next timeout.
3993  *
3994  * We loop over the array of recorded timeouts and wait for the earliest one.
3995  * When it timed out, do the logic associated with the timer.
3996  *
3997  * If there are no timers, we wait on a gcond until something new happens.
3998  */
3999 static void
4000 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
4001 {
4002   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
4003   GstClockTime now = 0;
4004
4005   JBUF_LOCK (priv);
4006   while (priv->timer_running) {
4007     TimerData *timer = NULL;
4008     GstClockTime timer_timeout = -1;
4009     gint i, len;
4010
4011     /* If we have a clock, update "now" now with the very
4012      * latest running time we have. If timers are unscheduled below we
4013      * otherwise wouldn't update now (it's only updated when timers
4014      * expire), and also for the very first loop iteration now would
4015      * otherwise always be 0
4016      */
4017     GST_OBJECT_LOCK (jitterbuffer);
4018     if (priv->eos) {
4019       now = GST_CLOCK_TIME_NONE;
4020     } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
4021       now =
4022           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
4023           GST_ELEMENT_CAST (jitterbuffer)->base_time;
4024     }
4025     GST_OBJECT_UNLOCK (jitterbuffer);
4026
4027     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
4028         GST_TIME_ARGS (now));
4029
4030     /* Clear expired rtx-stats timers */
4031     if (priv->do_retransmission)
4032       timer_queue_clear_until (priv->rtx_stats_timers, now);
4033
4034     /* Iterate "normal" timers */
4035     len = priv->timers->len;
4036     for (i = 0; i < len;) {
4037       TimerData *test = &g_array_index (priv->timers, TimerData, i);
4038       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
4039       gboolean save_best = FALSE;
4040
4041       GST_DEBUG_OBJECT (jitterbuffer,
4042           "%d, %d, %d, %" GST_TIME_FORMAT " diff:%" GST_STIME_FORMAT, i,
4043           test->type, test->seqnum, GST_TIME_ARGS (test_timeout),
4044           GST_STIME_ARGS ((gint64) (test_timeout - now)));
4045
4046       /* Weed out anything too late */
4047       if (test->type == TIMER_TYPE_LOST &&
4048           (test_timeout == -1 || test_timeout <= now)) {
4049         GST_DEBUG_OBJECT (jitterbuffer, "Weeding out late entry");
4050         do_lost_timeout (jitterbuffer, test, now);
4051         if (!priv->timer_running)
4052           break;
4053         /* We don't move the iterator forward since we just removed the current entry,
4054          * but we update the termination condition */
4055         len = priv->timers->len;
4056       } else {
4057         /* find the smallest timeout */
4058         if (timer == NULL) {
4059           save_best = TRUE;
4060         } else if (timer_timeout == -1) {
4061           /* we already have an immediate timeout, the new timer must be an
4062            * immediate timer with smaller seqnum to become the best */
4063           if (test_timeout == -1
4064               && (gst_rtp_buffer_compare_seqnum (test->seqnum,
4065                       timer->seqnum) > 0))
4066             save_best = TRUE;
4067         } else if (test_timeout == -1) {
4068           /* first immediate timer */
4069           save_best = TRUE;
4070         } else if (test_timeout < timer_timeout) {
4071           /* earlier timer */
4072           save_best = TRUE;
4073         } else if (test_timeout == timer_timeout
4074             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
4075                     timer->seqnum) > 0)) {
4076           /* same timer, smaller seqnum */
4077           save_best = TRUE;
4078         }
4079
4080         if (save_best) {
4081           GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
4082           timer = test;
4083           timer_timeout = test_timeout;
4084         }
4085         i++;
4086       }
4087     }
4088     if (timer && !priv->blocked) {
4089       GstClock *clock;
4090       GstClockTime sync_time;
4091       GstClockID id;
4092       GstClockReturn ret;
4093       GstClockTimeDiff clock_jitter;
4094
4095       if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
4096         /* We have normally removed all lost timers in the loop above */
4097         g_assert (timer->type != TIMER_TYPE_LOST);
4098
4099         do_timeout (jitterbuffer, timer, now);
4100         /* check here, do_timeout could have released the lock */
4101         if (!priv->timer_running)
4102           break;
4103         continue;
4104       }
4105
4106       GST_OBJECT_LOCK (jitterbuffer);
4107       clock = GST_ELEMENT_CLOCK (jitterbuffer);
4108       if (!clock) {
4109         GST_OBJECT_UNLOCK (jitterbuffer);
4110         /* let's just push if there is no clock */
4111         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
4112         now = timer_timeout;
4113         continue;
4114       }
4115
4116       /* prepare for sync against clock */
4117       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
4118       /* add latency of peer to get input time */
4119       sync_time += priv->peer_latency;
4120
4121       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
4122           " with sync time %" GST_TIME_FORMAT,
4123           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
4124
4125       /* create an entry for the clock */
4126       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
4127       priv->timer_timeout = timer_timeout;
4128       priv->timer_seqnum = timer->seqnum;
4129       GST_OBJECT_UNLOCK (jitterbuffer);
4130
4131       /* release the lock so that the other end can push stuff or unlock */
4132       JBUF_UNLOCK (priv);
4133
4134       ret = gst_clock_id_wait (id, &clock_jitter);
4135
4136       JBUF_LOCK (priv);
4137       if (!priv->timer_running) {
4138         gst_clock_id_unref (id);
4139         priv->clock_id = NULL;
4140         break;
4141       }
4142
4143       if (ret != GST_CLOCK_UNSCHEDULED) {
4144         now = timer_timeout + MAX (clock_jitter, 0);
4145         GST_DEBUG_OBJECT (jitterbuffer,
4146             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
4147             GST_STIME_ARGS (clock_jitter));
4148       } else {
4149         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
4150       }
4151       /* and free the entry */
4152       gst_clock_id_unref (id);
4153       priv->clock_id = NULL;
4154     } else {
4155       /* no timers, wait for activity */
4156       JBUF_WAIT_TIMER (priv);
4157     }
4158   }
4159   JBUF_UNLOCK (priv);
4160
4161   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
4162   return;
4163 }
4164
4165 /*
4166  * This funcion implements the main pushing loop on the source pad.
4167  *
4168  * It first tries to push as many buffers as possible. If there is a seqnum
4169  * mismatch, we wait for the next timeouts.
4170  */
4171 static void
4172 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
4173 {
4174   GstRtpJitterBufferPrivate *priv;
4175   GstFlowReturn result = GST_FLOW_OK;
4176
4177   priv = jitterbuffer->priv;
4178
4179   JBUF_LOCK_CHECK (priv, flushing);
4180   do {
4181     result = handle_next_buffer (jitterbuffer);
4182     if (G_LIKELY (result == GST_FLOW_WAIT)) {
4183       /* now wait for the next event */
4184       JBUF_WAIT_EVENT (priv, flushing);
4185       result = GST_FLOW_OK;
4186     }
4187   } while (result == GST_FLOW_OK);
4188   /* store result for upstream */
4189   priv->srcresult = result;
4190   /* if we get here we need to pause */
4191   goto pause;
4192
4193   /* ERRORS */
4194 flushing:
4195   {
4196     result = priv->srcresult;
4197     goto pause;
4198   }
4199 pause:
4200   {
4201     GstEvent *event;
4202
4203     JBUF_SIGNAL_QUERY (priv, FALSE);
4204     JBUF_UNLOCK (priv);
4205
4206     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
4207         gst_flow_get_name (result));
4208     gst_pad_pause_task (priv->srcpad);
4209     if (result == GST_FLOW_EOS) {
4210       event = gst_event_new_eos ();
4211       gst_pad_push_event (priv->srcpad, event);
4212     }
4213     return;
4214   }
4215 }
4216
4217 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
4218  * some sanity checks and then emit the handle-sync signal with the parameters.
4219  * This function must be called with the LOCK */
4220 static void
4221 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
4222 {
4223   GstRtpJitterBufferPrivate *priv;
4224   guint64 base_rtptime, base_time;
4225   guint32 clock_rate;
4226   guint64 last_rtptime;
4227   guint64 clock_base;
4228   guint64 ext_rtptime, diff;
4229   gboolean valid = TRUE, keep = FALSE;
4230
4231   priv = jitterbuffer->priv;
4232
4233   /* get the last values from the jitterbuffer */
4234   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
4235       &clock_rate, &last_rtptime);
4236
4237   clock_base = priv->clock_base;
4238   ext_rtptime = priv->ext_rtptime;
4239
4240   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
4241       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
4242       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
4243       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
4244
4245   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
4246     /* we keep this SR packet for later. When we get a valid RTP packet the
4247      * above values will be set and we can try to use the SR packet */
4248     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
4249     keep = TRUE;
4250   } else {
4251     /* we can't accept anything that happened before we did the last resync */
4252     if (base_rtptime > ext_rtptime) {
4253       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
4254       valid = FALSE;
4255     } else {
4256       /* the SR RTP timestamp must be something close to what we last observed
4257        * in the jitterbuffer */
4258       if (ext_rtptime > last_rtptime) {
4259         /* check how far ahead it is to our RTP timestamps */
4260         diff = ext_rtptime - last_rtptime;
4261         /* if bigger than 1 second, we drop it */
4262         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
4263             diff >
4264             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
4265                 clock_rate, 1000)) {
4266           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
4267           /* should drop this, but some RTSP servers end up with bogus
4268            * way too ahead RTCP packet when repeated PAUSE/PLAY,
4269            * so still trigger rptbin sync but invalidate RTCP data
4270            * (sync might use other methods) */
4271           ext_rtptime = -1;
4272         }
4273         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
4274             G_GUINT64_FORMAT, last_rtptime, diff);
4275       }
4276     }
4277   }
4278
4279   if (keep) {
4280     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
4281   } else if (valid) {
4282     GstStructure *s;
4283
4284     s = gst_structure_new ("application/x-rtp-sync",
4285         "base-rtptime", G_TYPE_UINT64, base_rtptime,
4286         "base-time", G_TYPE_UINT64, base_time,
4287         "clock-rate", G_TYPE_UINT, clock_rate,
4288         "clock-base", G_TYPE_UINT64, clock_base,
4289         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
4290         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
4291
4292     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
4293     gst_buffer_replace (&priv->last_sr, NULL);
4294     JBUF_UNLOCK (priv);
4295     g_signal_emit (jitterbuffer,
4296         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
4297     JBUF_LOCK (priv);
4298     gst_structure_free (s);
4299   } else {
4300     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
4301     gst_buffer_replace (&priv->last_sr, NULL);
4302   }
4303 }
4304
4305 static GstFlowReturn
4306 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
4307     GstBuffer * buffer)
4308 {
4309   GstRtpJitterBuffer *jitterbuffer;
4310   GstRtpJitterBufferPrivate *priv;
4311   GstFlowReturn ret = GST_FLOW_OK;
4312   guint32 ssrc;
4313   GstRTCPPacket packet;
4314   guint64 ext_rtptime;
4315   guint32 rtptime;
4316   GstRTCPBuffer rtcp = { NULL, };
4317
4318   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4319
4320   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
4321     goto invalid_buffer;
4322
4323   priv = jitterbuffer->priv;
4324
4325   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
4326
4327   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
4328     goto empty_buffer;
4329
4330   /* first packet must be SR or RR or else the validate would have failed */
4331   switch (gst_rtcp_packet_get_type (&packet)) {
4332     case GST_RTCP_TYPE_SR:
4333       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
4334           NULL, NULL);
4335       break;
4336     default:
4337       goto ignore_buffer;
4338   }
4339   gst_rtcp_buffer_unmap (&rtcp);
4340
4341   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
4342
4343   JBUF_LOCK (priv);
4344   /* convert the RTP timestamp to our extended timestamp, using the same offset
4345    * we used in the jitterbuffer */
4346   ext_rtptime = priv->jbuf->ext_rtptime;
4347   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
4348
4349   priv->ext_rtptime = ext_rtptime;
4350   gst_buffer_replace (&priv->last_sr, buffer);
4351
4352   do_handle_sync (jitterbuffer);
4353   JBUF_UNLOCK (priv);
4354
4355 done:
4356   gst_buffer_unref (buffer);
4357
4358   return ret;
4359
4360 invalid_buffer:
4361   {
4362     /* this is not fatal but should be filtered earlier */
4363     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4364         ("Received invalid RTCP payload, dropping"));
4365     ret = GST_FLOW_OK;
4366     goto done;
4367   }
4368 empty_buffer:
4369   {
4370     /* this is not fatal but should be filtered earlier */
4371     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
4372         ("Received empty RTCP payload, dropping"));
4373     gst_rtcp_buffer_unmap (&rtcp);
4374     ret = GST_FLOW_OK;
4375     goto done;
4376   }
4377 ignore_buffer:
4378   {
4379     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
4380     gst_rtcp_buffer_unmap (&rtcp);
4381     ret = GST_FLOW_OK;
4382     goto done;
4383   }
4384 }
4385
4386 static gboolean
4387 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
4388     GstQuery * query)
4389 {
4390   gboolean res = FALSE;
4391   GstRtpJitterBuffer *jitterbuffer;
4392   GstRtpJitterBufferPrivate *priv;
4393
4394   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4395   priv = jitterbuffer->priv;
4396
4397   switch (GST_QUERY_TYPE (query)) {
4398     case GST_QUERY_CAPS:
4399     {
4400       GstCaps *filter, *caps;
4401
4402       gst_query_parse_caps (query, &filter);
4403       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4404       gst_query_set_caps_result (query, caps);
4405       gst_caps_unref (caps);
4406       res = TRUE;
4407       break;
4408     }
4409     default:
4410       if (GST_QUERY_IS_SERIALIZED (query)) {
4411         RTPJitterBufferItem *item;
4412         gboolean head;
4413
4414         JBUF_LOCK_CHECK (priv, out_flushing);
4415         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
4416             RTP_JITTER_BUFFER_MODE_BUFFER) {
4417           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
4418           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
4419           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
4420           if (head)
4421             JBUF_SIGNAL_EVENT (priv);
4422           JBUF_WAIT_QUERY (priv, out_flushing);
4423           res = priv->last_query;
4424         } else {
4425           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
4426           res = FALSE;
4427         }
4428         JBUF_UNLOCK (priv);
4429       } else {
4430         res = gst_pad_query_default (pad, parent, query);
4431       }
4432       break;
4433   }
4434   return res;
4435   /* ERRORS */
4436 out_flushing:
4437   {
4438     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
4439     JBUF_UNLOCK (priv);
4440     return FALSE;
4441   }
4442
4443 }
4444
4445 static gboolean
4446 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
4447     GstQuery * query)
4448 {
4449   GstRtpJitterBuffer *jitterbuffer;
4450   GstRtpJitterBufferPrivate *priv;
4451   gboolean res = FALSE;
4452
4453   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
4454   priv = jitterbuffer->priv;
4455
4456   switch (GST_QUERY_TYPE (query)) {
4457     case GST_QUERY_LATENCY:
4458     {
4459       /* We need to send the query upstream and add the returned latency to our
4460        * own */
4461       GstClockTime min_latency, max_latency;
4462       gboolean us_live;
4463       GstClockTime our_latency;
4464
4465       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
4466         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
4467
4468         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
4469             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4470             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4471
4472         /* store this so that we can safely sync on the peer buffers. */
4473         JBUF_LOCK (priv);
4474         priv->peer_latency = min_latency;
4475         our_latency = priv->latency_ns;
4476         JBUF_UNLOCK (priv);
4477
4478         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
4479             GST_TIME_ARGS (our_latency));
4480
4481         /* we add some latency but can buffer an infinite amount of time */
4482         min_latency += our_latency;
4483         max_latency = -1;
4484
4485         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
4486             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
4487             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
4488
4489         gst_query_set_latency (query, TRUE, min_latency, max_latency);
4490       }
4491       break;
4492     }
4493     case GST_QUERY_POSITION:
4494     {
4495       GstClockTime start, last_out;
4496       GstFormat fmt;
4497
4498       gst_query_parse_position (query, &fmt, NULL);
4499       if (fmt != GST_FORMAT_TIME) {
4500         res = gst_pad_query_default (pad, parent, query);
4501         break;
4502       }
4503
4504       JBUF_LOCK (priv);
4505       start = priv->npt_start;
4506       last_out = priv->last_out_time;
4507       JBUF_UNLOCK (priv);
4508
4509       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
4510           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
4511           GST_TIME_ARGS (last_out));
4512
4513       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
4514         /* bring 0-based outgoing time to stream time */
4515         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
4516         res = TRUE;
4517       } else {
4518         res = gst_pad_query_default (pad, parent, query);
4519       }
4520       break;
4521     }
4522     case GST_QUERY_CAPS:
4523     {
4524       GstCaps *filter, *caps;
4525
4526       gst_query_parse_caps (query, &filter);
4527       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
4528       gst_query_set_caps_result (query, caps);
4529       gst_caps_unref (caps);
4530       res = TRUE;
4531       break;
4532     }
4533     default:
4534       res = gst_pad_query_default (pad, parent, query);
4535       break;
4536   }
4537
4538   return res;
4539 }
4540
4541 static void
4542 gst_rtp_jitter_buffer_set_property (GObject * object,
4543     guint prop_id, const GValue * value, GParamSpec * pspec)
4544 {
4545   GstRtpJitterBuffer *jitterbuffer;
4546   GstRtpJitterBufferPrivate *priv;
4547
4548   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4549   priv = jitterbuffer->priv;
4550
4551   switch (prop_id) {
4552     case PROP_LATENCY:
4553     {
4554       guint new_latency, old_latency;
4555
4556       new_latency = g_value_get_uint (value);
4557
4558       JBUF_LOCK (priv);
4559       old_latency = priv->latency_ms;
4560       priv->latency_ms = new_latency;
4561       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4562       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4563       JBUF_UNLOCK (priv);
4564
4565       /* post message if latency changed, this will inform the parent pipeline
4566        * that a latency reconfiguration is possible/needed. */
4567       if (new_latency != old_latency) {
4568         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4569             GST_TIME_ARGS (new_latency * GST_MSECOND));
4570
4571         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4572             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4573       }
4574       break;
4575     }
4576     case PROP_DROP_ON_LATENCY:
4577       JBUF_LOCK (priv);
4578       priv->drop_on_latency = g_value_get_boolean (value);
4579       JBUF_UNLOCK (priv);
4580       break;
4581     case PROP_TS_OFFSET:
4582       JBUF_LOCK (priv);
4583       if (priv->max_ts_offset_adjustment != 0) {
4584         gint64 new_offset = g_value_get_int64 (value);
4585
4586         if (new_offset > priv->ts_offset) {
4587           priv->ts_offset_remainder = new_offset - priv->ts_offset;
4588         } else {
4589           priv->ts_offset_remainder = -(priv->ts_offset - new_offset);
4590         }
4591       } else {
4592         priv->ts_offset = g_value_get_int64 (value);
4593         priv->ts_offset_remainder = 0;
4594       }
4595       priv->ts_discont = TRUE;
4596       JBUF_UNLOCK (priv);
4597       break;
4598     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4599       JBUF_LOCK (priv);
4600       priv->max_ts_offset_adjustment = g_value_get_uint64 (value);
4601       JBUF_UNLOCK (priv);
4602       break;
4603     case PROP_DO_LOST:
4604       JBUF_LOCK (priv);
4605       priv->do_lost = g_value_get_boolean (value);
4606       JBUF_UNLOCK (priv);
4607       break;
4608     case PROP_MODE:
4609       JBUF_LOCK (priv);
4610       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4611       JBUF_UNLOCK (priv);
4612       break;
4613     case PROP_DO_RETRANSMISSION:
4614       JBUF_LOCK (priv);
4615       priv->do_retransmission = g_value_get_boolean (value);
4616       JBUF_UNLOCK (priv);
4617       break;
4618     case PROP_RTX_NEXT_SEQNUM:
4619       JBUF_LOCK (priv);
4620       priv->rtx_next_seqnum = g_value_get_boolean (value);
4621       JBUF_UNLOCK (priv);
4622       break;
4623     case PROP_RTX_DELAY:
4624       JBUF_LOCK (priv);
4625       priv->rtx_delay = g_value_get_int (value);
4626       JBUF_UNLOCK (priv);
4627       break;
4628     case PROP_RTX_MIN_DELAY:
4629       JBUF_LOCK (priv);
4630       priv->rtx_min_delay = g_value_get_uint (value);
4631       JBUF_UNLOCK (priv);
4632       break;
4633     case PROP_RTX_DELAY_REORDER:
4634       JBUF_LOCK (priv);
4635       priv->rtx_delay_reorder = g_value_get_int (value);
4636       JBUF_UNLOCK (priv);
4637       break;
4638     case PROP_RTX_RETRY_TIMEOUT:
4639       JBUF_LOCK (priv);
4640       priv->rtx_retry_timeout = g_value_get_int (value);
4641       JBUF_UNLOCK (priv);
4642       break;
4643     case PROP_RTX_MIN_RETRY_TIMEOUT:
4644       JBUF_LOCK (priv);
4645       priv->rtx_min_retry_timeout = g_value_get_int (value);
4646       JBUF_UNLOCK (priv);
4647       break;
4648     case PROP_RTX_RETRY_PERIOD:
4649       JBUF_LOCK (priv);
4650       priv->rtx_retry_period = g_value_get_int (value);
4651       JBUF_UNLOCK (priv);
4652       break;
4653     case PROP_RTX_MAX_RETRIES:
4654       JBUF_LOCK (priv);
4655       priv->rtx_max_retries = g_value_get_int (value);
4656       JBUF_UNLOCK (priv);
4657       break;
4658     case PROP_RTX_DEADLINE:
4659       JBUF_LOCK (priv);
4660       priv->rtx_deadline_ms = g_value_get_int (value);
4661       JBUF_UNLOCK (priv);
4662       break;
4663     case PROP_RTX_STATS_TIMEOUT:
4664       JBUF_LOCK (priv);
4665       priv->rtx_stats_timeout = g_value_get_uint (value);
4666       JBUF_UNLOCK (priv);
4667       break;
4668     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4669       JBUF_LOCK (priv);
4670       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4671       JBUF_UNLOCK (priv);
4672       break;
4673     case PROP_MAX_DROPOUT_TIME:
4674       JBUF_LOCK (priv);
4675       priv->max_dropout_time = g_value_get_uint (value);
4676       JBUF_UNLOCK (priv);
4677       break;
4678     case PROP_MAX_MISORDER_TIME:
4679       JBUF_LOCK (priv);
4680       priv->max_misorder_time = g_value_get_uint (value);
4681       JBUF_UNLOCK (priv);
4682       break;
4683     case PROP_RFC7273_SYNC:
4684       JBUF_LOCK (priv);
4685       rtp_jitter_buffer_set_rfc7273_sync (priv->jbuf,
4686           g_value_get_boolean (value));
4687       JBUF_UNLOCK (priv);
4688       break;
4689     case PROP_FASTSTART_MIN_PACKETS:
4690       JBUF_LOCK (priv);
4691       priv->faststart_min_packets = g_value_get_uint (value);
4692       JBUF_UNLOCK (priv);
4693       break;
4694     default:
4695       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4696       break;
4697   }
4698 }
4699
4700 static void
4701 gst_rtp_jitter_buffer_get_property (GObject * object,
4702     guint prop_id, GValue * value, GParamSpec * pspec)
4703 {
4704   GstRtpJitterBuffer *jitterbuffer;
4705   GstRtpJitterBufferPrivate *priv;
4706
4707   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4708   priv = jitterbuffer->priv;
4709
4710   switch (prop_id) {
4711     case PROP_LATENCY:
4712       JBUF_LOCK (priv);
4713       g_value_set_uint (value, priv->latency_ms);
4714       JBUF_UNLOCK (priv);
4715       break;
4716     case PROP_DROP_ON_LATENCY:
4717       JBUF_LOCK (priv);
4718       g_value_set_boolean (value, priv->drop_on_latency);
4719       JBUF_UNLOCK (priv);
4720       break;
4721     case PROP_TS_OFFSET:
4722       JBUF_LOCK (priv);
4723       g_value_set_int64 (value, priv->ts_offset);
4724       JBUF_UNLOCK (priv);
4725       break;
4726     case PROP_MAX_TS_OFFSET_ADJUSTMENT:
4727       JBUF_LOCK (priv);
4728       g_value_set_uint64 (value, priv->max_ts_offset_adjustment);
4729       JBUF_UNLOCK (priv);
4730       break;
4731     case PROP_DO_LOST:
4732       JBUF_LOCK (priv);
4733       g_value_set_boolean (value, priv->do_lost);
4734       JBUF_UNLOCK (priv);
4735       break;
4736     case PROP_MODE:
4737       JBUF_LOCK (priv);
4738       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4739       JBUF_UNLOCK (priv);
4740       break;
4741     case PROP_PERCENT:
4742     {
4743       gint percent;
4744
4745       JBUF_LOCK (priv);
4746       if (priv->srcresult != GST_FLOW_OK)
4747         percent = 100;
4748       else
4749         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4750
4751       g_value_set_int (value, percent);
4752       JBUF_UNLOCK (priv);
4753       break;
4754     }
4755     case PROP_DO_RETRANSMISSION:
4756       JBUF_LOCK (priv);
4757       g_value_set_boolean (value, priv->do_retransmission);
4758       JBUF_UNLOCK (priv);
4759       break;
4760     case PROP_RTX_NEXT_SEQNUM:
4761       JBUF_LOCK (priv);
4762       g_value_set_boolean (value, priv->rtx_next_seqnum);
4763       JBUF_UNLOCK (priv);
4764       break;
4765     case PROP_RTX_DELAY:
4766       JBUF_LOCK (priv);
4767       g_value_set_int (value, priv->rtx_delay);
4768       JBUF_UNLOCK (priv);
4769       break;
4770     case PROP_RTX_MIN_DELAY:
4771       JBUF_LOCK (priv);
4772       g_value_set_uint (value, priv->rtx_min_delay);
4773       JBUF_UNLOCK (priv);
4774       break;
4775     case PROP_RTX_DELAY_REORDER:
4776       JBUF_LOCK (priv);
4777       g_value_set_int (value, priv->rtx_delay_reorder);
4778       JBUF_UNLOCK (priv);
4779       break;
4780     case PROP_RTX_RETRY_TIMEOUT:
4781       JBUF_LOCK (priv);
4782       g_value_set_int (value, priv->rtx_retry_timeout);
4783       JBUF_UNLOCK (priv);
4784       break;
4785     case PROP_RTX_MIN_RETRY_TIMEOUT:
4786       JBUF_LOCK (priv);
4787       g_value_set_int (value, priv->rtx_min_retry_timeout);
4788       JBUF_UNLOCK (priv);
4789       break;
4790     case PROP_RTX_RETRY_PERIOD:
4791       JBUF_LOCK (priv);
4792       g_value_set_int (value, priv->rtx_retry_period);
4793       JBUF_UNLOCK (priv);
4794       break;
4795     case PROP_RTX_MAX_RETRIES:
4796       JBUF_LOCK (priv);
4797       g_value_set_int (value, priv->rtx_max_retries);
4798       JBUF_UNLOCK (priv);
4799       break;
4800     case PROP_RTX_DEADLINE:
4801       JBUF_LOCK (priv);
4802       g_value_set_int (value, priv->rtx_deadline_ms);
4803       JBUF_UNLOCK (priv);
4804       break;
4805     case PROP_RTX_STATS_TIMEOUT:
4806       JBUF_LOCK (priv);
4807       g_value_set_uint (value, priv->rtx_stats_timeout);
4808       JBUF_UNLOCK (priv);
4809       break;
4810     case PROP_STATS:
4811       g_value_take_boxed (value,
4812           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4813       break;
4814     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4815       JBUF_LOCK (priv);
4816       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4817       JBUF_UNLOCK (priv);
4818       break;
4819     case PROP_MAX_DROPOUT_TIME:
4820       JBUF_LOCK (priv);
4821       g_value_set_uint (value, priv->max_dropout_time);
4822       JBUF_UNLOCK (priv);
4823       break;
4824     case PROP_MAX_MISORDER_TIME:
4825       JBUF_LOCK (priv);
4826       g_value_set_uint (value, priv->max_misorder_time);
4827       JBUF_UNLOCK (priv);
4828       break;
4829     case PROP_RFC7273_SYNC:
4830       JBUF_LOCK (priv);
4831       g_value_set_boolean (value,
4832           rtp_jitter_buffer_get_rfc7273_sync (priv->jbuf));
4833       JBUF_UNLOCK (priv);
4834       break;
4835     case PROP_FASTSTART_MIN_PACKETS:
4836       JBUF_LOCK (priv);
4837       g_value_set_uint (value, priv->faststart_min_packets);
4838       JBUF_UNLOCK (priv);
4839       break;
4840     default:
4841       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4842       break;
4843   }
4844 }
4845
4846 static GstStructure *
4847 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4848 {
4849   GstRtpJitterBufferPrivate *priv = jbuf->priv;
4850   GstStructure *s;
4851
4852   JBUF_LOCK (priv);
4853   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4854       "num-pushed", G_TYPE_UINT64, priv->num_pushed,
4855       "num-lost", G_TYPE_UINT64, priv->num_lost,
4856       "num-late", G_TYPE_UINT64, priv->num_late,
4857       "num-duplicates", G_TYPE_UINT64, priv->num_duplicates,
4858       "avg-jitter", G_TYPE_UINT64, priv->avg_jitter,
4859       "rtx-count", G_TYPE_UINT64, priv->num_rtx_requests,
4860       "rtx-success-count", G_TYPE_UINT64, priv->num_rtx_success,
4861       "rtx-per-packet", G_TYPE_DOUBLE, priv->avg_rtx_num,
4862       "rtx-rtt", G_TYPE_UINT64, priv->avg_rtx_rtt, NULL);
4863   JBUF_UNLOCK (priv);
4864
4865   return s;
4866 }