jitterbuffer: rearrange timer update code
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpjitterbuffer.h"
65 #include "rtpjitterbuffer.h"
66 #include "rtpstats.h"
67
68 #include <gst/glib-compat-private.h>
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* RTPJitterBuffer signals and args */
74 enum
75 {
76   SIGNAL_REQUEST_PT_MAP,
77   SIGNAL_CLEAR_PT_MAP,
78   SIGNAL_HANDLE_SYNC,
79   SIGNAL_ON_NPT_STOP,
80   SIGNAL_SET_ACTIVE,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS          200
85 #define DEFAULT_DROP_ON_LATENCY     FALSE
86 #define DEFAULT_TS_OFFSET           0
87 #define DEFAULT_DO_LOST             FALSE
88 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
89 #define DEFAULT_PERCENT             0
90 #define DEFAULT_DO_RETRANSMISSION   FALSE
91 #define DEFAULT_RTX_DELAY           20
92 #define DEFAULT_RTX_DELAY_REORDER   3
93 #define DEFAULT_RTX_RETRY_TIMEOUT   40
94 #define DEFAULT_RTX_RETRY_PERIOD    160
95
96 enum
97 {
98   PROP_0,
99   PROP_LATENCY,
100   PROP_DROP_ON_LATENCY,
101   PROP_TS_OFFSET,
102   PROP_DO_LOST,
103   PROP_MODE,
104   PROP_PERCENT,
105   PROP_DO_RETRANSMISSION,
106   PROP_RTX_DELAY,
107   PROP_RTX_DELAY_REORDER,
108   PROP_RTX_RETRY_TIMEOUT,
109   PROP_RTX_RETRY_PERIOD,
110   PROP_LAST
111 };
112
113 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
114
115 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
116   JBUF_LOCK (priv);                                   \
117   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
118     goto label;                                       \
119 } G_STMT_END
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
121
122 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
123   (priv)->waiting_timer = TRUE;                           \
124   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
125   (priv)->waiting_timer = FALSE;                          \
126 } G_STMT_END
127 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {    \
128   if (G_UNLIKELY ((priv)->waiting_timer))         \
129     g_cond_signal (&(priv)->jbuf_timer);          \
130 } G_STMT_END
131
132 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
133   (priv)->waiting_event = TRUE;                          \
134   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
135   (priv)->waiting_event = FALSE;                         \
136   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
137     goto label;                                          \
138 } G_STMT_END
139 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {    \
140   if (G_UNLIKELY ((priv)->waiting_event))         \
141     g_cond_signal (&(priv)->jbuf_event);          \
142 } G_STMT_END
143
144 struct _GstRtpJitterBufferPrivate
145 {
146   GstPad *sinkpad, *srcpad;
147   GstPad *rtcpsinkpad;
148
149   RTPJitterBuffer *jbuf;
150   GMutex jbuf_lock;
151   gboolean waiting_timer;
152   GCond jbuf_timer;
153   gboolean waiting_event;
154   GCond jbuf_event;
155   gboolean discont;
156   gboolean ts_discont;
157   gboolean active;
158   guint64 out_offset;
159
160   gboolean timer_running;
161   GThread *timer_thread;
162
163   /* properties */
164   guint latency_ms;
165   guint64 latency_ns;
166   gboolean drop_on_latency;
167   gint64 ts_offset;
168   gboolean do_lost;
169   gboolean do_retransmission;
170   gint rtx_delay;
171   gint rtx_delay_reorder;
172   gint rtx_retry_timeout;
173   gint rtx_retry_period;
174
175   /* the last seqnum we pushed out */
176   guint32 last_popped_seqnum;
177   /* the next expected seqnum we push */
178   guint32 next_seqnum;
179   /* last output time */
180   GstClockTime last_out_time;
181   /* last valid input timestamp and rtptime pair */
182   GstClockTime ips_dts;
183   guint64 ips_rtptime;
184   GstClockTime packet_spacing;
185
186   /* the next expected seqnum we receive */
187   GstClockTime last_in_dts;
188   guint32 last_in_seqnum;
189   guint32 next_in_seqnum;
190
191   GArray *timers;
192
193   /* start and stop ranges */
194   GstClockTime npt_start;
195   GstClockTime npt_stop;
196   guint64 ext_timestamp;
197   guint64 last_elapsed;
198   guint64 estimated_eos;
199   GstClockID eos_id;
200
201   /* state */
202   gboolean eos;
203
204   /* clock rate and rtp timestamp offset */
205   gint last_pt;
206   gint32 clock_rate;
207   gint64 clock_base;
208   gint64 prev_ts_offset;
209
210   /* when we are shutting down */
211   GstFlowReturn srcresult;
212   gboolean blocked;
213
214   /* for sync */
215   GstSegment segment;
216   GstClockID clock_id;
217   GstClockTime timer_timeout;
218   guint16 timer_seqnum;
219   gboolean unscheduled;
220   /* the latency of the upstream peer, we have to take this into account when
221    * synchronizing the buffers. */
222   GstClockTime peer_latency;
223   guint64 ext_rtptime;
224   GstBuffer *last_sr;
225
226   /* some accounting */
227   guint64 num_late;
228   guint64 num_duplicates;
229 };
230
231 typedef enum
232 {
233   TIMER_TYPE_EXPECTED,
234   TIMER_TYPE_LOST,
235   TIMER_TYPE_DEADLINE,
236   TIMER_TYPE_EOS
237 } TimerType;
238
239 typedef struct
240 {
241   guint idx;
242   guint16 seqnum;
243   TimerType type;
244   GstClockTime timeout;
245   GstClockTime rtx_base;
246   GstClockTime rtx_retry;
247 } TimerData;
248
249 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
250   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
251                                 GstRtpJitterBufferPrivate))
252
253 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
254 GST_STATIC_PAD_TEMPLATE ("sink",
255     GST_PAD_SINK,
256     GST_PAD_ALWAYS,
257     GST_STATIC_CAPS ("application/x-rtp, "
258         "clock-rate = (int) [ 1, 2147483647 ]"
259         /* "payload = (int) , "
260          * "encoding-name = (string) "
261          */ )
262     );
263
264 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
265 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
266     GST_PAD_SINK,
267     GST_PAD_REQUEST,
268     GST_STATIC_CAPS ("application/x-rtcp")
269     );
270
271 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
272 GST_STATIC_PAD_TEMPLATE ("src",
273     GST_PAD_SRC,
274     GST_PAD_ALWAYS,
275     GST_STATIC_CAPS ("application/x-rtp"
276         /* "payload = (int) , "
277          * "clock-rate = (int) , "
278          * "encoding-name = (string) "
279          */ )
280     );
281
282 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
283
284 #define gst_rtp_jitter_buffer_parent_class parent_class
285 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
286
287 /* object overrides */
288 static void gst_rtp_jitter_buffer_set_property (GObject * object,
289     guint prop_id, const GValue * value, GParamSpec * pspec);
290 static void gst_rtp_jitter_buffer_get_property (GObject * object,
291     guint prop_id, GValue * value, GParamSpec * pspec);
292 static void gst_rtp_jitter_buffer_finalize (GObject * object);
293
294 /* element overrides */
295 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
296     * element, GstStateChange transition);
297 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
298     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
299 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
300     GstPad * pad);
301 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
302
303 /* pad overrides */
304 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
305 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
306     GstObject * parent);
307
308 /* sinkpad overrides */
309 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
310     GstObject * parent, GstEvent * event);
311 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
312     GstObject * parent, GstBuffer * buffer);
313
314 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
315     GstObject * parent, GstEvent * event);
316 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
317     GstObject * parent, GstBuffer * buffer);
318
319 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
320     GstObject * parent, GstQuery * query);
321
322 /* srcpad overrides */
323 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
324     GstObject * parent, GstEvent * event);
325 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
326     GstObject * parent, GstPadMode mode, gboolean active);
327 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
328 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
329     GstObject * parent, GstQuery * query);
330
331 static void
332 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
333 static GstClockTime
334 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
335     gboolean active, guint64 base_time);
336 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
337
338 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
339 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
340
341 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
342
343 static void
344 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
345 {
346   GObjectClass *gobject_class;
347   GstElementClass *gstelement_class;
348
349   gobject_class = (GObjectClass *) klass;
350   gstelement_class = (GstElementClass *) klass;
351
352   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
353
354   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
355
356   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
357   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
358
359   /**
360    * GstRtpJitterBuffer::latency:
361    *
362    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
363    * for at most this time.
364    */
365   g_object_class_install_property (gobject_class, PROP_LATENCY,
366       g_param_spec_uint ("latency", "Buffer latency in ms",
367           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   /**
370    * GstRtpJitterBuffer::drop-on-latency:
371    *
372    * Drop oldest buffers when the queue is completely filled.
373    */
374   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
375       g_param_spec_boolean ("drop-on-latency",
376           "Drop buffers when maximum latency is reached",
377           "Tells the jitterbuffer to never exceed the given latency in size",
378           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
379   /**
380    * GstRtpJitterBuffer::ts-offset:
381    *
382    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
383    * This is mainly used to ensure interstream synchronisation.
384    */
385   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
386       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
387           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
388           G_MAXINT64, DEFAULT_TS_OFFSET,
389           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
390
391   /**
392    * GstRtpJitterBuffer::do-lost:
393    *
394    * Send out a GstRTPPacketLost event downstream when a packet is considered
395    * lost.
396    */
397   g_object_class_install_property (gobject_class, PROP_DO_LOST,
398       g_param_spec_boolean ("do-lost", "Do Lost",
399           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
400           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
401
402   /**
403    * GstRtpJitterBuffer::mode:
404    *
405    * Control the buffering and timestamping mode used by the jitterbuffer.
406    */
407   g_object_class_install_property (gobject_class, PROP_MODE,
408       g_param_spec_enum ("mode", "Mode",
409           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
410           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
411   /**
412    * GstRtpJitterBuffer::percent:
413    *
414    * The percent of the jitterbuffer that is filled.
415    *
416    * Since: 0.10.19
417    */
418   g_object_class_install_property (gobject_class, PROP_PERCENT,
419       g_param_spec_int ("percent", "percent",
420           "The buffer filled percent", 0, 100,
421           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
422   /**
423    * GstRtpJitterBuffer::do-retransmission:
424    *
425    * Send out a GstRTPRetransmission event upstream when a packet is considered
426    * late and should be retransmitted.
427    *
428    * Since: 1.2
429    */
430   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
431       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
432           "Send retransmission events upstream when a packet is late",
433           DEFAULT_DO_RETRANSMISSION,
434           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
435
436   /**
437    * GstRtpJitterBuffer::rtx-delay:
438    *
439    * When a packet did not arrive at the expected time, wait this extra amount
440    * of time before sending a retransmission event.
441    *
442    * When -1 is used, the max jitter will be used as extra delay.
443    *
444    * Since: 1.2
445    */
446   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
447       g_param_spec_int ("rtx-delay", "RTX Delay",
448           "Extra time in ms to wait before sending retransmission "
449           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
450           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
451   /**
452    * GstRtpJitterBuffer::rtx-delay-reorder:
453    *
454    * Assume that a retransmission event should be sent when we see
455    * this much packet reordering.
456    *
457    * When -1 is used, the value will be estimated based on observed packet
458    * reordering.
459    *
460    * Since: 1.2
461    */
462   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
463       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
464           "Sending retransmission event when this much reordering (-1 automatic)",
465           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
466           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
467   /**
468    * GstRtpJitterBuffer::rtx-retry-timeout:
469    *
470    * When no packet has been received after sending a retransmission event
471    * for this time, retry sending a retransmission event.
472    *
473    * When -1 is used, the value will be estimated based on observed round
474    * trip time.
475    *
476    * Since: 1.2
477    */
478   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
479       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
480           "Retry sending a transmission event after this timeout in "
481           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
482           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
483   /**
484    * GstRtpJitterBuffer::rtx-retry-period:
485    *
486    * The amount of time to try to get a retransmission.
487    *
488    * When -1 is used, the value will be estimated based on the jitterbuffer
489    * latency and the observed round trip time.
490    *
491    * Since: 1.2
492    */
493   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
494       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
495           "Try to get a retransmission for this many ms "
496           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
497           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
498
499   /**
500    * GstRtpJitterBuffer::request-pt-map:
501    * @buffer: the object which received the signal
502    * @pt: the pt
503    *
504    * Request the payload type as #GstCaps for @pt.
505    */
506   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
507       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
508       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
509           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
510       GST_TYPE_CAPS, 1, G_TYPE_UINT);
511   /**
512    * GstRtpJitterBuffer::handle-sync:
513    * @buffer: the object which received the signal
514    * @struct: a GstStructure containing sync values.
515    *
516    * Be notified of new sync values.
517    */
518   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
519       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
520       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
521           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
522       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
523
524   /**
525    * GstRtpJitterBuffer::on-npt-stop
526    * @buffer: the object which received the signal
527    *
528    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
529    * the npt-stop position.
530    */
531   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
532       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
533       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
534           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
535       G_TYPE_NONE, 0, G_TYPE_NONE);
536
537   /**
538    * GstRtpJitterBuffer::clear-pt-map:
539    * @buffer: the object which received the signal
540    *
541    * Invalidate the clock-rate as obtained with the
542    * #GstRtpJitterBuffer::request-pt-map signal.
543    */
544   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
545       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
546       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
547       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
548       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
549
550   /**
551    * GstRtpJitterBuffer::set-active:
552    * @buffer: the object which received the signal
553    *
554    * Start pushing out packets with the given base time. This signal is only
555    * useful in buffering mode.
556    *
557    * Returns: the time of the last pushed packet.
558    *
559    * Since: 0.10.19
560    */
561   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
562       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
563       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
564       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
565       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
566       G_TYPE_UINT64);
567
568   gstelement_class->change_state =
569       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
570   gstelement_class->request_new_pad =
571       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
572   gstelement_class->release_pad =
573       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
574   gstelement_class->provide_clock =
575       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
576
577   gst_element_class_add_pad_template (gstelement_class,
578       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
579   gst_element_class_add_pad_template (gstelement_class,
580       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
581   gst_element_class_add_pad_template (gstelement_class,
582       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
583
584   gst_element_class_set_static_metadata (gstelement_class,
585       "RTP packet jitter-buffer", "Filter/Network/RTP",
586       "A buffer that deals with network jitter and other transmission faults",
587       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
588       "Wim Taymans <wim.taymans@gmail.com>");
589
590   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
591   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
592
593   GST_DEBUG_CATEGORY_INIT
594       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
595 }
596
597 static void
598 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
599 {
600   GstRtpJitterBufferPrivate *priv;
601
602   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
603   jitterbuffer->priv = priv;
604
605   priv->latency_ms = DEFAULT_LATENCY_MS;
606   priv->latency_ns = priv->latency_ms * GST_MSECOND;
607   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
608   priv->do_lost = DEFAULT_DO_LOST;
609   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
610   priv->rtx_delay = DEFAULT_RTX_DELAY;
611   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
612   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
613   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
614
615   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
616   priv->jbuf = rtp_jitter_buffer_new ();
617   g_mutex_init (&priv->jbuf_lock);
618   g_cond_init (&priv->jbuf_timer);
619   g_cond_init (&priv->jbuf_event);
620
621   /* reset skew detection initialy */
622   rtp_jitter_buffer_reset_skew (priv->jbuf);
623   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
624   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
625   priv->active = TRUE;
626
627   priv->srcpad =
628       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
629       "src");
630
631   gst_pad_set_activatemode_function (priv->srcpad,
632       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
633   gst_pad_set_query_function (priv->srcpad,
634       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
635   gst_pad_set_event_function (priv->srcpad,
636       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
637
638   priv->sinkpad =
639       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
640       "sink");
641
642   gst_pad_set_chain_function (priv->sinkpad,
643       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
644   gst_pad_set_event_function (priv->sinkpad,
645       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
646   gst_pad_set_query_function (priv->sinkpad,
647       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
648
649   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
650   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
651
652   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
653 }
654
655 static void
656 gst_rtp_jitter_buffer_finalize (GObject * object)
657 {
658   GstRtpJitterBuffer *jitterbuffer;
659
660   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
661
662   g_array_free (jitterbuffer->priv->timers, TRUE);
663   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
664   g_cond_clear (&jitterbuffer->priv->jbuf_timer);
665   g_cond_clear (&jitterbuffer->priv->jbuf_event);
666
667   g_object_unref (jitterbuffer->priv->jbuf);
668
669   G_OBJECT_CLASS (parent_class)->finalize (object);
670 }
671
672 static GstIterator *
673 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
674 {
675   GstRtpJitterBuffer *jitterbuffer;
676   GstPad *otherpad = NULL;
677   GstIterator *it;
678   GValue val = { 0, };
679
680   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
681
682   if (pad == jitterbuffer->priv->sinkpad) {
683     otherpad = jitterbuffer->priv->srcpad;
684   } else if (pad == jitterbuffer->priv->srcpad) {
685     otherpad = jitterbuffer->priv->sinkpad;
686   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
687     otherpad = NULL;
688   }
689
690   g_value_init (&val, GST_TYPE_PAD);
691   g_value_set_object (&val, otherpad);
692   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
693   g_value_unset (&val);
694
695   return it;
696 }
697
698 static GstPad *
699 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
700 {
701   GstRtpJitterBufferPrivate *priv;
702
703   priv = jitterbuffer->priv;
704
705   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
706
707   priv->rtcpsinkpad =
708       gst_pad_new_from_static_template
709       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
710   gst_pad_set_chain_function (priv->rtcpsinkpad,
711       gst_rtp_jitter_buffer_chain_rtcp);
712   gst_pad_set_event_function (priv->rtcpsinkpad,
713       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
714   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
715       gst_rtp_jitter_buffer_iterate_internal_links);
716   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
717   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
718
719   return priv->rtcpsinkpad;
720 }
721
722 static void
723 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
724 {
725   GstRtpJitterBufferPrivate *priv;
726
727   priv = jitterbuffer->priv;
728
729   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
730
731   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
732
733   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
734   priv->rtcpsinkpad = NULL;
735 }
736
737 static GstPad *
738 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
739     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
740 {
741   GstRtpJitterBuffer *jitterbuffer;
742   GstElementClass *klass;
743   GstPad *result;
744   GstRtpJitterBufferPrivate *priv;
745
746   g_return_val_if_fail (templ != NULL, NULL);
747   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
748
749   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
750   priv = jitterbuffer->priv;
751   klass = GST_ELEMENT_GET_CLASS (element);
752
753   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
754
755   /* figure out the template */
756   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
757     if (priv->rtcpsinkpad != NULL)
758       goto exists;
759
760     result = create_rtcp_sink (jitterbuffer);
761   } else
762     goto wrong_template;
763
764   return result;
765
766   /* ERRORS */
767 wrong_template:
768   {
769     g_warning ("gstrtpjitterbuffer: this is not our template");
770     return NULL;
771   }
772 exists:
773   {
774     g_warning ("gstrtpjitterbuffer: pad already requested");
775     return NULL;
776   }
777 }
778
779 static void
780 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
781 {
782   GstRtpJitterBuffer *jitterbuffer;
783   GstRtpJitterBufferPrivate *priv;
784
785   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
786   g_return_if_fail (GST_IS_PAD (pad));
787
788   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
789   priv = jitterbuffer->priv;
790
791   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
792
793   if (priv->rtcpsinkpad == pad) {
794     remove_rtcp_sink (jitterbuffer);
795   } else
796     goto wrong_pad;
797
798   return;
799
800   /* ERRORS */
801 wrong_pad:
802   {
803     g_warning ("gstjitterbuffer: asked to release an unknown pad");
804     return;
805   }
806 }
807
808 static GstClock *
809 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
810 {
811   return gst_system_clock_obtain ();
812 }
813
814 static void
815 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
816 {
817   GstRtpJitterBufferPrivate *priv;
818
819   priv = jitterbuffer->priv;
820
821   /* this will trigger a new pt-map request signal, FIXME, do something better. */
822
823   JBUF_LOCK (priv);
824   priv->clock_rate = -1;
825   /* do not clear current content, but refresh state for new arrival */
826   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
827   rtp_jitter_buffer_reset_skew (priv->jbuf);
828   priv->last_popped_seqnum = -1;
829   priv->next_seqnum = -1;
830   JBUF_UNLOCK (priv);
831 }
832
833 static GstClockTime
834 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
835     guint64 offset)
836 {
837   GstRtpJitterBufferPrivate *priv;
838   GstClockTime last_out;
839   GstBuffer *head;
840
841   priv = jbuf->priv;
842
843   JBUF_LOCK (priv);
844   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
845       active, GST_TIME_ARGS (offset));
846
847   if (active != priv->active) {
848     /* add the amount of time spent in paused to the output offset. All
849      * outgoing buffers will have this offset applied to their timestamps in
850      * order to make them arrive in time in the sink. */
851     priv->out_offset = offset;
852     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
853         GST_TIME_ARGS (priv->out_offset));
854     priv->active = active;
855     JBUF_SIGNAL_EVENT (priv);
856   }
857   if (!active) {
858     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
859   }
860   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
861     /* head buffer timestamp and offset gives our output time */
862     last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
863   } else {
864     /* use last known time when the buffer is empty */
865     last_out = priv->last_out_time;
866   }
867   JBUF_UNLOCK (priv);
868
869   return last_out;
870 }
871
872 static GstCaps *
873 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
874 {
875   GstRtpJitterBuffer *jitterbuffer;
876   GstRtpJitterBufferPrivate *priv;
877   GstPad *other;
878   GstCaps *caps;
879   GstCaps *templ;
880
881   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
882   priv = jitterbuffer->priv;
883
884   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
885
886   caps = gst_pad_peer_query_caps (other, filter);
887
888   templ = gst_pad_get_pad_template_caps (pad);
889   if (caps == NULL) {
890     GST_DEBUG_OBJECT (jitterbuffer, "use template");
891     caps = templ;
892   } else {
893     GstCaps *intersect;
894
895     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
896
897     intersect = gst_caps_intersect (caps, templ);
898     gst_caps_unref (caps);
899     gst_caps_unref (templ);
900
901     caps = intersect;
902   }
903   gst_object_unref (jitterbuffer);
904
905   return caps;
906 }
907
908 /*
909  * Must be called with JBUF_LOCK held
910  */
911
912 static gboolean
913 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
914     GstCaps * caps)
915 {
916   GstRtpJitterBufferPrivate *priv;
917   GstStructure *caps_struct;
918   guint val;
919   GstClockTime tval;
920
921   priv = jitterbuffer->priv;
922
923   /* first parse the caps */
924   caps_struct = gst_caps_get_structure (caps, 0);
925
926   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
927
928   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
929    * measure the amount of data in the buffer */
930   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
931     goto error;
932
933   if (priv->clock_rate <= 0)
934     goto wrong_rate;
935
936   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
937
938   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
939    * can use this to track the amount of time elapsed on the sender. */
940   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
941     priv->clock_base = val;
942   else
943     priv->clock_base = -1;
944
945   priv->ext_timestamp = priv->clock_base;
946
947   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
948       priv->clock_base);
949
950   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
951     /* first expected seqnum, only update when we didn't have a previous base. */
952     if (priv->next_in_seqnum == -1)
953       priv->next_in_seqnum = val;
954     if (priv->next_seqnum == -1)
955       priv->next_seqnum = val;
956   }
957
958   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
959
960   /* the start and stop times. The seqnum-base corresponds to the start time. We
961    * will keep track of the seqnums on the output and when we reach the one
962    * corresponding to npt-stop, we emit the npt-stop-reached signal */
963   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
964     priv->npt_start = tval;
965   else
966     priv->npt_start = 0;
967
968   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
969     priv->npt_stop = tval;
970   else
971     priv->npt_stop = -1;
972
973   GST_DEBUG_OBJECT (jitterbuffer,
974       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
975       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
976
977   return TRUE;
978
979   /* ERRORS */
980 error:
981   {
982     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
983     return FALSE;
984   }
985 wrong_rate:
986   {
987     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
988     return FALSE;
989   }
990 }
991
992 static void
993 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
994 {
995   GstRtpJitterBufferPrivate *priv;
996
997   priv = jitterbuffer->priv;
998
999   JBUF_LOCK (priv);
1000   /* mark ourselves as flushing */
1001   priv->srcresult = GST_FLOW_FLUSHING;
1002   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1003   /* this unblocks any waiting pops on the src pad task */
1004   JBUF_SIGNAL_EVENT (priv);
1005   /* unlock clock, we just unschedule, the entry will be released by the
1006    * locking streaming thread. */
1007   unschedule_current_timer (jitterbuffer);
1008   JBUF_UNLOCK (priv);
1009 }
1010
1011 static void
1012 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1013 {
1014   GstRtpJitterBufferPrivate *priv;
1015
1016   priv = jitterbuffer->priv;
1017
1018   JBUF_LOCK (priv);
1019   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1020   /* Mark as non flushing */
1021   priv->srcresult = GST_FLOW_OK;
1022   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1023   priv->last_popped_seqnum = -1;
1024   priv->last_out_time = -1;
1025   priv->next_seqnum = -1;
1026   priv->ips_rtptime = -1;
1027   priv->ips_dts = GST_CLOCK_TIME_NONE;
1028   priv->packet_spacing = 0;
1029   priv->next_in_seqnum = -1;
1030   priv->clock_rate = -1;
1031   priv->eos = FALSE;
1032   priv->estimated_eos = -1;
1033   priv->last_elapsed = 0;
1034   priv->ext_timestamp = -1;
1035   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1036   rtp_jitter_buffer_flush (priv->jbuf);
1037   rtp_jitter_buffer_reset_skew (priv->jbuf);
1038   remove_all_timers (jitterbuffer);
1039   JBUF_UNLOCK (priv);
1040 }
1041
1042 static gboolean
1043 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1044     GstPadMode mode, gboolean active)
1045 {
1046   gboolean result;
1047   GstRtpJitterBuffer *jitterbuffer = NULL;
1048
1049   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1050
1051   switch (mode) {
1052     case GST_PAD_MODE_PUSH:
1053       if (active) {
1054         /* allow data processing */
1055         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1056
1057         /* start pushing out buffers */
1058         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1059         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1060             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1061       } else {
1062         /* make sure all data processing stops ASAP */
1063         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1064
1065         /* NOTE this will hardlock if the state change is called from the src pad
1066          * task thread because we will _join() the thread. */
1067         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1068         result = gst_pad_stop_task (pad);
1069       }
1070       break;
1071     default:
1072       result = FALSE;
1073       break;
1074   }
1075   return result;
1076 }
1077
1078 static GstStateChangeReturn
1079 gst_rtp_jitter_buffer_change_state (GstElement * element,
1080     GstStateChange transition)
1081 {
1082   GstRtpJitterBuffer *jitterbuffer;
1083   GstRtpJitterBufferPrivate *priv;
1084   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1085
1086   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1087   priv = jitterbuffer->priv;
1088
1089   switch (transition) {
1090     case GST_STATE_CHANGE_NULL_TO_READY:
1091       break;
1092     case GST_STATE_CHANGE_READY_TO_PAUSED:
1093       JBUF_LOCK (priv);
1094       /* reset negotiated values */
1095       priv->clock_rate = -1;
1096       priv->clock_base = -1;
1097       priv->peer_latency = 0;
1098       priv->last_pt = -1;
1099       /* block until we go to PLAYING */
1100       priv->blocked = TRUE;
1101       priv->timer_running = TRUE;
1102       priv->timer_thread =
1103           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1104       JBUF_UNLOCK (priv);
1105       break;
1106     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1107       JBUF_LOCK (priv);
1108       /* unblock to allow streaming in PLAYING */
1109       priv->blocked = FALSE;
1110       JBUF_SIGNAL_EVENT (priv);
1111       JBUF_UNLOCK (priv);
1112       break;
1113     default:
1114       break;
1115   }
1116
1117   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1118
1119   switch (transition) {
1120     case GST_STATE_CHANGE_READY_TO_PAUSED:
1121       /* we are a live element because we sync to the clock, which we can only
1122        * do in the PLAYING state */
1123       if (ret != GST_STATE_CHANGE_FAILURE)
1124         ret = GST_STATE_CHANGE_NO_PREROLL;
1125       break;
1126     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1127       JBUF_LOCK (priv);
1128       /* block to stop streaming when PAUSED */
1129       priv->blocked = TRUE;
1130       JBUF_UNLOCK (priv);
1131       if (ret != GST_STATE_CHANGE_FAILURE)
1132         ret = GST_STATE_CHANGE_NO_PREROLL;
1133       break;
1134     case GST_STATE_CHANGE_PAUSED_TO_READY:
1135       JBUF_LOCK (priv);
1136       gst_buffer_replace (&priv->last_sr, NULL);
1137       priv->timer_running = FALSE;
1138       JBUF_SIGNAL_TIMER (priv);
1139       JBUF_UNLOCK (priv);
1140       g_thread_join (priv->timer_thread);
1141       priv->timer_thread = NULL;
1142       break;
1143     case GST_STATE_CHANGE_READY_TO_NULL:
1144       break;
1145     default:
1146       break;
1147   }
1148
1149   return ret;
1150 }
1151
1152 static gboolean
1153 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1154     GstEvent * event)
1155 {
1156   gboolean ret = TRUE;
1157   GstRtpJitterBuffer *jitterbuffer;
1158   GstRtpJitterBufferPrivate *priv;
1159
1160   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1161   priv = jitterbuffer->priv;
1162
1163   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1164
1165   switch (GST_EVENT_TYPE (event)) {
1166     case GST_EVENT_LATENCY:
1167     {
1168       GstClockTime latency;
1169
1170       gst_event_parse_latency (event, &latency);
1171
1172       GST_DEBUG_OBJECT (jitterbuffer,
1173           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1174
1175       JBUF_LOCK (priv);
1176       /* adjust the overall buffer delay to the total pipeline latency in
1177        * buffering mode because if downstream consumes too fast (because of
1178        * large latency or queues, we would start rebuffering again. */
1179       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1180           RTP_JITTER_BUFFER_MODE_BUFFER) {
1181         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1182       }
1183       JBUF_UNLOCK (priv);
1184
1185       ret = gst_pad_push_event (priv->sinkpad, event);
1186       break;
1187     }
1188     default:
1189       ret = gst_pad_push_event (priv->sinkpad, event);
1190       break;
1191   }
1192
1193   return ret;
1194 }
1195
1196 static gboolean
1197 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1198     GstEvent * event)
1199 {
1200   gboolean ret = TRUE;
1201   GstRtpJitterBuffer *jitterbuffer;
1202   GstRtpJitterBufferPrivate *priv;
1203
1204   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1205   priv = jitterbuffer->priv;
1206
1207   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1208
1209   switch (GST_EVENT_TYPE (event)) {
1210     case GST_EVENT_CAPS:
1211     {
1212       GstCaps *caps;
1213
1214       gst_event_parse_caps (event, &caps);
1215
1216       JBUF_LOCK (priv);
1217       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1218       JBUF_UNLOCK (priv);
1219
1220       /* set same caps on srcpad on success */
1221       if (ret)
1222         ret = gst_pad_push_event (priv->srcpad, event);
1223       else
1224         gst_event_unref (event);
1225       break;
1226     }
1227     case GST_EVENT_SEGMENT:
1228     {
1229       gst_event_copy_segment (event, &priv->segment);
1230
1231       /* we need time for now */
1232       if (priv->segment.format != GST_FORMAT_TIME)
1233         goto newseg_wrong_format;
1234
1235       GST_DEBUG_OBJECT (jitterbuffer,
1236           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1237
1238       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1239       ret = gst_pad_push_event (priv->srcpad, event);
1240       break;
1241     }
1242     case GST_EVENT_FLUSH_START:
1243       ret = gst_pad_push_event (priv->srcpad, event);
1244       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1245       break;
1246     case GST_EVENT_FLUSH_STOP:
1247       ret = gst_pad_push_event (priv->srcpad, event);
1248       ret =
1249           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1250           GST_PAD_MODE_PUSH, TRUE);
1251       break;
1252     case GST_EVENT_EOS:
1253     {
1254       /* push EOS in queue. We always push it at the head */
1255       JBUF_LOCK (priv);
1256       /* check for flushing, we need to discard the event and return FALSE when
1257        * we are flushing */
1258       ret = priv->srcresult == GST_FLOW_OK;
1259       if (ret && !priv->eos) {
1260         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1261         priv->eos = TRUE;
1262         JBUF_SIGNAL_EVENT (priv);
1263       } else if (priv->eos) {
1264         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1265       } else {
1266         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1267             gst_flow_get_name (priv->srcresult));
1268       }
1269       JBUF_UNLOCK (priv);
1270       gst_event_unref (event);
1271       break;
1272     }
1273     default:
1274       ret = gst_pad_push_event (priv->srcpad, event);
1275       break;
1276   }
1277
1278 done:
1279
1280   return ret;
1281
1282   /* ERRORS */
1283 newseg_wrong_format:
1284   {
1285     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1286     ret = FALSE;
1287     gst_event_unref (event);
1288     goto done;
1289   }
1290 }
1291
1292 static gboolean
1293 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1294     GstEvent * event)
1295 {
1296   gboolean ret = TRUE;
1297   GstRtpJitterBuffer *jitterbuffer;
1298
1299   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1300
1301   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1302
1303   switch (GST_EVENT_TYPE (event)) {
1304     case GST_EVENT_FLUSH_START:
1305       gst_event_unref (event);
1306       break;
1307     case GST_EVENT_FLUSH_STOP:
1308       gst_event_unref (event);
1309       break;
1310     default:
1311       ret = gst_pad_event_default (pad, parent, event);
1312       break;
1313   }
1314
1315   return ret;
1316 }
1317
1318 /*
1319  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1320  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1321  * GST_FLOW_FLUSHING when the element is shutting down. On success
1322  * GST_FLOW_OK is returned.
1323  */
1324 static GstFlowReturn
1325 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1326     guint8 pt)
1327 {
1328   GValue ret = { 0 };
1329   GValue args[2] = { {0}, {0} };
1330   GstCaps *caps;
1331   gboolean res;
1332
1333   g_value_init (&args[0], GST_TYPE_ELEMENT);
1334   g_value_set_object (&args[0], jitterbuffer);
1335   g_value_init (&args[1], G_TYPE_UINT);
1336   g_value_set_uint (&args[1], pt);
1337
1338   g_value_init (&ret, GST_TYPE_CAPS);
1339   g_value_set_boxed (&ret, NULL);
1340
1341   JBUF_UNLOCK (jitterbuffer->priv);
1342   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1343       &ret);
1344   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1345
1346   g_value_unset (&args[0]);
1347   g_value_unset (&args[1]);
1348   caps = (GstCaps *) g_value_dup_boxed (&ret);
1349   g_value_unset (&ret);
1350   if (!caps)
1351     goto no_caps;
1352
1353   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1354   gst_caps_unref (caps);
1355
1356   if (G_UNLIKELY (!res))
1357     goto parse_failed;
1358
1359   return GST_FLOW_OK;
1360
1361   /* ERRORS */
1362 no_caps:
1363   {
1364     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1365     return GST_FLOW_ERROR;
1366   }
1367 out_flushing:
1368   {
1369     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1370     return GST_FLOW_FLUSHING;
1371   }
1372 parse_failed:
1373   {
1374     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1375     return GST_FLOW_ERROR;
1376   }
1377 }
1378
1379 /* call with jbuf lock held */
1380 static void
1381 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1382 {
1383   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1384
1385   /* too short a stream, or too close to EOS will never really fill buffer */
1386   if (*percent != -1 && priv->npt_stop != -1 &&
1387       priv->npt_stop - priv->npt_start <=
1388       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1389     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1390     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1391     *percent = 100;
1392   }
1393 }
1394
1395 static void
1396 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1397 {
1398   GstMessage *message;
1399
1400   /* Post a buffering message */
1401   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1402   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1403
1404   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1405 }
1406
1407 static GstClockTime
1408 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1409 {
1410   GstRtpJitterBufferPrivate *priv;
1411
1412   priv = jitterbuffer->priv;
1413
1414   if (timestamp == -1)
1415     return -1;
1416
1417   /* apply the timestamp offset, this is used for inter stream sync */
1418   timestamp += priv->ts_offset;
1419   /* add the offset, this is used when buffering */
1420   timestamp += priv->out_offset;
1421
1422   return timestamp;
1423 }
1424
1425 static TimerData *
1426 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1427 {
1428   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1429   TimerData *timer = NULL;
1430   gint i, len;
1431
1432   len = priv->timers->len;
1433   for (i = 0; i < len; i++) {
1434     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1435     if (test->seqnum == seqnum && test->type == type) {
1436       timer = test;
1437       break;
1438     }
1439   }
1440   return timer;
1441 }
1442
1443 static void
1444 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1445 {
1446   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1447
1448   if (priv->clock_id) {
1449     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1450     gst_clock_id_unschedule (priv->clock_id);
1451     priv->unscheduled = TRUE;
1452   }
1453 }
1454
1455 static GstClockTime
1456 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1457 {
1458   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1459   GstClockTime test_timeout;
1460
1461   if ((test_timeout = timer->timeout) == -1)
1462     return -1;
1463
1464   if (timer->type != TIMER_TYPE_EXPECTED) {
1465     /* add our latency and offset to get output times. */
1466     test_timeout = apply_offset (jitterbuffer, test_timeout);
1467     test_timeout += priv->latency_ns;
1468   }
1469   return test_timeout;
1470 }
1471
1472 static void
1473 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1474 {
1475   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1476
1477   if (priv->clock_id) {
1478     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1479
1480     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1481         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1482
1483     if (timeout == -1 || timeout < priv->timer_timeout)
1484       unschedule_current_timer (jitterbuffer);
1485   }
1486 }
1487
1488 static TimerData *
1489 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1490     guint16 seqnum, GstClockTime timeout)
1491 {
1492   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1493   TimerData *timer;
1494   gint len;
1495
1496   GST_DEBUG_OBJECT (jitterbuffer,
1497       "add timer for seqnum %d to %" GST_TIME_FORMAT,
1498       seqnum, GST_TIME_ARGS (timeout));
1499
1500   len = priv->timers->len;
1501   g_array_set_size (priv->timers, len + 1);
1502   timer = &g_array_index (priv->timers, TimerData, len);
1503   timer->idx = len;
1504   timer->type = type;
1505   timer->seqnum = seqnum;
1506   timer->timeout = timeout;
1507   if (type == TIMER_TYPE_EXPECTED) {
1508     timer->rtx_base = timeout;
1509     timer->rtx_retry = 0;
1510   }
1511   recalculate_timer (jitterbuffer, timer);
1512   JBUF_SIGNAL_TIMER (priv);
1513
1514   return timer;
1515 }
1516
1517 static void
1518 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1519     guint16 seqnum, GstClockTime timeout)
1520 {
1521   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1522   gboolean seqchange, timechange;
1523   guint16 oldseq;
1524
1525   seqchange = timer->seqnum != seqnum;
1526   timechange = timer->timeout != timeout;
1527
1528   if (!seqchange && !timechange)
1529     return;
1530
1531   oldseq = timer->seqnum;
1532
1533   GST_DEBUG_OBJECT (jitterbuffer,
1534       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1535       oldseq, seqnum, GST_TIME_ARGS (timeout));
1536
1537   timer->timeout = timeout;
1538   timer->seqnum = seqnum;
1539   if (seqchange && timer->type == TIMER_TYPE_EXPECTED) {
1540     timer->rtx_base = timeout;
1541     timer->rtx_retry = 0;
1542   }
1543
1544   if (priv->clock_id) {
1545     /* we changed the seqnum and there is a timer currently waiting with this
1546      * seqnum, unschedule it */
1547     if (seqchange && priv->timer_seqnum == oldseq)
1548       unschedule_current_timer (jitterbuffer);
1549     /* we changed the time, check if it is earlier than what we are waiting
1550      * for and unschedule if so */
1551     else if (timechange)
1552       recalculate_timer (jitterbuffer, timer);
1553   }
1554 }
1555
1556 static TimerData *
1557 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1558     guint16 seqnum, GstClockTime timeout)
1559 {
1560   TimerData *timer;
1561
1562   /* find the seqnum timer */
1563   timer = find_timer (jitterbuffer, type, seqnum);
1564   if (timer == NULL) {
1565     timer = add_timer (jitterbuffer, type, seqnum, timeout);
1566   } else {
1567     reschedule_timer (jitterbuffer, timer, seqnum, timeout);
1568   }
1569   return timer;
1570 }
1571
1572 static void
1573 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1574 {
1575   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1576   guint idx;
1577
1578   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1579     unschedule_current_timer (jitterbuffer);
1580
1581   idx = timer->idx;
1582   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1583   g_array_remove_index_fast (priv->timers, idx);
1584   timer->idx = idx;
1585 }
1586
1587 static void
1588 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1589 {
1590   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1591   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1592   g_array_set_size (priv->timers, 0);
1593   unschedule_current_timer (jitterbuffer);
1594 }
1595
1596 /* we just received a packet with seqnum and dts.
1597  *
1598  * First check for old seqnum that we are still expecting. If the gap with the
1599  * current timestamp is too big, unschedule the timeouts.
1600  *
1601  * If we have a valid packet spacing estimate we can set a timer for when we
1602  * should receive the next packet.
1603  * If we don't have a valid estimate, we remove any timer we might have
1604  * had for this packet.
1605  */
1606 static void
1607 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1608     GstClockTime dts, gboolean do_next_seqnum)
1609 {
1610   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1611   TimerData *timer = NULL;
1612   gint i, len;
1613
1614   /* go through all timers and unschedule the ones with a large gap, also find
1615    * the timer for the seqnum */
1616   len = priv->timers->len;
1617   for (i = 0; i < len; i++) {
1618     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1619     gint gap;
1620
1621     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1622
1623     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1624         test->seqnum, seqnum, gap);
1625
1626     if (gap == 0) {
1627       GST_DEBUG ("found timer for current seqnum");
1628       /* the timer for the current seqnum */
1629       timer = test;
1630     } else if (gap > priv->rtx_delay_reorder) {
1631       /* max gap, we exceeded the max reorder distance and we don't expect the
1632        * missing packet to be this reordered */
1633       if (test->rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1634         reschedule_timer (jitterbuffer, test, test->seqnum, -1);
1635     }
1636   }
1637
1638   if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1639     GstClockTime expected;
1640
1641     /* calculate expected arrival time of the next seqnum */
1642     expected = dts + priv->packet_spacing + (priv->rtx_delay * GST_MSECOND);
1643     /* and update/install timer for next seqnum */
1644     if (timer)
1645       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected);
1646     else
1647       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum,
1648           expected);
1649   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1650     /* if we had a timer, remove it, we don't know when to expect the next
1651      * packet. */
1652     remove_timer (jitterbuffer, timer);
1653     JBUF_SIGNAL_EVENT (priv);
1654   }
1655 }
1656
1657 static void
1658 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1659     GstClockTime dts)
1660 {
1661   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1662
1663   /* we need consecutive seqnums with a different
1664    * rtptime to estimate the packet spacing. */
1665   if (priv->ips_rtptime != rtptime) {
1666     /* rtptime changed, check dts diff */
1667     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1668       priv->packet_spacing = dts - priv->ips_dts;
1669       GST_DEBUG_OBJECT (jitterbuffer,
1670           "new packet spacing %" GST_TIME_FORMAT,
1671           GST_TIME_ARGS (priv->packet_spacing));
1672     }
1673     priv->ips_rtptime = rtptime;
1674     priv->ips_dts = dts;
1675   }
1676 }
1677
1678 static void
1679 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1680     guint16 seqnum, GstClockTime dts, gint gap)
1681 {
1682   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1683   GstClockTime duration, expected_dts;
1684   TimerType type;
1685
1686   GST_DEBUG_OBJECT (jitterbuffer,
1687       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1688       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1689
1690   /* interpolate between the current time and the last time based on
1691    * number of packets we are missing, this is the estimated duration
1692    * for the missing packet based on equidistant packet spacing. Also make
1693    * sure we never go negative. */
1694   if (dts >= priv->last_in_dts)
1695     duration = (dts - priv->last_in_dts) / (gap + 1);
1696   else
1697     /* packet already lost, timer will timeout quickly */
1698     duration = 0;
1699
1700   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1701       GST_TIME_ARGS (duration));
1702
1703   expected_dts = priv->last_in_dts + duration;
1704
1705   if (priv->do_retransmission) {
1706     type = TIMER_TYPE_EXPECTED;
1707     if (find_timer (jitterbuffer, type, expected))
1708       expected++;
1709   } else {
1710     type = TIMER_TYPE_LOST;
1711   }
1712
1713   while (expected < seqnum) {
1714     add_timer (jitterbuffer, type, expected, expected_dts);
1715     expected_dts += duration;
1716     expected++;
1717   }
1718 }
1719
1720 static GstFlowReturn
1721 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1722     GstBuffer * buffer)
1723 {
1724   GstRtpJitterBuffer *jitterbuffer;
1725   GstRtpJitterBufferPrivate *priv;
1726   guint16 seqnum;
1727   guint32 expected, rtptime;
1728   GstFlowReturn ret = GST_FLOW_OK;
1729   GstClockTime dts, pts;
1730   guint64 latency_ts;
1731   gboolean tail;
1732   gint percent = -1;
1733   guint8 pt;
1734   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1735   gboolean do_next_seqnum = FALSE;
1736
1737   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1738
1739   priv = jitterbuffer->priv;
1740
1741   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1742     goto invalid_buffer;
1743
1744   pt = gst_rtp_buffer_get_payload_type (&rtp);
1745   seqnum = gst_rtp_buffer_get_seq (&rtp);
1746   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1747   gst_rtp_buffer_unmap (&rtp);
1748
1749   /* make sure we have PTS and DTS set */
1750   pts = GST_BUFFER_PTS (buffer);
1751   dts = GST_BUFFER_DTS (buffer);
1752   if (dts == -1)
1753     dts = pts;
1754   else if (pts == -1)
1755     pts = dts;
1756
1757   /* take the DTS of the buffer. This is the time when the packet was
1758    * received and is used to calculate jitter and clock skew. We will adjust
1759    * this DTS with the smoothed value after processing it in the
1760    * jitterbuffer and assign it as the PTS. */
1761   /* bring to running time */
1762   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1763
1764   GST_DEBUG_OBJECT (jitterbuffer,
1765       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
1766       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
1767
1768   JBUF_LOCK_CHECK (priv, out_flushing);
1769
1770   if (G_UNLIKELY (priv->last_pt != pt)) {
1771     GstCaps *caps;
1772
1773     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1774         pt);
1775
1776     priv->last_pt = pt;
1777     /* reset clock-rate so that we get a new one */
1778     priv->clock_rate = -1;
1779
1780     /* Try to get the clock-rate from the caps first if we can. If there are no
1781      * caps we must fire the signal to get the clock-rate. */
1782     if ((caps = gst_pad_get_current_caps (pad))) {
1783       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1784       gst_caps_unref (caps);
1785     }
1786   }
1787
1788   if (G_UNLIKELY (priv->clock_rate == -1)) {
1789     /* no clock rate given on the caps, try to get one with the signal */
1790     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1791             pt) == GST_FLOW_FLUSHING)
1792       goto out_flushing;
1793
1794     if (G_UNLIKELY (priv->clock_rate == -1))
1795       goto no_clock_rate;
1796   }
1797
1798   /* don't accept more data on EOS */
1799   if (G_UNLIKELY (priv->eos))
1800     goto have_eos;
1801
1802   expected = priv->next_in_seqnum;
1803
1804   /* now check against our expected seqnum */
1805   if (G_LIKELY (expected != -1)) {
1806     gint gap;
1807
1808     /* now calculate gap */
1809     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
1810
1811     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1812         expected, seqnum, gap);
1813
1814     if (G_LIKELY (gap == 0)) {
1815       /* packet is expected */
1816       calculate_packet_spacing (jitterbuffer, rtptime, dts);
1817       do_next_seqnum = TRUE;
1818     } else {
1819       gboolean reset = FALSE;
1820
1821       if (gap < 0) {
1822         /* we received an old packet */
1823         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
1824           /* too old packet, reset */
1825           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
1826               -RTP_MAX_MISORDER);
1827           reset = TRUE;
1828         } else {
1829           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
1830         }
1831       } else {
1832         /* new packet, we are missing some packets */
1833         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1834           /* packet too far in future, reset */
1835           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
1836               RTP_MAX_DROPOUT);
1837           reset = TRUE;
1838         } else {
1839           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
1840           /* fill in the gap with EXPECTED timers */
1841           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
1842           do_next_seqnum = TRUE;
1843         }
1844       }
1845       if (G_UNLIKELY (reset)) {
1846         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1847         rtp_jitter_buffer_flush (priv->jbuf);
1848         rtp_jitter_buffer_reset_skew (priv->jbuf);
1849         remove_all_timers (jitterbuffer);
1850         priv->last_popped_seqnum = -1;
1851         priv->next_seqnum = seqnum;
1852         do_next_seqnum = TRUE;
1853       }
1854       /* reset spacing estimation when gap */
1855       priv->ips_rtptime = -1;
1856       priv->ips_dts = GST_CLOCK_TIME_NONE;
1857     }
1858   } else {
1859     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
1860     /* we don't know what the next_in_seqnum should be, wait for the last
1861      * possible moment to push this buffer, maybe we get an earlier seqnum
1862      * while we wait */
1863     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
1864     do_next_seqnum = TRUE;
1865   }
1866   if (do_next_seqnum) {
1867     priv->last_in_seqnum = seqnum;
1868     priv->last_in_dts = dts;
1869     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1870   }
1871
1872   /* let's check if this buffer is too late, we can only accept packets with
1873    * bigger seqnum than the one we last pushed. */
1874   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1875     gint gap;
1876
1877     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1878
1879     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1880     if (G_UNLIKELY (gap <= 0))
1881       goto too_late;
1882   }
1883
1884   /* let's drop oldest packet if the queue is already full and drop-on-latency
1885    * is set. We can only do this when there actually is a latency. When no
1886    * latency is set, we just pump it in the queue and let the other end push it
1887    * out as fast as possible. */
1888   if (priv->latency_ms && priv->drop_on_latency) {
1889     latency_ts =
1890         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1891
1892     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1893       GstBuffer *old_buf;
1894
1895       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1896
1897       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1898           old_buf);
1899
1900       gst_buffer_unref (old_buf);
1901     }
1902   }
1903
1904   /* we need to make the metadata writable before pushing it in the jitterbuffer
1905    * because the jitterbuffer will update the PTS */
1906   buffer = gst_buffer_make_writable (buffer);
1907   GST_BUFFER_DTS (buffer) = dts;
1908   GST_BUFFER_PTS (buffer) = pts;
1909
1910   /* now insert the packet into the queue in sorted order. This function returns
1911    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1912    * have a duplicate. */
1913   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
1914               priv->clock_rate, &tail, &percent)))
1915     goto duplicate;
1916
1917   /* update timers */
1918   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
1919
1920   /* we had an unhandled SR, handle it now */
1921   if (priv->last_sr)
1922     do_handle_sync (jitterbuffer);
1923
1924   /* signal addition of new buffer when the _loop is waiting. */
1925   if (priv->active && priv->waiting_timer)
1926     JBUF_SIGNAL_EVENT (priv);
1927
1928   /* let's unschedule and unblock any waiting buffers. We only want to do this
1929    * when the tail buffer changed */
1930   if (G_UNLIKELY (priv->clock_id && tail)) {
1931     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
1932     unschedule_current_timer (jitterbuffer);
1933   }
1934
1935   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1936       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1937
1938   check_buffering_percent (jitterbuffer, &percent);
1939
1940 finished:
1941   JBUF_UNLOCK (priv);
1942
1943   if (percent != -1)
1944     post_buffering_percent (jitterbuffer, percent);
1945
1946   return ret;
1947
1948   /* ERRORS */
1949 invalid_buffer:
1950   {
1951     /* this is not fatal but should be filtered earlier */
1952     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1953         ("Received invalid RTP payload, dropping"));
1954     gst_buffer_unref (buffer);
1955     return GST_FLOW_OK;
1956   }
1957 no_clock_rate:
1958   {
1959     GST_WARNING_OBJECT (jitterbuffer,
1960         "No clock-rate in caps!, dropping buffer");
1961     gst_buffer_unref (buffer);
1962     goto finished;
1963   }
1964 out_flushing:
1965   {
1966     ret = priv->srcresult;
1967     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1968     gst_buffer_unref (buffer);
1969     goto finished;
1970   }
1971 have_eos:
1972   {
1973     ret = GST_FLOW_EOS;
1974     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1975     gst_buffer_unref (buffer);
1976     goto finished;
1977   }
1978 too_late:
1979   {
1980     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1981         " popped, dropping", seqnum, priv->last_popped_seqnum);
1982     priv->num_late++;
1983     gst_buffer_unref (buffer);
1984     goto finished;
1985   }
1986 duplicate:
1987   {
1988     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1989         seqnum);
1990     priv->num_duplicates++;
1991     gst_buffer_unref (buffer);
1992     goto finished;
1993   }
1994 }
1995
1996 static GstClockTime
1997 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1998 {
1999   guint64 ext_time, elapsed;
2000   guint32 rtp_time;
2001   GstRtpJitterBufferPrivate *priv;
2002   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2003
2004   priv = jitterbuffer->priv;
2005   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2006   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
2007   gst_rtp_buffer_unmap (&rtp);
2008
2009   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2010       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2011
2012   if (rtp_time < priv->ext_timestamp) {
2013     ext_time = priv->ext_timestamp;
2014   } else {
2015     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2016   }
2017
2018   if (ext_time > priv->clock_base)
2019     elapsed = ext_time - priv->clock_base;
2020   else
2021     elapsed = 0;
2022
2023   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2024   return elapsed;
2025 }
2026
2027 static void
2028 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2029 {
2030   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2031
2032   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2033       && priv->clock_base != -1 && priv->clock_rate > 0) {
2034     guint64 elapsed, estimated;
2035
2036     elapsed = compute_elapsed (jitterbuffer, outbuf);
2037
2038     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2039       guint64 left;
2040       GstClockTime out_time;
2041
2042       priv->last_elapsed = elapsed;
2043
2044       left = priv->npt_stop - priv->npt_start;
2045       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2046           GST_TIME_ARGS (left));
2047
2048       out_time = GST_BUFFER_DTS (outbuf);
2049
2050       if (elapsed > 0)
2051         estimated = gst_util_uint64_scale (out_time, left, elapsed);
2052       else {
2053         /* if there is almost nothing left,
2054          * we may never advance enough to end up in the above case */
2055         if (left < GST_SECOND)
2056           estimated = GST_SECOND;
2057         else
2058           estimated = -1;
2059       }
2060
2061       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2062           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2063
2064       if (estimated != -1 && priv->estimated_eos != estimated) {
2065         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2066         priv->estimated_eos = estimated;
2067       }
2068     }
2069   }
2070 }
2071
2072 /* take a buffer from the queue and push it */
2073 static GstFlowReturn
2074 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2075 {
2076   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2077   GstFlowReturn result;
2078   GstBuffer *outbuf;
2079   GstClockTime dts, pts;
2080   gint percent = -1;
2081
2082   /* when we get here we are ready to pop and push the buffer */
2083   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2084
2085   check_buffering_percent (jitterbuffer, &percent);
2086
2087   if (G_UNLIKELY (priv->discont)) {
2088     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2089      * into the jitterbuffer so we can modify now. */
2090     GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2091     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2092     priv->discont = FALSE;
2093   }
2094   if (G_UNLIKELY (priv->ts_discont)) {
2095     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2096     priv->ts_discont = FALSE;
2097   }
2098
2099   dts = GST_BUFFER_DTS (outbuf);
2100   pts = GST_BUFFER_PTS (outbuf);
2101
2102   dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
2103   pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
2104
2105   /* apply timestamp with offset to buffer now */
2106   GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2107   GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2108
2109   /* update the elapsed time when we need to check against the npt stop time. */
2110   update_estimated_eos (jitterbuffer, outbuf);
2111
2112   /* now we are ready to push the buffer. Save the seqnum and release the lock
2113    * so the other end can push stuff in the queue again. */
2114   priv->last_popped_seqnum = seqnum;
2115   priv->last_out_time = GST_BUFFER_PTS (outbuf);
2116   priv->next_seqnum = (seqnum + 1) & 0xffff;
2117   JBUF_UNLOCK (priv);
2118
2119   if (percent != -1)
2120     post_buffering_percent (jitterbuffer, percent);
2121
2122   /* push buffer */
2123   GST_DEBUG_OBJECT (jitterbuffer,
2124       "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2125       seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2126       GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2127
2128   result = gst_pad_push (priv->srcpad, outbuf);
2129
2130   JBUF_LOCK_CHECK (priv, out_flushing);
2131
2132   return result;
2133
2134   /* ERRORS */
2135 out_flushing:
2136   {
2137     return priv->srcresult;
2138   }
2139 }
2140
2141 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2142
2143 /* Peek a buffer and compare the seqnum to the expected seqnum.
2144  * If all is fine, the buffer is pushed.
2145  * If something is wrong, we wait for some event
2146  */
2147 static GstFlowReturn
2148 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2149 {
2150   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2151   GstFlowReturn result = GST_FLOW_OK;
2152   GstBuffer *outbuf;
2153   guint16 seqnum;
2154   guint32 next_seqnum;
2155   gint gap;
2156   GstRTPBuffer rtp = { NULL, };
2157
2158   /* only push buffers when PLAYING and active and not buffering */
2159   if (priv->blocked || !priv->active ||
2160       rtp_jitter_buffer_is_buffering (priv->jbuf))
2161     return GST_FLOW_WAIT;
2162
2163 again:
2164   /* peek a buffer, we're just looking at the sequence number.
2165    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2166    * wait for a timeout or something to change.
2167    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2168   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
2169   if (outbuf == NULL)
2170     goto wait;
2171
2172   /* get the seqnum and the next expected seqnum */
2173   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2174   seqnum = gst_rtp_buffer_get_seq (&rtp);
2175   gst_rtp_buffer_unmap (&rtp);
2176
2177   next_seqnum = priv->next_seqnum;
2178
2179   /* get the gap between this and the previous packet. If we don't know the
2180    * previous packet seqnum assume no gap. */
2181   if (G_UNLIKELY (next_seqnum == -1)) {
2182     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2183     /* we don't know what the next_seqnum should be, the chain function should
2184      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2185      * fires, so wait for that */
2186     result = GST_FLOW_WAIT;
2187   } else {
2188     /* else calculate GAP */
2189     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2190
2191     if (G_LIKELY (gap == 0)) {
2192       /* no missing packet, pop and push */
2193       result = pop_and_push_next (jitterbuffer, seqnum);
2194     } else if (G_UNLIKELY (gap < 0)) {
2195       /* if we have a packet that we already pushed or considered dropped, pop it
2196        * off and get the next packet */
2197       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2198           seqnum, next_seqnum);
2199       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2200       gst_buffer_unref (outbuf);
2201       goto again;
2202     } else {
2203       /* the chain function has scheduled timers to request retransmission or
2204        * when to consider the packet lost, wait for that */
2205       GST_DEBUG_OBJECT (jitterbuffer,
2206           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2207           next_seqnum, seqnum, gap);
2208       result = GST_FLOW_WAIT;
2209     }
2210   }
2211   return result;
2212
2213 wait:
2214   {
2215     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2216     if (priv->eos)
2217       result = GST_FLOW_EOS;
2218     else
2219       result = GST_FLOW_WAIT;
2220     return result;
2221   }
2222 }
2223
2224 /* the timeout for when we expected a packet expired */
2225 static void
2226 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2227     GstClockTimeDiff clock_jitter)
2228 {
2229   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2230   GstEvent *event;
2231
2232   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2233
2234   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2235       gst_structure_new ("GstRTPRetransmissionRequest",
2236           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2237           "running-time", G_TYPE_UINT64, timer->rtx_base,
2238           "delay", G_TYPE_UINT, GST_TIME_AS_MSECONDS (timer->rtx_retry),
2239           "frequency", G_TYPE_UINT, priv->rtx_retry_timeout,
2240           "period", G_TYPE_UINT, priv->rtx_retry_period,
2241           "deadline", G_TYPE_UINT, priv->latency_ms,
2242           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, NULL));
2243
2244   JBUF_UNLOCK (priv);
2245   gst_pad_push_event (priv->sinkpad, event);
2246   JBUF_LOCK (priv);
2247
2248   /* calculate the timeout for the next retransmission attempt */
2249   timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2250   if (timer->rtx_retry > (priv->rtx_retry_period * GST_MSECOND)) {
2251     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2252     /* too many retransmission request, we now convert the timer
2253      * to a lost timer */
2254     timer->type = TIMER_TYPE_LOST;
2255     timer->rtx_retry = 0;
2256   }
2257   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2258       timer->rtx_base + timer->rtx_retry);
2259 }
2260
2261 /* a packet is lost */
2262 static void
2263 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2264     GstClockTimeDiff clock_jitter)
2265 {
2266   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2267   GstClockTime duration = GST_CLOCK_TIME_NONE;
2268   guint32 lost_packets = 1;
2269   gboolean lost_packets_late = FALSE;
2270
2271 #if 0
2272   if (clock_jitter > 0
2273       && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
2274     GstClockTimeDiff total_duration;
2275     GstClockTime out_time_diff;
2276
2277     out_time_diff =
2278         apply_offset (jitterbuffer, timer->timeout) - timer->timeout;
2279     total_duration = MIN (out_time_diff, clock_jitter);
2280
2281     if (duration > 0)
2282       lost_packets = total_duration / duration;
2283     else
2284       lost_packets = gap;
2285     total_duration = lost_packets * duration;
2286
2287     GST_DEBUG_OBJECT (jitterbuffer,
2288         "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
2289         ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
2290         GST_TIME_ARGS (clock_jitter),
2291         lost_packets, GST_TIME_ARGS (total_duration));
2292
2293     duration = total_duration;
2294     lost_packets_late = TRUE;
2295   }
2296 #endif
2297
2298   /* we had a gap and thus we lost some packets. Create an event for this.  */
2299   if (lost_packets > 1)
2300     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
2301         timer->seqnum + lost_packets - 1);
2302   else
2303     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
2304
2305   priv->num_late += lost_packets;
2306   priv->discont = TRUE;
2307
2308   /* update our expected next packet */
2309   priv->last_popped_seqnum = timer->seqnum;
2310   priv->last_out_time = apply_offset (jitterbuffer, timer->timeout);
2311   if (timer->seqnum + lost_packets > priv->next_seqnum)
2312     priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
2313   /* remove timer now */
2314   remove_timer (jitterbuffer, timer);
2315   JBUF_SIGNAL_EVENT (priv);
2316
2317   if (priv->do_lost) {
2318     GstEvent *event;
2319
2320     /* create paket lost event */
2321     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2322         gst_structure_new ("GstRTPPacketLost",
2323             "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
2324             "timestamp", G_TYPE_UINT64, priv->last_out_time,
2325             "duration", G_TYPE_UINT64, duration,
2326             "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
2327     JBUF_UNLOCK (priv);
2328     gst_pad_push_event (priv->srcpad, event);
2329     JBUF_LOCK (priv);
2330   }
2331 }
2332
2333 static void
2334 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2335 {
2336   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2337
2338   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2339   remove_timer (jitterbuffer, timer);
2340   JBUF_SIGNAL_EVENT (priv);
2341 }
2342
2343 static void
2344 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2345     GstClockTimeDiff clock_jitter)
2346 {
2347   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2348
2349   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2350
2351   priv->next_seqnum = timer->seqnum;
2352   remove_timer (jitterbuffer, timer);
2353   JBUF_SIGNAL_EVENT (priv);
2354 }
2355
2356 /* called when we need to wait for the next timeout.
2357  *
2358  * We loop over the array of recorded timeouts and wait for the earliest one.
2359  * When it timed out, do the logic associated with the timer.
2360  *
2361  * If there are no timers, we wait on a gcond until something new happens.
2362  */
2363 static void
2364 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2365 {
2366   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2367
2368   JBUF_LOCK (priv);
2369   while (priv->timer_running) {
2370     TimerData *timer = NULL;
2371     GstClockTime timer_timeout = -1;
2372     gint i, len;
2373     gint timer_idx;
2374
2375     len = priv->timers->len;
2376     for (i = 0; i < len; i++) {
2377       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2378       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2379
2380       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2381           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2382
2383       /* find the smallest timeout */
2384       if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
2385         timer = test;
2386         timer_timeout = test_timeout;
2387         if (timer_timeout == -1)
2388           break;
2389       }
2390     }
2391     if (timer) {
2392       GstClock *clock;
2393       GstClockTime sync_time;
2394       GstClockID id;
2395       GstClockReturn ret;
2396       GstClockTimeDiff clock_jitter;
2397
2398       /* no timestamp, timeout immeditately */
2399       if (timer_timeout == -1)
2400         goto do_timeout;
2401
2402       GST_OBJECT_LOCK (jitterbuffer);
2403       clock = GST_ELEMENT_CLOCK (jitterbuffer);
2404       if (!clock) {
2405         GST_OBJECT_UNLOCK (jitterbuffer);
2406         /* let's just push if there is no clock */
2407         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2408         goto do_timeout;
2409       }
2410
2411       /* prepare for sync against clock */
2412       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2413       /* add latency of peer to get input time */
2414       sync_time += priv->peer_latency;
2415
2416       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2417           " with sync time %" GST_TIME_FORMAT,
2418           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2419
2420       /* create an entry for the clock */
2421       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2422       priv->unscheduled = FALSE;
2423       priv->timer_timeout = timer_timeout;
2424       priv->timer_seqnum = timer->seqnum;
2425       timer_idx = timer->idx;
2426       GST_OBJECT_UNLOCK (jitterbuffer);
2427
2428       /* release the lock so that the other end can push stuff or unlock */
2429       JBUF_UNLOCK (priv);
2430
2431       ret = gst_clock_id_wait (id, &clock_jitter);
2432
2433       JBUF_LOCK (priv);
2434       if (!priv->timer_running)
2435         break;
2436
2437       GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2438           ret, priv->timer_seqnum, clock_jitter);
2439       /* and free the entry */
2440       gst_clock_id_unref (id);
2441       priv->clock_id = NULL;
2442
2443       if (priv->timers->len <= timer_idx)
2444         continue;
2445
2446       /* we released the lock, the array might have changed */
2447       timer = &g_array_index (priv->timers, TimerData, timer_idx);
2448       /* if changed to timeout immediately, do so */
2449       if (timer->timeout == -1)
2450         goto do_timeout;
2451
2452       /* if we got unscheduled and we are not flushing, it's because a new tail
2453        * element became available in the queue or we flushed the queue.
2454        * Grab it and try to push or sync. */
2455       if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
2456         GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
2457         continue;
2458       }
2459
2460     do_timeout:
2461       switch (timer->type) {
2462         case TIMER_TYPE_EXPECTED:
2463           do_expected_timeout (jitterbuffer, timer, clock_jitter);
2464           break;
2465         case TIMER_TYPE_LOST:
2466           do_lost_timeout (jitterbuffer, timer, clock_jitter);
2467           break;
2468         case TIMER_TYPE_DEADLINE:
2469           do_deadline_timeout (jitterbuffer, timer, clock_jitter);
2470           break;
2471         case TIMER_TYPE_EOS:
2472           do_eos_timeout (jitterbuffer, timer);
2473           break;
2474       }
2475     } else {
2476       /* no timers, wait for activity */
2477       GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2478       JBUF_WAIT_TIMER (priv);
2479       GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2480     }
2481   }
2482   JBUF_UNLOCK (priv);
2483
2484   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2485   return;
2486 }
2487
2488 /*
2489  * This funcion implements the main pushing loop on the source pad.
2490  *
2491  * It first tries to push as many buffers as possible. If there is a seqnum
2492  * mismatch, we wait for the next timeouts.
2493  */
2494 static void
2495 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2496 {
2497   GstRtpJitterBufferPrivate *priv;
2498   GstFlowReturn result;
2499
2500   priv = jitterbuffer->priv;
2501
2502   JBUF_LOCK_CHECK (priv, flushing);
2503   do {
2504     result = handle_next_buffer (jitterbuffer);
2505     if (G_LIKELY (result == GST_FLOW_WAIT)) {
2506       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
2507       /* now wait for the next event */
2508       JBUF_WAIT_EVENT (priv, flushing);
2509       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
2510       result = GST_FLOW_OK;
2511     }
2512   }
2513   while (result == GST_FLOW_OK);
2514   JBUF_UNLOCK (priv);
2515
2516   /* if we get here we need to pause */
2517   goto pause;
2518
2519   /* ERRORS */
2520 flushing:
2521   {
2522     result = priv->srcresult;
2523     JBUF_UNLOCK (priv);
2524     goto pause;
2525   }
2526 pause:
2527   {
2528     const gchar *reason = gst_flow_get_name (result);
2529     GstEvent *event;
2530
2531     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2532     gst_pad_pause_task (priv->srcpad);
2533     if (result == GST_FLOW_EOS) {
2534       event = gst_event_new_eos ();
2535       gst_pad_push_event (priv->srcpad, event);
2536     }
2537     return;
2538   }
2539 }
2540
2541 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2542  * some sanity checks and then emit the handle-sync signal with the parameters.
2543  * This function must be called with the LOCK */
2544 static void
2545 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2546 {
2547   GstRtpJitterBufferPrivate *priv;
2548   guint64 base_rtptime, base_time;
2549   guint32 clock_rate;
2550   guint64 last_rtptime;
2551   guint64 clock_base;
2552   guint64 ext_rtptime, diff;
2553   gboolean drop = FALSE;
2554
2555   priv = jitterbuffer->priv;
2556
2557   /* get the last values from the jitterbuffer */
2558   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2559       &clock_rate, &last_rtptime);
2560
2561   clock_base = priv->clock_base;
2562   ext_rtptime = priv->ext_rtptime;
2563
2564   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2565       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2566       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2567       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2568
2569   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2570     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2571     drop = TRUE;
2572   } else {
2573     /* we can't accept anything that happened before we did the last resync */
2574     if (base_rtptime > ext_rtptime) {
2575       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2576       drop = TRUE;
2577     } else {
2578       /* the SR RTP timestamp must be something close to what we last observed
2579        * in the jitterbuffer */
2580       if (ext_rtptime > last_rtptime) {
2581         /* check how far ahead it is to our RTP timestamps */
2582         diff = ext_rtptime - last_rtptime;
2583         /* if bigger than 1 second, we drop it */
2584         if (diff > clock_rate) {
2585           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2586           /* should drop this, but some RTSP servers end up with bogus
2587            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2588            * so still trigger rptbin sync but invalidate RTCP data
2589            * (sync might use other methods) */
2590           ext_rtptime = -1;
2591         }
2592         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2593             G_GUINT64_FORMAT, last_rtptime, diff);
2594       }
2595     }
2596   }
2597
2598   if (!drop) {
2599     GstStructure *s;
2600
2601     s = gst_structure_new ("application/x-rtp-sync",
2602         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2603         "base-time", G_TYPE_UINT64, base_time,
2604         "clock-rate", G_TYPE_UINT, clock_rate,
2605         "clock-base", G_TYPE_UINT64, clock_base,
2606         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2607         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2608
2609     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2610     gst_buffer_replace (&priv->last_sr, NULL);
2611     JBUF_UNLOCK (priv);
2612     g_signal_emit (jitterbuffer,
2613         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2614     JBUF_LOCK (priv);
2615     gst_structure_free (s);
2616   } else {
2617     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2618   }
2619 }
2620
2621 static GstFlowReturn
2622 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2623     GstBuffer * buffer)
2624 {
2625   GstRtpJitterBuffer *jitterbuffer;
2626   GstRtpJitterBufferPrivate *priv;
2627   GstFlowReturn ret = GST_FLOW_OK;
2628   guint32 ssrc;
2629   GstRTCPPacket packet;
2630   guint64 ext_rtptime;
2631   guint32 rtptime;
2632   GstRTCPBuffer rtcp = { NULL, };
2633
2634   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2635
2636   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2637     goto invalid_buffer;
2638
2639   priv = jitterbuffer->priv;
2640
2641   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2642
2643   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2644     goto empty_buffer;
2645
2646   /* first packet must be SR or RR or else the validate would have failed */
2647   switch (gst_rtcp_packet_get_type (&packet)) {
2648     case GST_RTCP_TYPE_SR:
2649       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2650           NULL, NULL);
2651       break;
2652     default:
2653       goto ignore_buffer;
2654   }
2655   gst_rtcp_buffer_unmap (&rtcp);
2656
2657   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2658
2659   JBUF_LOCK (priv);
2660   /* convert the RTP timestamp to our extended timestamp, using the same offset
2661    * we used in the jitterbuffer */
2662   ext_rtptime = priv->jbuf->ext_rtptime;
2663   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2664
2665   priv->ext_rtptime = ext_rtptime;
2666   gst_buffer_replace (&priv->last_sr, buffer);
2667
2668   do_handle_sync (jitterbuffer);
2669   JBUF_UNLOCK (priv);
2670
2671 done:
2672   gst_buffer_unref (buffer);
2673
2674   return ret;
2675
2676 invalid_buffer:
2677   {
2678     /* this is not fatal but should be filtered earlier */
2679     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2680         ("Received invalid RTCP payload, dropping"));
2681     ret = GST_FLOW_OK;
2682     goto done;
2683   }
2684 empty_buffer:
2685   {
2686     /* this is not fatal but should be filtered earlier */
2687     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2688         ("Received empty RTCP payload, dropping"));
2689     gst_rtcp_buffer_unmap (&rtcp);
2690     ret = GST_FLOW_OK;
2691     goto done;
2692   }
2693 ignore_buffer:
2694   {
2695     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2696     gst_rtcp_buffer_unmap (&rtcp);
2697     ret = GST_FLOW_OK;
2698     goto done;
2699   }
2700 }
2701
2702 static gboolean
2703 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2704     GstQuery * query)
2705 {
2706   gboolean res = FALSE;
2707
2708   switch (GST_QUERY_TYPE (query)) {
2709     case GST_QUERY_CAPS:
2710     {
2711       GstCaps *filter, *caps;
2712
2713       gst_query_parse_caps (query, &filter);
2714       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2715       gst_query_set_caps_result (query, caps);
2716       gst_caps_unref (caps);
2717       res = TRUE;
2718       break;
2719     }
2720     default:
2721       if (GST_QUERY_IS_SERIALIZED (query)) {
2722         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2723         res = FALSE;
2724       } else {
2725         res = gst_pad_query_default (pad, parent, query);
2726       }
2727       break;
2728   }
2729   return res;
2730 }
2731
2732 static gboolean
2733 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2734     GstQuery * query)
2735 {
2736   GstRtpJitterBuffer *jitterbuffer;
2737   GstRtpJitterBufferPrivate *priv;
2738   gboolean res = FALSE;
2739
2740   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2741   priv = jitterbuffer->priv;
2742
2743   switch (GST_QUERY_TYPE (query)) {
2744     case GST_QUERY_LATENCY:
2745     {
2746       /* We need to send the query upstream and add the returned latency to our
2747        * own */
2748       GstClockTime min_latency, max_latency;
2749       gboolean us_live;
2750       GstClockTime our_latency;
2751
2752       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2753         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2754
2755         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2756             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2757             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2758
2759         /* store this so that we can safely sync on the peer buffers. */
2760         JBUF_LOCK (priv);
2761         priv->peer_latency = min_latency;
2762         our_latency = priv->latency_ns;
2763         JBUF_UNLOCK (priv);
2764
2765         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2766             GST_TIME_ARGS (our_latency));
2767
2768         /* we add some latency but can buffer an infinite amount of time */
2769         min_latency += our_latency;
2770         max_latency = -1;
2771
2772         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2773             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2774             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2775
2776         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2777       }
2778       break;
2779     }
2780     case GST_QUERY_POSITION:
2781     {
2782       GstClockTime start, last_out;
2783       GstFormat fmt;
2784
2785       gst_query_parse_position (query, &fmt, NULL);
2786       if (fmt != GST_FORMAT_TIME) {
2787         res = gst_pad_query_default (pad, parent, query);
2788         break;
2789       }
2790
2791       JBUF_LOCK (priv);
2792       start = priv->npt_start;
2793       last_out = priv->last_out_time;
2794       JBUF_UNLOCK (priv);
2795
2796       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2797           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2798           GST_TIME_ARGS (last_out));
2799
2800       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2801         /* bring 0-based outgoing time to stream time */
2802         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2803         res = TRUE;
2804       } else {
2805         res = gst_pad_query_default (pad, parent, query);
2806       }
2807       break;
2808     }
2809     case GST_QUERY_CAPS:
2810     {
2811       GstCaps *filter, *caps;
2812
2813       gst_query_parse_caps (query, &filter);
2814       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2815       gst_query_set_caps_result (query, caps);
2816       gst_caps_unref (caps);
2817       res = TRUE;
2818       break;
2819     }
2820     default:
2821       res = gst_pad_query_default (pad, parent, query);
2822       break;
2823   }
2824
2825   return res;
2826 }
2827
2828 static void
2829 gst_rtp_jitter_buffer_set_property (GObject * object,
2830     guint prop_id, const GValue * value, GParamSpec * pspec)
2831 {
2832   GstRtpJitterBuffer *jitterbuffer;
2833   GstRtpJitterBufferPrivate *priv;
2834
2835   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2836   priv = jitterbuffer->priv;
2837
2838   switch (prop_id) {
2839     case PROP_LATENCY:
2840     {
2841       guint new_latency, old_latency;
2842
2843       new_latency = g_value_get_uint (value);
2844
2845       JBUF_LOCK (priv);
2846       old_latency = priv->latency_ms;
2847       priv->latency_ms = new_latency;
2848       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2849       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2850       JBUF_UNLOCK (priv);
2851
2852       /* post message if latency changed, this will inform the parent pipeline
2853        * that a latency reconfiguration is possible/needed. */
2854       if (new_latency != old_latency) {
2855         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2856             GST_TIME_ARGS (new_latency * GST_MSECOND));
2857
2858         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2859             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2860       }
2861       break;
2862     }
2863     case PROP_DROP_ON_LATENCY:
2864       JBUF_LOCK (priv);
2865       priv->drop_on_latency = g_value_get_boolean (value);
2866       JBUF_UNLOCK (priv);
2867       break;
2868     case PROP_TS_OFFSET:
2869       JBUF_LOCK (priv);
2870       priv->ts_offset = g_value_get_int64 (value);
2871       priv->ts_discont = TRUE;
2872       JBUF_UNLOCK (priv);
2873       break;
2874     case PROP_DO_LOST:
2875       JBUF_LOCK (priv);
2876       priv->do_lost = g_value_get_boolean (value);
2877       JBUF_UNLOCK (priv);
2878       break;
2879     case PROP_MODE:
2880       JBUF_LOCK (priv);
2881       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2882       JBUF_UNLOCK (priv);
2883       break;
2884     case PROP_DO_RETRANSMISSION:
2885       JBUF_LOCK (priv);
2886       priv->do_retransmission = g_value_get_boolean (value);
2887       JBUF_UNLOCK (priv);
2888       break;
2889     case PROP_RTX_DELAY:
2890       JBUF_LOCK (priv);
2891       priv->rtx_delay = g_value_get_int (value);
2892       JBUF_UNLOCK (priv);
2893       break;
2894     case PROP_RTX_DELAY_REORDER:
2895       JBUF_LOCK (priv);
2896       priv->rtx_delay_reorder = g_value_get_int (value);
2897       JBUF_UNLOCK (priv);
2898       break;
2899     case PROP_RTX_RETRY_TIMEOUT:
2900       JBUF_LOCK (priv);
2901       priv->rtx_retry_timeout = g_value_get_int (value);
2902       JBUF_UNLOCK (priv);
2903       break;
2904     case PROP_RTX_RETRY_PERIOD:
2905       JBUF_LOCK (priv);
2906       priv->rtx_retry_period = g_value_get_int (value);
2907       JBUF_UNLOCK (priv);
2908       break;
2909     default:
2910       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2911       break;
2912   }
2913 }
2914
2915 static void
2916 gst_rtp_jitter_buffer_get_property (GObject * object,
2917     guint prop_id, GValue * value, GParamSpec * pspec)
2918 {
2919   GstRtpJitterBuffer *jitterbuffer;
2920   GstRtpJitterBufferPrivate *priv;
2921
2922   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2923   priv = jitterbuffer->priv;
2924
2925   switch (prop_id) {
2926     case PROP_LATENCY:
2927       JBUF_LOCK (priv);
2928       g_value_set_uint (value, priv->latency_ms);
2929       JBUF_UNLOCK (priv);
2930       break;
2931     case PROP_DROP_ON_LATENCY:
2932       JBUF_LOCK (priv);
2933       g_value_set_boolean (value, priv->drop_on_latency);
2934       JBUF_UNLOCK (priv);
2935       break;
2936     case PROP_TS_OFFSET:
2937       JBUF_LOCK (priv);
2938       g_value_set_int64 (value, priv->ts_offset);
2939       JBUF_UNLOCK (priv);
2940       break;
2941     case PROP_DO_LOST:
2942       JBUF_LOCK (priv);
2943       g_value_set_boolean (value, priv->do_lost);
2944       JBUF_UNLOCK (priv);
2945       break;
2946     case PROP_MODE:
2947       JBUF_LOCK (priv);
2948       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2949       JBUF_UNLOCK (priv);
2950       break;
2951     case PROP_PERCENT:
2952     {
2953       gint percent;
2954
2955       JBUF_LOCK (priv);
2956       if (priv->srcresult != GST_FLOW_OK)
2957         percent = 100;
2958       else
2959         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2960
2961       g_value_set_int (value, percent);
2962       JBUF_UNLOCK (priv);
2963       break;
2964     }
2965     case PROP_DO_RETRANSMISSION:
2966       JBUF_LOCK (priv);
2967       g_value_set_boolean (value, priv->do_retransmission);
2968       JBUF_UNLOCK (priv);
2969       break;
2970     case PROP_RTX_DELAY:
2971       JBUF_LOCK (priv);
2972       g_value_set_int (value, priv->rtx_delay);
2973       JBUF_UNLOCK (priv);
2974       break;
2975     case PROP_RTX_DELAY_REORDER:
2976       JBUF_LOCK (priv);
2977       g_value_set_int (value, priv->rtx_delay_reorder);
2978       JBUF_UNLOCK (priv);
2979       break;
2980     case PROP_RTX_RETRY_TIMEOUT:
2981       JBUF_LOCK (priv);
2982       g_value_set_int (value, priv->rtx_retry_timeout);
2983       JBUF_UNLOCK (priv);
2984       break;
2985     case PROP_RTX_RETRY_PERIOD:
2986       JBUF_LOCK (priv);
2987       g_value_set_int (value, priv->rtx_retry_period);
2988       JBUF_UNLOCK (priv);
2989       break;
2990     default:
2991       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2992       break;
2993   }
2994 }