gst/rtpmanager/gstrtpbin.c: Ref caps when inserting into the cache.
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd, 
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  * @short_description: buffer, reorder and remove duplicate RTP packets to
29  * compensate for network oddities.
30  *
31  * <refsect2>
32  * <para>
33  * This element reorders and removes duplicate RTP packets as they are received
34  * from a network source. It will also wait for missing packets up to a
35  * configurable time limit using the ::latency property. Packets arriving too
36  * late are considered to be lost packets.
37  * </para>
38  * <para>
39  * This element acts as a live element and so adds ::latency to the pipeline.
40  * </para>
41  * <para>
42  * The element needs the clock-rate of the RTP payload in order to estimate the
43  * delay. This information is obtained either from the caps on the sink pad or,
44  * when no caps are present, from the ::request-pt-map signal. To clear the
45  * previous pt-map use the ::clear-pt-map signal.
46  * </para>
47  * <para>
48  * This element will automatically be used inside gstrtpbin.
49  * </para>
50  * <title>Example pipelines</title>
51  * <para>
52  * <programlisting>
53  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
54  * </programlisting>
55  * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
56  * inserted into the pipeline to smooth out network jitter and to reorder the
57  * out-of-order RTP packets.
58  * </para>
59  * </refsect2>
60  *
61  * Last reviewed on 2007-05-28 (0.10.5)
62  */
63
64 #ifdef HAVE_CONFIG_H
65 #include "config.h"
66 #endif
67
68 #include <stdlib.h>
69 #include <string.h>
70 #include <gst/rtp/gstrtpbuffer.h>
71
72 #include "gstrtpbin-marshal.h"
73
74 #include "gstrtpjitterbuffer.h"
75 #include "rtpjitterbuffer.h"
76
77 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
78 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
79
80 /* low and high threshold tell the queue when to start and stop buffering */
81 #define LOW_THRESHOLD 0.2
82 #define HIGH_THRESHOLD 0.8
83
84 /* elementfactory information */
85 static const GstElementDetails gst_rtp_jitter_buffer_details =
86 GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
87     "Filter/Network/RTP",
88     "A buffer that deals with network jitter and other transmission faults",
89     "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
90     "Wim Taymans <wim.taymans@gmail.com>");
91
92 /* RTPJitterBuffer signals and args */
93 enum
94 {
95   SIGNAL_REQUEST_PT_MAP,
96   SIGNAL_CLEAR_PT_MAP,
97   LAST_SIGNAL
98 };
99
100 #define DEFAULT_LATENCY_MS      200
101 #define DEFAULT_DROP_ON_LATENCY FALSE
102 #define DEFAULT_TS_OFFSET       0
103
104 enum
105 {
106   PROP_0,
107   PROP_LATENCY,
108   PROP_DROP_ON_LATENCY,
109   PROP_TS_OFFSET
110 };
111
112 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
113
114 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
115   JBUF_LOCK (priv);                                   \
116   if (priv->srcresult != GST_FLOW_OK)                 \
117     goto label;                                       \
118 } G_STMT_END
119
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
121 #define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
122
123 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
124   JBUF_WAIT(priv);                                    \
125   if (priv->srcresult != GST_FLOW_OK)                 \
126     goto label;                                       \
127 } G_STMT_END
128
129 #define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
130
131 struct _GstRtpJitterBufferPrivate
132 {
133   GstPad *sinkpad, *srcpad;
134
135   RTPJitterBuffer *jbuf;
136   GMutex *jbuf_lock;
137   GCond *jbuf_cond;
138   gboolean waiting;
139
140   /* properties */
141   guint latency_ms;
142   gboolean drop_on_latency;
143   gint64 ts_offset;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum */
148   guint32 next_seqnum;
149
150   /* state */
151   gboolean eos;
152
153   /* clock rate and rtp timestamp offset */
154   gint last_pt;
155   gint32 clock_rate;
156   gint64 clock_base;
157   gint64 prev_ts_offset;
158
159   /* when we are shutting down */
160   GstFlowReturn srcresult;
161   gboolean blocked;
162
163   /* for sync */
164   GstSegment segment;
165   GstClockID clock_id;
166   /* the latency of the upstream peer, we have to take this into account when
167    * synchronizing the buffers. */
168   GstClockTime peer_latency;
169
170   /* some accounting */
171   guint64 num_late;
172   guint64 num_duplicates;
173 };
174
175 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
176   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
177                                 GstRtpJitterBufferPrivate))
178
179 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
180 GST_STATIC_PAD_TEMPLATE ("sink",
181     GST_PAD_SINK,
182     GST_PAD_ALWAYS,
183     GST_STATIC_CAPS ("application/x-rtp, "
184         "clock-rate = (int) [ 1, 2147483647 ]"
185         /* "payload = (int) , "
186          * "encoding-name = (string) "
187          */ )
188     );
189
190 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
191 GST_STATIC_PAD_TEMPLATE ("src",
192     GST_PAD_SRC,
193     GST_PAD_ALWAYS,
194     GST_STATIC_CAPS ("application/x-rtp"
195         /* "payload = (int) , "
196          * "clock-rate = (int) , "
197          * "encoding-name = (string) "
198          */ )
199     );
200
201 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
202
203 GST_BOILERPLATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GstElement,
204     GST_TYPE_ELEMENT);
205
206 /* object overrides */
207 static void gst_rtp_jitter_buffer_set_property (GObject * object,
208     guint prop_id, const GValue * value, GParamSpec * pspec);
209 static void gst_rtp_jitter_buffer_get_property (GObject * object,
210     guint prop_id, GValue * value, GParamSpec * pspec);
211 static void gst_rtp_jitter_buffer_finalize (GObject * object);
212
213 /* element overrides */
214 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
215     * element, GstStateChange transition);
216
217 /* pad overrides */
218 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
219
220 /* sinkpad overrides */
221 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
222 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
223     GstEvent * event);
224 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
225     GstBuffer * buffer);
226
227 /* srcpad overrides */
228 static gboolean
229 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
230 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
231 static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
232
233 static void
234 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
235
236 static void
237 gst_rtp_jitter_buffer_base_init (gpointer klass)
238 {
239   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
240
241   gst_element_class_add_pad_template (element_class,
242       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
243   gst_element_class_add_pad_template (element_class,
244       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
245   gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
246 }
247
248 static void
249 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
250 {
251   GObjectClass *gobject_class;
252   GstElementClass *gstelement_class;
253
254   gobject_class = (GObjectClass *) klass;
255   gstelement_class = (GstElementClass *) klass;
256
257   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
258
259   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
260
261   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
262   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
263
264   /**
265    * GstRtpJitterBuffer::latency:
266    * 
267    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
268    * for at most this time.
269    */
270   g_object_class_install_property (gobject_class, PROP_LATENCY,
271       g_param_spec_uint ("latency", "Buffer latency in ms",
272           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
273           G_PARAM_READWRITE));
274   /**
275    * GstRtpJitterBuffer::drop-on-latency:
276    * 
277    * Drop oldest buffers when the queue is completely filled. 
278    */
279   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
280       g_param_spec_boolean ("drop-on-latency",
281           "Drop buffers when maximum latency is reached",
282           "Tells the jitterbuffer to never exceed the given latency in size",
283           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
284   /**
285    * GstRtpJitterBuffer::ts-offset:
286    * 
287    * Adjust RTP timestamps in the jitterbuffer with offset.
288    */
289   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
290       g_param_spec_int64 ("ts-offset",
291           "Timestamp Offset",
292           "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64,
293           G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE));
294   /**
295    * GstRtpJitterBuffer::request-pt-map:
296    * @buffer: the object which received the signal
297    * @pt: the pt
298    *
299    * Request the payload type as #GstCaps for @pt.
300    */
301   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
302       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
303       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
304           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
305       GST_TYPE_CAPS, 1, G_TYPE_UINT);
306   /**
307    * GstRtpJitterBuffer::clear-pt-map:
308    * @buffer: the object which received the signal
309    *
310    * Invalidate the clock-rate as obtained with the ::request-pt-map signal.
311    */
312   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
313       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
314       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
315           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID,
316       G_TYPE_NONE, 0, G_TYPE_NONE);
317
318   gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
319
320   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
321
322   GST_DEBUG_CATEGORY_INIT
323       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
324 }
325
326 static void
327 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
328     GstRtpJitterBufferClass * klass)
329 {
330   GstRtpJitterBufferPrivate *priv;
331
332   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
333   jitterbuffer->priv = priv;
334
335   priv->latency_ms = DEFAULT_LATENCY_MS;
336   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
337
338   priv->jbuf = rtp_jitter_buffer_new ();
339   priv->jbuf_lock = g_mutex_new ();
340   priv->jbuf_cond = g_cond_new ();
341
342   priv->srcpad =
343       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
344       "src");
345
346   gst_pad_set_activatepush_function (priv->srcpad,
347       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
348   gst_pad_set_query_function (priv->srcpad,
349       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
350   gst_pad_set_getcaps_function (priv->srcpad,
351       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
352
353   priv->sinkpad =
354       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
355       "sink");
356
357   gst_pad_set_chain_function (priv->sinkpad,
358       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
359   gst_pad_set_event_function (priv->sinkpad,
360       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
361   gst_pad_set_setcaps_function (priv->sinkpad,
362       GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
363   gst_pad_set_getcaps_function (priv->sinkpad,
364       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
365
366   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
367   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
368 }
369
370 static void
371 gst_rtp_jitter_buffer_finalize (GObject * object)
372 {
373   GstRtpJitterBuffer *jitterbuffer;
374
375   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
376
377   g_mutex_free (jitterbuffer->priv->jbuf_lock);
378   g_cond_free (jitterbuffer->priv->jbuf_cond);
379
380   g_object_unref (jitterbuffer->priv->jbuf);
381
382   G_OBJECT_CLASS (parent_class)->finalize (object);
383 }
384
385 static void
386 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
387 {
388   GstRtpJitterBufferPrivate *priv;
389
390   priv = jitterbuffer->priv;
391
392   /* this will trigger a new pt-map request signal, FIXME, do something better. */
393   priv->clock_rate = -1;
394 }
395
396 static GstCaps *
397 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
398 {
399   GstRtpJitterBuffer *jitterbuffer;
400   GstRtpJitterBufferPrivate *priv;
401   GstPad *other;
402   GstCaps *caps;
403   const GstCaps *templ;
404
405   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
406   priv = jitterbuffer->priv;
407
408   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
409
410   caps = gst_pad_peer_get_caps (other);
411
412   templ = gst_pad_get_pad_template_caps (pad);
413   if (caps == NULL) {
414     GST_DEBUG_OBJECT (jitterbuffer, "copy template");
415     caps = gst_caps_copy (templ);
416   } else {
417     GstCaps *intersect;
418
419     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
420
421     intersect = gst_caps_intersect (caps, templ);
422     gst_caps_unref (caps);
423
424     caps = intersect;
425   }
426   gst_object_unref (jitterbuffer);
427
428   return caps;
429 }
430
431 static gboolean
432 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
433     GstCaps * caps)
434 {
435   GstRtpJitterBufferPrivate *priv;
436   GstStructure *caps_struct;
437   guint val;
438
439   priv = jitterbuffer->priv;
440
441   /* first parse the caps */
442   caps_struct = gst_caps_get_structure (caps, 0);
443
444   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
445
446   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
447    * measure the amount of data in the buffer */
448   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
449     goto error;
450
451   if (priv->clock_rate <= 0)
452     goto wrong_rate;
453
454   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
455
456   /* gah, clock-base is uint. If we don't have a base, we will use the first
457    * buffer timestamp as the base time. This will screw up sync but it's better
458    * than nothing. */
459   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
460     priv->clock_base = val;
461   else
462     priv->clock_base = -1;
463
464   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
465       priv->clock_base);
466
467   /* first expected seqnum */
468   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
469     priv->next_seqnum = val;
470   else
471     priv->next_seqnum = -1;
472
473   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
474
475   return TRUE;
476
477   /* ERRORS */
478 error:
479   {
480     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
481     return FALSE;
482   }
483 wrong_rate:
484   {
485     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
486     return FALSE;
487   }
488 }
489
490 static gboolean
491 gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
492 {
493   GstRtpJitterBuffer *jitterbuffer;
494   GstRtpJitterBufferPrivate *priv;
495   gboolean res;
496
497   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
498   priv = jitterbuffer->priv;
499
500   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
501
502   /* set same caps on srcpad on success */
503   if (res)
504     gst_pad_set_caps (priv->srcpad, caps);
505
506   gst_object_unref (jitterbuffer);
507
508   return res;
509 }
510
511 static void
512 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
513 {
514   GstRtpJitterBufferPrivate *priv;
515
516   priv = jitterbuffer->priv;
517
518   JBUF_LOCK (priv);
519   /* mark ourselves as flushing */
520   priv->srcresult = GST_FLOW_WRONG_STATE;
521   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
522   /* this unblocks any waiting pops on the src pad task */
523   JBUF_SIGNAL (priv);
524   /* unlock clock, we just unschedule, the entry will be released by the 
525    * locking streaming thread. */
526   if (priv->clock_id)
527     gst_clock_id_unschedule (priv->clock_id);
528   JBUF_UNLOCK (priv);
529 }
530
531 static void
532 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
533 {
534   GstRtpJitterBufferPrivate *priv;
535
536   priv = jitterbuffer->priv;
537
538   JBUF_LOCK (priv);
539   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
540   /* Mark as non flushing */
541   priv->srcresult = GST_FLOW_OK;
542   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
543   priv->last_popped_seqnum = -1;
544   priv->next_seqnum = -1;
545   priv->clock_rate = -1;
546   priv->eos = FALSE;
547   rtp_jitter_buffer_flush (priv->jbuf);
548   rtp_jitter_buffer_reset_skew (priv->jbuf);
549   JBUF_UNLOCK (priv);
550 }
551
552 static gboolean
553 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
554 {
555   gboolean result = TRUE;
556   GstRtpJitterBuffer *jitterbuffer = NULL;
557
558   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
559
560   if (active) {
561     /* allow data processing */
562     gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
563
564     /* start pushing out buffers */
565     GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
566     gst_pad_start_task (jitterbuffer->priv->srcpad,
567         (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
568   } else {
569     /* make sure all data processing stops ASAP */
570     gst_rtp_jitter_buffer_flush_start (jitterbuffer);
571
572     /* NOTE this will hardlock if the state change is called from the src pad
573      * task thread because we will _join() the thread. */
574     GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
575     result = gst_pad_stop_task (pad);
576   }
577
578   gst_object_unref (jitterbuffer);
579
580   return result;
581 }
582
583 static GstStateChangeReturn
584 gst_rtp_jitter_buffer_change_state (GstElement * element,
585     GstStateChange transition)
586 {
587   GstRtpJitterBuffer *jitterbuffer;
588   GstRtpJitterBufferPrivate *priv;
589   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
590
591   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
592   priv = jitterbuffer->priv;
593
594   switch (transition) {
595     case GST_STATE_CHANGE_NULL_TO_READY:
596       break;
597     case GST_STATE_CHANGE_READY_TO_PAUSED:
598       JBUF_LOCK (priv);
599       /* reset negotiated values */
600       priv->clock_rate = -1;
601       priv->clock_base = -1;
602       priv->peer_latency = 0;
603       priv->last_pt = -1;
604       /* block until we go to PLAYING */
605       priv->blocked = TRUE;
606       /* reset skew detection initialy */
607       rtp_jitter_buffer_reset_skew (priv->jbuf);
608       JBUF_UNLOCK (priv);
609       break;
610     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
611       JBUF_LOCK (priv);
612       /* unblock to allow streaming in PLAYING */
613       priv->blocked = FALSE;
614       JBUF_SIGNAL (priv);
615       JBUF_UNLOCK (priv);
616       break;
617     default:
618       break;
619   }
620
621   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
622
623   switch (transition) {
624     case GST_STATE_CHANGE_READY_TO_PAUSED:
625       /* we are a live element because we sync to the clock, which we can only
626        * do in the PLAYING state */
627       if (ret != GST_STATE_CHANGE_FAILURE)
628         ret = GST_STATE_CHANGE_NO_PREROLL;
629       break;
630     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
631       JBUF_LOCK (priv);
632       /* block to stop streaming when PAUSED */
633       priv->blocked = TRUE;
634       JBUF_UNLOCK (priv);
635       if (ret != GST_STATE_CHANGE_FAILURE)
636         ret = GST_STATE_CHANGE_NO_PREROLL;
637       break;
638     case GST_STATE_CHANGE_PAUSED_TO_READY:
639       break;
640     case GST_STATE_CHANGE_READY_TO_NULL:
641       break;
642     default:
643       break;
644   }
645
646   return ret;
647 }
648
649 /**
650  * Performs comparison 'b - a' with check for overflows.
651  */
652 static inline gint
653 priv_compare_rtp_seq_lt (guint16 a, guint16 b)
654 {
655   /* check if diff more than half of the 16bit range */
656   if (abs (b - a) > (1 << 15)) {
657     /* one of a/b has wrapped */
658     return a - b;
659   } else {
660     return b - a;
661   }
662 }
663
664 static gboolean
665 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
666 {
667   gboolean ret = TRUE;
668   GstRtpJitterBuffer *jitterbuffer;
669   GstRtpJitterBufferPrivate *priv;
670
671   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
672   priv = jitterbuffer->priv;
673
674   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
675
676   switch (GST_EVENT_TYPE (event)) {
677     case GST_EVENT_NEWSEGMENT:
678     {
679       GstFormat format;
680       gdouble rate, arate;
681       gint64 start, stop, time;
682       gboolean update;
683
684       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
685           &start, &stop, &time);
686
687       /* we need time for now */
688       if (format != GST_FORMAT_TIME)
689         goto newseg_wrong_format;
690
691       GST_DEBUG_OBJECT (jitterbuffer,
692           "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
693           ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
694           update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
695           GST_TIME_ARGS (time));
696
697       /* now configure the values, we need these to time the release of the
698        * buffers on the srcpad. */
699       gst_segment_set_newsegment_full (&priv->segment, update,
700           rate, arate, format, start, stop, time);
701
702       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
703       ret = gst_pad_push_event (priv->srcpad, event);
704       break;
705     }
706     case GST_EVENT_FLUSH_START:
707       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
708       ret = gst_pad_push_event (priv->srcpad, event);
709       break;
710     case GST_EVENT_FLUSH_STOP:
711       ret = gst_pad_push_event (priv->srcpad, event);
712       ret = gst_rtp_jitter_buffer_src_activate_push (priv->srcpad, TRUE);
713       break;
714     case GST_EVENT_EOS:
715     {
716       /* push EOS in queue. We always push it at the head */
717       JBUF_LOCK (priv);
718       /* check for flushing, we need to discard the event and return FALSE when
719        * we are flushing */
720       ret = priv->srcresult == GST_FLOW_OK;
721       if (ret && !priv->eos) {
722         GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
723         priv->eos = TRUE;
724         JBUF_SIGNAL (priv);
725       } else if (priv->eos) {
726         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
727       } else {
728         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
729             gst_flow_get_name (priv->srcresult));
730       }
731       JBUF_UNLOCK (priv);
732       gst_event_unref (event);
733       break;
734     }
735     default:
736       ret = gst_pad_push_event (priv->srcpad, event);
737       break;
738   }
739
740 done:
741   gst_object_unref (jitterbuffer);
742
743   return ret;
744
745   /* ERRORS */
746 newseg_wrong_format:
747   {
748     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
749     ret = FALSE;
750     goto done;
751   }
752 }
753
754 static gboolean
755 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
756     guint8 pt)
757 {
758   GValue ret = { 0 };
759   GValue args[2] = { {0}, {0} };
760   GstCaps *caps;
761   gboolean res;
762
763   g_value_init (&args[0], GST_TYPE_ELEMENT);
764   g_value_set_object (&args[0], jitterbuffer);
765   g_value_init (&args[1], G_TYPE_UINT);
766   g_value_set_uint (&args[1], pt);
767
768   g_value_init (&ret, GST_TYPE_CAPS);
769   g_value_set_boxed (&ret, NULL);
770
771   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
772       &ret);
773
774   g_value_unset (&args[0]);
775   g_value_unset (&args[1]);
776   caps = (GstCaps *) g_value_dup_boxed (&ret);
777   g_value_unset (&ret);
778   if (!caps)
779     goto no_caps;
780
781   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
782
783   gst_caps_unref (caps);
784
785   return res;
786
787   /* ERRORS */
788 no_caps:
789   {
790     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
791     return FALSE;
792   }
793 }
794
795 static GstFlowReturn
796 gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
797 {
798   GstRtpJitterBuffer *jitterbuffer;
799   GstRtpJitterBufferPrivate *priv;
800   guint16 seqnum;
801   GstFlowReturn ret = GST_FLOW_OK;
802   GstClockTime timestamp;
803   guint64 latency_ts;
804   gboolean tail;
805
806   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
807
808   if (!gst_rtp_buffer_validate (buffer))
809     goto invalid_buffer;
810
811   priv = jitterbuffer->priv;
812
813   if (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer)) {
814     GstCaps *caps;
815
816     priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
817     /* reset clock-rate so that we get a new one */
818     priv->clock_rate = -1;
819     /* Try to get the clock-rate from the caps first if we can. If there are no
820      * caps we must fire the signal to get the clock-rate. */
821     if ((caps = GST_BUFFER_CAPS (buffer))) {
822       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
823     }
824   }
825
826   if (priv->clock_rate == -1) {
827     guint8 pt;
828
829     /* no clock rate given on the caps, try to get one with the signal */
830     pt = gst_rtp_buffer_get_payload_type (buffer);
831
832     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
833     if (priv->clock_rate == -1)
834       goto not_negotiated;
835   }
836
837   /* take the timestamp of the buffer. This is the time when the packet was
838    * received and is used to calculate jitter and clock skew. We will adjust
839    * this timestamp with the smoothed value after processing it in the
840    * jitterbuffer. */
841   timestamp = GST_BUFFER_TIMESTAMP (buffer);
842   /* bring to running time */
843   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
844       timestamp);
845
846   seqnum = gst_rtp_buffer_get_seq (buffer);
847   GST_DEBUG_OBJECT (jitterbuffer,
848       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
849       GST_TIME_ARGS (timestamp));
850
851   JBUF_LOCK_CHECK (priv, out_flushing);
852   /* don't accept more data on EOS */
853   if (priv->eos)
854     goto have_eos;
855
856   /* let's check if this buffer is too late, we cannot accept packets with
857    * bigger seqnum than the one we already pushed. */
858   if (priv->last_popped_seqnum != -1) {
859     if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
860       goto too_late;
861   }
862
863   /* let's drop oldest packet if the queue is already full and drop-on-latency
864    * is set. We can only do this when there actually is a latency. When no
865    * latency is set, we just pump it in the queue and let the other end push it
866    * out as fast as possible. */
867   if (priv->latency_ms && priv->drop_on_latency) {
868
869     latency_ts =
870         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
871
872     if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
873       GstBuffer *old_buf;
874
875       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
876           seqnum);
877
878       old_buf = rtp_jitter_buffer_pop (priv->jbuf);
879       gst_buffer_unref (old_buf);
880     }
881   }
882
883   /* now insert the packet into the queue in sorted order. This function returns
884    * FALSE if a packet with the same seqnum was already in the queue, meaning we
885    * have a duplicate. */
886   if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
887           priv->clock_rate, &tail))
888     goto duplicate;
889
890   /* signal addition of new buffer when the _loop is waiting. */
891   if (priv->waiting)
892     JBUF_SIGNAL (priv);
893
894   /* let's unschedule and unblock any waiting buffers. We only want to do this
895    * when the tail buffer changed */
896   if (priv->clock_id && tail) {
897     GST_DEBUG_OBJECT (jitterbuffer,
898         "Unscheduling waiting buffer, new tail buffer");
899     gst_clock_id_unschedule (priv->clock_id);
900   }
901
902   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
903       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
904
905 finished:
906   JBUF_UNLOCK (priv);
907
908   gst_object_unref (jitterbuffer);
909
910   return ret;
911
912   /* ERRORS */
913 invalid_buffer:
914   {
915     /* this is fatal and should be filtered earlier */
916     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
917         ("Received invalid RTP payload"));
918     gst_buffer_unref (buffer);
919     gst_object_unref (jitterbuffer);
920     return GST_FLOW_ERROR;
921   }
922 not_negotiated:
923   {
924     GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
925     gst_buffer_unref (buffer);
926     gst_object_unref (jitterbuffer);
927     return GST_FLOW_OK;
928   }
929 out_flushing:
930   {
931     ret = priv->srcresult;
932     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
933     gst_buffer_unref (buffer);
934     goto finished;
935   }
936 have_eos:
937   {
938     ret = GST_FLOW_UNEXPECTED;
939     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
940     gst_buffer_unref (buffer);
941     goto finished;
942   }
943 too_late:
944   {
945     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
946         " popped, dropping", seqnum, priv->last_popped_seqnum);
947     priv->num_late++;
948     gst_buffer_unref (buffer);
949     goto finished;
950   }
951 duplicate:
952   {
953     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
954         seqnum);
955     priv->num_duplicates++;
956     gst_buffer_unref (buffer);
957     goto finished;
958   }
959 }
960
961 static GstClockTime
962 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
963 {
964   GstRtpJitterBufferPrivate *priv;
965
966   priv = jitterbuffer->priv;
967
968   if (timestamp == -1)
969     return -1;
970
971   /* apply the timestamp offset */
972   timestamp += priv->ts_offset;
973
974   return timestamp;
975 }
976
977 /**
978  * This funcion will push out buffers on the source pad.
979  *
980  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
981  * different seqnum (missing packets before B), this function will wait for the
982  * missing packet to arrive up to the timestamp of buffer B.
983  */
984 static void
985 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
986 {
987   GstRtpJitterBufferPrivate *priv;
988   GstBuffer *outbuf;
989   GstFlowReturn result;
990   guint16 seqnum;
991   GstClockTime timestamp, out_time;
992
993   priv = jitterbuffer->priv;
994
995   JBUF_LOCK_CHECK (priv, flushing);
996 again:
997   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
998   while (TRUE) {
999
1000     /* always wait if we are blocked */
1001     if (!priv->blocked) {
1002       /* if we have a packet, we can grab it */
1003       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1004         break;
1005       /* no packets but we are EOS, do eos logic */
1006       if (priv->eos)
1007         goto do_eos;
1008     }
1009     /* wait for packets or flushing now */
1010     priv->waiting = TRUE;
1011     JBUF_WAIT_CHECK (priv, flushing);
1012     priv->waiting = FALSE;
1013   }
1014
1015   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1016    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1017    * wait on the timestamp. In the chain function we will unlock the wait when a
1018    * new buffer is available. The peeked buffer is valid for as long as we hold
1019    * the jitterbuffer lock. */
1020   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1021   seqnum = gst_rtp_buffer_get_seq (outbuf);
1022
1023   /* get the timestamp, this is already corrected for clock skew by the
1024    * jitterbuffer */
1025   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1026
1027   GST_DEBUG_OBJECT (jitterbuffer,
1028       "Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
1029       seqnum, GST_TIME_ARGS (timestamp),
1030       rtp_jitter_buffer_num_packets (priv->jbuf));
1031
1032   /* apply our timestamp offset to the incomming buffer, this will be our output
1033    * timestamp. */
1034   out_time = apply_offset (jitterbuffer, timestamp);
1035
1036   /* If we don't know what the next seqnum should be (== -1) we have to wait
1037    * because it might be possible that we are not receiving this buffer in-order,
1038    * a buffer with a lower seqnum could arrive later and we want to push that
1039    * earlier buffer before this buffer then.
1040    * If we know the expected seqnum, we can compare it to the current seqnum to
1041    * determine if we have missing a packet. If we have a missing packet (which
1042    * must be before this packet) we can wait for it until the deadline for this
1043    * packet expires. */
1044   if ((priv->next_seqnum == -1 || priv->next_seqnum != seqnum)
1045       && out_time != -1) {
1046     GstClockID id;
1047     GstClockTime sync_time;
1048     GstClockReturn ret;
1049     GstClock *clock;
1050
1051     if (priv->next_seqnum != -1) {
1052       /* we expected next_seqnum but received something else, that's a gap */
1053       GST_WARNING_OBJECT (jitterbuffer,
1054           "Sequence number GAP detected: expected %d instead of %d",
1055           priv->next_seqnum, seqnum);
1056     } else {
1057       /* we don't know what the next_seqnum should be, wait for the last
1058        * possible moment to push this buffer, maybe we get an earlier seqnum
1059        * while we wait */
1060       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1061     }
1062
1063     GST_OBJECT_LOCK (jitterbuffer);
1064     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1065     if (!clock) {
1066       GST_OBJECT_UNLOCK (jitterbuffer);
1067       /* let's just push if there is no clock */
1068       goto push_buffer;
1069     }
1070
1071     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
1072         GST_TIME_ARGS (out_time));
1073
1074     /* prepare for sync against clock */
1075     sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1076     /* add latency, this includes our own latency and the peer latency. */
1077     sync_time += (priv->latency_ms * GST_MSECOND);
1078     sync_time += priv->peer_latency;
1079
1080     /* create an entry for the clock */
1081     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1082     GST_OBJECT_UNLOCK (jitterbuffer);
1083
1084     /* release the lock so that the other end can push stuff or unlock */
1085     JBUF_UNLOCK (priv);
1086
1087     ret = gst_clock_id_wait (id, NULL);
1088
1089     JBUF_LOCK (priv);
1090     /* and free the entry */
1091     gst_clock_id_unref (id);
1092     priv->clock_id = NULL;
1093
1094     /* at this point, the clock could have been unlocked by a timeout, a new
1095      * tail element was added to the queue or because we are shutting down. Check
1096      * for shutdown first. */
1097     if (priv->srcresult != GST_FLOW_OK)
1098       goto flushing;
1099
1100     /* if we got unscheduled and we are not flushing, it's because a new tail
1101      * element became available in the queue. Grab it and try to push or sync. */
1102     if (ret == GST_CLOCK_UNSCHEDULED) {
1103       GST_DEBUG_OBJECT (jitterbuffer,
1104           "Wait got unscheduled, will retry to push with new buffer");
1105       goto again;
1106     }
1107     /* Get new timestamp, latency might have changed */
1108     out_time = apply_offset (jitterbuffer, timestamp);
1109   }
1110 push_buffer:
1111   /* when we get here we are ready to pop and push the buffer */
1112   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
1113
1114   /* check if we are pushing something unexpected */
1115   if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
1116     gint dropped;
1117
1118     /* calc number of missing packets, careful for wraparounds */
1119     dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
1120
1121     GST_DEBUG_OBJECT (jitterbuffer,
1122         "Pushing DISCONT after dropping %d (%d to %d)", dropped,
1123         priv->next_seqnum, seqnum);
1124
1125     /* update stats */
1126     priv->num_late += dropped;
1127
1128     /* set DISCONT flag when we missed a packet. */
1129     outbuf = gst_buffer_make_metadata_writable (outbuf);
1130     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1131   }
1132
1133   /* apply timestamp with offset to buffer now */
1134   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
1135
1136   /* now we are ready to push the buffer. Save the seqnum and release the lock
1137    * so the other end can push stuff in the queue again. */
1138   priv->last_popped_seqnum = seqnum;
1139   priv->next_seqnum = (seqnum + 1) & 0xffff;
1140   JBUF_UNLOCK (priv);
1141
1142   /* push buffer */
1143   GST_DEBUG_OBJECT (jitterbuffer,
1144       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1145       GST_TIME_ARGS (out_time));
1146   result = gst_pad_push (priv->srcpad, outbuf);
1147   if (result != GST_FLOW_OK)
1148     goto pause;
1149
1150   return;
1151
1152   /* ERRORS */
1153 do_eos:
1154   {
1155     /* store result, we are flushing now */
1156     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1157     priv->srcresult = GST_FLOW_UNEXPECTED;
1158     gst_pad_pause_task (priv->srcpad);
1159     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1160     JBUF_UNLOCK (priv);
1161     return;
1162   }
1163 flushing:
1164   {
1165     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1166     gst_pad_pause_task (priv->srcpad);
1167     JBUF_UNLOCK (priv);
1168     return;
1169   }
1170 pause:
1171   {
1172     const gchar *reason = gst_flow_get_name (result);
1173
1174     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
1175
1176     JBUF_LOCK (priv);
1177     /* store result */
1178     priv->srcresult = result;
1179     /* we don't post errors or anything because upstream will do that for us
1180      * when we pass the return value upstream. */
1181     gst_pad_pause_task (priv->srcpad);
1182     JBUF_UNLOCK (priv);
1183     return;
1184   }
1185 }
1186
1187 static gboolean
1188 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
1189 {
1190   GstRtpJitterBuffer *jitterbuffer;
1191   GstRtpJitterBufferPrivate *priv;
1192   gboolean res = FALSE;
1193
1194   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1195   priv = jitterbuffer->priv;
1196
1197   switch (GST_QUERY_TYPE (query)) {
1198     case GST_QUERY_LATENCY:
1199     {
1200       /* We need to send the query upstream and add the returned latency to our
1201        * own */
1202       GstClockTime min_latency, max_latency;
1203       gboolean us_live;
1204       GstClockTime our_latency;
1205
1206       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
1207         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
1208
1209         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
1210             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1211             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1212
1213         /* store this so that we can safely sync on the peer buffers. */
1214         JBUF_LOCK (priv);
1215         priv->peer_latency = min_latency;
1216         our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
1217         JBUF_UNLOCK (priv);
1218
1219         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
1220             GST_TIME_ARGS (our_latency));
1221
1222         /* we add some latency but can buffer an infinite amount of time */
1223         min_latency += our_latency;
1224         max_latency = -1;
1225
1226         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
1227             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1228             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1229
1230         gst_query_set_latency (query, TRUE, min_latency, max_latency);
1231       }
1232       break;
1233     }
1234     default:
1235       res = gst_pad_query_default (pad, query);
1236       break;
1237   }
1238
1239   gst_object_unref (jitterbuffer);
1240
1241   return res;
1242 }
1243
1244 static void
1245 gst_rtp_jitter_buffer_set_property (GObject * object,
1246     guint prop_id, const GValue * value, GParamSpec * pspec)
1247 {
1248   GstRtpJitterBuffer *jitterbuffer;
1249   GstRtpJitterBufferPrivate *priv;
1250
1251   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1252   priv = jitterbuffer->priv;
1253
1254   switch (prop_id) {
1255     case PROP_LATENCY:
1256     {
1257       guint new_latency, old_latency;
1258
1259       new_latency = g_value_get_uint (value);
1260
1261       JBUF_LOCK (priv);
1262       old_latency = priv->latency_ms;
1263       priv->latency_ms = new_latency;
1264       JBUF_UNLOCK (priv);
1265
1266       /* post message if latency changed, this will inform the parent pipeline
1267        * that a latency reconfiguration is possible/needed. */
1268       if (new_latency != old_latency) {
1269         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
1270             GST_TIME_ARGS (new_latency * GST_MSECOND));
1271
1272         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
1273             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
1274       }
1275       break;
1276     }
1277     case PROP_DROP_ON_LATENCY:
1278       priv->drop_on_latency = g_value_get_boolean (value);
1279       break;
1280     case PROP_TS_OFFSET:
1281       JBUF_LOCK (priv);
1282       priv->ts_offset = g_value_get_int64 (value);
1283       JBUF_UNLOCK (priv);
1284       break;
1285     default:
1286       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1287       break;
1288   }
1289 }
1290
1291 static void
1292 gst_rtp_jitter_buffer_get_property (GObject * object,
1293     guint prop_id, GValue * value, GParamSpec * pspec)
1294 {
1295   GstRtpJitterBuffer *jitterbuffer;
1296   GstRtpJitterBufferPrivate *priv;
1297
1298   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1299   priv = jitterbuffer->priv;
1300
1301   switch (prop_id) {
1302     case PROP_LATENCY:
1303       JBUF_LOCK (priv);
1304       g_value_set_uint (value, priv->latency_ms);
1305       JBUF_UNLOCK (priv);
1306       break;
1307     case PROP_DROP_ON_LATENCY:
1308       g_value_set_boolean (value, priv->drop_on_latency);
1309       break;
1310     case PROP_TS_OFFSET:
1311       JBUF_LOCK (priv);
1312       g_value_set_int64 (value, priv->ts_offset);
1313       JBUF_UNLOCK (priv);
1314       break;
1315     default:
1316       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1317       break;
1318   }
1319 }