3e5cb3b3f3d36f8f10711d6d3c4f61a30ce7371a
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd, 
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  * @short_description: buffer, reorder and remove duplicate RTP packets to
29  * compensate for network oddities.
30  *
31  * <refsect2>
32  * <para>
33  * This element reorders and removes duplicate RTP packets as they are received
34  * from a network source. It will also wait for missing packets up to a
35  * configurable time limit using the ::latency property. Packets arriving too
36  * late are considered to be lost packets.
37  * </para>
38  * <para>
39  * This element acts as a live element and so adds ::latency to the pipeline.
40  * </para>
41  * <para>
42  * The element needs the clock-rate of the RTP payload in order to estimate the
43  * delay. This information is obtained either from the caps on the sink pad or,
44  * when no caps are present, from the ::request-pt-map signal. To clear the
45  * previous pt-map use the ::clear-pt-map signal.
46  * </para>
47  * <para>
48  * This element will automatically be used inside gstrtpbin.
49  * </para>
50  * <title>Example pipelines</title>
51  * <para>
52  * <programlisting>
53  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
54  * </programlisting>
55  * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
56  * inserted into the pipeline to smooth out network jitter and to reorder the
57  * out-of-order RTP packets.
58  * </para>
59  * </refsect2>
60  *
61  * Last reviewed on 2007-05-28 (0.10.5)
62  */
63
64 #ifdef HAVE_CONFIG_H
65 #include "config.h"
66 #endif
67
68 #include <stdlib.h>
69 #include <string.h>
70 #include <gst/rtp/gstrtpbuffer.h>
71
72 #include "gstrtpbin-marshal.h"
73
74 #include "gstrtpjitterbuffer.h"
75 #include "rtpjitterbuffer.h"
76
77 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
78 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
79
80 /* low and high threshold tell the queue when to start and stop buffering */
81 #define LOW_THRESHOLD 0.2
82 #define HIGH_THRESHOLD 0.8
83
84 /* elementfactory information */
85 static const GstElementDetails gst_rtp_jitter_buffer_details =
86 GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
87     "Filter/Network/RTP",
88     "A buffer that deals with network jitter and other transmission faults",
89     "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
90     "Wim Taymans <wim.taymans@gmail.com>");
91
92 /* RTPJitterBuffer signals and args */
93 enum
94 {
95   SIGNAL_REQUEST_PT_MAP,
96   SIGNAL_CLEAR_PT_MAP,
97   LAST_SIGNAL
98 };
99
100 #define DEFAULT_LATENCY_MS      200
101 #define DEFAULT_DROP_ON_LATENCY FALSE
102 #define DEFAULT_TS_OFFSET       0
103
104 enum
105 {
106   PROP_0,
107   PROP_LATENCY,
108   PROP_DROP_ON_LATENCY,
109   PROP_TS_OFFSET
110 };
111
112 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
113
114 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
115   JBUF_LOCK (priv);                                   \
116   if (priv->srcresult != GST_FLOW_OK)                 \
117     goto label;                                       \
118 } G_STMT_END
119
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
121 #define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
122
123 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
124   JBUF_WAIT(priv);                                    \
125   if (priv->srcresult != GST_FLOW_OK)                 \
126     goto label;                                       \
127 } G_STMT_END
128
129 #define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
130
131 struct _GstRtpJitterBufferPrivate
132 {
133   GstPad *sinkpad, *srcpad;
134
135   RTPJitterBuffer *jbuf;
136   GMutex *jbuf_lock;
137   GCond *jbuf_cond;
138   gboolean waiting;
139
140   /* properties */
141   guint latency_ms;
142   gboolean drop_on_latency;
143   gint64 ts_offset;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum */
148   guint32 next_seqnum;
149
150   /* state */
151   gboolean eos;
152
153   /* clock rate and rtp timestamp offset */
154   gint last_pt;
155   gint32 clock_rate;
156   gint64 clock_base;
157   gint64 prev_ts_offset;
158
159   /* when we are shutting down */
160   GstFlowReturn srcresult;
161   gboolean blocked;
162
163   /* for sync */
164   GstSegment segment;
165   GstClockID clock_id;
166   /* the latency of the upstream peer, we have to take this into account when
167    * synchronizing the buffers. */
168   GstClockTime peer_latency;
169
170   /* some accounting */
171   guint64 num_late;
172   guint64 num_duplicates;
173 };
174
175 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
176   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
177                                 GstRtpJitterBufferPrivate))
178
179 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
180 GST_STATIC_PAD_TEMPLATE ("sink",
181     GST_PAD_SINK,
182     GST_PAD_ALWAYS,
183     GST_STATIC_CAPS ("application/x-rtp, "
184         "clock-rate = (int) [ 1, 2147483647 ]"
185         /* "payload = (int) , "
186          * "encoding-name = (string) "
187          */ )
188     );
189
190 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
191 GST_STATIC_PAD_TEMPLATE ("src",
192     GST_PAD_SRC,
193     GST_PAD_ALWAYS,
194     GST_STATIC_CAPS ("application/x-rtp"
195         /* "payload = (int) , "
196          * "clock-rate = (int) , "
197          * "encoding-name = (string) "
198          */ )
199     );
200
201 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
202
203 GST_BOILERPLATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GstElement,
204     GST_TYPE_ELEMENT);
205
206 /* object overrides */
207 static void gst_rtp_jitter_buffer_set_property (GObject * object,
208     guint prop_id, const GValue * value, GParamSpec * pspec);
209 static void gst_rtp_jitter_buffer_get_property (GObject * object,
210     guint prop_id, GValue * value, GParamSpec * pspec);
211 static void gst_rtp_jitter_buffer_finalize (GObject * object);
212
213 /* element overrides */
214 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
215     * element, GstStateChange transition);
216
217 /* pad overrides */
218 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
219
220 /* sinkpad overrides */
221 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
222 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
223     GstEvent * event);
224 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
225     GstBuffer * buffer);
226
227 /* srcpad overrides */
228 static gboolean
229 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
230 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
231 static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
232
233 static void
234 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
235
236 static void
237 gst_rtp_jitter_buffer_base_init (gpointer klass)
238 {
239   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
240
241   gst_element_class_add_pad_template (element_class,
242       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
243   gst_element_class_add_pad_template (element_class,
244       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
245   gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
246 }
247
248 static void
249 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
250 {
251   GObjectClass *gobject_class;
252   GstElementClass *gstelement_class;
253
254   gobject_class = (GObjectClass *) klass;
255   gstelement_class = (GstElementClass *) klass;
256
257   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
258
259   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
260
261   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
262   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
263
264   /**
265    * GstRtpJitterBuffer::latency:
266    * 
267    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
268    * for at most this time.
269    */
270   g_object_class_install_property (gobject_class, PROP_LATENCY,
271       g_param_spec_uint ("latency", "Buffer latency in ms",
272           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
273           G_PARAM_READWRITE));
274   /**
275    * GstRtpJitterBuffer::drop-on-latency:
276    * 
277    * Drop oldest buffers when the queue is completely filled. 
278    */
279   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
280       g_param_spec_boolean ("drop-on-latency",
281           "Drop buffers when maximum latency is reached",
282           "Tells the jitterbuffer to never exceed the given latency in size",
283           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
284   /**
285    * GstRtpJitterBuffer::ts-offset:
286    * 
287    * Adjust RTP timestamps in the jitterbuffer with offset.
288    */
289   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
290       g_param_spec_int64 ("ts-offset",
291           "Timestamp Offset",
292           "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64,
293           G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE));
294   /**
295    * GstRtpJitterBuffer::request-pt-map:
296    * @buffer: the object which received the signal
297    * @pt: the pt
298    *
299    * Request the payload type as #GstCaps for @pt.
300    */
301   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
302       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
303       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
304           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
305       GST_TYPE_CAPS, 1, G_TYPE_UINT);
306   /**
307    * GstRtpJitterBuffer::clear-pt-map:
308    * @buffer: the object which received the signal
309    *
310    * Invalidate the clock-rate as obtained with the ::request-pt-map signal.
311    */
312   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
313       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
314       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
315           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID,
316       G_TYPE_NONE, 0, G_TYPE_NONE);
317
318   gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
319
320   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
321
322   GST_DEBUG_CATEGORY_INIT
323       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
324 }
325
326 static void
327 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
328     GstRtpJitterBufferClass * klass)
329 {
330   GstRtpJitterBufferPrivate *priv;
331
332   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
333   jitterbuffer->priv = priv;
334
335   priv->latency_ms = DEFAULT_LATENCY_MS;
336   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
337
338   priv->jbuf = rtp_jitter_buffer_new ();
339   priv->jbuf_lock = g_mutex_new ();
340   priv->jbuf_cond = g_cond_new ();
341
342   priv->srcpad =
343       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
344       "src");
345
346   gst_pad_set_activatepush_function (priv->srcpad,
347       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
348   gst_pad_set_query_function (priv->srcpad,
349       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
350   gst_pad_set_getcaps_function (priv->srcpad,
351       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
352
353   priv->sinkpad =
354       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
355       "sink");
356
357   gst_pad_set_chain_function (priv->sinkpad,
358       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
359   gst_pad_set_event_function (priv->sinkpad,
360       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
361   gst_pad_set_setcaps_function (priv->sinkpad,
362       GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
363   gst_pad_set_getcaps_function (priv->sinkpad,
364       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
365
366   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
367   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
368 }
369
370 static void
371 gst_rtp_jitter_buffer_finalize (GObject * object)
372 {
373   GstRtpJitterBuffer *jitterbuffer;
374
375   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
376
377   g_mutex_free (jitterbuffer->priv->jbuf_lock);
378   g_cond_free (jitterbuffer->priv->jbuf_cond);
379
380   g_object_unref (jitterbuffer->priv->jbuf);
381
382   G_OBJECT_CLASS (parent_class)->finalize (object);
383 }
384
385 static void
386 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
387 {
388   GstRtpJitterBufferPrivate *priv;
389
390   priv = jitterbuffer->priv;
391
392   /* this will trigger a new pt-map request signal, FIXME, do something better. */
393   priv->clock_rate = -1;
394 }
395
396 static GstCaps *
397 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
398 {
399   GstRtpJitterBuffer *jitterbuffer;
400   GstRtpJitterBufferPrivate *priv;
401   GstPad *other;
402   GstCaps *caps;
403   const GstCaps *templ;
404
405   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
406   priv = jitterbuffer->priv;
407
408   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
409
410   caps = gst_pad_peer_get_caps (other);
411
412   templ = gst_pad_get_pad_template_caps (pad);
413   if (caps == NULL) {
414     GST_DEBUG_OBJECT (jitterbuffer, "copy template");
415     caps = gst_caps_copy (templ);
416   } else {
417     GstCaps *intersect;
418
419     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
420
421     intersect = gst_caps_intersect (caps, templ);
422     gst_caps_unref (caps);
423
424     caps = intersect;
425   }
426   gst_object_unref (jitterbuffer);
427
428   return caps;
429 }
430
431 static gboolean
432 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
433     GstCaps * caps)
434 {
435   GstRtpJitterBufferPrivate *priv;
436   GstStructure *caps_struct;
437   guint val;
438
439   priv = jitterbuffer->priv;
440
441   /* first parse the caps */
442   caps_struct = gst_caps_get_structure (caps, 0);
443
444   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
445
446   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
447    * measure the amount of data in the buffer */
448   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
449     goto error;
450
451   if (priv->clock_rate <= 0)
452     goto wrong_rate;
453
454   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
455
456   /* gah, clock-base is uint. If we don't have a base, we will use the first
457    * buffer timestamp as the base time. This will screw up sync but it's better
458    * than nothing. */
459   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
460     priv->clock_base = val;
461   else
462     priv->clock_base = -1;
463
464   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
465       priv->clock_base);
466
467   /* first expected seqnum */
468   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
469     priv->next_seqnum = val;
470   else
471     priv->next_seqnum = -1;
472
473   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
474
475   return TRUE;
476
477   /* ERRORS */
478 error:
479   {
480     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
481     return FALSE;
482   }
483 wrong_rate:
484   {
485     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
486     return FALSE;
487   }
488 }
489
490 static gboolean
491 gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
492 {
493   GstRtpJitterBuffer *jitterbuffer;
494   GstRtpJitterBufferPrivate *priv;
495   gboolean res;
496
497   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
498   priv = jitterbuffer->priv;
499
500   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
501
502   /* set same caps on srcpad on success */
503   if (res)
504     gst_pad_set_caps (priv->srcpad, caps);
505
506   gst_object_unref (jitterbuffer);
507
508   return res;
509 }
510
511 static void
512 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
513 {
514   GstRtpJitterBufferPrivate *priv;
515
516   priv = jitterbuffer->priv;
517
518   JBUF_LOCK (priv);
519   /* mark ourselves as flushing */
520   priv->srcresult = GST_FLOW_WRONG_STATE;
521   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
522   /* this unblocks any waiting pops on the src pad task */
523   JBUF_SIGNAL (priv);
524   /* unlock clock, we just unschedule, the entry will be released by the 
525    * locking streaming thread. */
526   if (priv->clock_id)
527     gst_clock_id_unschedule (priv->clock_id);
528   JBUF_UNLOCK (priv);
529 }
530
531 static void
532 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
533 {
534   GstRtpJitterBufferPrivate *priv;
535
536   priv = jitterbuffer->priv;
537
538   JBUF_LOCK (priv);
539   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
540   /* Mark as non flushing */
541   priv->srcresult = GST_FLOW_OK;
542   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
543   priv->last_popped_seqnum = -1;
544   priv->next_seqnum = -1;
545   priv->clock_rate = -1;
546   priv->eos = FALSE;
547   rtp_jitter_buffer_flush (priv->jbuf);
548   rtp_jitter_buffer_reset_skew (priv->jbuf);
549   JBUF_UNLOCK (priv);
550 }
551
552 static gboolean
553 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
554 {
555   gboolean result = TRUE;
556   GstRtpJitterBuffer *jitterbuffer = NULL;
557
558   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
559
560   if (active) {
561     /* allow data processing */
562     gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
563
564     /* start pushing out buffers */
565     GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
566     gst_pad_start_task (jitterbuffer->priv->srcpad,
567         (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
568   } else {
569     /* make sure all data processing stops ASAP */
570     gst_rtp_jitter_buffer_flush_start (jitterbuffer);
571
572     /* NOTE this will hardlock if the state change is called from the src pad
573      * task thread because we will _join() the thread. */
574     GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
575     result = gst_pad_stop_task (pad);
576   }
577
578   gst_object_unref (jitterbuffer);
579
580   return result;
581 }
582
583 static GstStateChangeReturn
584 gst_rtp_jitter_buffer_change_state (GstElement * element,
585     GstStateChange transition)
586 {
587   GstRtpJitterBuffer *jitterbuffer;
588   GstRtpJitterBufferPrivate *priv;
589   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
590
591   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
592   priv = jitterbuffer->priv;
593
594   switch (transition) {
595     case GST_STATE_CHANGE_NULL_TO_READY:
596       break;
597     case GST_STATE_CHANGE_READY_TO_PAUSED:
598       JBUF_LOCK (priv);
599       /* reset negotiated values */
600       priv->clock_rate = -1;
601       priv->clock_base = -1;
602       priv->peer_latency = 0;
603       priv->last_pt = -1;
604       /* block until we go to PLAYING */
605       priv->blocked = TRUE;
606       /* reset skew detection initialy */
607       rtp_jitter_buffer_reset_skew (priv->jbuf);
608       JBUF_UNLOCK (priv);
609       break;
610     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
611       JBUF_LOCK (priv);
612       /* unblock to allow streaming in PLAYING */
613       priv->blocked = FALSE;
614       JBUF_SIGNAL (priv);
615       JBUF_UNLOCK (priv);
616       break;
617     default:
618       break;
619   }
620
621   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
622
623   switch (transition) {
624     case GST_STATE_CHANGE_READY_TO_PAUSED:
625       /* we are a live element because we sync to the clock, which we can only
626        * do in the PLAYING state */
627       if (ret != GST_STATE_CHANGE_FAILURE)
628         ret = GST_STATE_CHANGE_NO_PREROLL;
629       break;
630     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
631       JBUF_LOCK (priv);
632       /* block to stop streaming when PAUSED */
633       priv->blocked = TRUE;
634       JBUF_UNLOCK (priv);
635       if (ret != GST_STATE_CHANGE_FAILURE)
636         ret = GST_STATE_CHANGE_NO_PREROLL;
637       break;
638     case GST_STATE_CHANGE_PAUSED_TO_READY:
639       break;
640     case GST_STATE_CHANGE_READY_TO_NULL:
641       break;
642     default:
643       break;
644   }
645
646   return ret;
647 }
648
649 /**
650  * Performs comparison 'b - a' with check for overflows.
651  */
652 static inline gint
653 priv_compare_rtp_seq_lt (guint16 a, guint16 b)
654 {
655   /* check if diff more than half of the 16bit range */
656   if (abs (b - a) > (1 << 15)) {
657     /* one of a/b has wrapped */
658     return a - b;
659   } else {
660     return b - a;
661   }
662 }
663
664 static gboolean
665 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
666 {
667   gboolean ret = TRUE;
668   GstRtpJitterBuffer *jitterbuffer;
669   GstRtpJitterBufferPrivate *priv;
670
671   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
672   priv = jitterbuffer->priv;
673
674   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
675
676   switch (GST_EVENT_TYPE (event)) {
677     case GST_EVENT_NEWSEGMENT:
678     {
679       GstFormat format;
680       gdouble rate, arate;
681       gint64 start, stop, time;
682       gboolean update;
683
684       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
685           &start, &stop, &time);
686
687       /* we need time for now */
688       if (format != GST_FORMAT_TIME)
689         goto newseg_wrong_format;
690
691       GST_DEBUG_OBJECT (jitterbuffer,
692           "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
693           ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
694           update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
695           GST_TIME_ARGS (time));
696
697       /* now configure the values, we need these to time the release of the
698        * buffers on the srcpad. */
699       gst_segment_set_newsegment_full (&priv->segment, update,
700           rate, arate, format, start, stop, time);
701
702       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
703       ret = gst_pad_push_event (priv->srcpad, event);
704       break;
705     }
706     case GST_EVENT_FLUSH_START:
707       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
708       ret = gst_pad_push_event (priv->srcpad, event);
709       break;
710     case GST_EVENT_FLUSH_STOP:
711       ret = gst_pad_push_event (priv->srcpad, event);
712       ret = gst_rtp_jitter_buffer_src_activate_push (priv->srcpad, TRUE);
713       break;
714     case GST_EVENT_EOS:
715     {
716       /* push EOS in queue. We always push it at the head */
717       JBUF_LOCK (priv);
718       /* check for flushing, we need to discard the event and return FALSE when
719        * we are flushing */
720       ret = priv->srcresult == GST_FLOW_OK;
721       if (ret && !priv->eos) {
722         GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
723         priv->eos = TRUE;
724         JBUF_SIGNAL (priv);
725       } else if (priv->eos) {
726         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
727       } else {
728         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
729             gst_flow_get_name (priv->srcresult));
730       }
731       JBUF_UNLOCK (priv);
732       gst_event_unref (event);
733       break;
734     }
735     default:
736       ret = gst_pad_push_event (priv->srcpad, event);
737       break;
738   }
739
740 done:
741   gst_object_unref (jitterbuffer);
742
743   return ret;
744
745   /* ERRORS */
746 newseg_wrong_format:
747   {
748     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
749     ret = FALSE;
750     goto done;
751   }
752 }
753
754 static gboolean
755 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
756     guint8 pt)
757 {
758   GValue ret = { 0 };
759   GValue args[2] = { {0}, {0} };
760   GstCaps *caps;
761   gboolean res;
762
763   g_value_init (&args[0], GST_TYPE_ELEMENT);
764   g_value_set_object (&args[0], jitterbuffer);
765   g_value_init (&args[1], G_TYPE_UINT);
766   g_value_set_uint (&args[1], pt);
767
768   g_value_init (&ret, GST_TYPE_CAPS);
769   g_value_set_boxed (&ret, NULL);
770
771   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
772       &ret);
773
774   g_value_unset (&args[0]);
775   g_value_unset (&args[1]);
776   caps = (GstCaps *) g_value_dup_boxed (&ret);
777   g_value_unset (&ret);
778   if (!caps)
779     goto no_caps;
780
781   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
782
783   return res;
784
785   /* ERRORS */
786 no_caps:
787   {
788     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
789     return FALSE;
790   }
791 }
792
793 static GstFlowReturn
794 gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
795 {
796   GstRtpJitterBuffer *jitterbuffer;
797   GstRtpJitterBufferPrivate *priv;
798   guint16 seqnum;
799   GstFlowReturn ret = GST_FLOW_OK;
800   GstClockTime timestamp;
801   guint64 latency_ts;
802   gboolean tail;
803
804   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
805
806   if (!gst_rtp_buffer_validate (buffer))
807     goto invalid_buffer;
808
809   priv = jitterbuffer->priv;
810
811   if (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer)) {
812     GstCaps *caps;
813
814     priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
815     /* reset clock-rate so that we get a new one */
816     priv->clock_rate = -1;
817     /* Try to get the clock-rate from the caps first if we can. If there are no
818      * caps we must fire the signal to get the clock-rate. */
819     if ((caps = GST_BUFFER_CAPS (buffer))) {
820       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
821     }
822   }
823
824   if (priv->clock_rate == -1) {
825     guint8 pt;
826
827     /* no clock rate given on the caps, try to get one with the signal */
828     pt = gst_rtp_buffer_get_payload_type (buffer);
829
830     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
831     if (priv->clock_rate == -1)
832       goto not_negotiated;
833   }
834
835   /* take the timestamp of the buffer. This is the time when the packet was
836    * received and is used to calculate jitter and clock skew. We will adjust
837    * this timestamp with the smoothed value after processing it in the
838    * jitterbuffer. */
839   timestamp = GST_BUFFER_TIMESTAMP (buffer);
840   /* bring to running time */
841   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
842       timestamp);
843
844   seqnum = gst_rtp_buffer_get_seq (buffer);
845   GST_DEBUG_OBJECT (jitterbuffer,
846       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
847       GST_TIME_ARGS (timestamp));
848
849   JBUF_LOCK_CHECK (priv, out_flushing);
850   /* don't accept more data on EOS */
851   if (priv->eos)
852     goto have_eos;
853
854   /* let's check if this buffer is too late, we cannot accept packets with
855    * bigger seqnum than the one we already pushed. */
856   if (priv->last_popped_seqnum != -1) {
857     if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
858       goto too_late;
859   }
860
861   /* let's drop oldest packet if the queue is already full and drop-on-latency
862    * is set. We can only do this when there actually is a latency. When no
863    * latency is set, we just pump it in the queue and let the other end push it
864    * out as fast as possible. */
865   if (priv->latency_ms && priv->drop_on_latency) {
866
867     latency_ts =
868         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
869
870     if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
871       GstBuffer *old_buf;
872
873       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
874           seqnum);
875
876       old_buf = rtp_jitter_buffer_pop (priv->jbuf);
877       gst_buffer_unref (old_buf);
878     }
879   }
880
881   /* now insert the packet into the queue in sorted order. This function returns
882    * FALSE if a packet with the same seqnum was already in the queue, meaning we
883    * have a duplicate. */
884   if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
885           priv->clock_rate, &tail))
886     goto duplicate;
887
888   /* signal addition of new buffer when the _loop is waiting. */
889   if (priv->waiting)
890     JBUF_SIGNAL (priv);
891
892   /* let's unschedule and unblock any waiting buffers. We only want to do this
893    * when the tail buffer changed */
894   if (priv->clock_id && tail) {
895     GST_DEBUG_OBJECT (jitterbuffer,
896         "Unscheduling waiting buffer, new tail buffer");
897     gst_clock_id_unschedule (priv->clock_id);
898   }
899
900   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
901       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
902
903 finished:
904   JBUF_UNLOCK (priv);
905
906   gst_object_unref (jitterbuffer);
907
908   return ret;
909
910   /* ERRORS */
911 invalid_buffer:
912   {
913     /* this is fatal and should be filtered earlier */
914     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
915         ("Received invalid RTP payload"));
916     gst_buffer_unref (buffer);
917     gst_object_unref (jitterbuffer);
918     return GST_FLOW_ERROR;
919   }
920 not_negotiated:
921   {
922     GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
923     gst_buffer_unref (buffer);
924     gst_object_unref (jitterbuffer);
925     return GST_FLOW_OK;
926   }
927 out_flushing:
928   {
929     ret = priv->srcresult;
930     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
931     gst_buffer_unref (buffer);
932     goto finished;
933   }
934 have_eos:
935   {
936     ret = GST_FLOW_UNEXPECTED;
937     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
938     gst_buffer_unref (buffer);
939     goto finished;
940   }
941 too_late:
942   {
943     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
944         " popped, dropping", seqnum, priv->last_popped_seqnum);
945     priv->num_late++;
946     gst_buffer_unref (buffer);
947     goto finished;
948   }
949 duplicate:
950   {
951     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
952         seqnum);
953     priv->num_duplicates++;
954     gst_buffer_unref (buffer);
955     goto finished;
956   }
957 }
958
959 static GstClockTime
960 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
961 {
962   GstRtpJitterBufferPrivate *priv;
963
964   priv = jitterbuffer->priv;
965
966   if (timestamp == -1)
967     return -1;
968
969   /* apply the timestamp offset */
970   timestamp += priv->ts_offset;
971
972   return timestamp;
973 }
974
975 /**
976  * This funcion will push out buffers on the source pad.
977  *
978  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
979  * different seqnum (missing packets before B), this function will wait for the
980  * missing packet to arrive up to the timestamp of buffer B.
981  */
982 static void
983 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
984 {
985   GstRtpJitterBufferPrivate *priv;
986   GstBuffer *outbuf;
987   GstFlowReturn result;
988   guint16 seqnum;
989   GstClockTime timestamp, out_time;
990
991   priv = jitterbuffer->priv;
992
993   JBUF_LOCK_CHECK (priv, flushing);
994 again:
995   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
996   while (TRUE) {
997
998     /* always wait if we are blocked */
999     if (!priv->blocked) {
1000       /* if we have a packet, we can grab it */
1001       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1002         break;
1003       /* no packets but we are EOS, do eos logic */
1004       if (priv->eos)
1005         goto do_eos;
1006     }
1007     /* wait for packets or flushing now */
1008     priv->waiting = TRUE;
1009     JBUF_WAIT_CHECK (priv, flushing);
1010     priv->waiting = FALSE;
1011   }
1012
1013   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1014    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1015    * wait on the timestamp. In the chain function we will unlock the wait when a
1016    * new buffer is available. The peeked buffer is valid for as long as we hold
1017    * the jitterbuffer lock. */
1018   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1019   seqnum = gst_rtp_buffer_get_seq (outbuf);
1020
1021   /* get the timestamp, this is already corrected for clock skew by the
1022    * jitterbuffer */
1023   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1024
1025   GST_DEBUG_OBJECT (jitterbuffer,
1026       "Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
1027       seqnum, GST_TIME_ARGS (timestamp),
1028       rtp_jitter_buffer_num_packets (priv->jbuf));
1029
1030   /* apply our timestamp offset to the incomming buffer, this will be our output
1031    * timestamp. */
1032   out_time = apply_offset (jitterbuffer, timestamp);
1033
1034   /* If we don't know what the next seqnum should be (== -1) we have to wait
1035    * because it might be possible that we are not receiving this buffer in-order,
1036    * a buffer with a lower seqnum could arrive later and we want to push that
1037    * earlier buffer before this buffer then.
1038    * If we know the expected seqnum, we can compare it to the current seqnum to
1039    * determine if we have missing a packet. If we have a missing packet (which
1040    * must be before this packet) we can wait for it until the deadline for this
1041    * packet expires. */
1042   if ((priv->next_seqnum == -1 || priv->next_seqnum != seqnum)
1043       && out_time != -1) {
1044     GstClockID id;
1045     GstClockTime sync_time;
1046     GstClockReturn ret;
1047     GstClock *clock;
1048
1049     if (priv->next_seqnum != -1) {
1050       /* we expected next_seqnum but received something else, that's a gap */
1051       GST_WARNING_OBJECT (jitterbuffer,
1052           "Sequence number GAP detected: expected %d instead of %d",
1053           priv->next_seqnum, seqnum);
1054     } else {
1055       /* we don't know what the next_seqnum should be, wait for the last
1056        * possible moment to push this buffer, maybe we get an earlier seqnum
1057        * while we wait */
1058       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1059     }
1060
1061     GST_OBJECT_LOCK (jitterbuffer);
1062     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1063     if (!clock) {
1064       GST_OBJECT_UNLOCK (jitterbuffer);
1065       /* let's just push if there is no clock */
1066       goto push_buffer;
1067     }
1068
1069     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
1070         GST_TIME_ARGS (out_time));
1071
1072     /* prepare for sync against clock */
1073     sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1074     /* add latency, this includes our own latency and the peer latency. */
1075     sync_time += (priv->latency_ms * GST_MSECOND);
1076     sync_time += priv->peer_latency;
1077
1078     /* create an entry for the clock */
1079     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1080     GST_OBJECT_UNLOCK (jitterbuffer);
1081
1082     /* release the lock so that the other end can push stuff or unlock */
1083     JBUF_UNLOCK (priv);
1084
1085     ret = gst_clock_id_wait (id, NULL);
1086
1087     JBUF_LOCK (priv);
1088     /* and free the entry */
1089     gst_clock_id_unref (id);
1090     priv->clock_id = NULL;
1091
1092     /* at this point, the clock could have been unlocked by a timeout, a new
1093      * tail element was added to the queue or because we are shutting down. Check
1094      * for shutdown first. */
1095     if (priv->srcresult != GST_FLOW_OK)
1096       goto flushing;
1097
1098     /* if we got unscheduled and we are not flushing, it's because a new tail
1099      * element became available in the queue. Grab it and try to push or sync. */
1100     if (ret == GST_CLOCK_UNSCHEDULED) {
1101       GST_DEBUG_OBJECT (jitterbuffer,
1102           "Wait got unscheduled, will retry to push with new buffer");
1103       goto again;
1104     }
1105     /* Get new timestamp, latency might have changed */
1106     out_time = apply_offset (jitterbuffer, timestamp);
1107   }
1108 push_buffer:
1109   /* when we get here we are ready to pop and push the buffer */
1110   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
1111
1112   /* check if we are pushing something unexpected */
1113   if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
1114     gint dropped;
1115
1116     /* calc number of missing packets, careful for wraparounds */
1117     dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
1118
1119     GST_DEBUG_OBJECT (jitterbuffer,
1120         "Pushing DISCONT after dropping %d (%d to %d)", dropped,
1121         priv->next_seqnum, seqnum);
1122
1123     /* update stats */
1124     priv->num_late += dropped;
1125
1126     /* set DISCONT flag when we missed a packet. */
1127     outbuf = gst_buffer_make_metadata_writable (outbuf);
1128     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1129   }
1130
1131   /* apply timestamp with offset to buffer now */
1132   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
1133
1134   /* now we are ready to push the buffer. Save the seqnum and release the lock
1135    * so the other end can push stuff in the queue again. */
1136   priv->last_popped_seqnum = seqnum;
1137   priv->next_seqnum = (seqnum + 1) & 0xffff;
1138   JBUF_UNLOCK (priv);
1139
1140   /* push buffer */
1141   GST_DEBUG_OBJECT (jitterbuffer,
1142       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1143       GST_TIME_ARGS (out_time));
1144   result = gst_pad_push (priv->srcpad, outbuf);
1145   if (result != GST_FLOW_OK)
1146     goto pause;
1147
1148   return;
1149
1150   /* ERRORS */
1151 do_eos:
1152   {
1153     /* store result, we are flushing now */
1154     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1155     priv->srcresult = GST_FLOW_UNEXPECTED;
1156     gst_pad_pause_task (priv->srcpad);
1157     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1158     JBUF_UNLOCK (priv);
1159     return;
1160   }
1161 flushing:
1162   {
1163     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1164     gst_pad_pause_task (priv->srcpad);
1165     JBUF_UNLOCK (priv);
1166     return;
1167   }
1168 pause:
1169   {
1170     const gchar *reason = gst_flow_get_name (result);
1171
1172     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
1173
1174     JBUF_LOCK (priv);
1175     /* store result */
1176     priv->srcresult = result;
1177     /* we don't post errors or anything because upstream will do that for us
1178      * when we pass the return value upstream. */
1179     gst_pad_pause_task (priv->srcpad);
1180     JBUF_UNLOCK (priv);
1181     return;
1182   }
1183 }
1184
1185 static gboolean
1186 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
1187 {
1188   GstRtpJitterBuffer *jitterbuffer;
1189   GstRtpJitterBufferPrivate *priv;
1190   gboolean res = FALSE;
1191
1192   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1193   priv = jitterbuffer->priv;
1194
1195   switch (GST_QUERY_TYPE (query)) {
1196     case GST_QUERY_LATENCY:
1197     {
1198       /* We need to send the query upstream and add the returned latency to our
1199        * own */
1200       GstClockTime min_latency, max_latency;
1201       gboolean us_live;
1202       GstClockTime our_latency;
1203
1204       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
1205         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
1206
1207         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
1208             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1209             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1210
1211         /* store this so that we can safely sync on the peer buffers. */
1212         JBUF_LOCK (priv);
1213         priv->peer_latency = min_latency;
1214         our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
1215         JBUF_UNLOCK (priv);
1216
1217         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
1218             GST_TIME_ARGS (our_latency));
1219
1220         /* we add some latency but can buffer an infinite amount of time */
1221         min_latency += our_latency;
1222         max_latency = -1;
1223
1224         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
1225             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1226             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1227
1228         gst_query_set_latency (query, TRUE, min_latency, max_latency);
1229       }
1230       break;
1231     }
1232     default:
1233       res = gst_pad_query_default (pad, query);
1234       break;
1235   }
1236   return res;
1237 }
1238
1239 static void
1240 gst_rtp_jitter_buffer_set_property (GObject * object,
1241     guint prop_id, const GValue * value, GParamSpec * pspec)
1242 {
1243   GstRtpJitterBuffer *jitterbuffer;
1244   GstRtpJitterBufferPrivate *priv;
1245
1246   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1247   priv = jitterbuffer->priv;
1248
1249   switch (prop_id) {
1250     case PROP_LATENCY:
1251     {
1252       guint new_latency, old_latency;
1253
1254       new_latency = g_value_get_uint (value);
1255
1256       JBUF_LOCK (priv);
1257       old_latency = priv->latency_ms;
1258       priv->latency_ms = new_latency;
1259       JBUF_UNLOCK (priv);
1260
1261       /* post message if latency changed, this will inform the parent pipeline
1262        * that a latency reconfiguration is possible/needed. */
1263       if (new_latency != old_latency) {
1264         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
1265             GST_TIME_ARGS (new_latency * GST_MSECOND));
1266
1267         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
1268             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
1269       }
1270       break;
1271     }
1272     case PROP_DROP_ON_LATENCY:
1273       priv->drop_on_latency = g_value_get_boolean (value);
1274       break;
1275     case PROP_TS_OFFSET:
1276       JBUF_LOCK (priv);
1277       priv->ts_offset = g_value_get_int64 (value);
1278       JBUF_UNLOCK (priv);
1279       break;
1280     default:
1281       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1282       break;
1283   }
1284 }
1285
1286 static void
1287 gst_rtp_jitter_buffer_get_property (GObject * object,
1288     guint prop_id, GValue * value, GParamSpec * pspec)
1289 {
1290   GstRtpJitterBuffer *jitterbuffer;
1291   GstRtpJitterBufferPrivate *priv;
1292
1293   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1294   priv = jitterbuffer->priv;
1295
1296   switch (prop_id) {
1297     case PROP_LATENCY:
1298       JBUF_LOCK (priv);
1299       g_value_set_uint (value, priv->latency_ms);
1300       JBUF_UNLOCK (priv);
1301       break;
1302     case PROP_DROP_ON_LATENCY:
1303       g_value_set_boolean (value, priv->drop_on_latency);
1304       break;
1305     case PROP_TS_OFFSET:
1306       JBUF_LOCK (priv);
1307       g_value_set_int64 (value, priv->ts_offset);
1308       JBUF_UNLOCK (priv);
1309       break;
1310     default:
1311       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1312       break;
1313   }
1314 }