gst/rtpmanager/gstrtpjitterbuffer.c: jitterbuffer can buffer an unlimited amount...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd, 
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  * @short_description: buffer, reorder and remove duplicate RTP packets to
29  * compensate for network oddities.
30  *
31  * <refsect2>
32  * <para>
33  * This element reorders and removes duplicate RTP packets as they are received
34  * from a network source. It will also wait for missing packets up to a
35  * configurable time limit using the ::latency property. Packets arriving too
36  * late are considered to be lost packets.
37  * </para>
38  * <para>
39  * This element acts as a live element and so adds ::latency to the pipeline.
40  * </para>
41  * <para>
42  * The element needs the clock-rate of the RTP payload in order to estimate the
43  * delay. This information is obtained either from the caps on the sink pad or,
44  * when no caps are present, from the ::request-pt-map signal. To clear the
45  * previous pt-map use the ::clear-pt-map signal.
46  * </para>
47  * <para>
48  * This element will automatically be used inside gstrtpbin.
49  * </para>
50  * <title>Example pipelines</title>
51  * <para>
52  * <programlisting>
53  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
54  * </programlisting>
55  * Connect to a streaming server and decode the MPEG video. The jitterbuffer is
56  * inserted into the pipeline to smooth out network jitter and to reorder the
57  * out-of-order RTP packets.
58  * </para>
59  * </refsect2>
60  *
61  * Last reviewed on 2007-05-28 (0.10.5)
62  */
63
64 #ifdef HAVE_CONFIG_H
65 #include "config.h"
66 #endif
67
68 #include <stdlib.h>
69 #include <string.h>
70 #include <gst/rtp/gstrtpbuffer.h>
71
72 #include "gstrtpbin-marshal.h"
73
74 #include "gstrtpjitterbuffer.h"
75 #include "rtpjitterbuffer.h"
76
77 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
78 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
79
80 /* low and high threshold tell the queue when to start and stop buffering */
81 #define LOW_THRESHOLD 0.2
82 #define HIGH_THRESHOLD 0.8
83
84 /* elementfactory information */
85 static const GstElementDetails gst_rtp_jitter_buffer_details =
86 GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
87     "Filter/Network/RTP",
88     "A buffer that deals with network jitter and other transmission faults",
89     "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
90     "Wim Taymans <wim.taymans@gmail.com>");
91
92 /* RTPJitterBuffer signals and args */
93 enum
94 {
95   SIGNAL_REQUEST_PT_MAP,
96   SIGNAL_CLEAR_PT_MAP,
97   LAST_SIGNAL
98 };
99
100 #define DEFAULT_LATENCY_MS      200
101 #define DEFAULT_DROP_ON_LATENCY FALSE
102 #define DEFAULT_TS_OFFSET       0
103
104 enum
105 {
106   PROP_0,
107   PROP_LATENCY,
108   PROP_DROP_ON_LATENCY,
109   PROP_TS_OFFSET
110 };
111
112 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
113
114 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
115   JBUF_LOCK (priv);                                   \
116   if (priv->srcresult != GST_FLOW_OK)                 \
117     goto label;                                       \
118 } G_STMT_END
119
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
121 #define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
122
123 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
124   JBUF_WAIT(priv);                                    \
125   if (priv->srcresult != GST_FLOW_OK)                 \
126     goto label;                                       \
127 } G_STMT_END
128
129 #define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
130
131 struct _GstRtpJitterBufferPrivate
132 {
133   GstPad *sinkpad, *srcpad;
134
135   RTPJitterBuffer *jbuf;
136   GMutex *jbuf_lock;
137   GCond *jbuf_cond;
138   gboolean waiting;
139
140   /* properties */
141   guint latency_ms;
142   gboolean drop_on_latency;
143   gint64 ts_offset;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum */
148   guint32 next_seqnum;
149
150   /* state */
151   gboolean eos;
152
153   /* clock rate and rtp timestamp offset */
154   gint32 clock_rate;
155   gint64 clock_base;
156   gint64 prev_ts_offset;
157
158   /* when we are shutting down */
159   GstFlowReturn srcresult;
160   gboolean blocked;
161
162   /* for sync */
163   GstSegment segment;
164   GstClockID clock_id;
165   guint32 waiting_seqnum;
166   /* the latency of the upstream peer, we have to take this into account when
167    * synchronizing the buffers. */
168   GstClockTime peer_latency;
169
170   /* some accounting */
171   guint64 num_late;
172   guint64 num_duplicates;
173 };
174
175 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
176   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
177                                 GstRtpJitterBufferPrivate))
178
179 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
180 GST_STATIC_PAD_TEMPLATE ("sink",
181     GST_PAD_SINK,
182     GST_PAD_ALWAYS,
183     GST_STATIC_CAPS ("application/x-rtp, "
184         "clock-rate = (int) [ 1, 2147483647 ]"
185         /* "payload = (int) , "
186          * "encoding-name = (string) "
187          */ )
188     );
189
190 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
191 GST_STATIC_PAD_TEMPLATE ("src",
192     GST_PAD_SRC,
193     GST_PAD_ALWAYS,
194     GST_STATIC_CAPS ("application/x-rtp"
195         /* "payload = (int) , "
196          * "clock-rate = (int) , "
197          * "encoding-name = (string) "
198          */ )
199     );
200
201 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
202
203 GST_BOILERPLATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GstElement,
204     GST_TYPE_ELEMENT);
205
206 /* object overrides */
207 static void gst_rtp_jitter_buffer_set_property (GObject * object,
208     guint prop_id, const GValue * value, GParamSpec * pspec);
209 static void gst_rtp_jitter_buffer_get_property (GObject * object,
210     guint prop_id, GValue * value, GParamSpec * pspec);
211 static void gst_rtp_jitter_buffer_finalize (GObject * object);
212
213 /* element overrides */
214 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
215     * element, GstStateChange transition);
216
217 /* pad overrides */
218 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
219
220 /* sinkpad overrides */
221 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
222 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
223     GstEvent * event);
224 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
225     GstBuffer * buffer);
226
227 /* srcpad overrides */
228 static gboolean
229 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
230 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
231 static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
232
233 static void
234 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
235
236 static void
237 gst_rtp_jitter_buffer_base_init (gpointer klass)
238 {
239   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
240
241   gst_element_class_add_pad_template (element_class,
242       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
243   gst_element_class_add_pad_template (element_class,
244       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
245   gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
246 }
247
248 static void
249 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
250 {
251   GObjectClass *gobject_class;
252   GstElementClass *gstelement_class;
253
254   gobject_class = (GObjectClass *) klass;
255   gstelement_class = (GstElementClass *) klass;
256
257   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
258
259   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
260
261   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
262   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
263
264   /**
265    * GstRtpJitterBuffer::latency:
266    * 
267    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
268    * for at most this time.
269    */
270   g_object_class_install_property (gobject_class, PROP_LATENCY,
271       g_param_spec_uint ("latency", "Buffer latency in ms",
272           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
273           G_PARAM_READWRITE));
274   /**
275    * GstRtpJitterBuffer::drop-on-latency:
276    * 
277    * Drop oldest buffers when the queue is completely filled. 
278    */
279   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
280       g_param_spec_boolean ("drop-on-latency",
281           "Drop buffers when maximum latency is reached",
282           "Tells the jitterbuffer to never exceed the given latency in size",
283           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
284   /**
285    * GstRtpJitterBuffer::ts-offset:
286    * 
287    * Adjust RTP timestamps in the jitterbuffer with offset.
288    */
289   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
290       g_param_spec_int64 ("ts-offset",
291           "Timestamp Offset",
292           "Adjust buffer RTP timestamps with offset in nanoseconds", G_MININT64,
293           G_MAXINT64, DEFAULT_TS_OFFSET, G_PARAM_READWRITE));
294   /**
295    * GstRtpJitterBuffer::request-pt-map:
296    * @buffer: the object which received the signal
297    * @pt: the pt
298    *
299    * Request the payload type as #GstCaps for @pt.
300    */
301   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
302       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
303       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
304           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
305       GST_TYPE_CAPS, 1, G_TYPE_UINT);
306   /**
307    * GstRtpJitterBuffer::clear-pt-map:
308    * @buffer: the object which received the signal
309    *
310    * Invalidate the clock-rate as obtained with the ::request-pt-map signal.
311    */
312   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
313       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
314       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
315           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID,
316       G_TYPE_NONE, 0, G_TYPE_NONE);
317
318   gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
319
320   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
321
322   GST_DEBUG_CATEGORY_INIT
323       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
324 }
325
326 static void
327 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
328     GstRtpJitterBufferClass * klass)
329 {
330   GstRtpJitterBufferPrivate *priv;
331
332   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
333   jitterbuffer->priv = priv;
334
335   priv->latency_ms = DEFAULT_LATENCY_MS;
336   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
337
338   priv->jbuf = rtp_jitter_buffer_new ();
339   priv->jbuf_lock = g_mutex_new ();
340   priv->jbuf_cond = g_cond_new ();
341
342   priv->waiting_seqnum = -1;
343
344   priv->srcpad =
345       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
346       "src");
347
348   gst_pad_set_activatepush_function (priv->srcpad,
349       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
350   gst_pad_set_query_function (priv->srcpad,
351       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
352   gst_pad_set_getcaps_function (priv->srcpad,
353       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
354
355   priv->sinkpad =
356       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
357       "sink");
358
359   gst_pad_set_chain_function (priv->sinkpad,
360       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
361   gst_pad_set_event_function (priv->sinkpad,
362       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
363   gst_pad_set_setcaps_function (priv->sinkpad,
364       GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
365   gst_pad_set_getcaps_function (priv->sinkpad,
366       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
367
368   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
369   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
370 }
371
372 static void
373 gst_rtp_jitter_buffer_finalize (GObject * object)
374 {
375   GstRtpJitterBuffer *jitterbuffer;
376
377   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
378
379   g_mutex_free (jitterbuffer->priv->jbuf_lock);
380   g_cond_free (jitterbuffer->priv->jbuf_cond);
381
382   g_object_unref (jitterbuffer->priv->jbuf);
383
384   G_OBJECT_CLASS (parent_class)->finalize (object);
385 }
386
387 static void
388 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
389 {
390   GstRtpJitterBufferPrivate *priv;
391
392   priv = jitterbuffer->priv;
393
394   /* this will trigger a new pt-map request signal, FIXME, do something better. */
395   priv->clock_rate = -1;
396 }
397
398 static GstCaps *
399 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
400 {
401   GstRtpJitterBuffer *jitterbuffer;
402   GstRtpJitterBufferPrivate *priv;
403   GstPad *other;
404   GstCaps *caps;
405   const GstCaps *templ;
406
407   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
408   priv = jitterbuffer->priv;
409
410   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
411
412   caps = gst_pad_peer_get_caps (other);
413
414   templ = gst_pad_get_pad_template_caps (pad);
415   if (caps == NULL) {
416     GST_DEBUG_OBJECT (jitterbuffer, "copy template");
417     caps = gst_caps_copy (templ);
418   } else {
419     GstCaps *intersect;
420
421     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
422
423     intersect = gst_caps_intersect (caps, templ);
424     gst_caps_unref (caps);
425
426     caps = intersect;
427   }
428   gst_object_unref (jitterbuffer);
429
430   return caps;
431 }
432
433 static gboolean
434 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
435     GstCaps * caps)
436 {
437   GstRtpJitterBufferPrivate *priv;
438   GstStructure *caps_struct;
439   guint val;
440
441   priv = jitterbuffer->priv;
442
443   /* first parse the caps */
444   caps_struct = gst_caps_get_structure (caps, 0);
445
446   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
447
448   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
449    * measure the amount of data in the buffer */
450   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
451     goto error;
452
453   if (priv->clock_rate <= 0)
454     goto wrong_rate;
455
456   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
457
458   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
459
460   /* gah, clock-base is uint. If we don't have a base, we will use the first
461    * buffer timestamp as the base time. This will screw up sync but it's better
462    * than nothing. */
463   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
464     priv->clock_base = val;
465   else
466     priv->clock_base = -1;
467
468   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
469       priv->clock_base);
470
471   /* first expected seqnum */
472   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
473     priv->next_seqnum = val;
474   else
475     priv->next_seqnum = -1;
476
477   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
478
479   return TRUE;
480
481   /* ERRORS */
482 error:
483   {
484     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
485     return FALSE;
486   }
487 wrong_rate:
488   {
489     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
490     return FALSE;
491   }
492 }
493
494 static gboolean
495 gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
496 {
497   GstRtpJitterBuffer *jitterbuffer;
498   GstRtpJitterBufferPrivate *priv;
499   gboolean res;
500
501   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
502   priv = jitterbuffer->priv;
503
504   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
505
506   /* set same caps on srcpad on success */
507   if (res)
508     gst_pad_set_caps (priv->srcpad, caps);
509
510   gst_object_unref (jitterbuffer);
511
512   return res;
513 }
514
515 static void
516 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
517 {
518   GstRtpJitterBufferPrivate *priv;
519
520   priv = jitterbuffer->priv;
521
522   JBUF_LOCK (priv);
523   /* mark ourselves as flushing */
524   priv->srcresult = GST_FLOW_WRONG_STATE;
525   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
526   /* this unblocks any waiting pops on the src pad task */
527   JBUF_SIGNAL (priv);
528   /* unlock clock, we just unschedule, the entry will be released by the 
529    * locking streaming thread. */
530   if (priv->clock_id)
531     gst_clock_id_unschedule (priv->clock_id);
532   JBUF_UNLOCK (priv);
533 }
534
535 static void
536 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
537 {
538   GstRtpJitterBufferPrivate *priv;
539
540   priv = jitterbuffer->priv;
541
542   JBUF_LOCK (priv);
543   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
544   /* Mark as non flushing */
545   priv->srcresult = GST_FLOW_OK;
546   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
547   priv->last_popped_seqnum = -1;
548   priv->next_seqnum = -1;
549   priv->clock_rate = -1;
550   priv->eos = FALSE;
551   rtp_jitter_buffer_flush (priv->jbuf);
552   rtp_jitter_buffer_reset_skew (priv->jbuf);
553   JBUF_UNLOCK (priv);
554 }
555
556 static gboolean
557 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
558 {
559   gboolean result = TRUE;
560   GstRtpJitterBuffer *jitterbuffer = NULL;
561
562   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
563
564   if (active) {
565     /* allow data processing */
566     gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
567
568     /* start pushing out buffers */
569     GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
570     gst_pad_start_task (jitterbuffer->priv->srcpad,
571         (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
572   } else {
573     /* make sure all data processing stops ASAP */
574     gst_rtp_jitter_buffer_flush_start (jitterbuffer);
575
576     /* NOTE this will hardlock if the state change is called from the src pad
577      * task thread because we will _join() the thread. */
578     GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
579     result = gst_pad_stop_task (pad);
580   }
581
582   gst_object_unref (jitterbuffer);
583
584   return result;
585 }
586
587 static GstStateChangeReturn
588 gst_rtp_jitter_buffer_change_state (GstElement * element,
589     GstStateChange transition)
590 {
591   GstRtpJitterBuffer *jitterbuffer;
592   GstRtpJitterBufferPrivate *priv;
593   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
594
595   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
596   priv = jitterbuffer->priv;
597
598   switch (transition) {
599     case GST_STATE_CHANGE_NULL_TO_READY:
600       break;
601     case GST_STATE_CHANGE_READY_TO_PAUSED:
602       JBUF_LOCK (priv);
603       /* reset negotiated values */
604       priv->clock_rate = -1;
605       priv->clock_base = -1;
606       priv->peer_latency = 0;
607       /* block until we go to PLAYING */
608       priv->blocked = TRUE;
609       /* reset skew detection initialy */
610       rtp_jitter_buffer_reset_skew (priv->jbuf);
611       JBUF_UNLOCK (priv);
612       break;
613     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
614       JBUF_LOCK (priv);
615       /* unblock to allow streaming in PLAYING */
616       priv->blocked = FALSE;
617       JBUF_SIGNAL (priv);
618       JBUF_UNLOCK (priv);
619       break;
620     default:
621       break;
622   }
623
624   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
625
626   switch (transition) {
627     case GST_STATE_CHANGE_READY_TO_PAUSED:
628       /* we are a live element because we sync to the clock, which we can only
629        * do in the PLAYING state */
630       if (ret != GST_STATE_CHANGE_FAILURE)
631         ret = GST_STATE_CHANGE_NO_PREROLL;
632       break;
633     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
634       JBUF_LOCK (priv);
635       /* block to stop streaming when PAUSED */
636       priv->blocked = TRUE;
637       JBUF_UNLOCK (priv);
638       if (ret != GST_STATE_CHANGE_FAILURE)
639         ret = GST_STATE_CHANGE_NO_PREROLL;
640       break;
641     case GST_STATE_CHANGE_PAUSED_TO_READY:
642       break;
643     case GST_STATE_CHANGE_READY_TO_NULL:
644       break;
645     default:
646       break;
647   }
648
649   return ret;
650 }
651
652 /**
653  * Performs comparison 'b - a' with check for overflows.
654  */
655 static inline gint
656 priv_compare_rtp_seq_lt (guint16 a, guint16 b)
657 {
658   /* check if diff more than half of the 16bit range */
659   if (abs (b - a) > (1 << 15)) {
660     /* one of a/b has wrapped */
661     return a - b;
662   } else {
663     return b - a;
664   }
665 }
666
667 static gboolean
668 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
669 {
670   gboolean ret = TRUE;
671   GstRtpJitterBuffer *jitterbuffer;
672   GstRtpJitterBufferPrivate *priv;
673
674   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
675   priv = jitterbuffer->priv;
676
677   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
678
679   switch (GST_EVENT_TYPE (event)) {
680     case GST_EVENT_NEWSEGMENT:
681     {
682       GstFormat format;
683       gdouble rate, arate;
684       gint64 start, stop, time;
685       gboolean update;
686
687       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
688           &start, &stop, &time);
689
690       /* we need time for now */
691       if (format != GST_FORMAT_TIME)
692         goto newseg_wrong_format;
693
694       GST_DEBUG_OBJECT (jitterbuffer,
695           "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
696           ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
697           update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
698           GST_TIME_ARGS (time));
699
700       /* now configure the values, we need these to time the release of the
701        * buffers on the srcpad. */
702       gst_segment_set_newsegment_full (&priv->segment, update,
703           rate, arate, format, start, stop, time);
704
705       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
706       ret = gst_pad_push_event (priv->srcpad, event);
707       break;
708     }
709     case GST_EVENT_FLUSH_START:
710       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
711       ret = gst_pad_push_event (priv->srcpad, event);
712       break;
713     case GST_EVENT_FLUSH_STOP:
714       ret = gst_pad_push_event (priv->srcpad, event);
715       ret = gst_rtp_jitter_buffer_src_activate_push (priv->srcpad, TRUE);
716       break;
717     case GST_EVENT_EOS:
718     {
719       /* push EOS in queue. We always push it at the head */
720       JBUF_LOCK (priv);
721       /* check for flushing, we need to discard the event and return FALSE when
722        * we are flushing */
723       ret = priv->srcresult == GST_FLOW_OK;
724       if (ret && !priv->eos) {
725         GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
726         priv->eos = TRUE;
727         JBUF_SIGNAL (priv);
728       } else if (priv->eos) {
729         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
730       } else {
731         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
732             gst_flow_get_name (priv->srcresult));
733       }
734       JBUF_UNLOCK (priv);
735       gst_event_unref (event);
736       break;
737     }
738     default:
739       ret = gst_pad_push_event (priv->srcpad, event);
740       break;
741   }
742
743 done:
744   gst_object_unref (jitterbuffer);
745
746   return ret;
747
748   /* ERRORS */
749 newseg_wrong_format:
750   {
751     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
752     ret = FALSE;
753     goto done;
754   }
755 }
756
757 static gboolean
758 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
759     guint8 pt)
760 {
761   GValue ret = { 0 };
762   GValue args[2] = { {0}, {0} };
763   GstCaps *caps;
764   gboolean res;
765
766   g_value_init (&args[0], GST_TYPE_ELEMENT);
767   g_value_set_object (&args[0], jitterbuffer);
768   g_value_init (&args[1], G_TYPE_UINT);
769   g_value_set_uint (&args[1], pt);
770
771   g_value_init (&ret, GST_TYPE_CAPS);
772   g_value_set_boxed (&ret, NULL);
773
774   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
775       &ret);
776
777   caps = (GstCaps *) g_value_get_boxed (&ret);
778   if (!caps)
779     goto no_caps;
780
781   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
782
783   return res;
784
785   /* ERRORS */
786 no_caps:
787   {
788     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
789     return FALSE;
790   }
791 }
792
793 static GstFlowReturn
794 gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
795 {
796   GstRtpJitterBuffer *jitterbuffer;
797   GstRtpJitterBufferPrivate *priv;
798   guint16 seqnum;
799   GstFlowReturn ret = GST_FLOW_OK;
800   GstClockTime timestamp;
801   guint64 latency_ts;
802   gboolean tail;
803
804   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
805
806   if (!gst_rtp_buffer_validate (buffer))
807     goto invalid_buffer;
808
809   priv = jitterbuffer->priv;
810
811   if (priv->clock_rate == -1) {
812     guint8 pt;
813
814     /* no clock rate given on the caps, try to get one with the signal */
815     pt = gst_rtp_buffer_get_payload_type (buffer);
816
817     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
818     if (priv->clock_rate == -1)
819       goto not_negotiated;
820
821     rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
822   }
823
824   /* take the timestamp of the buffer. This is the time when the packet was
825    * received and is used to calculate jitter and clock skew. We will adjust
826    * this timestamp with the smoothed value after processing it in the
827    * jitterbuffer. */
828   timestamp = GST_BUFFER_TIMESTAMP (buffer);
829   /* bring to running time */
830   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
831       timestamp);
832
833   seqnum = gst_rtp_buffer_get_seq (buffer);
834   GST_DEBUG_OBJECT (jitterbuffer,
835       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
836       GST_TIME_ARGS (timestamp));
837
838   JBUF_LOCK_CHECK (priv, out_flushing);
839   /* don't accept more data on EOS */
840   if (priv->eos)
841     goto have_eos;
842
843   /* let's check if this buffer is too late, we cannot accept packets with
844    * bigger seqnum than the one we already pushed. */
845   if (priv->last_popped_seqnum != -1) {
846     if (priv_compare_rtp_seq_lt (priv->last_popped_seqnum, seqnum) < 0)
847       goto too_late;
848   }
849
850   /* let's drop oldest packet if the queue is already full and drop-on-latency
851    * is set. We can only do this when there actually is a latency. When no
852    * latency is set, we just pump it in the queue and let the other end push it
853    * out as fast as possible. */
854   if (priv->latency_ms && priv->drop_on_latency) {
855
856     latency_ts =
857         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
858
859     if (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts) {
860       GstBuffer *old_buf;
861
862       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
863           seqnum);
864
865       old_buf = rtp_jitter_buffer_pop (priv->jbuf);
866       gst_buffer_unref (old_buf);
867     }
868   }
869
870   /* now insert the packet into the queue in sorted order. This function returns
871    * FALSE if a packet with the same seqnum was already in the queue, meaning we
872    * have a duplicate. */
873   if (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp, &tail))
874     goto duplicate;
875
876   /* signal addition of new buffer when the _loop is waiting. */
877   if (priv->waiting)
878     JBUF_SIGNAL (priv);
879
880   /* let's unschedule and unblock any waiting buffers. We only want to do this
881    * when the tail buffer changed */
882   if (priv->clock_id && tail) {
883     GST_DEBUG_OBJECT (jitterbuffer,
884         "Unscheduling waiting buffer, new tail buffer");
885     gst_clock_id_unschedule (priv->clock_id);
886   }
887
888   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
889       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
890
891 finished:
892   JBUF_UNLOCK (priv);
893
894   gst_object_unref (jitterbuffer);
895
896   return ret;
897
898   /* ERRORS */
899 invalid_buffer:
900   {
901     /* this is fatal and should be filtered earlier */
902     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
903         ("Received invalid RTP payload"));
904     gst_buffer_unref (buffer);
905     gst_object_unref (jitterbuffer);
906     return GST_FLOW_ERROR;
907   }
908 not_negotiated:
909   {
910     GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
911     gst_buffer_unref (buffer);
912     gst_object_unref (jitterbuffer);
913     return GST_FLOW_NOT_NEGOTIATED;
914   }
915 out_flushing:
916   {
917     ret = priv->srcresult;
918     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
919     gst_buffer_unref (buffer);
920     goto finished;
921   }
922 have_eos:
923   {
924     ret = GST_FLOW_UNEXPECTED;
925     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
926     gst_buffer_unref (buffer);
927     goto finished;
928   }
929 too_late:
930   {
931     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
932         " popped, dropping", seqnum, priv->last_popped_seqnum);
933     priv->num_late++;
934     gst_buffer_unref (buffer);
935     goto finished;
936   }
937 duplicate:
938   {
939     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
940         seqnum);
941     priv->num_duplicates++;
942     gst_buffer_unref (buffer);
943     goto finished;
944   }
945 }
946
947 static GstClockTime
948 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
949 {
950   GstRtpJitterBufferPrivate *priv;
951
952   priv = jitterbuffer->priv;
953
954   if (timestamp == -1)
955     return -1;
956
957   /* apply the timestamp offset */
958   timestamp += priv->ts_offset;
959
960   return timestamp;
961 }
962
963 /**
964  * This funcion will push out buffers on the source pad.
965  *
966  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
967  * different seqnum (missing packets before B), this function will wait for the
968  * missing packet to arrive up to the timestamp of buffer B.
969  */
970 static void
971 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
972 {
973   GstRtpJitterBufferPrivate *priv;
974   GstBuffer *outbuf = NULL;
975   GstFlowReturn result;
976   guint16 seqnum;
977   GstClockTime timestamp, out_time;
978
979   priv = jitterbuffer->priv;
980
981   JBUF_LOCK_CHECK (priv, flushing);
982 again:
983   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
984   while (TRUE) {
985
986     /* always wait if we are blocked */
987     if (!priv->blocked) {
988       /* if we have a packet, we can grab it */
989       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
990         break;
991       /* no packets but we are EOS, do eos logic */
992       if (priv->eos)
993         goto do_eos;
994     }
995     /* wait for packets or flushing now */
996     priv->waiting = TRUE;
997     JBUF_WAIT_CHECK (priv, flushing);
998     priv->waiting = FALSE;
999   }
1000
1001   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1002    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1003    * wait on the timestamp. In the chain function we will unlock the wait when a
1004    * new buffer is available. The peeked buffer is valid for as long as we hold
1005    * the jitterbuffer lock. */
1006   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1007   seqnum = gst_rtp_buffer_get_seq (outbuf);
1008
1009   /* get the timestamp, this is already corrected for clock skew by the
1010    * jitterbuffer */
1011   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1012
1013   GST_DEBUG_OBJECT (jitterbuffer,
1014       "Peeked buffer #%d, timestamp %" GST_TIME_FORMAT ", now %d left",
1015       seqnum, GST_TIME_ARGS (timestamp),
1016       rtp_jitter_buffer_num_packets (priv->jbuf));
1017
1018   /* apply our timestamp offset to the incomming buffer, this will be our output
1019    * timestamp. */
1020   out_time = apply_offset (jitterbuffer, timestamp);
1021
1022   /* If we don't know what the next seqnum should be (== -1) we have to wait
1023    * because it might be possible that we are not receiving this buffer in-order,
1024    * a buffer with a lower seqnum could arrive later and we want to push that
1025    * earlier buffer before this buffer then.
1026    * If we know the expected seqnum, we can compare it to the current seqnum to
1027    * determine if we have missing a packet. If we have a missing packet (which
1028    * must be before this packet) we can wait for it until the deadline for this
1029    * packet expires. */
1030   if ((priv->next_seqnum == -1 || priv->next_seqnum != seqnum)
1031       && out_time != -1) {
1032     GstClockID id;
1033     GstClockTime sync_time;
1034     GstClockReturn ret;
1035     GstClock *clock;
1036
1037     if (priv->next_seqnum != -1) {
1038       /* we expected next_seqnum but received something else, that's a gap */
1039       GST_WARNING_OBJECT (jitterbuffer,
1040           "Sequence number GAP detected: expected %d instead of %d",
1041           priv->next_seqnum, seqnum);
1042     } else {
1043       /* we don't know what the next_seqnum should be, wait for the last
1044        * possible moment to push this buffer, maybe we get an earlier seqnum
1045        * while we wait */
1046       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1047     }
1048
1049     GST_OBJECT_LOCK (jitterbuffer);
1050     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1051     if (!clock) {
1052       GST_OBJECT_UNLOCK (jitterbuffer);
1053       /* let's just push if there is no clock */
1054       goto push_buffer;
1055     }
1056
1057     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
1058         GST_TIME_ARGS (out_time));
1059
1060     /* prepare for sync against clock */
1061     sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1062     /* add latency, this includes our own latency and the peer latency. */
1063     sync_time += (priv->latency_ms * GST_MSECOND);
1064     sync_time += priv->peer_latency;
1065
1066     /* create an entry for the clock */
1067     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1068     priv->waiting_seqnum = seqnum;
1069     GST_OBJECT_UNLOCK (jitterbuffer);
1070
1071     /* release the lock so that the other end can push stuff or unlock */
1072     JBUF_UNLOCK (priv);
1073
1074     ret = gst_clock_id_wait (id, NULL);
1075
1076     JBUF_LOCK (priv);
1077     /* and free the entry */
1078     gst_clock_id_unref (id);
1079     priv->clock_id = NULL;
1080     priv->waiting_seqnum = -1;
1081
1082     /* at this point, the clock could have been unlocked by a timeout, a new
1083      * tail element was added to the queue or because we are shutting down. Check
1084      * for shutdown first. */
1085     if (priv->srcresult != GST_FLOW_OK)
1086       goto flushing;
1087
1088     /* if we got unscheduled and we are not flushing, it's because a new tail
1089      * element became available in the queue. Grab it and try to push or sync. */
1090     if (ret == GST_CLOCK_UNSCHEDULED) {
1091       GST_DEBUG_OBJECT (jitterbuffer,
1092           "Wait got unscheduled, will retry to push with new buffer");
1093       goto again;
1094     }
1095     /* Get new timestamp, latency might have changed */
1096     out_time = apply_offset (jitterbuffer, timestamp);
1097   }
1098 push_buffer:
1099   /* when we get here we are ready to pop and push the buffer */
1100   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
1101
1102   /* check if we are pushing something unexpected */
1103   if (priv->next_seqnum != -1 && priv->next_seqnum != seqnum) {
1104     gint dropped;
1105
1106     /* calc number of missing packets, careful for wraparounds */
1107     dropped = priv_compare_rtp_seq_lt (priv->next_seqnum, seqnum);
1108
1109     GST_DEBUG_OBJECT (jitterbuffer,
1110         "Pushing DISCONT after dropping %d (%d to %d)", dropped,
1111         priv->next_seqnum, seqnum);
1112
1113     /* update stats */
1114     priv->num_late += dropped;
1115
1116     /* set DISCONT flag when we missed a packet. */
1117     outbuf = gst_buffer_make_metadata_writable (outbuf);
1118     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1119   }
1120
1121   /* apply timestamp with offset to buffer now */
1122   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
1123
1124   /* now we are ready to push the buffer. Save the seqnum and release the lock
1125    * so the other end can push stuff in the queue again. */
1126   priv->last_popped_seqnum = seqnum;
1127   priv->next_seqnum = (seqnum + 1) & 0xffff;
1128   JBUF_UNLOCK (priv);
1129
1130   /* push buffer */
1131   GST_DEBUG_OBJECT (jitterbuffer,
1132       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1133       GST_TIME_ARGS (out_time));
1134   result = gst_pad_push (priv->srcpad, outbuf);
1135   if (result != GST_FLOW_OK)
1136     goto pause;
1137
1138   return;
1139
1140   /* ERRORS */
1141 do_eos:
1142   {
1143     /* store result, we are flushing now */
1144     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1145     priv->srcresult = GST_FLOW_UNEXPECTED;
1146     gst_pad_pause_task (priv->srcpad);
1147     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1148     JBUF_UNLOCK (priv);
1149     return;
1150   }
1151 flushing:
1152   {
1153     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1154     gst_pad_pause_task (priv->srcpad);
1155     if (outbuf)
1156       gst_buffer_unref (outbuf);
1157     JBUF_UNLOCK (priv);
1158     return;
1159   }
1160 pause:
1161   {
1162     const gchar *reason = gst_flow_get_name (result);
1163
1164     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
1165
1166     JBUF_LOCK (priv);
1167     /* store result */
1168     priv->srcresult = result;
1169     /* we don't post errors or anything because upstream will do that for us
1170      * when we pass the return value upstream. */
1171     gst_pad_pause_task (priv->srcpad);
1172     JBUF_UNLOCK (priv);
1173     return;
1174   }
1175 }
1176
1177 static gboolean
1178 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
1179 {
1180   GstRtpJitterBuffer *jitterbuffer;
1181   GstRtpJitterBufferPrivate *priv;
1182   gboolean res = FALSE;
1183
1184   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1185   priv = jitterbuffer->priv;
1186
1187   switch (GST_QUERY_TYPE (query)) {
1188     case GST_QUERY_LATENCY:
1189     {
1190       /* We need to send the query upstream and add the returned latency to our
1191        * own */
1192       GstClockTime min_latency, max_latency;
1193       gboolean us_live;
1194       GstClockTime our_latency;
1195
1196       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
1197         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
1198
1199         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
1200             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1201             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1202
1203         /* store this so that we can safely sync on the peer buffers. */
1204         JBUF_LOCK (priv);
1205         priv->peer_latency = min_latency;
1206         our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
1207         JBUF_UNLOCK (priv);
1208
1209         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
1210             GST_TIME_ARGS (our_latency));
1211
1212         /* we add some latency but can buffer an infinite amount of time */
1213         min_latency += our_latency;
1214         max_latency = -1;
1215
1216         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
1217             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1218             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1219
1220         gst_query_set_latency (query, TRUE, min_latency, max_latency);
1221       }
1222       break;
1223     }
1224     default:
1225       res = gst_pad_query_default (pad, query);
1226       break;
1227   }
1228   return res;
1229 }
1230
1231 static void
1232 gst_rtp_jitter_buffer_set_property (GObject * object,
1233     guint prop_id, const GValue * value, GParamSpec * pspec)
1234 {
1235   GstRtpJitterBuffer *jitterbuffer;
1236   GstRtpJitterBufferPrivate *priv;
1237
1238   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1239   priv = jitterbuffer->priv;
1240
1241   switch (prop_id) {
1242     case PROP_LATENCY:
1243     {
1244       guint new_latency, old_latency;
1245
1246       new_latency = g_value_get_uint (value);
1247
1248       JBUF_LOCK (priv);
1249       old_latency = priv->latency_ms;
1250       priv->latency_ms = new_latency;
1251       JBUF_UNLOCK (priv);
1252
1253       /* post message if latency changed, this will inform the parent pipeline
1254        * that a latency reconfiguration is possible/needed. */
1255       if (new_latency != old_latency) {
1256         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
1257             GST_TIME_ARGS (new_latency * GST_MSECOND));
1258
1259         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
1260             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
1261       }
1262       break;
1263     }
1264     case PROP_DROP_ON_LATENCY:
1265       priv->drop_on_latency = g_value_get_boolean (value);
1266       break;
1267     case PROP_TS_OFFSET:
1268       JBUF_LOCK (priv);
1269       priv->ts_offset = g_value_get_int64 (value);
1270       JBUF_UNLOCK (priv);
1271       break;
1272     default:
1273       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1274       break;
1275   }
1276 }
1277
1278 static void
1279 gst_rtp_jitter_buffer_get_property (GObject * object,
1280     guint prop_id, GValue * value, GParamSpec * pspec)
1281 {
1282   GstRtpJitterBuffer *jitterbuffer;
1283   GstRtpJitterBufferPrivate *priv;
1284
1285   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1286   priv = jitterbuffer->priv;
1287
1288   switch (prop_id) {
1289     case PROP_LATENCY:
1290       JBUF_LOCK (priv);
1291       g_value_set_uint (value, priv->latency_ms);
1292       JBUF_UNLOCK (priv);
1293       break;
1294     case PROP_DROP_ON_LATENCY:
1295       g_value_set_boolean (value, priv->drop_on_latency);
1296       break;
1297     case PROP_TS_OFFSET:
1298       JBUF_LOCK (priv);
1299       g_value_set_int64 (value, priv->ts_offset);
1300       JBUF_UNLOCK (priv);
1301       break;
1302     default:
1303       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1304       break;
1305   }
1306 }