jitterbuffer: improve sync on first packets
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 #include <gst/glib-compat-private.h>
71
72 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
73 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
74
75 /* RTPJitterBuffer signals and args */
76 enum
77 {
78   SIGNAL_REQUEST_PT_MAP,
79   SIGNAL_CLEAR_PT_MAP,
80   SIGNAL_HANDLE_SYNC,
81   SIGNAL_ON_NPT_STOP,
82   SIGNAL_SET_ACTIVE,
83   LAST_SIGNAL
84 };
85
86 #define DEFAULT_LATENCY_MS      200
87 #define DEFAULT_DROP_ON_LATENCY FALSE
88 #define DEFAULT_TS_OFFSET       0
89 #define DEFAULT_DO_LOST         FALSE
90 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
91 #define DEFAULT_PERCENT         0
92
93 enum
94 {
95   PROP_0,
96   PROP_LATENCY,
97   PROP_DROP_ON_LATENCY,
98   PROP_TS_OFFSET,
99   PROP_DO_LOST,
100   PROP_MODE,
101   PROP_PERCENT,
102   PROP_LAST
103 };
104
105 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
106
107 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
108   JBUF_LOCK (priv);                                   \
109   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
110     goto label;                                       \
111 } G_STMT_END
112
113 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
114 #define JBUF_WAIT(priv)   (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
115
116 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
117   JBUF_WAIT(priv);                                    \
118   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
119     goto label;                                       \
120 } G_STMT_END
121
122 #define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
123
124 struct _GstRtpJitterBufferPrivate
125 {
126   GstPad *sinkpad, *srcpad;
127   GstPad *rtcpsinkpad;
128
129   RTPJitterBuffer *jbuf;
130   GMutex jbuf_lock;
131   GCond jbuf_cond;
132   gboolean waiting;
133   gboolean discont;
134   gboolean ts_discont;
135   gboolean active;
136   guint64 out_offset;
137
138   /* properties */
139   guint latency_ms;
140   guint64 latency_ns;
141   gboolean drop_on_latency;
142   gint64 ts_offset;
143   gboolean do_lost;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum we push */
148   guint32 next_seqnum;
149   /* last output time */
150   GstClockTime last_out_time;
151   /* the next expected seqnum we receive */
152   guint32 next_in_seqnum;
153
154   /* start and stop ranges */
155   GstClockTime npt_start;
156   GstClockTime npt_stop;
157   guint64 ext_timestamp;
158   guint64 last_elapsed;
159   guint64 estimated_eos;
160   GstClockID eos_id;
161   gboolean reached_npt_stop;
162
163   /* state */
164   gboolean eos;
165
166   /* clock rate and rtp timestamp offset */
167   gint last_pt;
168   gint32 clock_rate;
169   gint64 clock_base;
170   gint64 prev_ts_offset;
171
172   /* when we are shutting down */
173   GstFlowReturn srcresult;
174   gboolean blocked;
175
176   /* for sync */
177   GstSegment segment;
178   GstClockID clock_id;
179   gboolean unscheduled;
180   /* the latency of the upstream peer, we have to take this into account when
181    * synchronizing the buffers. */
182   GstClockTime peer_latency;
183   guint64 ext_rtptime;
184   GstBuffer *last_sr;
185
186   /* some accounting */
187   guint64 num_late;
188   guint64 num_duplicates;
189 };
190
191 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
192   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
193                                 GstRtpJitterBufferPrivate))
194
195 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
196 GST_STATIC_PAD_TEMPLATE ("sink",
197     GST_PAD_SINK,
198     GST_PAD_ALWAYS,
199     GST_STATIC_CAPS ("application/x-rtp, "
200         "clock-rate = (int) [ 1, 2147483647 ]"
201         /* "payload = (int) , "
202          * "encoding-name = (string) "
203          */ )
204     );
205
206 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
207 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
208     GST_PAD_SINK,
209     GST_PAD_REQUEST,
210     GST_STATIC_CAPS ("application/x-rtcp")
211     );
212
213 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
214 GST_STATIC_PAD_TEMPLATE ("src",
215     GST_PAD_SRC,
216     GST_PAD_ALWAYS,
217     GST_STATIC_CAPS ("application/x-rtp"
218         /* "payload = (int) , "
219          * "clock-rate = (int) , "
220          * "encoding-name = (string) "
221          */ )
222     );
223
224 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
225
226 #define gst_rtp_jitter_buffer_parent_class parent_class
227 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
228
229 /* object overrides */
230 static void gst_rtp_jitter_buffer_set_property (GObject * object,
231     guint prop_id, const GValue * value, GParamSpec * pspec);
232 static void gst_rtp_jitter_buffer_get_property (GObject * object,
233     guint prop_id, GValue * value, GParamSpec * pspec);
234 static void gst_rtp_jitter_buffer_finalize (GObject * object);
235
236 /* element overrides */
237 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
238     * element, GstStateChange transition);
239 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
240     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
241 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
242     GstPad * pad);
243 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
244
245 /* pad overrides */
246 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
247 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
248     GstObject * parent);
249
250 /* sinkpad overrides */
251 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
252     GstObject * parent, GstEvent * event);
253 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
254     GstObject * parent, GstBuffer * buffer);
255
256 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
257     GstObject * parent, GstEvent * event);
258 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
259     GstObject * parent, GstBuffer * buffer);
260
261 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
262     GstObject * parent, GstQuery * query);
263
264 /* srcpad overrides */
265 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
266     GstObject * parent, GstEvent * event);
267 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
268     GstObject * parent, GstPadMode mode, gboolean active);
269 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
270 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
271     GstObject * parent, GstQuery * query);
272
273 static void
274 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
275 static GstClockTime
276 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
277     gboolean active, guint64 base_time);
278 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
279
280 static void
281 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
282 {
283   GObjectClass *gobject_class;
284   GstElementClass *gstelement_class;
285
286   gobject_class = (GObjectClass *) klass;
287   gstelement_class = (GstElementClass *) klass;
288
289   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
290
291   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
292
293   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
294   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
295
296   /**
297    * GstRtpJitterBuffer::latency:
298    *
299    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
300    * for at most this time.
301    */
302   g_object_class_install_property (gobject_class, PROP_LATENCY,
303       g_param_spec_uint ("latency", "Buffer latency in ms",
304           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
305           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
306   /**
307    * GstRtpJitterBuffer::drop-on-latency:
308    *
309    * Drop oldest buffers when the queue is completely filled.
310    */
311   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
312       g_param_spec_boolean ("drop-on-latency",
313           "Drop buffers when maximum latency is reached",
314           "Tells the jitterbuffer to never exceed the given latency in size",
315           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
316   /**
317    * GstRtpJitterBuffer::ts-offset:
318    *
319    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
320    * This is mainly used to ensure interstream synchronisation.
321    */
322   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
323       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
324           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
325           G_MAXINT64, DEFAULT_TS_OFFSET,
326           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
327
328   /**
329    * GstRtpJitterBuffer::do-lost:
330    *
331    * Send out a GstRTPPacketLost event downstream when a packet is considered
332    * lost.
333    */
334   g_object_class_install_property (gobject_class, PROP_DO_LOST,
335       g_param_spec_boolean ("do-lost", "Do Lost",
336           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
337           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
338
339   /**
340    * GstRtpJitterBuffer::mode:
341    *
342    * Control the buffering and timestamping mode used by the jitterbuffer.
343    */
344   g_object_class_install_property (gobject_class, PROP_MODE,
345       g_param_spec_enum ("mode", "Mode",
346           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
347           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348   /**
349    * GstRtpJitterBuffer::percent:
350    *
351    * The percent of the jitterbuffer that is filled.
352    *
353    * Since: 0.10.19
354    */
355   g_object_class_install_property (gobject_class, PROP_PERCENT,
356       g_param_spec_int ("percent", "percent",
357           "The buffer filled percent", 0, 100,
358           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
359   /**
360    * GstRtpJitterBuffer::request-pt-map:
361    * @buffer: the object which received the signal
362    * @pt: the pt
363    *
364    * Request the payload type as #GstCaps for @pt.
365    */
366   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
367       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
368       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
369           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
370       GST_TYPE_CAPS, 1, G_TYPE_UINT);
371   /**
372    * GstRtpJitterBuffer::handle-sync:
373    * @buffer: the object which received the signal
374    * @struct: a GstStructure containing sync values.
375    *
376    * Be notified of new sync values.
377    */
378   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
379       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
380       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
381           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
382       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
383
384   /**
385    * GstRtpJitterBuffer::on-npt-stop
386    * @buffer: the object which received the signal
387    *
388    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
389    * the npt-stop position.
390    */
391   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
392       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
393       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
394           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
395       G_TYPE_NONE, 0, G_TYPE_NONE);
396
397   /**
398    * GstRtpJitterBuffer::clear-pt-map:
399    * @buffer: the object which received the signal
400    *
401    * Invalidate the clock-rate as obtained with the
402    * #GstRtpJitterBuffer::request-pt-map signal.
403    */
404   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
405       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
406       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
407       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
408       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
409
410   /**
411    * GstRtpJitterBuffer::set-active:
412    * @buffer: the object which received the signal
413    *
414    * Start pushing out packets with the given base time. This signal is only
415    * useful in buffering mode.
416    *
417    * Returns: the time of the last pushed packet.
418    *
419    * Since: 0.10.19
420    */
421   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
422       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
423       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
424       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
425       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
426       G_TYPE_UINT64);
427
428   gstelement_class->change_state =
429       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
430   gstelement_class->request_new_pad =
431       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
432   gstelement_class->release_pad =
433       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
434   gstelement_class->provide_clock =
435       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
436
437   gst_element_class_add_pad_template (gstelement_class,
438       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
439   gst_element_class_add_pad_template (gstelement_class,
440       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
441   gst_element_class_add_pad_template (gstelement_class,
442       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
443
444   gst_element_class_set_static_metadata (gstelement_class,
445       "RTP packet jitter-buffer", "Filter/Network/RTP",
446       "A buffer that deals with network jitter and other transmission faults",
447       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
448       "Wim Taymans <wim.taymans@gmail.com>");
449
450   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
451   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
452
453   GST_DEBUG_CATEGORY_INIT
454       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
455 }
456
457 static void
458 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
459 {
460   GstRtpJitterBufferPrivate *priv;
461
462   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
463   jitterbuffer->priv = priv;
464
465   priv->latency_ms = DEFAULT_LATENCY_MS;
466   priv->latency_ns = priv->latency_ms * GST_MSECOND;
467   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
468   priv->do_lost = DEFAULT_DO_LOST;
469
470   priv->jbuf = rtp_jitter_buffer_new ();
471   g_mutex_init (&priv->jbuf_lock);
472   g_cond_init (&priv->jbuf_cond);
473
474   /* reset skew detection initialy */
475   rtp_jitter_buffer_reset_skew (priv->jbuf);
476   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
477   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
478   priv->active = TRUE;
479
480   priv->srcpad =
481       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
482       "src");
483
484   gst_pad_set_activatemode_function (priv->srcpad,
485       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
486   gst_pad_set_query_function (priv->srcpad,
487       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
488   gst_pad_set_event_function (priv->srcpad,
489       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
490
491   priv->sinkpad =
492       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
493       "sink");
494
495   gst_pad_set_chain_function (priv->sinkpad,
496       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
497   gst_pad_set_event_function (priv->sinkpad,
498       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
499   gst_pad_set_query_function (priv->sinkpad,
500       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
501
502   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
503   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
504
505   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
506 }
507
508 static void
509 gst_rtp_jitter_buffer_finalize (GObject * object)
510 {
511   GstRtpJitterBuffer *jitterbuffer;
512
513   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
514
515   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
516   g_cond_clear (&jitterbuffer->priv->jbuf_cond);
517
518   g_object_unref (jitterbuffer->priv->jbuf);
519
520   G_OBJECT_CLASS (parent_class)->finalize (object);
521 }
522
523 static GstIterator *
524 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
525 {
526   GstRtpJitterBuffer *jitterbuffer;
527   GstPad *otherpad = NULL;
528   GstIterator *it;
529   GValue val = { 0, };
530
531   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
532
533   if (pad == jitterbuffer->priv->sinkpad) {
534     otherpad = jitterbuffer->priv->srcpad;
535   } else if (pad == jitterbuffer->priv->srcpad) {
536     otherpad = jitterbuffer->priv->sinkpad;
537   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
538     otherpad = NULL;
539   }
540
541   g_value_init (&val, GST_TYPE_PAD);
542   g_value_set_object (&val, otherpad);
543   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
544   g_value_unset (&val);
545
546   return it;
547 }
548
549 static GstPad *
550 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
551 {
552   GstRtpJitterBufferPrivate *priv;
553
554   priv = jitterbuffer->priv;
555
556   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
557
558   priv->rtcpsinkpad =
559       gst_pad_new_from_static_template
560       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
561   gst_pad_set_chain_function (priv->rtcpsinkpad,
562       gst_rtp_jitter_buffer_chain_rtcp);
563   gst_pad_set_event_function (priv->rtcpsinkpad,
564       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
565   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
566       gst_rtp_jitter_buffer_iterate_internal_links);
567   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
568   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
569
570   return priv->rtcpsinkpad;
571 }
572
573 static void
574 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
575 {
576   GstRtpJitterBufferPrivate *priv;
577
578   priv = jitterbuffer->priv;
579
580   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
581
582   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
583
584   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
585   priv->rtcpsinkpad = NULL;
586 }
587
588 static GstPad *
589 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
590     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
591 {
592   GstRtpJitterBuffer *jitterbuffer;
593   GstElementClass *klass;
594   GstPad *result;
595   GstRtpJitterBufferPrivate *priv;
596
597   g_return_val_if_fail (templ != NULL, NULL);
598   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
599
600   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
601   priv = jitterbuffer->priv;
602   klass = GST_ELEMENT_GET_CLASS (element);
603
604   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
605
606   /* figure out the template */
607   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
608     if (priv->rtcpsinkpad != NULL)
609       goto exists;
610
611     result = create_rtcp_sink (jitterbuffer);
612   } else
613     goto wrong_template;
614
615   return result;
616
617   /* ERRORS */
618 wrong_template:
619   {
620     g_warning ("gstrtpjitterbuffer: this is not our template");
621     return NULL;
622   }
623 exists:
624   {
625     g_warning ("gstrtpjitterbuffer: pad already requested");
626     return NULL;
627   }
628 }
629
630 static void
631 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
632 {
633   GstRtpJitterBuffer *jitterbuffer;
634   GstRtpJitterBufferPrivate *priv;
635
636   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
637   g_return_if_fail (GST_IS_PAD (pad));
638
639   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
640   priv = jitterbuffer->priv;
641
642   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
643
644   if (priv->rtcpsinkpad == pad) {
645     remove_rtcp_sink (jitterbuffer);
646   } else
647     goto wrong_pad;
648
649   return;
650
651   /* ERRORS */
652 wrong_pad:
653   {
654     g_warning ("gstjitterbuffer: asked to release an unknown pad");
655     return;
656   }
657 }
658
659 static GstClock *
660 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
661 {
662   return gst_system_clock_obtain ();
663 }
664
665 static void
666 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
667 {
668   GstRtpJitterBufferPrivate *priv;
669
670   priv = jitterbuffer->priv;
671
672   /* this will trigger a new pt-map request signal, FIXME, do something better. */
673
674   JBUF_LOCK (priv);
675   priv->clock_rate = -1;
676   /* do not clear current content, but refresh state for new arrival */
677   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
678   rtp_jitter_buffer_reset_skew (priv->jbuf);
679   priv->last_popped_seqnum = -1;
680   priv->next_seqnum = -1;
681   JBUF_UNLOCK (priv);
682 }
683
684 static GstClockTime
685 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
686     guint64 offset)
687 {
688   GstRtpJitterBufferPrivate *priv;
689   GstClockTime last_out;
690   GstBuffer *head;
691
692   priv = jbuf->priv;
693
694   JBUF_LOCK (priv);
695   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
696       active, GST_TIME_ARGS (offset));
697
698   if (active != priv->active) {
699     /* add the amount of time spent in paused to the output offset. All
700      * outgoing buffers will have this offset applied to their timestamps in
701      * order to make them arrive in time in the sink. */
702     priv->out_offset = offset;
703     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
704         GST_TIME_ARGS (priv->out_offset));
705     priv->active = active;
706     JBUF_SIGNAL (priv);
707   }
708   if (!active) {
709     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
710   }
711   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
712     /* head buffer timestamp and offset gives our output time */
713     last_out = GST_BUFFER_TIMESTAMP (head) + priv->ts_offset;
714   } else {
715     /* use last known time when the buffer is empty */
716     last_out = priv->last_out_time;
717   }
718   JBUF_UNLOCK (priv);
719
720   return last_out;
721 }
722
723 static GstCaps *
724 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
725 {
726   GstRtpJitterBuffer *jitterbuffer;
727   GstRtpJitterBufferPrivate *priv;
728   GstPad *other;
729   GstCaps *caps;
730   GstCaps *templ;
731
732   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
733   priv = jitterbuffer->priv;
734
735   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
736
737   caps = gst_pad_peer_query_caps (other, filter);
738
739   templ = gst_pad_get_pad_template_caps (pad);
740   if (caps == NULL) {
741     GST_DEBUG_OBJECT (jitterbuffer, "use template");
742     caps = templ;
743   } else {
744     GstCaps *intersect;
745
746     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
747
748     intersect = gst_caps_intersect (caps, templ);
749     gst_caps_unref (caps);
750     gst_caps_unref (templ);
751
752     caps = intersect;
753   }
754   gst_object_unref (jitterbuffer);
755
756   return caps;
757 }
758
759 /*
760  * Must be called with JBUF_LOCK held
761  */
762
763 static gboolean
764 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
765     GstCaps * caps)
766 {
767   GstRtpJitterBufferPrivate *priv;
768   GstStructure *caps_struct;
769   guint val;
770   GstClockTime tval;
771
772   priv = jitterbuffer->priv;
773
774   /* first parse the caps */
775   caps_struct = gst_caps_get_structure (caps, 0);
776
777   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
778
779   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
780    * measure the amount of data in the buffer */
781   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
782     goto error;
783
784   if (priv->clock_rate <= 0)
785     goto wrong_rate;
786
787   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
788
789   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
790    * can use this to track the amount of time elapsed on the sender. */
791   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
792     priv->clock_base = val;
793   else
794     priv->clock_base = -1;
795
796   priv->ext_timestamp = priv->clock_base;
797
798   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
799       priv->clock_base);
800
801   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
802     /* first expected seqnum, only update when we didn't have a previous base. */
803     if (priv->next_in_seqnum == -1)
804       priv->next_in_seqnum = val;
805     if (priv->next_seqnum == -1)
806       priv->next_seqnum = val;
807   }
808
809   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
810
811   /* the start and stop times. The seqnum-base corresponds to the start time. We
812    * will keep track of the seqnums on the output and when we reach the one
813    * corresponding to npt-stop, we emit the npt-stop-reached signal */
814   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
815     priv->npt_start = tval;
816   else
817     priv->npt_start = 0;
818
819   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
820     priv->npt_stop = tval;
821   else
822     priv->npt_stop = -1;
823
824   GST_DEBUG_OBJECT (jitterbuffer,
825       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
826       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
827
828   return TRUE;
829
830   /* ERRORS */
831 error:
832   {
833     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
834     return FALSE;
835   }
836 wrong_rate:
837   {
838     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
839     return FALSE;
840   }
841 }
842
843 static void
844 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
845 {
846   GstRtpJitterBufferPrivate *priv;
847
848   priv = jitterbuffer->priv;
849
850   JBUF_LOCK (priv);
851   /* mark ourselves as flushing */
852   priv->srcresult = GST_FLOW_FLUSHING;
853   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
854   /* this unblocks any waiting pops on the src pad task */
855   JBUF_SIGNAL (priv);
856   /* unlock clock, we just unschedule, the entry will be released by the
857    * locking streaming thread. */
858   if (priv->clock_id) {
859     gst_clock_id_unschedule (priv->clock_id);
860     priv->unscheduled = TRUE;
861   }
862   JBUF_UNLOCK (priv);
863 }
864
865 static void
866 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
867 {
868   GstRtpJitterBufferPrivate *priv;
869
870   priv = jitterbuffer->priv;
871
872   JBUF_LOCK (priv);
873   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
874   /* Mark as non flushing */
875   priv->srcresult = GST_FLOW_OK;
876   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
877   priv->last_popped_seqnum = -1;
878   priv->last_out_time = -1;
879   priv->next_seqnum = -1;
880   priv->next_in_seqnum = -1;
881   priv->clock_rate = -1;
882   priv->eos = FALSE;
883   priv->estimated_eos = -1;
884   priv->last_elapsed = 0;
885   priv->reached_npt_stop = FALSE;
886   priv->ext_timestamp = -1;
887   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
888   rtp_jitter_buffer_flush (priv->jbuf);
889   rtp_jitter_buffer_reset_skew (priv->jbuf);
890   JBUF_UNLOCK (priv);
891 }
892
893 static gboolean
894 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
895     GstPadMode mode, gboolean active)
896 {
897   gboolean result;
898   GstRtpJitterBuffer *jitterbuffer = NULL;
899
900   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
901
902   switch (mode) {
903     case GST_PAD_MODE_PUSH:
904       if (active) {
905         /* allow data processing */
906         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
907
908         /* start pushing out buffers */
909         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
910         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
911             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
912       } else {
913         /* make sure all data processing stops ASAP */
914         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
915
916         /* NOTE this will hardlock if the state change is called from the src pad
917          * task thread because we will _join() the thread. */
918         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
919         result = gst_pad_stop_task (pad);
920       }
921       break;
922     default:
923       result = FALSE;
924       break;
925   }
926   return result;
927 }
928
929 static GstStateChangeReturn
930 gst_rtp_jitter_buffer_change_state (GstElement * element,
931     GstStateChange transition)
932 {
933   GstRtpJitterBuffer *jitterbuffer;
934   GstRtpJitterBufferPrivate *priv;
935   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
936
937   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
938   priv = jitterbuffer->priv;
939
940   switch (transition) {
941     case GST_STATE_CHANGE_NULL_TO_READY:
942       break;
943     case GST_STATE_CHANGE_READY_TO_PAUSED:
944       JBUF_LOCK (priv);
945       /* reset negotiated values */
946       priv->clock_rate = -1;
947       priv->clock_base = -1;
948       priv->peer_latency = 0;
949       priv->last_pt = -1;
950       /* block until we go to PLAYING */
951       priv->blocked = TRUE;
952       JBUF_UNLOCK (priv);
953       break;
954     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
955       JBUF_LOCK (priv);
956       /* unblock to allow streaming in PLAYING */
957       priv->blocked = FALSE;
958       JBUF_SIGNAL (priv);
959       JBUF_UNLOCK (priv);
960       break;
961     default:
962       break;
963   }
964
965   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
966
967   switch (transition) {
968     case GST_STATE_CHANGE_READY_TO_PAUSED:
969       /* we are a live element because we sync to the clock, which we can only
970        * do in the PLAYING state */
971       if (ret != GST_STATE_CHANGE_FAILURE)
972         ret = GST_STATE_CHANGE_NO_PREROLL;
973       break;
974     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
975       JBUF_LOCK (priv);
976       /* block to stop streaming when PAUSED */
977       priv->blocked = TRUE;
978       JBUF_UNLOCK (priv);
979       if (ret != GST_STATE_CHANGE_FAILURE)
980         ret = GST_STATE_CHANGE_NO_PREROLL;
981       break;
982     case GST_STATE_CHANGE_PAUSED_TO_READY:
983       gst_buffer_replace (&priv->last_sr, NULL);
984       break;
985     case GST_STATE_CHANGE_READY_TO_NULL:
986       break;
987     default:
988       break;
989   }
990
991   return ret;
992 }
993
994 static gboolean
995 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
996     GstEvent * event)
997 {
998   gboolean ret = TRUE;
999   GstRtpJitterBuffer *jitterbuffer;
1000   GstRtpJitterBufferPrivate *priv;
1001
1002   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1003   priv = jitterbuffer->priv;
1004
1005   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1006
1007   switch (GST_EVENT_TYPE (event)) {
1008     case GST_EVENT_LATENCY:
1009     {
1010       GstClockTime latency;
1011
1012       gst_event_parse_latency (event, &latency);
1013
1014       JBUF_LOCK (priv);
1015       /* adjust the overall buffer delay to the total pipeline latency in
1016        * buffering mode because if downstream consumes too fast (because of
1017        * large latency or queues, we would start rebuffering again. */
1018       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1019           RTP_JITTER_BUFFER_MODE_BUFFER) {
1020         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1021       }
1022       JBUF_UNLOCK (priv);
1023
1024       ret = gst_pad_push_event (priv->sinkpad, event);
1025       break;
1026     }
1027     default:
1028       ret = gst_pad_push_event (priv->sinkpad, event);
1029       break;
1030   }
1031
1032   return ret;
1033 }
1034
1035 static gboolean
1036 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1037     GstEvent * event)
1038 {
1039   gboolean ret = TRUE;
1040   GstRtpJitterBuffer *jitterbuffer;
1041   GstRtpJitterBufferPrivate *priv;
1042
1043   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1044   priv = jitterbuffer->priv;
1045
1046   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1047
1048   switch (GST_EVENT_TYPE (event)) {
1049     case GST_EVENT_CAPS:
1050     {
1051       GstCaps *caps;
1052
1053       gst_event_parse_caps (event, &caps);
1054
1055       JBUF_LOCK (priv);
1056       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1057       JBUF_UNLOCK (priv);
1058
1059       /* set same caps on srcpad on success */
1060       if (ret)
1061         ret = gst_pad_push_event (priv->srcpad, event);
1062       else
1063         gst_event_unref (event);
1064       break;
1065     }
1066     case GST_EVENT_SEGMENT:
1067     {
1068       gst_event_copy_segment (event, &priv->segment);
1069
1070       /* we need time for now */
1071       if (priv->segment.format != GST_FORMAT_TIME)
1072         goto newseg_wrong_format;
1073
1074       GST_DEBUG_OBJECT (jitterbuffer,
1075           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1076
1077       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1078       ret = gst_pad_push_event (priv->srcpad, event);
1079       break;
1080     }
1081     case GST_EVENT_FLUSH_START:
1082       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1083       ret = gst_pad_push_event (priv->srcpad, event);
1084       break;
1085     case GST_EVENT_FLUSH_STOP:
1086       ret = gst_pad_push_event (priv->srcpad, event);
1087       ret =
1088           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1089           GST_PAD_MODE_PUSH, TRUE);
1090       break;
1091     case GST_EVENT_EOS:
1092     {
1093       /* push EOS in queue. We always push it at the head */
1094       JBUF_LOCK (priv);
1095       /* check for flushing, we need to discard the event and return FALSE when
1096        * we are flushing */
1097       ret = priv->srcresult == GST_FLOW_OK;
1098       if (ret && !priv->eos) {
1099         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1100         priv->eos = TRUE;
1101         JBUF_SIGNAL (priv);
1102       } else if (priv->eos) {
1103         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1104       } else {
1105         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1106             gst_flow_get_name (priv->srcresult));
1107       }
1108       JBUF_UNLOCK (priv);
1109       gst_event_unref (event);
1110       break;
1111     }
1112     default:
1113       ret = gst_pad_push_event (priv->srcpad, event);
1114       break;
1115   }
1116
1117 done:
1118
1119   return ret;
1120
1121   /* ERRORS */
1122 newseg_wrong_format:
1123   {
1124     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1125     ret = FALSE;
1126     gst_event_unref (event);
1127     goto done;
1128   }
1129 }
1130
1131 static gboolean
1132 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1133     GstEvent * event)
1134 {
1135   gboolean ret = TRUE;
1136   GstRtpJitterBuffer *jitterbuffer;
1137
1138   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1139
1140   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1141
1142   switch (GST_EVENT_TYPE (event)) {
1143     case GST_EVENT_FLUSH_START:
1144       gst_event_unref (event);
1145       break;
1146     case GST_EVENT_FLUSH_STOP:
1147       gst_event_unref (event);
1148       break;
1149     default:
1150       ret = gst_pad_event_default (pad, parent, event);
1151       break;
1152   }
1153
1154   return ret;
1155 }
1156
1157 /*
1158  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1159  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1160  * GST_FLOW_FLUSHING when the element is shutting down. On success
1161  * GST_FLOW_OK is returned.
1162  */
1163 static GstFlowReturn
1164 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1165     guint8 pt)
1166 {
1167   GValue ret = { 0 };
1168   GValue args[2] = { {0}, {0} };
1169   GstCaps *caps;
1170   gboolean res;
1171
1172   g_value_init (&args[0], GST_TYPE_ELEMENT);
1173   g_value_set_object (&args[0], jitterbuffer);
1174   g_value_init (&args[1], G_TYPE_UINT);
1175   g_value_set_uint (&args[1], pt);
1176
1177   g_value_init (&ret, GST_TYPE_CAPS);
1178   g_value_set_boxed (&ret, NULL);
1179
1180   JBUF_UNLOCK (jitterbuffer->priv);
1181   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1182       &ret);
1183   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1184
1185   g_value_unset (&args[0]);
1186   g_value_unset (&args[1]);
1187   caps = (GstCaps *) g_value_dup_boxed (&ret);
1188   g_value_unset (&ret);
1189   if (!caps)
1190     goto no_caps;
1191
1192   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1193   gst_caps_unref (caps);
1194
1195   if (G_UNLIKELY (!res))
1196     goto parse_failed;
1197
1198   return GST_FLOW_OK;
1199
1200   /* ERRORS */
1201 no_caps:
1202   {
1203     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1204     return GST_FLOW_ERROR;
1205   }
1206 out_flushing:
1207   {
1208     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1209     return GST_FLOW_FLUSHING;
1210   }
1211 parse_failed:
1212   {
1213     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1214     return GST_FLOW_ERROR;
1215   }
1216 }
1217
1218 /* call with jbuf lock held */
1219 static void
1220 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1221 {
1222   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1223
1224   /* too short a stream, or too close to EOS will never really fill buffer */
1225   if (*percent != -1 && priv->npt_stop != -1 &&
1226       priv->npt_stop - priv->npt_start <=
1227       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1228     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1229     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1230     *percent = 100;
1231   }
1232 }
1233
1234 static void
1235 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1236 {
1237   GstMessage *message;
1238
1239   /* Post a buffering message */
1240   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1241   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1242
1243   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1244 }
1245
1246 static GstFlowReturn
1247 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1248     GstBuffer * buffer)
1249 {
1250   GstRtpJitterBuffer *jitterbuffer;
1251   GstRtpJitterBufferPrivate *priv;
1252   guint16 seqnum;
1253   GstFlowReturn ret = GST_FLOW_OK;
1254   GstClockTime timestamp;
1255   guint64 latency_ts;
1256   gboolean tail;
1257   gint percent = -1;
1258   guint8 pt;
1259   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1260
1261   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1262
1263   priv = jitterbuffer->priv;
1264
1265   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1266     goto invalid_buffer;
1267
1268   pt = gst_rtp_buffer_get_payload_type (&rtp);
1269   seqnum = gst_rtp_buffer_get_seq (&rtp);
1270   gst_rtp_buffer_unmap (&rtp);
1271
1272   /* take the timestamp of the buffer. This is the time when the packet was
1273    * received and is used to calculate jitter and clock skew. We will adjust
1274    * this timestamp with the smoothed value after processing it in the
1275    * jitterbuffer. */
1276   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1277   /* bring to running time */
1278   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
1279       timestamp);
1280
1281   GST_DEBUG_OBJECT (jitterbuffer,
1282       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1283       GST_TIME_ARGS (timestamp));
1284
1285   JBUF_LOCK_CHECK (priv, out_flushing);
1286
1287   if (G_UNLIKELY (priv->last_pt != pt)) {
1288     GstCaps *caps;
1289
1290     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1291         pt);
1292
1293     priv->last_pt = pt;
1294     /* reset clock-rate so that we get a new one */
1295     priv->clock_rate = -1;
1296
1297     /* Try to get the clock-rate from the caps first if we can. If there are no
1298      * caps we must fire the signal to get the clock-rate. */
1299     if ((caps = gst_pad_get_current_caps (pad))) {
1300       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1301       gst_caps_unref (caps);
1302     }
1303   }
1304
1305   if (G_UNLIKELY (priv->clock_rate == -1)) {
1306     /* no clock rate given on the caps, try to get one with the signal */
1307     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1308             pt) == GST_FLOW_FLUSHING)
1309       goto out_flushing;
1310
1311     if (G_UNLIKELY (priv->clock_rate == -1))
1312       goto no_clock_rate;
1313   }
1314
1315   /* don't accept more data on EOS */
1316   if (G_UNLIKELY (priv->eos))
1317     goto have_eos;
1318
1319   /* now check against our expected seqnum */
1320   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1321     gint gap;
1322     gboolean reset = FALSE;
1323
1324     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1325     if (G_UNLIKELY (gap != 0)) {
1326       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1327           priv->next_in_seqnum, seqnum, gap);
1328       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1329        * sender might have been restarted with different seqnum. */
1330       if (gap < -RTP_MAX_MISORDER) {
1331         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1332         reset = TRUE;
1333       }
1334       /* priv->next_in_seqnum < seqnum, this is a new packet */
1335       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1336         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1337             gap);
1338         reset = TRUE;
1339       } else {
1340         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1341       }
1342     }
1343     if (G_UNLIKELY (reset)) {
1344       GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1345       rtp_jitter_buffer_flush (priv->jbuf);
1346       rtp_jitter_buffer_reset_skew (priv->jbuf);
1347       priv->last_popped_seqnum = -1;
1348       priv->next_seqnum = seqnum;
1349     }
1350   }
1351   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1352
1353   /* let's check if this buffer is too late, we can only accept packets with
1354    * bigger seqnum than the one we last pushed. */
1355   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1356     gint gap;
1357
1358     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1359
1360     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1361     if (G_UNLIKELY (gap <= 0))
1362       goto too_late;
1363   }
1364
1365   /* let's drop oldest packet if the queue is already full and drop-on-latency
1366    * is set. We can only do this when there actually is a latency. When no
1367    * latency is set, we just pump it in the queue and let the other end push it
1368    * out as fast as possible. */
1369   if (priv->latency_ms && priv->drop_on_latency) {
1370     latency_ts =
1371         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1372
1373     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1374       GstBuffer *old_buf;
1375
1376       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1377
1378       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1379           old_buf);
1380
1381       gst_buffer_unref (old_buf);
1382     }
1383   }
1384
1385   /* we need to make the metadata writable before pushing it in the jitterbuffer
1386    * because the jitterbuffer will update the timestamp */
1387   buffer = gst_buffer_make_writable (buffer);
1388
1389   /* now insert the packet into the queue in sorted order. This function returns
1390    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1391    * have a duplicate. */
1392   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
1393               priv->clock_rate, &tail, &percent)))
1394     goto duplicate;
1395
1396   /* we had an unhandled SR, handle it now */
1397   if (priv->last_sr)
1398     do_handle_sync (jitterbuffer);
1399
1400   /* signal addition of new buffer when the _loop is waiting. */
1401   if (priv->waiting && priv->active)
1402     JBUF_SIGNAL (priv);
1403
1404   /* let's unschedule and unblock any waiting buffers. We only want to do this
1405    * when the tail buffer changed */
1406   if (G_UNLIKELY (priv->clock_id && tail)) {
1407     GST_DEBUG_OBJECT (jitterbuffer,
1408         "Unscheduling waiting buffer, new tail buffer");
1409     gst_clock_id_unschedule (priv->clock_id);
1410     priv->unscheduled = TRUE;
1411   }
1412
1413   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1414       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1415
1416   check_buffering_percent (jitterbuffer, &percent);
1417
1418 finished:
1419   JBUF_UNLOCK (priv);
1420
1421   if (percent != -1)
1422     post_buffering_percent (jitterbuffer, percent);
1423
1424   return ret;
1425
1426   /* ERRORS */
1427 invalid_buffer:
1428   {
1429     /* this is not fatal but should be filtered earlier */
1430     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1431         ("Received invalid RTP payload, dropping"));
1432     gst_buffer_unref (buffer);
1433     return GST_FLOW_OK;
1434   }
1435 no_clock_rate:
1436   {
1437     GST_WARNING_OBJECT (jitterbuffer,
1438         "No clock-rate in caps!, dropping buffer");
1439     gst_buffer_unref (buffer);
1440     goto finished;
1441   }
1442 out_flushing:
1443   {
1444     ret = priv->srcresult;
1445     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1446     gst_buffer_unref (buffer);
1447     goto finished;
1448   }
1449 have_eos:
1450   {
1451     ret = GST_FLOW_EOS;
1452     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1453     gst_buffer_unref (buffer);
1454     goto finished;
1455   }
1456 too_late:
1457   {
1458     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1459         " popped, dropping", seqnum, priv->last_popped_seqnum);
1460     priv->num_late++;
1461     gst_buffer_unref (buffer);
1462     goto finished;
1463   }
1464 duplicate:
1465   {
1466     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1467         seqnum);
1468     priv->num_duplicates++;
1469     gst_buffer_unref (buffer);
1470     goto finished;
1471   }
1472 }
1473
1474 static GstClockTime
1475 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1476 {
1477   GstRtpJitterBufferPrivate *priv;
1478
1479   priv = jitterbuffer->priv;
1480
1481   if (timestamp == -1)
1482     return -1;
1483
1484   /* apply the timestamp offset, this is used for inter stream sync */
1485   timestamp += priv->ts_offset;
1486   /* add the offset, this is used when buffering */
1487   timestamp += priv->out_offset;
1488
1489   return timestamp;
1490 }
1491
1492 static GstClockTime
1493 get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1494 {
1495   GstClockTime result;
1496   GstRtpJitterBufferPrivate *priv;
1497
1498   priv = jitterbuffer->priv;
1499
1500   result = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1501   /* add latency, this includes our own latency and the peer latency. */
1502   result += priv->latency_ns;
1503   result += priv->peer_latency;
1504
1505   return result;
1506 }
1507
1508 static gboolean
1509 eos_reached (GstClock * clock, GstClockTime time, GstClockID id,
1510     GstRtpJitterBuffer * jitterbuffer)
1511 {
1512   GstRtpJitterBufferPrivate *priv;
1513
1514   priv = jitterbuffer->priv;
1515
1516   JBUF_LOCK_CHECK (priv, flushing);
1517   if (priv->waiting) {
1518     GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
1519     priv->reached_npt_stop = TRUE;
1520     JBUF_SIGNAL (priv);
1521   }
1522   JBUF_UNLOCK (priv);
1523
1524   return TRUE;
1525
1526   /* ERRORS */
1527 flushing:
1528   {
1529     JBUF_UNLOCK (priv);
1530     return FALSE;
1531   }
1532 }
1533
1534 static GstClockTime
1535 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1536 {
1537   guint64 ext_time, elapsed;
1538   guint32 rtp_time;
1539   GstRtpJitterBufferPrivate *priv;
1540   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1541
1542   priv = jitterbuffer->priv;
1543   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1544   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
1545   gst_rtp_buffer_unmap (&rtp);
1546
1547   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1548       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1549
1550   if (rtp_time < priv->ext_timestamp) {
1551     ext_time = priv->ext_timestamp;
1552   } else {
1553     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1554   }
1555
1556   if (ext_time > priv->clock_base)
1557     elapsed = ext_time - priv->clock_base;
1558   else
1559     elapsed = 0;
1560
1561   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1562   return elapsed;
1563 }
1564
1565 /*
1566  * This funcion will push out buffers on the source pad.
1567  *
1568  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
1569  * different seqnum (missing packets before B), this function will wait for the
1570  * missing packet to arrive up to the timestamp of buffer B.
1571  */
1572 static void
1573 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
1574 {
1575   GstRtpJitterBufferPrivate *priv;
1576   GstBuffer *outbuf;
1577   GstFlowReturn result;
1578   guint16 seqnum;
1579   guint32 next_seqnum;
1580   GstClockTime timestamp, out_time;
1581   gboolean discont = FALSE;
1582   gint gap;
1583   GstClock *clock;
1584   GstClockID id;
1585   GstClockTime sync_time;
1586   gint percent = -1;
1587   GstRTPBuffer rtp = { NULL, };
1588
1589   priv = jitterbuffer->priv;
1590
1591   JBUF_LOCK_CHECK (priv, flushing);
1592 again:
1593   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
1594   while (TRUE) {
1595     id = NULL;
1596     /* always wait if we are blocked */
1597     if (G_LIKELY (!priv->blocked)) {
1598       /* we're buffering but not EOS, wait. */
1599       if (!priv->eos && (!priv->active
1600               || rtp_jitter_buffer_is_buffering (priv->jbuf))) {
1601         GstClockTime elapsed, delay, left;
1602
1603         if (priv->estimated_eos == -1)
1604           goto do_wait;
1605
1606         outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1607         if (outbuf != NULL) {
1608           elapsed = compute_elapsed (jitterbuffer, outbuf);
1609           if (GST_BUFFER_DURATION_IS_VALID (outbuf))
1610             elapsed += GST_BUFFER_DURATION (outbuf);
1611         } else {
1612           GST_INFO_OBJECT (jitterbuffer, "no buffer, using last_elapsed");
1613           elapsed = priv->last_elapsed;
1614         }
1615
1616         delay = rtp_jitter_buffer_get_delay (priv->jbuf);
1617
1618         if (priv->estimated_eos > elapsed)
1619           left = priv->estimated_eos - elapsed;
1620         else
1621           left = 0;
1622
1623         GST_INFO_OBJECT (jitterbuffer, "buffering, elapsed %" GST_TIME_FORMAT
1624             " estimated_eos %" GST_TIME_FORMAT " left %" GST_TIME_FORMAT
1625             " delay %" GST_TIME_FORMAT,
1626             GST_TIME_ARGS (elapsed), GST_TIME_ARGS (priv->estimated_eos),
1627             GST_TIME_ARGS (left), GST_TIME_ARGS (delay));
1628         if (left > delay)
1629           goto do_wait;
1630       }
1631       /* if we have a packet, we can exit the loop and grab it */
1632       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1633         break;
1634       /* no packets but we are EOS, do eos logic */
1635       if (G_UNLIKELY (priv->eos))
1636         goto do_eos;
1637       /* underrun, wait for packets or flushing now if we are expecting an EOS
1638        * timeout, set the async timer for it too */
1639       if (priv->estimated_eos != -1 && !priv->reached_npt_stop) {
1640         sync_time = get_sync_time (jitterbuffer, priv->estimated_eos);
1641
1642         GST_OBJECT_LOCK (jitterbuffer);
1643         clock = GST_ELEMENT_CLOCK (jitterbuffer);
1644         if (clock) {
1645           GST_INFO_OBJECT (jitterbuffer, "scheduling timeout");
1646           id = gst_clock_new_single_shot_id (clock, sync_time);
1647           gst_clock_id_wait_async (id, (GstClockCallback) eos_reached,
1648               jitterbuffer, NULL);
1649         }
1650         GST_OBJECT_UNLOCK (jitterbuffer);
1651       }
1652     }
1653   do_wait:
1654     /* now we wait */
1655     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
1656     priv->waiting = TRUE;
1657     JBUF_WAIT (priv);
1658     priv->waiting = FALSE;
1659     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
1660
1661     if (id) {
1662       /* unschedule any pending async notifications we might have */
1663       gst_clock_id_unschedule (id);
1664       gst_clock_id_unref (id);
1665     }
1666     if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
1667       goto flushing;
1668
1669     if (id && priv->reached_npt_stop) {
1670       goto do_npt_stop;
1671     }
1672   }
1673
1674   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1675    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1676    * wait on the timestamp. In the chain function we will unlock the wait when a
1677    * new buffer is available. The peeked buffer is valid for as long as we hold
1678    * the jitterbuffer lock. */
1679   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1680
1681   /* get the seqnum and the next expected seqnum */
1682   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1683   seqnum = gst_rtp_buffer_get_seq (&rtp);
1684   gst_rtp_buffer_unmap (&rtp);
1685   next_seqnum = priv->next_seqnum;
1686
1687   /* get the timestamp, this is already corrected for clock skew by the
1688    * jitterbuffer */
1689   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1690
1691   GST_DEBUG_OBJECT (jitterbuffer,
1692       "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
1693       ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
1694       rtp_jitter_buffer_num_packets (priv->jbuf));
1695
1696   /* apply our timestamp offset to the incomming buffer, this will be our output
1697    * timestamp. */
1698   out_time = apply_offset (jitterbuffer, timestamp);
1699
1700   /* get the gap between this and the previous packet. If we don't know the
1701    * previous packet seqnum assume no gap. */
1702   if (G_LIKELY (next_seqnum != -1)) {
1703     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1704
1705     /* if we have a packet that we already pushed or considered dropped, pop it
1706      * off and get the next packet */
1707     if (G_UNLIKELY (gap < 0)) {
1708       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1709           seqnum, next_seqnum);
1710       outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1711       gst_buffer_unref (outbuf);
1712       goto again;
1713     }
1714   } else {
1715     GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
1716     gap = -1;
1717   }
1718
1719   /* If we don't know what the next seqnum should be (== -1) we have to wait
1720    * because it might be possible that we are not receiving this buffer in-order,
1721    * a buffer with a lower seqnum could arrive later and we want to push that
1722    * earlier buffer before this buffer then.
1723    * If we know the expected seqnum, we can compare it to the current seqnum to
1724    * determine if we have missing a packet. If we have a missing packet (which
1725    * must be before this packet) we can wait for it until the deadline for this
1726    * packet expires. */
1727   if (G_UNLIKELY (gap != 0 && out_time != -1)) {
1728     GstClockReturn ret;
1729     GstClockTime duration = GST_CLOCK_TIME_NONE;
1730     GstClockTimeDiff clock_jitter;
1731     guint32 lost_packets = 1;
1732     gboolean lost_packets_late = FALSE;
1733
1734     if (gap > 0) {
1735       /* we have a gap */
1736       GST_DEBUG_OBJECT (jitterbuffer,
1737           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1738           next_seqnum, seqnum, gap);
1739
1740       if (priv->last_out_time != -1) {
1741         GST_DEBUG_OBJECT (jitterbuffer,
1742             "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1743             GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
1744         /* interpolate between the current time and the last time based on
1745          * number of packets we are missing, this is the estimated duration
1746          * for the missing packet based on equidistant packet spacing. Also make
1747          * sure we never go negative. */
1748         if (out_time >= priv->last_out_time)
1749           duration = (out_time - priv->last_out_time) / (gap + 1);
1750         else
1751           goto lost;
1752
1753         GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1754             GST_TIME_ARGS (duration));
1755         /* add this duration to the timestamp of the last packet we pushed */
1756         out_time = (priv->last_out_time + duration);
1757       }
1758     } else {
1759       /* we don't know what the next_seqnum should be, wait for the last
1760        * possible moment to push this buffer, maybe we get an earlier seqnum
1761        * while we wait */
1762       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1763     }
1764
1765     GST_OBJECT_LOCK (jitterbuffer);
1766     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1767     if (!clock) {
1768       GST_OBJECT_UNLOCK (jitterbuffer);
1769       /* let's just push if there is no clock */
1770       GST_DEBUG_OBJECT (jitterbuffer, "No clock, push right away");
1771       goto push_buffer;
1772     }
1773
1774     /* prepare for sync against clock */
1775     sync_time = get_sync_time (jitterbuffer, out_time);
1776
1777     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
1778         " with sync time %" GST_TIME_FORMAT,
1779         GST_TIME_ARGS (out_time), GST_TIME_ARGS (sync_time));
1780
1781     /* create an entry for the clock */
1782     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1783     priv->unscheduled = FALSE;
1784     GST_OBJECT_UNLOCK (jitterbuffer);
1785
1786     /* release the lock so that the other end can push stuff or unlock */
1787     JBUF_UNLOCK (priv);
1788
1789     ret = gst_clock_id_wait (id, &clock_jitter);
1790
1791     if (ret == GST_CLOCK_EARLY && gap > 0
1792         && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
1793       GstClockTimeDiff total_duration;
1794       GstClockTime out_time_diff;
1795
1796       out_time_diff = apply_offset (jitterbuffer, timestamp) - out_time;
1797       total_duration = MIN (out_time_diff, clock_jitter);
1798
1799       if (duration > 0)
1800         lost_packets = total_duration / duration;
1801       else
1802         lost_packets = gap;
1803       total_duration = lost_packets * duration;
1804
1805       GST_DEBUG_OBJECT (jitterbuffer,
1806           "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
1807           ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
1808           GST_TIME_ARGS (clock_jitter),
1809           lost_packets, GST_TIME_ARGS (total_duration));
1810
1811       duration = total_duration;
1812       lost_packets_late = TRUE;
1813     }
1814
1815     JBUF_LOCK (priv);
1816     /* and free the entry */
1817     gst_clock_id_unref (id);
1818     priv->clock_id = NULL;
1819
1820     /* at this point, the clock could have been unlocked by a timeout, a new
1821      * tail element was added to the queue or because we are shutting down. Check
1822      * for shutdown first. */
1823     if G_UNLIKELY
1824       ((priv->srcresult != GST_FLOW_OK))
1825           goto flushing;
1826
1827     /* if we got unscheduled and we are not flushing, it's because a new tail
1828      * element became available in the queue or we flushed the queue.
1829      * Grab it and try to push or sync. */
1830     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
1831       GST_DEBUG_OBJECT (jitterbuffer,
1832           "Wait got unscheduled, will retry to push with new buffer");
1833       goto again;
1834     }
1835
1836   lost:
1837     /* we now timed out, this means we lost a packet or finished synchronizing
1838      * on the first buffer. */
1839     if (gap > 0) {
1840       GstEvent *event;
1841
1842       /* we had a gap and thus we lost some packets. Create an event for this.  */
1843       if (lost_packets > 1)
1844         GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", next_seqnum,
1845             next_seqnum + lost_packets - 1);
1846       else
1847         GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
1848
1849       priv->num_late += lost_packets;
1850       discont = TRUE;
1851
1852       /* update our expected next packet */
1853       priv->last_popped_seqnum = next_seqnum;
1854       priv->last_out_time += duration;
1855       priv->next_seqnum = (next_seqnum + lost_packets) & 0xffff;
1856
1857       if (priv->do_lost) {
1858         /* create paket lost event */
1859         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1860             gst_structure_new ("GstRTPPacketLost",
1861                 "seqnum", G_TYPE_UINT, (guint) next_seqnum,
1862                 "timestamp", G_TYPE_UINT64, out_time,
1863                 "duration", G_TYPE_UINT64, duration,
1864                 "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
1865         JBUF_UNLOCK (priv);
1866         gst_pad_push_event (priv->srcpad, event);
1867         JBUF_LOCK_CHECK (priv, flushing);
1868       }
1869       /* look for next packet */
1870       goto again;
1871     }
1872
1873     /* there was no known gap,just the first packet, exit the loop and push */
1874     GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
1875
1876     /* get new timestamp, latency might have changed */
1877     out_time = apply_offset (jitterbuffer, timestamp);
1878   }
1879 push_buffer:
1880
1881   /* when we get here we are ready to pop and push the buffer */
1882   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1883
1884   check_buffering_percent (jitterbuffer, &percent);
1885
1886   if (G_UNLIKELY (discont || priv->discont)) {
1887     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1888      * into the jitterbuffer so we can modify now. */
1889     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1890     priv->discont = FALSE;
1891   }
1892   if (G_UNLIKELY (priv->ts_discont)) {
1893     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
1894     priv->ts_discont = FALSE;
1895   }
1896
1897   /* apply timestamp with offset to buffer now */
1898   GST_BUFFER_PTS (outbuf) = out_time;
1899   GST_BUFFER_DTS (outbuf) = out_time;
1900
1901   /* update the elapsed time when we need to check against the npt stop time. */
1902   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1903       && priv->clock_base != -1 && priv->clock_rate > 0) {
1904     guint64 elapsed, estimated;
1905
1906     elapsed = compute_elapsed (jitterbuffer, outbuf);
1907
1908     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1909       guint64 left;
1910
1911       priv->last_elapsed = elapsed;
1912
1913       left = priv->npt_stop - priv->npt_start;
1914       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1915           GST_TIME_ARGS (left));
1916
1917       if (elapsed > 0)
1918         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1919       else {
1920         /* if there is almost nothing left,
1921          * we may never advance enough to end up in the above case */
1922         if (left < GST_SECOND)
1923           estimated = GST_SECOND;
1924         else
1925           estimated = -1;
1926       }
1927
1928       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1929           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1930
1931       priv->estimated_eos = estimated;
1932     }
1933   }
1934
1935   /* now we are ready to push the buffer. Save the seqnum and release the lock
1936    * so the other end can push stuff in the queue again. */
1937   priv->last_popped_seqnum = seqnum;
1938   priv->last_out_time = out_time;
1939   priv->next_seqnum = (seqnum + 1) & 0xffff;
1940   JBUF_UNLOCK (priv);
1941
1942   if (percent != -1)
1943     post_buffering_percent (jitterbuffer, percent);
1944
1945   /* push buffer */
1946   GST_DEBUG_OBJECT (jitterbuffer,
1947       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1948       GST_TIME_ARGS (out_time));
1949   result = gst_pad_push (priv->srcpad, outbuf);
1950   if (G_UNLIKELY (result != GST_FLOW_OK))
1951     goto pause;
1952
1953   return;
1954
1955   /* ERRORS */
1956 do_eos:
1957   {
1958     /* store result, we are flushing now */
1959     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1960     priv->srcresult = GST_FLOW_EOS;
1961     gst_pad_pause_task (priv->srcpad);
1962     JBUF_UNLOCK (priv);
1963     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1964     return;
1965   }
1966 do_npt_stop:
1967   {
1968     /* store result, we are flushing now */
1969     GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop");
1970     JBUF_UNLOCK (priv);
1971
1972     g_signal_emit (jitterbuffer,
1973         gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL);
1974     return;
1975   }
1976 flushing:
1977   {
1978     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1979     gst_pad_pause_task (priv->srcpad);
1980     JBUF_UNLOCK (priv);
1981     return;
1982   }
1983 pause:
1984   {
1985     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
1986         gst_flow_get_name (result));
1987
1988     JBUF_LOCK (priv);
1989     /* store result */
1990     priv->srcresult = result;
1991     /* we don't post errors or anything because upstream will do that for us
1992      * when we pass the return value upstream. */
1993     gst_pad_pause_task (priv->srcpad);
1994     JBUF_UNLOCK (priv);
1995     return;
1996   }
1997 }
1998
1999 /* collect the info form the lastest RTCP packet and the jittebuffer sync, do
2000  * some sanity checks and then emit the handle-sync signal with the parameters.
2001  * This function must be called with the LOCK */
2002 static void
2003 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2004 {
2005   GstRtpJitterBufferPrivate *priv;
2006   guint64 base_rtptime, base_time;
2007   guint32 clock_rate;
2008   guint64 last_rtptime;
2009   guint64 clock_base;
2010   guint64 ext_rtptime, diff;
2011   gboolean drop = FALSE;
2012
2013   priv = jitterbuffer->priv;
2014
2015   /* get the last values from the jitterbuffer */
2016   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2017       &clock_rate, &last_rtptime);
2018
2019   clock_base = priv->clock_base;
2020   ext_rtptime = priv->ext_rtptime;
2021
2022   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2023       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2024       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2025       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2026
2027   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2028     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2029     drop = TRUE;
2030   } else {
2031     /* we can't accept anything that happened before we did the last resync */
2032     if (base_rtptime > ext_rtptime) {
2033       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2034       drop = TRUE;
2035     } else {
2036       /* the SR RTP timestamp must be something close to what we last observed
2037        * in the jitterbuffer */
2038       if (ext_rtptime > last_rtptime) {
2039         /* check how far ahead it is to our RTP timestamps */
2040         diff = ext_rtptime - last_rtptime;
2041         /* if bigger than 1 second, we drop it */
2042         if (diff > clock_rate) {
2043           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2044           /* should drop this, but some RTSP servers end up with bogus
2045            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2046            * so still trigger rptbin sync but invalidate RTCP data
2047            * (sync might use other methods) */
2048           ext_rtptime = -1;
2049         }
2050         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2051             G_GUINT64_FORMAT, last_rtptime, diff);
2052       }
2053     }
2054   }
2055
2056   if (!drop) {
2057     GstStructure *s;
2058
2059     s = gst_structure_new ("application/x-rtp-sync",
2060         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2061         "base-time", G_TYPE_UINT64, base_time,
2062         "clock-rate", G_TYPE_UINT, clock_rate,
2063         "clock-base", G_TYPE_UINT64, clock_base,
2064         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2065         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2066
2067     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2068     gst_buffer_replace (&priv->last_sr, NULL);
2069     JBUF_UNLOCK (priv);
2070     g_signal_emit (jitterbuffer,
2071         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2072     JBUF_LOCK (priv);
2073     gst_structure_free (s);
2074   } else {
2075     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2076   }
2077 }
2078
2079 static GstFlowReturn
2080 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2081     GstBuffer * buffer)
2082 {
2083   GstRtpJitterBuffer *jitterbuffer;
2084   GstRtpJitterBufferPrivate *priv;
2085   GstFlowReturn ret = GST_FLOW_OK;
2086   guint32 ssrc;
2087   GstRTCPPacket packet;
2088   guint64 ext_rtptime;
2089   guint32 rtptime;
2090   GstRTCPBuffer rtcp = { NULL, };
2091
2092   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2093
2094   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2095     goto invalid_buffer;
2096
2097   priv = jitterbuffer->priv;
2098
2099   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2100
2101   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2102     goto empty_buffer;
2103
2104   /* first packet must be SR or RR or else the validate would have failed */
2105   switch (gst_rtcp_packet_get_type (&packet)) {
2106     case GST_RTCP_TYPE_SR:
2107       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2108           NULL, NULL);
2109       break;
2110     default:
2111       goto ignore_buffer;
2112   }
2113   gst_rtcp_buffer_unmap (&rtcp);
2114
2115   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2116
2117   JBUF_LOCK (priv);
2118   /* convert the RTP timestamp to our extended timestamp, using the same offset
2119    * we used in the jitterbuffer */
2120   ext_rtptime = priv->jbuf->ext_rtptime;
2121   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2122
2123   priv->ext_rtptime = ext_rtptime;
2124   gst_buffer_replace (&priv->last_sr, buffer);
2125
2126   do_handle_sync (jitterbuffer);
2127   JBUF_UNLOCK (priv);
2128
2129 done:
2130   gst_buffer_unref (buffer);
2131
2132   return ret;
2133
2134 invalid_buffer:
2135   {
2136     /* this is not fatal but should be filtered earlier */
2137     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2138         ("Received invalid RTCP payload, dropping"));
2139     ret = GST_FLOW_OK;
2140     goto done;
2141   }
2142 empty_buffer:
2143   {
2144     /* this is not fatal but should be filtered earlier */
2145     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2146         ("Received empty RTCP payload, dropping"));
2147     gst_rtcp_buffer_unmap (&rtcp);
2148     ret = GST_FLOW_OK;
2149     goto done;
2150   }
2151 ignore_buffer:
2152   {
2153     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2154     gst_rtcp_buffer_unmap (&rtcp);
2155     ret = GST_FLOW_OK;
2156     goto done;
2157   }
2158 }
2159
2160 static gboolean
2161 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2162     GstQuery * query)
2163 {
2164   gboolean res = FALSE;
2165
2166   switch (GST_QUERY_TYPE (query)) {
2167     case GST_QUERY_CAPS:
2168     {
2169       GstCaps *filter, *caps;
2170
2171       gst_query_parse_caps (query, &filter);
2172       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2173       gst_query_set_caps_result (query, caps);
2174       gst_caps_unref (caps);
2175       res = TRUE;
2176       break;
2177     }
2178     default:
2179       if (GST_QUERY_IS_SERIALIZED (query)) {
2180         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2181         res = FALSE;
2182       } else {
2183         res = gst_pad_query_default (pad, parent, query);
2184       }
2185       break;
2186   }
2187   return res;
2188 }
2189
2190 static gboolean
2191 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2192     GstQuery * query)
2193 {
2194   GstRtpJitterBuffer *jitterbuffer;
2195   GstRtpJitterBufferPrivate *priv;
2196   gboolean res = FALSE;
2197
2198   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2199   priv = jitterbuffer->priv;
2200
2201   switch (GST_QUERY_TYPE (query)) {
2202     case GST_QUERY_LATENCY:
2203     {
2204       /* We need to send the query upstream and add the returned latency to our
2205        * own */
2206       GstClockTime min_latency, max_latency;
2207       gboolean us_live;
2208       GstClockTime our_latency;
2209
2210       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2211         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2212
2213         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2214             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2215             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2216
2217         /* store this so that we can safely sync on the peer buffers. */
2218         JBUF_LOCK (priv);
2219         priv->peer_latency = min_latency;
2220         our_latency = priv->latency_ns;
2221         JBUF_UNLOCK (priv);
2222
2223         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2224             GST_TIME_ARGS (our_latency));
2225
2226         /* we add some latency but can buffer an infinite amount of time */
2227         min_latency += our_latency;
2228         max_latency = -1;
2229
2230         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2231             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2232             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2233
2234         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2235       }
2236       break;
2237     }
2238     case GST_QUERY_POSITION:
2239     {
2240       GstClockTime start, last_out;
2241       GstFormat fmt;
2242
2243       gst_query_parse_position (query, &fmt, NULL);
2244       if (fmt != GST_FORMAT_TIME) {
2245         res = gst_pad_query_default (pad, parent, query);
2246         break;
2247       }
2248
2249       JBUF_LOCK (priv);
2250       start = priv->npt_start;
2251       last_out = priv->last_out_time;
2252       JBUF_UNLOCK (priv);
2253
2254       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2255           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2256           GST_TIME_ARGS (last_out));
2257
2258       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2259         /* bring 0-based outgoing time to stream time */
2260         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2261         res = TRUE;
2262       } else {
2263         res = gst_pad_query_default (pad, parent, query);
2264       }
2265       break;
2266     }
2267     case GST_QUERY_CAPS:
2268     {
2269       GstCaps *filter, *caps;
2270
2271       gst_query_parse_caps (query, &filter);
2272       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2273       gst_query_set_caps_result (query, caps);
2274       gst_caps_unref (caps);
2275       res = TRUE;
2276       break;
2277     }
2278     default:
2279       res = gst_pad_query_default (pad, parent, query);
2280       break;
2281   }
2282
2283   return res;
2284 }
2285
2286 static void
2287 gst_rtp_jitter_buffer_set_property (GObject * object,
2288     guint prop_id, const GValue * value, GParamSpec * pspec)
2289 {
2290   GstRtpJitterBuffer *jitterbuffer;
2291   GstRtpJitterBufferPrivate *priv;
2292
2293   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2294   priv = jitterbuffer->priv;
2295
2296   switch (prop_id) {
2297     case PROP_LATENCY:
2298     {
2299       guint new_latency, old_latency;
2300
2301       new_latency = g_value_get_uint (value);
2302
2303       JBUF_LOCK (priv);
2304       old_latency = priv->latency_ms;
2305       priv->latency_ms = new_latency;
2306       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2307       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2308       JBUF_UNLOCK (priv);
2309
2310       /* post message if latency changed, this will inform the parent pipeline
2311        * that a latency reconfiguration is possible/needed. */
2312       if (new_latency != old_latency) {
2313         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2314             GST_TIME_ARGS (new_latency * GST_MSECOND));
2315
2316         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2317             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2318       }
2319       break;
2320     }
2321     case PROP_DROP_ON_LATENCY:
2322       JBUF_LOCK (priv);
2323       priv->drop_on_latency = g_value_get_boolean (value);
2324       JBUF_UNLOCK (priv);
2325       break;
2326     case PROP_TS_OFFSET:
2327       JBUF_LOCK (priv);
2328       priv->ts_offset = g_value_get_int64 (value);
2329       priv->ts_discont = TRUE;
2330       JBUF_UNLOCK (priv);
2331       break;
2332     case PROP_DO_LOST:
2333       JBUF_LOCK (priv);
2334       priv->do_lost = g_value_get_boolean (value);
2335       JBUF_UNLOCK (priv);
2336       break;
2337     case PROP_MODE:
2338       JBUF_LOCK (priv);
2339       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2340       JBUF_UNLOCK (priv);
2341       break;
2342     default:
2343       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2344       break;
2345   }
2346 }
2347
2348 static void
2349 gst_rtp_jitter_buffer_get_property (GObject * object,
2350     guint prop_id, GValue * value, GParamSpec * pspec)
2351 {
2352   GstRtpJitterBuffer *jitterbuffer;
2353   GstRtpJitterBufferPrivate *priv;
2354
2355   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2356   priv = jitterbuffer->priv;
2357
2358   switch (prop_id) {
2359     case PROP_LATENCY:
2360       JBUF_LOCK (priv);
2361       g_value_set_uint (value, priv->latency_ms);
2362       JBUF_UNLOCK (priv);
2363       break;
2364     case PROP_DROP_ON_LATENCY:
2365       JBUF_LOCK (priv);
2366       g_value_set_boolean (value, priv->drop_on_latency);
2367       JBUF_UNLOCK (priv);
2368       break;
2369     case PROP_TS_OFFSET:
2370       JBUF_LOCK (priv);
2371       g_value_set_int64 (value, priv->ts_offset);
2372       JBUF_UNLOCK (priv);
2373       break;
2374     case PROP_DO_LOST:
2375       JBUF_LOCK (priv);
2376       g_value_set_boolean (value, priv->do_lost);
2377       JBUF_UNLOCK (priv);
2378       break;
2379     case PROP_MODE:
2380       JBUF_LOCK (priv);
2381       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2382       JBUF_UNLOCK (priv);
2383       break;
2384     case PROP_PERCENT:
2385     {
2386       gint percent;
2387
2388       JBUF_LOCK (priv);
2389       if (priv->srcresult != GST_FLOW_OK)
2390         percent = 100;
2391       else
2392         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2393
2394       g_value_set_int (value, percent);
2395       JBUF_UNLOCK (priv);
2396       break;
2397     }
2398     default:
2399       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2400       break;
2401   }
2402 }