jitterbuffer: signal timestamp discont
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 #include <gst/glib-compat-private.h>
71
72 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
73 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
74
75 /* RTPJitterBuffer signals and args */
76 enum
77 {
78   SIGNAL_REQUEST_PT_MAP,
79   SIGNAL_CLEAR_PT_MAP,
80   SIGNAL_HANDLE_SYNC,
81   SIGNAL_ON_NPT_STOP,
82   SIGNAL_SET_ACTIVE,
83   LAST_SIGNAL
84 };
85
86 #define DEFAULT_LATENCY_MS      200
87 #define DEFAULT_DROP_ON_LATENCY FALSE
88 #define DEFAULT_TS_OFFSET       0
89 #define DEFAULT_DO_LOST         FALSE
90 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
91 #define DEFAULT_PERCENT         0
92
93 enum
94 {
95   PROP_0,
96   PROP_LATENCY,
97   PROP_DROP_ON_LATENCY,
98   PROP_TS_OFFSET,
99   PROP_DO_LOST,
100   PROP_MODE,
101   PROP_PERCENT,
102   PROP_LAST
103 };
104
105 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
106
107 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
108   JBUF_LOCK (priv);                                   \
109   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
110     goto label;                                       \
111 } G_STMT_END
112
113 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
114 #define JBUF_WAIT(priv)   (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
115
116 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
117   JBUF_WAIT(priv);                                    \
118   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
119     goto label;                                       \
120 } G_STMT_END
121
122 #define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
123
124 struct _GstRtpJitterBufferPrivate
125 {
126   GstPad *sinkpad, *srcpad;
127   GstPad *rtcpsinkpad;
128
129   RTPJitterBuffer *jbuf;
130   GMutex jbuf_lock;
131   GCond jbuf_cond;
132   gboolean waiting;
133   gboolean discont;
134   gboolean ts_discont;
135   gboolean active;
136   guint64 out_offset;
137
138   /* properties */
139   guint latency_ms;
140   guint64 latency_ns;
141   gboolean drop_on_latency;
142   gint64 ts_offset;
143   gboolean do_lost;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum we push */
148   guint32 next_seqnum;
149   /* last output time */
150   GstClockTime last_out_time;
151   /* the next expected seqnum we receive */
152   guint32 next_in_seqnum;
153
154   /* start and stop ranges */
155   GstClockTime npt_start;
156   GstClockTime npt_stop;
157   guint64 ext_timestamp;
158   guint64 last_elapsed;
159   guint64 estimated_eos;
160   GstClockID eos_id;
161   gboolean reached_npt_stop;
162
163   /* state */
164   gboolean eos;
165
166   /* clock rate and rtp timestamp offset */
167   gint last_pt;
168   gint32 clock_rate;
169   gint64 clock_base;
170   gint64 prev_ts_offset;
171
172   /* when we are shutting down */
173   GstFlowReturn srcresult;
174   gboolean blocked;
175
176   /* for sync */
177   GstSegment segment;
178   GstClockID clock_id;
179   gboolean unscheduled;
180   /* the latency of the upstream peer, we have to take this into account when
181    * synchronizing the buffers. */
182   GstClockTime peer_latency;
183   guint64 ext_rtptime;
184   GstBuffer *last_sr;
185
186   /* some accounting */
187   guint64 num_late;
188   guint64 num_duplicates;
189 };
190
191 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
192   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
193                                 GstRtpJitterBufferPrivate))
194
195 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
196 GST_STATIC_PAD_TEMPLATE ("sink",
197     GST_PAD_SINK,
198     GST_PAD_ALWAYS,
199     GST_STATIC_CAPS ("application/x-rtp, "
200         "clock-rate = (int) [ 1, 2147483647 ]"
201         /* "payload = (int) , "
202          * "encoding-name = (string) "
203          */ )
204     );
205
206 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
207 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
208     GST_PAD_SINK,
209     GST_PAD_REQUEST,
210     GST_STATIC_CAPS ("application/x-rtcp")
211     );
212
213 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
214 GST_STATIC_PAD_TEMPLATE ("src",
215     GST_PAD_SRC,
216     GST_PAD_ALWAYS,
217     GST_STATIC_CAPS ("application/x-rtp"
218         /* "payload = (int) , "
219          * "clock-rate = (int) , "
220          * "encoding-name = (string) "
221          */ )
222     );
223
224 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
225
226 #define gst_rtp_jitter_buffer_parent_class parent_class
227 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
228
229 /* object overrides */
230 static void gst_rtp_jitter_buffer_set_property (GObject * object,
231     guint prop_id, const GValue * value, GParamSpec * pspec);
232 static void gst_rtp_jitter_buffer_get_property (GObject * object,
233     guint prop_id, GValue * value, GParamSpec * pspec);
234 static void gst_rtp_jitter_buffer_finalize (GObject * object);
235
236 /* element overrides */
237 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
238     * element, GstStateChange transition);
239 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
240     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
241 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
242     GstPad * pad);
243 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
244
245 /* pad overrides */
246 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
247 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
248     GstObject * parent);
249
250 /* sinkpad overrides */
251 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
252     GstObject * parent, GstEvent * event);
253 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
254     GstObject * parent, GstBuffer * buffer);
255
256 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
257     GstObject * parent, GstEvent * event);
258 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
259     GstObject * parent, GstBuffer * buffer);
260
261 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
262     GstObject * parent, GstQuery * query);
263
264 /* srcpad overrides */
265 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
266     GstObject * parent, GstEvent * event);
267 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
268     GstObject * parent, GstPadMode mode, gboolean active);
269 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
270 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
271     GstObject * parent, GstQuery * query);
272
273 static void
274 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
275 static GstClockTime
276 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
277     gboolean active, guint64 base_time);
278
279 static void
280 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
281 {
282   GObjectClass *gobject_class;
283   GstElementClass *gstelement_class;
284
285   gobject_class = (GObjectClass *) klass;
286   gstelement_class = (GstElementClass *) klass;
287
288   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
289
290   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
291
292   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
293   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
294
295   /**
296    * GstRtpJitterBuffer::latency:
297    *
298    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
299    * for at most this time.
300    */
301   g_object_class_install_property (gobject_class, PROP_LATENCY,
302       g_param_spec_uint ("latency", "Buffer latency in ms",
303           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
304           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
305   /**
306    * GstRtpJitterBuffer::drop-on-latency:
307    *
308    * Drop oldest buffers when the queue is completely filled.
309    */
310   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
311       g_param_spec_boolean ("drop-on-latency",
312           "Drop buffers when maximum latency is reached",
313           "Tells the jitterbuffer to never exceed the given latency in size",
314           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315   /**
316    * GstRtpJitterBuffer::ts-offset:
317    *
318    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
319    * This is mainly used to ensure interstream synchronisation.
320    */
321   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
322       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
323           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
324           G_MAXINT64, DEFAULT_TS_OFFSET,
325           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
326
327   /**
328    * GstRtpJitterBuffer::do-lost:
329    *
330    * Send out a GstRTPPacketLost event downstream when a packet is considered
331    * lost.
332    */
333   g_object_class_install_property (gobject_class, PROP_DO_LOST,
334       g_param_spec_boolean ("do-lost", "Do Lost",
335           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
336           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
337
338   /**
339    * GstRtpJitterBuffer::mode:
340    *
341    * Control the buffering and timestamping mode used by the jitterbuffer.
342    */
343   g_object_class_install_property (gobject_class, PROP_MODE,
344       g_param_spec_enum ("mode", "Mode",
345           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
346           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347   /**
348    * GstRtpJitterBuffer::percent:
349    *
350    * The percent of the jitterbuffer that is filled.
351    *
352    * Since: 0.10.19
353    */
354   g_object_class_install_property (gobject_class, PROP_PERCENT,
355       g_param_spec_int ("percent", "percent",
356           "The buffer filled percent", 0, 100,
357           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
358   /**
359    * GstRtpJitterBuffer::request-pt-map:
360    * @buffer: the object which received the signal
361    * @pt: the pt
362    *
363    * Request the payload type as #GstCaps for @pt.
364    */
365   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
366       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
367       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
368           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
369       GST_TYPE_CAPS, 1, G_TYPE_UINT);
370   /**
371    * GstRtpJitterBuffer::handle-sync:
372    * @buffer: the object which received the signal
373    * @struct: a GstStructure containing sync values.
374    *
375    * Be notified of new sync values.
376    */
377   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
378       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
379       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
380           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
381       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
382
383   /**
384    * GstRtpJitterBuffer::on-npt-stop
385    * @buffer: the object which received the signal
386    *
387    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
388    * the npt-stop position.
389    */
390   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
391       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
392       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
393           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
394       G_TYPE_NONE, 0, G_TYPE_NONE);
395
396   /**
397    * GstRtpJitterBuffer::clear-pt-map:
398    * @buffer: the object which received the signal
399    *
400    * Invalidate the clock-rate as obtained with the
401    * #GstRtpJitterBuffer::request-pt-map signal.
402    */
403   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
404       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
405       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
406       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
407       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
408
409   /**
410    * GstRtpJitterBuffer::set-active:
411    * @buffer: the object which received the signal
412    *
413    * Start pushing out packets with the given base time. This signal is only
414    * useful in buffering mode.
415    *
416    * Returns: the time of the last pushed packet.
417    *
418    * Since: 0.10.19
419    */
420   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
421       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
422       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
423       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
424       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
425       G_TYPE_UINT64);
426
427   gstelement_class->change_state =
428       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
429   gstelement_class->request_new_pad =
430       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
431   gstelement_class->release_pad =
432       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
433   gstelement_class->provide_clock =
434       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
435
436   gst_element_class_add_pad_template (gstelement_class,
437       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
438   gst_element_class_add_pad_template (gstelement_class,
439       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
440   gst_element_class_add_pad_template (gstelement_class,
441       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
442
443   gst_element_class_set_static_metadata (gstelement_class,
444       "RTP packet jitter-buffer", "Filter/Network/RTP",
445       "A buffer that deals with network jitter and other transmission faults",
446       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
447       "Wim Taymans <wim.taymans@gmail.com>");
448
449   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
450   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
451
452   GST_DEBUG_CATEGORY_INIT
453       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
454 }
455
456 static void
457 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
458 {
459   GstRtpJitterBufferPrivate *priv;
460
461   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
462   jitterbuffer->priv = priv;
463
464   priv->latency_ms = DEFAULT_LATENCY_MS;
465   priv->latency_ns = priv->latency_ms * GST_MSECOND;
466   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
467   priv->do_lost = DEFAULT_DO_LOST;
468
469   priv->jbuf = rtp_jitter_buffer_new ();
470   g_mutex_init (&priv->jbuf_lock);
471   g_cond_init (&priv->jbuf_cond);
472
473   /* reset skew detection initialy */
474   rtp_jitter_buffer_reset_skew (priv->jbuf);
475   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
476   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
477   priv->active = TRUE;
478
479   priv->srcpad =
480       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
481       "src");
482
483   gst_pad_set_activatemode_function (priv->srcpad,
484       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
485   gst_pad_set_query_function (priv->srcpad,
486       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
487   gst_pad_set_event_function (priv->srcpad,
488       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
489
490   priv->sinkpad =
491       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
492       "sink");
493
494   gst_pad_set_chain_function (priv->sinkpad,
495       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
496   gst_pad_set_event_function (priv->sinkpad,
497       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
498   gst_pad_set_query_function (priv->sinkpad,
499       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
500
501   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
502   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
503
504   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
505 }
506
507 static void
508 gst_rtp_jitter_buffer_finalize (GObject * object)
509 {
510   GstRtpJitterBuffer *jitterbuffer;
511
512   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
513
514   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
515   g_cond_clear (&jitterbuffer->priv->jbuf_cond);
516
517   g_object_unref (jitterbuffer->priv->jbuf);
518
519   G_OBJECT_CLASS (parent_class)->finalize (object);
520 }
521
522 static GstIterator *
523 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
524 {
525   GstRtpJitterBuffer *jitterbuffer;
526   GstPad *otherpad = NULL;
527   GstIterator *it;
528   GValue val = { 0, };
529
530   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
531
532   if (pad == jitterbuffer->priv->sinkpad) {
533     otherpad = jitterbuffer->priv->srcpad;
534   } else if (pad == jitterbuffer->priv->srcpad) {
535     otherpad = jitterbuffer->priv->sinkpad;
536   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
537     otherpad = NULL;
538   }
539
540   g_value_init (&val, GST_TYPE_PAD);
541   g_value_set_object (&val, otherpad);
542   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
543   g_value_unset (&val);
544
545   return it;
546 }
547
548 static GstPad *
549 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
550 {
551   GstRtpJitterBufferPrivate *priv;
552
553   priv = jitterbuffer->priv;
554
555   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
556
557   priv->rtcpsinkpad =
558       gst_pad_new_from_static_template
559       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
560   gst_pad_set_chain_function (priv->rtcpsinkpad,
561       gst_rtp_jitter_buffer_chain_rtcp);
562   gst_pad_set_event_function (priv->rtcpsinkpad,
563       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
564   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
565       gst_rtp_jitter_buffer_iterate_internal_links);
566   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
567   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
568
569   return priv->rtcpsinkpad;
570 }
571
572 static void
573 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
574 {
575   GstRtpJitterBufferPrivate *priv;
576
577   priv = jitterbuffer->priv;
578
579   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
580
581   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
582
583   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
584   priv->rtcpsinkpad = NULL;
585 }
586
587 static GstPad *
588 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
589     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
590 {
591   GstRtpJitterBuffer *jitterbuffer;
592   GstElementClass *klass;
593   GstPad *result;
594   GstRtpJitterBufferPrivate *priv;
595
596   g_return_val_if_fail (templ != NULL, NULL);
597   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
598
599   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
600   priv = jitterbuffer->priv;
601   klass = GST_ELEMENT_GET_CLASS (element);
602
603   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
604
605   /* figure out the template */
606   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
607     if (priv->rtcpsinkpad != NULL)
608       goto exists;
609
610     result = create_rtcp_sink (jitterbuffer);
611   } else
612     goto wrong_template;
613
614   return result;
615
616   /* ERRORS */
617 wrong_template:
618   {
619     g_warning ("gstrtpjitterbuffer: this is not our template");
620     return NULL;
621   }
622 exists:
623   {
624     g_warning ("gstrtpjitterbuffer: pad already requested");
625     return NULL;
626   }
627 }
628
629 static void
630 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
631 {
632   GstRtpJitterBuffer *jitterbuffer;
633   GstRtpJitterBufferPrivate *priv;
634
635   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
636   g_return_if_fail (GST_IS_PAD (pad));
637
638   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
639   priv = jitterbuffer->priv;
640
641   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
642
643   if (priv->rtcpsinkpad == pad) {
644     remove_rtcp_sink (jitterbuffer);
645   } else
646     goto wrong_pad;
647
648   return;
649
650   /* ERRORS */
651 wrong_pad:
652   {
653     g_warning ("gstjitterbuffer: asked to release an unknown pad");
654     return;
655   }
656 }
657
658 static GstClock *
659 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
660 {
661   return gst_system_clock_obtain ();
662 }
663
664 static void
665 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
666 {
667   GstRtpJitterBufferPrivate *priv;
668
669   priv = jitterbuffer->priv;
670
671   /* this will trigger a new pt-map request signal, FIXME, do something better. */
672
673   JBUF_LOCK (priv);
674   priv->clock_rate = -1;
675   /* do not clear current content, but refresh state for new arrival */
676   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
677   rtp_jitter_buffer_reset_skew (priv->jbuf);
678   priv->last_popped_seqnum = -1;
679   priv->next_seqnum = -1;
680   JBUF_UNLOCK (priv);
681 }
682
683 static GstClockTime
684 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
685     guint64 offset)
686 {
687   GstRtpJitterBufferPrivate *priv;
688   GstClockTime last_out;
689   GstBuffer *head;
690
691   priv = jbuf->priv;
692
693   JBUF_LOCK (priv);
694   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
695       active, GST_TIME_ARGS (offset));
696
697   if (active != priv->active) {
698     /* add the amount of time spent in paused to the output offset. All
699      * outgoing buffers will have this offset applied to their timestamps in
700      * order to make them arrive in time in the sink. */
701     priv->out_offset = offset;
702     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
703         GST_TIME_ARGS (priv->out_offset));
704     priv->active = active;
705     JBUF_SIGNAL (priv);
706   }
707   if (!active) {
708     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
709   }
710   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
711     /* head buffer timestamp and offset gives our output time */
712     last_out = GST_BUFFER_TIMESTAMP (head) + priv->ts_offset;
713   } else {
714     /* use last known time when the buffer is empty */
715     last_out = priv->last_out_time;
716   }
717   JBUF_UNLOCK (priv);
718
719   return last_out;
720 }
721
722 static GstCaps *
723 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
724 {
725   GstRtpJitterBuffer *jitterbuffer;
726   GstRtpJitterBufferPrivate *priv;
727   GstPad *other;
728   GstCaps *caps;
729   GstCaps *templ;
730
731   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
732   priv = jitterbuffer->priv;
733
734   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
735
736   caps = gst_pad_peer_query_caps (other, filter);
737
738   templ = gst_pad_get_pad_template_caps (pad);
739   if (caps == NULL) {
740     GST_DEBUG_OBJECT (jitterbuffer, "use template");
741     caps = templ;
742   } else {
743     GstCaps *intersect;
744
745     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
746
747     intersect = gst_caps_intersect (caps, templ);
748     gst_caps_unref (caps);
749     gst_caps_unref (templ);
750
751     caps = intersect;
752   }
753   gst_object_unref (jitterbuffer);
754
755   return caps;
756 }
757
758 /*
759  * Must be called with JBUF_LOCK held
760  */
761
762 static gboolean
763 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
764     GstCaps * caps)
765 {
766   GstRtpJitterBufferPrivate *priv;
767   GstStructure *caps_struct;
768   guint val;
769   GstClockTime tval;
770
771   priv = jitterbuffer->priv;
772
773   /* first parse the caps */
774   caps_struct = gst_caps_get_structure (caps, 0);
775
776   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
777
778   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
779    * measure the amount of data in the buffer */
780   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
781     goto error;
782
783   if (priv->clock_rate <= 0)
784     goto wrong_rate;
785
786   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
787
788   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
789    * can use this to track the amount of time elapsed on the sender. */
790   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
791     priv->clock_base = val;
792   else
793     priv->clock_base = -1;
794
795   priv->ext_timestamp = priv->clock_base;
796
797   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
798       priv->clock_base);
799
800   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
801     /* first expected seqnum, only update when we didn't have a previous base. */
802     if (priv->next_in_seqnum == -1)
803       priv->next_in_seqnum = val;
804     if (priv->next_seqnum == -1)
805       priv->next_seqnum = val;
806   }
807
808   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
809
810   /* the start and stop times. The seqnum-base corresponds to the start time. We
811    * will keep track of the seqnums on the output and when we reach the one
812    * corresponding to npt-stop, we emit the npt-stop-reached signal */
813   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
814     priv->npt_start = tval;
815   else
816     priv->npt_start = 0;
817
818   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
819     priv->npt_stop = tval;
820   else
821     priv->npt_stop = -1;
822
823   GST_DEBUG_OBJECT (jitterbuffer,
824       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
825       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
826
827   return TRUE;
828
829   /* ERRORS */
830 error:
831   {
832     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
833     return FALSE;
834   }
835 wrong_rate:
836   {
837     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
838     return FALSE;
839   }
840 }
841
842 static void
843 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
844 {
845   GstRtpJitterBufferPrivate *priv;
846
847   priv = jitterbuffer->priv;
848
849   JBUF_LOCK (priv);
850   /* mark ourselves as flushing */
851   priv->srcresult = GST_FLOW_FLUSHING;
852   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
853   /* this unblocks any waiting pops on the src pad task */
854   JBUF_SIGNAL (priv);
855   /* unlock clock, we just unschedule, the entry will be released by the
856    * locking streaming thread. */
857   if (priv->clock_id) {
858     gst_clock_id_unschedule (priv->clock_id);
859     priv->unscheduled = TRUE;
860   }
861   JBUF_UNLOCK (priv);
862 }
863
864 static void
865 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
866 {
867   GstRtpJitterBufferPrivate *priv;
868
869   priv = jitterbuffer->priv;
870
871   JBUF_LOCK (priv);
872   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
873   /* Mark as non flushing */
874   priv->srcresult = GST_FLOW_OK;
875   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
876   priv->last_popped_seqnum = -1;
877   priv->last_out_time = -1;
878   priv->next_seqnum = -1;
879   priv->next_in_seqnum = -1;
880   priv->clock_rate = -1;
881   priv->eos = FALSE;
882   priv->estimated_eos = -1;
883   priv->last_elapsed = 0;
884   priv->reached_npt_stop = FALSE;
885   priv->ext_timestamp = -1;
886   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
887   rtp_jitter_buffer_flush (priv->jbuf);
888   rtp_jitter_buffer_reset_skew (priv->jbuf);
889   JBUF_UNLOCK (priv);
890 }
891
892 static gboolean
893 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
894     GstPadMode mode, gboolean active)
895 {
896   gboolean result;
897   GstRtpJitterBuffer *jitterbuffer = NULL;
898
899   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
900
901   switch (mode) {
902     case GST_PAD_MODE_PUSH:
903       if (active) {
904         /* allow data processing */
905         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
906
907         /* start pushing out buffers */
908         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
909         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
910             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
911       } else {
912         /* make sure all data processing stops ASAP */
913         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
914
915         /* NOTE this will hardlock if the state change is called from the src pad
916          * task thread because we will _join() the thread. */
917         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
918         result = gst_pad_stop_task (pad);
919       }
920       break;
921     default:
922       result = FALSE;
923       break;
924   }
925   return result;
926 }
927
928 static GstStateChangeReturn
929 gst_rtp_jitter_buffer_change_state (GstElement * element,
930     GstStateChange transition)
931 {
932   GstRtpJitterBuffer *jitterbuffer;
933   GstRtpJitterBufferPrivate *priv;
934   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
935
936   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
937   priv = jitterbuffer->priv;
938
939   switch (transition) {
940     case GST_STATE_CHANGE_NULL_TO_READY:
941       break;
942     case GST_STATE_CHANGE_READY_TO_PAUSED:
943       JBUF_LOCK (priv);
944       /* reset negotiated values */
945       priv->clock_rate = -1;
946       priv->clock_base = -1;
947       priv->peer_latency = 0;
948       priv->last_pt = -1;
949       /* block until we go to PLAYING */
950       priv->blocked = TRUE;
951       JBUF_UNLOCK (priv);
952       break;
953     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
954       JBUF_LOCK (priv);
955       /* unblock to allow streaming in PLAYING */
956       priv->blocked = FALSE;
957       JBUF_SIGNAL (priv);
958       JBUF_UNLOCK (priv);
959       break;
960     default:
961       break;
962   }
963
964   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
965
966   switch (transition) {
967     case GST_STATE_CHANGE_READY_TO_PAUSED:
968       /* we are a live element because we sync to the clock, which we can only
969        * do in the PLAYING state */
970       if (ret != GST_STATE_CHANGE_FAILURE)
971         ret = GST_STATE_CHANGE_NO_PREROLL;
972       break;
973     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
974       JBUF_LOCK (priv);
975       /* block to stop streaming when PAUSED */
976       priv->blocked = TRUE;
977       JBUF_UNLOCK (priv);
978       if (ret != GST_STATE_CHANGE_FAILURE)
979         ret = GST_STATE_CHANGE_NO_PREROLL;
980       break;
981     case GST_STATE_CHANGE_PAUSED_TO_READY:
982       gst_buffer_replace (&priv->last_sr, NULL);
983       break;
984     case GST_STATE_CHANGE_READY_TO_NULL:
985       break;
986     default:
987       break;
988   }
989
990   return ret;
991 }
992
993 static gboolean
994 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
995     GstEvent * event)
996 {
997   gboolean ret = TRUE;
998   GstRtpJitterBuffer *jitterbuffer;
999   GstRtpJitterBufferPrivate *priv;
1000
1001   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1002   priv = jitterbuffer->priv;
1003
1004   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1005
1006   switch (GST_EVENT_TYPE (event)) {
1007     case GST_EVENT_LATENCY:
1008     {
1009       GstClockTime latency;
1010
1011       gst_event_parse_latency (event, &latency);
1012
1013       JBUF_LOCK (priv);
1014       /* adjust the overall buffer delay to the total pipeline latency in
1015        * buffering mode because if downstream consumes too fast (because of
1016        * large latency or queues, we would start rebuffering again. */
1017       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1018           RTP_JITTER_BUFFER_MODE_BUFFER) {
1019         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1020       }
1021       JBUF_UNLOCK (priv);
1022
1023       ret = gst_pad_push_event (priv->sinkpad, event);
1024       break;
1025     }
1026     default:
1027       ret = gst_pad_push_event (priv->sinkpad, event);
1028       break;
1029   }
1030
1031   return ret;
1032 }
1033
1034 static gboolean
1035 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1036     GstEvent * event)
1037 {
1038   gboolean ret = TRUE;
1039   GstRtpJitterBuffer *jitterbuffer;
1040   GstRtpJitterBufferPrivate *priv;
1041
1042   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1043   priv = jitterbuffer->priv;
1044
1045   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1046
1047   switch (GST_EVENT_TYPE (event)) {
1048     case GST_EVENT_CAPS:
1049     {
1050       GstCaps *caps;
1051
1052       gst_event_parse_caps (event, &caps);
1053
1054       JBUF_LOCK (priv);
1055       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1056       JBUF_UNLOCK (priv);
1057
1058       /* set same caps on srcpad on success */
1059       if (ret)
1060         ret = gst_pad_push_event (priv->srcpad, event);
1061       else
1062         gst_event_unref (event);
1063       break;
1064     }
1065     case GST_EVENT_SEGMENT:
1066     {
1067       gst_event_copy_segment (event, &priv->segment);
1068
1069       /* we need time for now */
1070       if (priv->segment.format != GST_FORMAT_TIME)
1071         goto newseg_wrong_format;
1072
1073       GST_DEBUG_OBJECT (jitterbuffer,
1074           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1075
1076       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1077       ret = gst_pad_push_event (priv->srcpad, event);
1078       break;
1079     }
1080     case GST_EVENT_FLUSH_START:
1081       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1082       ret = gst_pad_push_event (priv->srcpad, event);
1083       break;
1084     case GST_EVENT_FLUSH_STOP:
1085       ret = gst_pad_push_event (priv->srcpad, event);
1086       ret =
1087           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1088           GST_PAD_MODE_PUSH, TRUE);
1089       break;
1090     case GST_EVENT_EOS:
1091     {
1092       /* push EOS in queue. We always push it at the head */
1093       JBUF_LOCK (priv);
1094       /* check for flushing, we need to discard the event and return FALSE when
1095        * we are flushing */
1096       ret = priv->srcresult == GST_FLOW_OK;
1097       if (ret && !priv->eos) {
1098         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1099         priv->eos = TRUE;
1100         JBUF_SIGNAL (priv);
1101       } else if (priv->eos) {
1102         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1103       } else {
1104         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1105             gst_flow_get_name (priv->srcresult));
1106       }
1107       JBUF_UNLOCK (priv);
1108       gst_event_unref (event);
1109       break;
1110     }
1111     default:
1112       ret = gst_pad_push_event (priv->srcpad, event);
1113       break;
1114   }
1115
1116 done:
1117
1118   return ret;
1119
1120   /* ERRORS */
1121 newseg_wrong_format:
1122   {
1123     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1124     ret = FALSE;
1125     gst_event_unref (event);
1126     goto done;
1127   }
1128 }
1129
1130 static gboolean
1131 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1132     GstEvent * event)
1133 {
1134   gboolean ret = TRUE;
1135   GstRtpJitterBuffer *jitterbuffer;
1136
1137   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1138
1139   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1140
1141   switch (GST_EVENT_TYPE (event)) {
1142     case GST_EVENT_FLUSH_START:
1143       gst_event_unref (event);
1144       break;
1145     case GST_EVENT_FLUSH_STOP:
1146       gst_event_unref (event);
1147       break;
1148     default:
1149       ret = gst_pad_event_default (pad, parent, event);
1150       break;
1151   }
1152
1153   return ret;
1154 }
1155
1156 /*
1157  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1158  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1159  * GST_FLOW_FLUSHING when the element is shutting down. On success
1160  * GST_FLOW_OK is returned.
1161  */
1162 static GstFlowReturn
1163 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1164     guint8 pt)
1165 {
1166   GValue ret = { 0 };
1167   GValue args[2] = { {0}, {0} };
1168   GstCaps *caps;
1169   gboolean res;
1170
1171   g_value_init (&args[0], GST_TYPE_ELEMENT);
1172   g_value_set_object (&args[0], jitterbuffer);
1173   g_value_init (&args[1], G_TYPE_UINT);
1174   g_value_set_uint (&args[1], pt);
1175
1176   g_value_init (&ret, GST_TYPE_CAPS);
1177   g_value_set_boxed (&ret, NULL);
1178
1179   JBUF_UNLOCK (jitterbuffer->priv);
1180   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1181       &ret);
1182   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1183
1184   g_value_unset (&args[0]);
1185   g_value_unset (&args[1]);
1186   caps = (GstCaps *) g_value_dup_boxed (&ret);
1187   g_value_unset (&ret);
1188   if (!caps)
1189     goto no_caps;
1190
1191   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1192   gst_caps_unref (caps);
1193
1194   if (G_UNLIKELY (!res))
1195     goto parse_failed;
1196
1197   return GST_FLOW_OK;
1198
1199   /* ERRORS */
1200 no_caps:
1201   {
1202     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1203     return GST_FLOW_ERROR;
1204   }
1205 out_flushing:
1206   {
1207     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1208     return GST_FLOW_FLUSHING;
1209   }
1210 parse_failed:
1211   {
1212     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1213     return GST_FLOW_ERROR;
1214   }
1215 }
1216
1217 /* call with jbuf lock held */
1218 static void
1219 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1220 {
1221   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1222
1223   /* too short a stream, or too close to EOS will never really fill buffer */
1224   if (*percent != -1 && priv->npt_stop != -1 &&
1225       priv->npt_stop - priv->npt_start <=
1226       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1227     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1228     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1229     *percent = 100;
1230   }
1231 }
1232
1233 static void
1234 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1235 {
1236   GstMessage *message;
1237
1238   /* Post a buffering message */
1239   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1240   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1241
1242   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1243 }
1244
1245 static GstFlowReturn
1246 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1247     GstBuffer * buffer)
1248 {
1249   GstRtpJitterBuffer *jitterbuffer;
1250   GstRtpJitterBufferPrivate *priv;
1251   guint16 seqnum;
1252   GstFlowReturn ret = GST_FLOW_OK;
1253   GstClockTime timestamp;
1254   guint64 latency_ts;
1255   gboolean tail;
1256   gint percent = -1;
1257   guint8 pt;
1258   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1259
1260   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1261
1262   priv = jitterbuffer->priv;
1263
1264   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1265     goto invalid_buffer;
1266
1267   pt = gst_rtp_buffer_get_payload_type (&rtp);
1268   seqnum = gst_rtp_buffer_get_seq (&rtp);
1269   gst_rtp_buffer_unmap (&rtp);
1270
1271   /* take the timestamp of the buffer. This is the time when the packet was
1272    * received and is used to calculate jitter and clock skew. We will adjust
1273    * this timestamp with the smoothed value after processing it in the
1274    * jitterbuffer. */
1275   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1276   /* bring to running time */
1277   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
1278       timestamp);
1279
1280   GST_DEBUG_OBJECT (jitterbuffer,
1281       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1282       GST_TIME_ARGS (timestamp));
1283
1284   JBUF_LOCK_CHECK (priv, out_flushing);
1285
1286   if (G_UNLIKELY (priv->last_pt != pt)) {
1287     GstCaps *caps;
1288
1289     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1290         pt);
1291
1292     priv->last_pt = pt;
1293     /* reset clock-rate so that we get a new one */
1294     priv->clock_rate = -1;
1295
1296     /* Try to get the clock-rate from the caps first if we can. If there are no
1297      * caps we must fire the signal to get the clock-rate. */
1298     if ((caps = gst_pad_get_current_caps (pad))) {
1299       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1300       gst_caps_unref (caps);
1301     }
1302   }
1303
1304   if (G_UNLIKELY (priv->clock_rate == -1)) {
1305     /* no clock rate given on the caps, try to get one with the signal */
1306     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1307             pt) == GST_FLOW_FLUSHING)
1308       goto out_flushing;
1309
1310     if (G_UNLIKELY (priv->clock_rate == -1))
1311       goto no_clock_rate;
1312   }
1313
1314   /* don't accept more data on EOS */
1315   if (G_UNLIKELY (priv->eos))
1316     goto have_eos;
1317
1318   /* now check against our expected seqnum */
1319   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1320     gint gap;
1321     gboolean reset = FALSE;
1322
1323     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1324     if (G_UNLIKELY (gap != 0)) {
1325       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1326           priv->next_in_seqnum, seqnum, gap);
1327       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1328        * sender might have been restarted with different seqnum. */
1329       if (gap < -RTP_MAX_MISORDER) {
1330         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1331         reset = TRUE;
1332       }
1333       /* priv->next_in_seqnum < seqnum, this is a new packet */
1334       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1335         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1336             gap);
1337         reset = TRUE;
1338       } else {
1339         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1340       }
1341     }
1342     if (G_UNLIKELY (reset)) {
1343       GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1344       rtp_jitter_buffer_flush (priv->jbuf);
1345       rtp_jitter_buffer_reset_skew (priv->jbuf);
1346       priv->last_popped_seqnum = -1;
1347       priv->next_seqnum = seqnum;
1348     }
1349   }
1350   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1351
1352   /* let's check if this buffer is too late, we can only accept packets with
1353    * bigger seqnum than the one we last pushed. */
1354   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1355     gint gap;
1356
1357     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1358
1359     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1360     if (G_UNLIKELY (gap <= 0))
1361       goto too_late;
1362   }
1363
1364   /* let's drop oldest packet if the queue is already full and drop-on-latency
1365    * is set. We can only do this when there actually is a latency. When no
1366    * latency is set, we just pump it in the queue and let the other end push it
1367    * out as fast as possible. */
1368   if (priv->latency_ms && priv->drop_on_latency) {
1369     latency_ts =
1370         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1371
1372     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1373       GstBuffer *old_buf;
1374
1375       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1376
1377       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1378           old_buf);
1379
1380       gst_buffer_unref (old_buf);
1381     }
1382   }
1383
1384   /* we need to make the metadata writable before pushing it in the jitterbuffer
1385    * because the jitterbuffer will update the timestamp */
1386   buffer = gst_buffer_make_writable (buffer);
1387
1388   /* now insert the packet into the queue in sorted order. This function returns
1389    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1390    * have a duplicate. */
1391   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
1392               priv->clock_rate, &tail, &percent)))
1393     goto duplicate;
1394
1395   /* signal addition of new buffer when the _loop is waiting. */
1396   if (priv->waiting)
1397     JBUF_SIGNAL (priv);
1398
1399   /* let's unschedule and unblock any waiting buffers. We only want to do this
1400    * when the tail buffer changed */
1401   if (G_UNLIKELY (priv->clock_id && tail)) {
1402     GST_DEBUG_OBJECT (jitterbuffer,
1403         "Unscheduling waiting buffer, new tail buffer");
1404     gst_clock_id_unschedule (priv->clock_id);
1405     priv->unscheduled = TRUE;
1406   }
1407
1408   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1409       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1410
1411   check_buffering_percent (jitterbuffer, &percent);
1412
1413 finished:
1414   JBUF_UNLOCK (priv);
1415
1416   if (percent != -1)
1417     post_buffering_percent (jitterbuffer, percent);
1418
1419   return ret;
1420
1421   /* ERRORS */
1422 invalid_buffer:
1423   {
1424     /* this is not fatal but should be filtered earlier */
1425     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1426         ("Received invalid RTP payload, dropping"));
1427     gst_buffer_unref (buffer);
1428     return GST_FLOW_OK;
1429   }
1430 no_clock_rate:
1431   {
1432     GST_WARNING_OBJECT (jitterbuffer,
1433         "No clock-rate in caps!, dropping buffer");
1434     gst_buffer_unref (buffer);
1435     goto finished;
1436   }
1437 out_flushing:
1438   {
1439     ret = priv->srcresult;
1440     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1441     gst_buffer_unref (buffer);
1442     goto finished;
1443   }
1444 have_eos:
1445   {
1446     ret = GST_FLOW_EOS;
1447     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1448     gst_buffer_unref (buffer);
1449     goto finished;
1450   }
1451 too_late:
1452   {
1453     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1454         " popped, dropping", seqnum, priv->last_popped_seqnum);
1455     priv->num_late++;
1456     gst_buffer_unref (buffer);
1457     goto finished;
1458   }
1459 duplicate:
1460   {
1461     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1462         seqnum);
1463     priv->num_duplicates++;
1464     gst_buffer_unref (buffer);
1465     goto finished;
1466   }
1467 }
1468
1469 static GstClockTime
1470 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1471 {
1472   GstRtpJitterBufferPrivate *priv;
1473
1474   priv = jitterbuffer->priv;
1475
1476   if (timestamp == -1)
1477     return -1;
1478
1479   /* apply the timestamp offset, this is used for inter stream sync */
1480   timestamp += priv->ts_offset;
1481   /* add the offset, this is used when buffering */
1482   timestamp += priv->out_offset;
1483
1484   return timestamp;
1485 }
1486
1487 static GstClockTime
1488 get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1489 {
1490   GstClockTime result;
1491   GstRtpJitterBufferPrivate *priv;
1492
1493   priv = jitterbuffer->priv;
1494
1495   result = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1496   /* add latency, this includes our own latency and the peer latency. */
1497   result += priv->latency_ns;
1498   result += priv->peer_latency;
1499
1500   return result;
1501 }
1502
1503 static gboolean
1504 eos_reached (GstClock * clock, GstClockTime time, GstClockID id,
1505     GstRtpJitterBuffer * jitterbuffer)
1506 {
1507   GstRtpJitterBufferPrivate *priv;
1508
1509   priv = jitterbuffer->priv;
1510
1511   JBUF_LOCK_CHECK (priv, flushing);
1512   if (priv->waiting) {
1513     GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
1514     priv->reached_npt_stop = TRUE;
1515     JBUF_SIGNAL (priv);
1516   }
1517   JBUF_UNLOCK (priv);
1518
1519   return TRUE;
1520
1521   /* ERRORS */
1522 flushing:
1523   {
1524     JBUF_UNLOCK (priv);
1525     return FALSE;
1526   }
1527 }
1528
1529 static GstClockTime
1530 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1531 {
1532   guint64 ext_time, elapsed;
1533   guint32 rtp_time;
1534   GstRtpJitterBufferPrivate *priv;
1535   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1536
1537   priv = jitterbuffer->priv;
1538   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1539   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
1540   gst_rtp_buffer_unmap (&rtp);
1541
1542   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1543       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1544
1545   if (rtp_time < priv->ext_timestamp) {
1546     ext_time = priv->ext_timestamp;
1547   } else {
1548     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1549   }
1550
1551   if (ext_time > priv->clock_base)
1552     elapsed = ext_time - priv->clock_base;
1553   else
1554     elapsed = 0;
1555
1556   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1557   return elapsed;
1558 }
1559
1560 /*
1561  * This funcion will push out buffers on the source pad.
1562  *
1563  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
1564  * different seqnum (missing packets before B), this function will wait for the
1565  * missing packet to arrive up to the timestamp of buffer B.
1566  */
1567 static void
1568 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
1569 {
1570   GstRtpJitterBufferPrivate *priv;
1571   GstBuffer *outbuf;
1572   GstFlowReturn result;
1573   guint16 seqnum;
1574   guint32 next_seqnum;
1575   GstClockTime timestamp, out_time;
1576   gboolean discont = FALSE;
1577   gint gap;
1578   GstClock *clock;
1579   GstClockID id;
1580   GstClockTime sync_time;
1581   gint percent = -1;
1582   GstRTPBuffer rtp = { NULL, };
1583
1584   priv = jitterbuffer->priv;
1585
1586   JBUF_LOCK_CHECK (priv, flushing);
1587 again:
1588   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
1589   while (TRUE) {
1590     id = NULL;
1591     /* always wait if we are blocked */
1592     if (G_LIKELY (!priv->blocked)) {
1593       /* we're buffering but not EOS, wait. */
1594       if (!priv->eos && (!priv->active
1595               || rtp_jitter_buffer_is_buffering (priv->jbuf))) {
1596         GstClockTime elapsed, delay, left;
1597
1598         if (priv->estimated_eos == -1)
1599           goto do_wait;
1600
1601         outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1602         if (outbuf != NULL) {
1603           elapsed = compute_elapsed (jitterbuffer, outbuf);
1604           if (GST_BUFFER_DURATION_IS_VALID (outbuf))
1605             elapsed += GST_BUFFER_DURATION (outbuf);
1606         } else {
1607           GST_INFO_OBJECT (jitterbuffer, "no buffer, using last_elapsed");
1608           elapsed = priv->last_elapsed;
1609         }
1610
1611         delay = rtp_jitter_buffer_get_delay (priv->jbuf);
1612
1613         if (priv->estimated_eos > elapsed)
1614           left = priv->estimated_eos - elapsed;
1615         else
1616           left = 0;
1617
1618         GST_INFO_OBJECT (jitterbuffer, "buffering, elapsed %" GST_TIME_FORMAT
1619             " estimated_eos %" GST_TIME_FORMAT " left %" GST_TIME_FORMAT
1620             " delay %" GST_TIME_FORMAT,
1621             GST_TIME_ARGS (elapsed), GST_TIME_ARGS (priv->estimated_eos),
1622             GST_TIME_ARGS (left), GST_TIME_ARGS (delay));
1623         if (left > delay)
1624           goto do_wait;
1625       }
1626       /* if we have a packet, we can exit the loop and grab it */
1627       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1628         break;
1629       /* no packets but we are EOS, do eos logic */
1630       if (G_UNLIKELY (priv->eos))
1631         goto do_eos;
1632       /* underrun, wait for packets or flushing now if we are expecting an EOS
1633        * timeout, set the async timer for it too */
1634       if (priv->estimated_eos != -1 && !priv->reached_npt_stop) {
1635         sync_time = get_sync_time (jitterbuffer, priv->estimated_eos);
1636
1637         GST_OBJECT_LOCK (jitterbuffer);
1638         clock = GST_ELEMENT_CLOCK (jitterbuffer);
1639         if (clock) {
1640           GST_INFO_OBJECT (jitterbuffer, "scheduling timeout");
1641           id = gst_clock_new_single_shot_id (clock, sync_time);
1642           gst_clock_id_wait_async (id, (GstClockCallback) eos_reached,
1643               jitterbuffer, NULL);
1644         }
1645         GST_OBJECT_UNLOCK (jitterbuffer);
1646       }
1647     }
1648   do_wait:
1649     /* now we wait */
1650     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
1651     priv->waiting = TRUE;
1652     JBUF_WAIT (priv);
1653     priv->waiting = FALSE;
1654     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
1655
1656     if (id) {
1657       /* unschedule any pending async notifications we might have */
1658       gst_clock_id_unschedule (id);
1659       gst_clock_id_unref (id);
1660     }
1661     if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
1662       goto flushing;
1663
1664     if (id && priv->reached_npt_stop) {
1665       goto do_npt_stop;
1666     }
1667   }
1668
1669   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1670    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1671    * wait on the timestamp. In the chain function we will unlock the wait when a
1672    * new buffer is available. The peeked buffer is valid for as long as we hold
1673    * the jitterbuffer lock. */
1674   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1675
1676   /* get the seqnum and the next expected seqnum */
1677   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1678   seqnum = gst_rtp_buffer_get_seq (&rtp);
1679   gst_rtp_buffer_unmap (&rtp);
1680   next_seqnum = priv->next_seqnum;
1681
1682   /* get the timestamp, this is already corrected for clock skew by the
1683    * jitterbuffer */
1684   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1685
1686   GST_DEBUG_OBJECT (jitterbuffer,
1687       "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
1688       ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
1689       rtp_jitter_buffer_num_packets (priv->jbuf));
1690
1691   /* apply our timestamp offset to the incomming buffer, this will be our output
1692    * timestamp. */
1693   out_time = apply_offset (jitterbuffer, timestamp);
1694
1695   /* get the gap between this and the previous packet. If we don't know the
1696    * previous packet seqnum assume no gap. */
1697   if (G_LIKELY (next_seqnum != -1)) {
1698     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1699
1700     /* if we have a packet that we already pushed or considered dropped, pop it
1701      * off and get the next packet */
1702     if (G_UNLIKELY (gap < 0)) {
1703       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1704           seqnum, next_seqnum);
1705       outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1706       gst_buffer_unref (outbuf);
1707       goto again;
1708     }
1709   } else {
1710     GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
1711     gap = -1;
1712   }
1713
1714   /* If we don't know what the next seqnum should be (== -1) we have to wait
1715    * because it might be possible that we are not receiving this buffer in-order,
1716    * a buffer with a lower seqnum could arrive later and we want to push that
1717    * earlier buffer before this buffer then.
1718    * If we know the expected seqnum, we can compare it to the current seqnum to
1719    * determine if we have missing a packet. If we have a missing packet (which
1720    * must be before this packet) we can wait for it until the deadline for this
1721    * packet expires. */
1722   if (G_UNLIKELY (gap != 0 && out_time != -1)) {
1723     GstClockReturn ret;
1724     GstClockTime duration = GST_CLOCK_TIME_NONE;
1725     GstClockTimeDiff clock_jitter;
1726     guint32 lost_packets = 1;
1727     gboolean lost_packets_late = FALSE;
1728
1729     if (gap > 0) {
1730       /* we have a gap */
1731       GST_DEBUG_OBJECT (jitterbuffer,
1732           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1733           next_seqnum, seqnum, gap);
1734
1735       if (priv->last_out_time != -1) {
1736         GST_DEBUG_OBJECT (jitterbuffer,
1737             "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1738             GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
1739         /* interpolate between the current time and the last time based on
1740          * number of packets we are missing, this is the estimated duration
1741          * for the missing packet based on equidistant packet spacing. Also make
1742          * sure we never go negative. */
1743         if (out_time >= priv->last_out_time)
1744           duration = (out_time - priv->last_out_time) / (gap + 1);
1745         else
1746           goto lost;
1747
1748         GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1749             GST_TIME_ARGS (duration));
1750         /* add this duration to the timestamp of the last packet we pushed */
1751         out_time = (priv->last_out_time + duration);
1752       }
1753     } else {
1754       /* we don't know what the next_seqnum should be, wait for the last
1755        * possible moment to push this buffer, maybe we get an earlier seqnum
1756        * while we wait */
1757       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1758     }
1759
1760     GST_OBJECT_LOCK (jitterbuffer);
1761     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1762     if (!clock) {
1763       GST_OBJECT_UNLOCK (jitterbuffer);
1764       /* let's just push if there is no clock */
1765       GST_DEBUG_OBJECT (jitterbuffer, "No clock, push right away");
1766       goto push_buffer;
1767     }
1768
1769     /* prepare for sync against clock */
1770     sync_time = get_sync_time (jitterbuffer, out_time);
1771
1772     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
1773         " with sync time %" GST_TIME_FORMAT,
1774         GST_TIME_ARGS (out_time), GST_TIME_ARGS (sync_time));
1775
1776     /* create an entry for the clock */
1777     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1778     priv->unscheduled = FALSE;
1779     GST_OBJECT_UNLOCK (jitterbuffer);
1780
1781     /* release the lock so that the other end can push stuff or unlock */
1782     JBUF_UNLOCK (priv);
1783
1784     ret = gst_clock_id_wait (id, &clock_jitter);
1785
1786     if (ret == GST_CLOCK_EARLY && gap > 0
1787         && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
1788       GstClockTimeDiff total_duration;
1789       GstClockTime out_time_diff;
1790
1791       out_time_diff = apply_offset (jitterbuffer, timestamp) - out_time;
1792       total_duration = MIN (out_time_diff, clock_jitter);
1793
1794       if (duration > 0)
1795         lost_packets = total_duration / duration;
1796       else
1797         lost_packets = gap;
1798       total_duration = lost_packets * duration;
1799
1800       GST_DEBUG_OBJECT (jitterbuffer,
1801           "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
1802           ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
1803           GST_TIME_ARGS (clock_jitter),
1804           lost_packets, GST_TIME_ARGS (total_duration));
1805
1806       duration = total_duration;
1807       lost_packets_late = TRUE;
1808     }
1809
1810     JBUF_LOCK (priv);
1811     /* and free the entry */
1812     gst_clock_id_unref (id);
1813     priv->clock_id = NULL;
1814
1815     /* at this point, the clock could have been unlocked by a timeout, a new
1816      * tail element was added to the queue or because we are shutting down. Check
1817      * for shutdown first. */
1818     if G_UNLIKELY
1819       ((priv->srcresult != GST_FLOW_OK))
1820           goto flushing;
1821
1822     /* if we got unscheduled and we are not flushing, it's because a new tail
1823      * element became available in the queue or we flushed the queue.
1824      * Grab it and try to push or sync. */
1825     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
1826       GST_DEBUG_OBJECT (jitterbuffer,
1827           "Wait got unscheduled, will retry to push with new buffer");
1828       goto again;
1829     }
1830
1831   lost:
1832     /* we now timed out, this means we lost a packet or finished synchronizing
1833      * on the first buffer. */
1834     if (gap > 0) {
1835       GstEvent *event;
1836
1837       /* we had a gap and thus we lost some packets. Create an event for this.  */
1838       if (lost_packets > 1)
1839         GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", next_seqnum,
1840             next_seqnum + lost_packets - 1);
1841       else
1842         GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
1843
1844       priv->num_late += lost_packets;
1845       discont = TRUE;
1846
1847       /* update our expected next packet */
1848       priv->last_popped_seqnum = next_seqnum;
1849       priv->last_out_time += duration;
1850       priv->next_seqnum = (next_seqnum + lost_packets) & 0xffff;
1851
1852       if (priv->do_lost) {
1853         /* create paket lost event */
1854         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1855             gst_structure_new ("GstRTPPacketLost",
1856                 "seqnum", G_TYPE_UINT, (guint) next_seqnum,
1857                 "timestamp", G_TYPE_UINT64, out_time,
1858                 "duration", G_TYPE_UINT64, duration,
1859                 "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
1860         JBUF_UNLOCK (priv);
1861         gst_pad_push_event (priv->srcpad, event);
1862         JBUF_LOCK_CHECK (priv, flushing);
1863       }
1864       /* look for next packet */
1865       goto again;
1866     }
1867
1868     /* there was no known gap,just the first packet, exit the loop and push */
1869     GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
1870
1871     /* get new timestamp, latency might have changed */
1872     out_time = apply_offset (jitterbuffer, timestamp);
1873   }
1874 push_buffer:
1875
1876   /* when we get here we are ready to pop and push the buffer */
1877   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1878
1879   check_buffering_percent (jitterbuffer, &percent);
1880
1881   if (G_UNLIKELY (discont || priv->discont)) {
1882     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1883      * into the jitterbuffer so we can modify now. */
1884     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1885     priv->discont = FALSE;
1886   }
1887   if (G_UNLIKELY (priv->ts_discont)) {
1888     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
1889     priv->ts_discont = FALSE;
1890   }
1891
1892   /* apply timestamp with offset to buffer now */
1893   GST_BUFFER_PTS (outbuf) = out_time;
1894   GST_BUFFER_DTS (outbuf) = out_time;
1895
1896   /* update the elapsed time when we need to check against the npt stop time. */
1897   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1898       && priv->clock_base != -1 && priv->clock_rate > 0) {
1899     guint64 elapsed, estimated;
1900
1901     elapsed = compute_elapsed (jitterbuffer, outbuf);
1902
1903     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1904       guint64 left;
1905
1906       priv->last_elapsed = elapsed;
1907
1908       left = priv->npt_stop - priv->npt_start;
1909       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1910           GST_TIME_ARGS (left));
1911
1912       if (elapsed > 0)
1913         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1914       else {
1915         /* if there is almost nothing left,
1916          * we may never advance enough to end up in the above case */
1917         if (left < GST_SECOND)
1918           estimated = GST_SECOND;
1919         else
1920           estimated = -1;
1921       }
1922
1923       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1924           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1925
1926       priv->estimated_eos = estimated;
1927     }
1928   }
1929
1930   /* now we are ready to push the buffer. Save the seqnum and release the lock
1931    * so the other end can push stuff in the queue again. */
1932   priv->last_popped_seqnum = seqnum;
1933   priv->last_out_time = out_time;
1934   priv->next_seqnum = (seqnum + 1) & 0xffff;
1935   JBUF_UNLOCK (priv);
1936
1937   if (percent != -1)
1938     post_buffering_percent (jitterbuffer, percent);
1939
1940   /* push buffer */
1941   GST_DEBUG_OBJECT (jitterbuffer,
1942       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1943       GST_TIME_ARGS (out_time));
1944   result = gst_pad_push (priv->srcpad, outbuf);
1945   if (G_UNLIKELY (result != GST_FLOW_OK))
1946     goto pause;
1947
1948   return;
1949
1950   /* ERRORS */
1951 do_eos:
1952   {
1953     /* store result, we are flushing now */
1954     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1955     priv->srcresult = GST_FLOW_EOS;
1956     gst_pad_pause_task (priv->srcpad);
1957     JBUF_UNLOCK (priv);
1958     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1959     return;
1960   }
1961 do_npt_stop:
1962   {
1963     /* store result, we are flushing now */
1964     GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop");
1965     JBUF_UNLOCK (priv);
1966
1967     g_signal_emit (jitterbuffer,
1968         gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL);
1969     return;
1970   }
1971 flushing:
1972   {
1973     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1974     gst_pad_pause_task (priv->srcpad);
1975     JBUF_UNLOCK (priv);
1976     return;
1977   }
1978 pause:
1979   {
1980     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
1981         gst_flow_get_name (result));
1982
1983     JBUF_LOCK (priv);
1984     /* store result */
1985     priv->srcresult = result;
1986     /* we don't post errors or anything because upstream will do that for us
1987      * when we pass the return value upstream. */
1988     gst_pad_pause_task (priv->srcpad);
1989     JBUF_UNLOCK (priv);
1990     return;
1991   }
1992 }
1993
1994 /* collect the info form the lastest RTCP packet and the jittebuffer sync, do
1995  * some sanity checks and then emit the handle-sync signal with the parameters.
1996  * This function must be called with the LOCK */
1997 static void
1998 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
1999 {
2000   GstRtpJitterBufferPrivate *priv;
2001   guint64 base_rtptime, base_time;
2002   guint32 clock_rate;
2003   guint64 last_rtptime;
2004   guint64 clock_base;
2005   guint64 ext_rtptime, diff;
2006   gboolean drop = FALSE;
2007
2008   priv = jitterbuffer->priv;
2009
2010   if (priv->last_sr == NULL) {
2011     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no SR RTCP");
2012     return;
2013   }
2014
2015   /* get the last values from the jitterbuffer */
2016   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2017       &clock_rate, &last_rtptime);
2018
2019   clock_base = priv->clock_base;
2020   ext_rtptime = priv->ext_rtptime;
2021
2022   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2023       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2024       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2025       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2026
2027   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2028     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2029     drop = TRUE;
2030   } else {
2031     /* we can't accept anything that happened before we did the last resync */
2032     if (base_rtptime > ext_rtptime) {
2033       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2034       drop = TRUE;
2035     } else {
2036       /* the SR RTP timestamp must be something close to what we last observed
2037        * in the jitterbuffer */
2038       if (ext_rtptime > last_rtptime) {
2039         /* check how far ahead it is to our RTP timestamps */
2040         diff = ext_rtptime - last_rtptime;
2041         /* if bigger than 1 second, we drop it */
2042         if (diff > clock_rate) {
2043           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2044           /* should drop this, but some RTSP servers end up with bogus
2045            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2046            * so still trigger rptbin sync but invalidate RTCP data
2047            * (sync might use other methods) */
2048           ext_rtptime = -1;
2049         }
2050         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2051             G_GUINT64_FORMAT, last_rtptime, diff);
2052       }
2053     }
2054   }
2055
2056   if (!drop) {
2057     GstStructure *s;
2058
2059     s = gst_structure_new ("application/x-rtp-sync",
2060         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2061         "base-time", G_TYPE_UINT64, base_time,
2062         "clock-rate", G_TYPE_UINT, clock_rate,
2063         "clock-base", G_TYPE_UINT64, clock_base,
2064         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2065         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2066
2067     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2068     JBUF_UNLOCK (priv);
2069     g_signal_emit (jitterbuffer,
2070         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2071     JBUF_LOCK (priv);
2072     gst_structure_free (s);
2073   } else {
2074     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2075   }
2076 }
2077
2078 static GstFlowReturn
2079 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2080     GstBuffer * buffer)
2081 {
2082   GstRtpJitterBuffer *jitterbuffer;
2083   GstRtpJitterBufferPrivate *priv;
2084   GstFlowReturn ret = GST_FLOW_OK;
2085   guint32 ssrc;
2086   GstRTCPPacket packet;
2087   guint64 ext_rtptime;
2088   guint32 rtptime;
2089   GstRTCPBuffer rtcp = { NULL, };
2090
2091   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2092
2093   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2094     goto invalid_buffer;
2095
2096   priv = jitterbuffer->priv;
2097
2098   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2099
2100   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2101     goto empty_buffer;
2102
2103   /* first packet must be SR or RR or else the validate would have failed */
2104   switch (gst_rtcp_packet_get_type (&packet)) {
2105     case GST_RTCP_TYPE_SR:
2106       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2107           NULL, NULL);
2108       break;
2109     default:
2110       goto ignore_buffer;
2111   }
2112   gst_rtcp_buffer_unmap (&rtcp);
2113
2114   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2115
2116   JBUF_LOCK (priv);
2117   /* convert the RTP timestamp to our extended timestamp, using the same offset
2118    * we used in the jitterbuffer */
2119   ext_rtptime = priv->jbuf->ext_rtptime;
2120   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2121
2122   priv->ext_rtptime = ext_rtptime;
2123   gst_buffer_replace (&priv->last_sr, buffer);
2124
2125   do_handle_sync (jitterbuffer);
2126   JBUF_UNLOCK (priv);
2127
2128 done:
2129   gst_buffer_unref (buffer);
2130
2131   return ret;
2132
2133 invalid_buffer:
2134   {
2135     /* this is not fatal but should be filtered earlier */
2136     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2137         ("Received invalid RTCP payload, dropping"));
2138     ret = GST_FLOW_OK;
2139     goto done;
2140   }
2141 empty_buffer:
2142   {
2143     /* this is not fatal but should be filtered earlier */
2144     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2145         ("Received empty RTCP payload, dropping"));
2146     gst_rtcp_buffer_unmap (&rtcp);
2147     ret = GST_FLOW_OK;
2148     goto done;
2149   }
2150 ignore_buffer:
2151   {
2152     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2153     gst_rtcp_buffer_unmap (&rtcp);
2154     ret = GST_FLOW_OK;
2155     goto done;
2156   }
2157 }
2158
2159 static gboolean
2160 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2161     GstQuery * query)
2162 {
2163   gboolean res = FALSE;
2164
2165   switch (GST_QUERY_TYPE (query)) {
2166     case GST_QUERY_CAPS:
2167     {
2168       GstCaps *filter, *caps;
2169
2170       gst_query_parse_caps (query, &filter);
2171       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2172       gst_query_set_caps_result (query, caps);
2173       gst_caps_unref (caps);
2174       res = TRUE;
2175       break;
2176     }
2177     default:
2178       if (GST_QUERY_IS_SERIALIZED (query)) {
2179         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2180         res = FALSE;
2181       } else {
2182         res = gst_pad_query_default (pad, parent, query);
2183       }
2184       break;
2185   }
2186   return res;
2187 }
2188
2189 static gboolean
2190 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2191     GstQuery * query)
2192 {
2193   GstRtpJitterBuffer *jitterbuffer;
2194   GstRtpJitterBufferPrivate *priv;
2195   gboolean res = FALSE;
2196
2197   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2198   priv = jitterbuffer->priv;
2199
2200   switch (GST_QUERY_TYPE (query)) {
2201     case GST_QUERY_LATENCY:
2202     {
2203       /* We need to send the query upstream and add the returned latency to our
2204        * own */
2205       GstClockTime min_latency, max_latency;
2206       gboolean us_live;
2207       GstClockTime our_latency;
2208
2209       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2210         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2211
2212         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2213             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2214             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2215
2216         /* store this so that we can safely sync on the peer buffers. */
2217         JBUF_LOCK (priv);
2218         priv->peer_latency = min_latency;
2219         our_latency = priv->latency_ns;
2220         JBUF_UNLOCK (priv);
2221
2222         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2223             GST_TIME_ARGS (our_latency));
2224
2225         /* we add some latency but can buffer an infinite amount of time */
2226         min_latency += our_latency;
2227         max_latency = -1;
2228
2229         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2230             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2231             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2232
2233         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2234       }
2235       break;
2236     }
2237     case GST_QUERY_POSITION:
2238     {
2239       GstClockTime start, last_out;
2240       GstFormat fmt;
2241
2242       gst_query_parse_position (query, &fmt, NULL);
2243       if (fmt != GST_FORMAT_TIME) {
2244         res = gst_pad_query_default (pad, parent, query);
2245         break;
2246       }
2247
2248       JBUF_LOCK (priv);
2249       start = priv->npt_start;
2250       last_out = priv->last_out_time;
2251       JBUF_UNLOCK (priv);
2252
2253       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2254           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2255           GST_TIME_ARGS (last_out));
2256
2257       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2258         /* bring 0-based outgoing time to stream time */
2259         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2260         res = TRUE;
2261       } else {
2262         res = gst_pad_query_default (pad, parent, query);
2263       }
2264       break;
2265     }
2266     case GST_QUERY_CAPS:
2267     {
2268       GstCaps *filter, *caps;
2269
2270       gst_query_parse_caps (query, &filter);
2271       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2272       gst_query_set_caps_result (query, caps);
2273       gst_caps_unref (caps);
2274       res = TRUE;
2275       break;
2276     }
2277     default:
2278       res = gst_pad_query_default (pad, parent, query);
2279       break;
2280   }
2281
2282   return res;
2283 }
2284
2285 static void
2286 gst_rtp_jitter_buffer_set_property (GObject * object,
2287     guint prop_id, const GValue * value, GParamSpec * pspec)
2288 {
2289   GstRtpJitterBuffer *jitterbuffer;
2290   GstRtpJitterBufferPrivate *priv;
2291
2292   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2293   priv = jitterbuffer->priv;
2294
2295   switch (prop_id) {
2296     case PROP_LATENCY:
2297     {
2298       guint new_latency, old_latency;
2299
2300       new_latency = g_value_get_uint (value);
2301
2302       JBUF_LOCK (priv);
2303       old_latency = priv->latency_ms;
2304       priv->latency_ms = new_latency;
2305       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2306       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2307       JBUF_UNLOCK (priv);
2308
2309       /* post message if latency changed, this will inform the parent pipeline
2310        * that a latency reconfiguration is possible/needed. */
2311       if (new_latency != old_latency) {
2312         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2313             GST_TIME_ARGS (new_latency * GST_MSECOND));
2314
2315         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2316             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2317       }
2318       break;
2319     }
2320     case PROP_DROP_ON_LATENCY:
2321       JBUF_LOCK (priv);
2322       priv->drop_on_latency = g_value_get_boolean (value);
2323       JBUF_UNLOCK (priv);
2324       break;
2325     case PROP_TS_OFFSET:
2326       JBUF_LOCK (priv);
2327       priv->ts_offset = g_value_get_int64 (value);
2328       priv->ts_discont = TRUE;
2329       JBUF_UNLOCK (priv);
2330       break;
2331     case PROP_DO_LOST:
2332       JBUF_LOCK (priv);
2333       priv->do_lost = g_value_get_boolean (value);
2334       JBUF_UNLOCK (priv);
2335       break;
2336     case PROP_MODE:
2337       JBUF_LOCK (priv);
2338       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2339       JBUF_UNLOCK (priv);
2340       break;
2341     default:
2342       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2343       break;
2344   }
2345 }
2346
2347 static void
2348 gst_rtp_jitter_buffer_get_property (GObject * object,
2349     guint prop_id, GValue * value, GParamSpec * pspec)
2350 {
2351   GstRtpJitterBuffer *jitterbuffer;
2352   GstRtpJitterBufferPrivate *priv;
2353
2354   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2355   priv = jitterbuffer->priv;
2356
2357   switch (prop_id) {
2358     case PROP_LATENCY:
2359       JBUF_LOCK (priv);
2360       g_value_set_uint (value, priv->latency_ms);
2361       JBUF_UNLOCK (priv);
2362       break;
2363     case PROP_DROP_ON_LATENCY:
2364       JBUF_LOCK (priv);
2365       g_value_set_boolean (value, priv->drop_on_latency);
2366       JBUF_UNLOCK (priv);
2367       break;
2368     case PROP_TS_OFFSET:
2369       JBUF_LOCK (priv);
2370       g_value_set_int64 (value, priv->ts_offset);
2371       JBUF_UNLOCK (priv);
2372       break;
2373     case PROP_DO_LOST:
2374       JBUF_LOCK (priv);
2375       g_value_set_boolean (value, priv->do_lost);
2376       JBUF_UNLOCK (priv);
2377       break;
2378     case PROP_MODE:
2379       JBUF_LOCK (priv);
2380       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2381       JBUF_UNLOCK (priv);
2382       break;
2383     case PROP_PERCENT:
2384     {
2385       gint percent;
2386
2387       JBUF_LOCK (priv);
2388       if (priv->srcresult != GST_FLOW_OK)
2389         percent = 100;
2390       else
2391         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2392
2393       g_value_set_int (value, percent);
2394       JBUF_UNLOCK (priv);
2395       break;
2396     }
2397     default:
2398       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2399       break;
2400   }
2401 }