d9200d5ace0da9e96267bc5f40c0dbf40ee4e99a
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 #include <gst/glib-compat-private.h>
71
72 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
73 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
74
75 /* RTPJitterBuffer signals and args */
76 enum
77 {
78   SIGNAL_REQUEST_PT_MAP,
79   SIGNAL_CLEAR_PT_MAP,
80   SIGNAL_HANDLE_SYNC,
81   SIGNAL_ON_NPT_STOP,
82   SIGNAL_SET_ACTIVE,
83   LAST_SIGNAL
84 };
85
86 #define DEFAULT_LATENCY_MS      200
87 #define DEFAULT_DROP_ON_LATENCY FALSE
88 #define DEFAULT_TS_OFFSET       0
89 #define DEFAULT_DO_LOST         FALSE
90 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
91 #define DEFAULT_PERCENT         0
92
93 enum
94 {
95   PROP_0,
96   PROP_LATENCY,
97   PROP_DROP_ON_LATENCY,
98   PROP_TS_OFFSET,
99   PROP_DO_LOST,
100   PROP_MODE,
101   PROP_PERCENT,
102   PROP_LAST
103 };
104
105 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
106
107 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
108   JBUF_LOCK (priv);                                   \
109   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
110     goto label;                                       \
111 } G_STMT_END
112
113 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
114 #define JBUF_WAIT(priv)   (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
115
116 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
117   JBUF_WAIT(priv);                                    \
118   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
119     goto label;                                       \
120 } G_STMT_END
121
122 #define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
123
124 struct _GstRtpJitterBufferPrivate
125 {
126   GstPad *sinkpad, *srcpad;
127   GstPad *rtcpsinkpad;
128
129   RTPJitterBuffer *jbuf;
130   GMutex jbuf_lock;
131   GCond jbuf_cond;
132   gboolean waiting;
133   gboolean discont;
134   gboolean ts_discont;
135   gboolean active;
136   guint64 out_offset;
137
138   /* properties */
139   guint latency_ms;
140   guint64 latency_ns;
141   gboolean drop_on_latency;
142   gint64 ts_offset;
143   gboolean do_lost;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum we push */
148   guint32 next_seqnum;
149   /* last output time */
150   GstClockTime last_out_time;
151   GstClockTime last_out_pts;
152   /* the next expected seqnum we receive */
153   guint32 next_in_seqnum;
154
155   GArray *timers;
156
157   /* start and stop ranges */
158   GstClockTime npt_start;
159   GstClockTime npt_stop;
160   guint64 ext_timestamp;
161   guint64 last_elapsed;
162   guint64 estimated_eos;
163   GstClockID eos_id;
164
165   /* state */
166   gboolean eos;
167
168   /* clock rate and rtp timestamp offset */
169   gint last_pt;
170   gint32 clock_rate;
171   gint64 clock_base;
172   gint64 prev_ts_offset;
173
174   /* when we are shutting down */
175   GstFlowReturn srcresult;
176   gboolean blocked;
177
178   /* for sync */
179   GstSegment segment;
180   GstClockID clock_id;
181   gboolean unscheduled;
182   /* the latency of the upstream peer, we have to take this into account when
183    * synchronizing the buffers. */
184   GstClockTime peer_latency;
185   guint64 ext_rtptime;
186   GstBuffer *last_sr;
187
188   /* some accounting */
189   guint64 num_late;
190   guint64 num_duplicates;
191 };
192
193 typedef enum
194 {
195   TIMER_TYPE_EXPECTED,
196   TIMER_TYPE_LOST,
197   TIMER_TYPE_DEADLINE,
198   TIMER_TYPE_EOS
199 } TimerType;
200
201 typedef struct
202 {
203   guint idx;
204   guint16 seqnum;
205   TimerType type;
206   GstClockTime timeout;
207 } TimerData;
208
209 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
210   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
211                                 GstRtpJitterBufferPrivate))
212
213 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
214 GST_STATIC_PAD_TEMPLATE ("sink",
215     GST_PAD_SINK,
216     GST_PAD_ALWAYS,
217     GST_STATIC_CAPS ("application/x-rtp, "
218         "clock-rate = (int) [ 1, 2147483647 ]"
219         /* "payload = (int) , "
220          * "encoding-name = (string) "
221          */ )
222     );
223
224 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
225 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
226     GST_PAD_SINK,
227     GST_PAD_REQUEST,
228     GST_STATIC_CAPS ("application/x-rtcp")
229     );
230
231 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
232 GST_STATIC_PAD_TEMPLATE ("src",
233     GST_PAD_SRC,
234     GST_PAD_ALWAYS,
235     GST_STATIC_CAPS ("application/x-rtp"
236         /* "payload = (int) , "
237          * "clock-rate = (int) , "
238          * "encoding-name = (string) "
239          */ )
240     );
241
242 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
243
244 #define gst_rtp_jitter_buffer_parent_class parent_class
245 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
246
247 /* object overrides */
248 static void gst_rtp_jitter_buffer_set_property (GObject * object,
249     guint prop_id, const GValue * value, GParamSpec * pspec);
250 static void gst_rtp_jitter_buffer_get_property (GObject * object,
251     guint prop_id, GValue * value, GParamSpec * pspec);
252 static void gst_rtp_jitter_buffer_finalize (GObject * object);
253
254 /* element overrides */
255 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
256     * element, GstStateChange transition);
257 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
258     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
259 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
260     GstPad * pad);
261 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
262
263 /* pad overrides */
264 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
265 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
266     GstObject * parent);
267
268 /* sinkpad overrides */
269 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
270     GstObject * parent, GstEvent * event);
271 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
272     GstObject * parent, GstBuffer * buffer);
273
274 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
275     GstObject * parent, GstEvent * event);
276 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
277     GstObject * parent, GstBuffer * buffer);
278
279 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
280     GstObject * parent, GstQuery * query);
281
282 /* srcpad overrides */
283 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
284     GstObject * parent, GstEvent * event);
285 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
286     GstObject * parent, GstPadMode mode, gboolean active);
287 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
288 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
289     GstObject * parent, GstQuery * query);
290
291 static void
292 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
293 static GstClockTime
294 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
295     gboolean active, guint64 base_time);
296 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
297 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
298
299 static void
300 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
301 {
302   GObjectClass *gobject_class;
303   GstElementClass *gstelement_class;
304
305   gobject_class = (GObjectClass *) klass;
306   gstelement_class = (GstElementClass *) klass;
307
308   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
309
310   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
311
312   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
313   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
314
315   /**
316    * GstRtpJitterBuffer::latency:
317    *
318    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
319    * for at most this time.
320    */
321   g_object_class_install_property (gobject_class, PROP_LATENCY,
322       g_param_spec_uint ("latency", "Buffer latency in ms",
323           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
324           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
325   /**
326    * GstRtpJitterBuffer::drop-on-latency:
327    *
328    * Drop oldest buffers when the queue is completely filled.
329    */
330   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
331       g_param_spec_boolean ("drop-on-latency",
332           "Drop buffers when maximum latency is reached",
333           "Tells the jitterbuffer to never exceed the given latency in size",
334           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
335   /**
336    * GstRtpJitterBuffer::ts-offset:
337    *
338    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
339    * This is mainly used to ensure interstream synchronisation.
340    */
341   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
342       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
343           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
344           G_MAXINT64, DEFAULT_TS_OFFSET,
345           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
346
347   /**
348    * GstRtpJitterBuffer::do-lost:
349    *
350    * Send out a GstRTPPacketLost event downstream when a packet is considered
351    * lost.
352    */
353   g_object_class_install_property (gobject_class, PROP_DO_LOST,
354       g_param_spec_boolean ("do-lost", "Do Lost",
355           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
356           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357
358   /**
359    * GstRtpJitterBuffer::mode:
360    *
361    * Control the buffering and timestamping mode used by the jitterbuffer.
362    */
363   g_object_class_install_property (gobject_class, PROP_MODE,
364       g_param_spec_enum ("mode", "Mode",
365           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
366           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
367   /**
368    * GstRtpJitterBuffer::percent:
369    *
370    * The percent of the jitterbuffer that is filled.
371    *
372    * Since: 0.10.19
373    */
374   g_object_class_install_property (gobject_class, PROP_PERCENT,
375       g_param_spec_int ("percent", "percent",
376           "The buffer filled percent", 0, 100,
377           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
378   /**
379    * GstRtpJitterBuffer::request-pt-map:
380    * @buffer: the object which received the signal
381    * @pt: the pt
382    *
383    * Request the payload type as #GstCaps for @pt.
384    */
385   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
386       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
387       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
388           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
389       GST_TYPE_CAPS, 1, G_TYPE_UINT);
390   /**
391    * GstRtpJitterBuffer::handle-sync:
392    * @buffer: the object which received the signal
393    * @struct: a GstStructure containing sync values.
394    *
395    * Be notified of new sync values.
396    */
397   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
398       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
399       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
400           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
401       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
402
403   /**
404    * GstRtpJitterBuffer::on-npt-stop
405    * @buffer: the object which received the signal
406    *
407    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
408    * the npt-stop position.
409    */
410   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
411       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
412       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
413           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
414       G_TYPE_NONE, 0, G_TYPE_NONE);
415
416   /**
417    * GstRtpJitterBuffer::clear-pt-map:
418    * @buffer: the object which received the signal
419    *
420    * Invalidate the clock-rate as obtained with the
421    * #GstRtpJitterBuffer::request-pt-map signal.
422    */
423   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
424       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
425       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
426       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
427       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
428
429   /**
430    * GstRtpJitterBuffer::set-active:
431    * @buffer: the object which received the signal
432    *
433    * Start pushing out packets with the given base time. This signal is only
434    * useful in buffering mode.
435    *
436    * Returns: the time of the last pushed packet.
437    *
438    * Since: 0.10.19
439    */
440   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
441       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
442       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
443       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
444       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
445       G_TYPE_UINT64);
446
447   gstelement_class->change_state =
448       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
449   gstelement_class->request_new_pad =
450       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
451   gstelement_class->release_pad =
452       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
453   gstelement_class->provide_clock =
454       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
455
456   gst_element_class_add_pad_template (gstelement_class,
457       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
458   gst_element_class_add_pad_template (gstelement_class,
459       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
460   gst_element_class_add_pad_template (gstelement_class,
461       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
462
463   gst_element_class_set_static_metadata (gstelement_class,
464       "RTP packet jitter-buffer", "Filter/Network/RTP",
465       "A buffer that deals with network jitter and other transmission faults",
466       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
467       "Wim Taymans <wim.taymans@gmail.com>");
468
469   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
470   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
471
472   GST_DEBUG_CATEGORY_INIT
473       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
474 }
475
476 static void
477 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
478 {
479   GstRtpJitterBufferPrivate *priv;
480
481   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
482   jitterbuffer->priv = priv;
483
484   priv->latency_ms = DEFAULT_LATENCY_MS;
485   priv->latency_ns = priv->latency_ms * GST_MSECOND;
486   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
487   priv->do_lost = DEFAULT_DO_LOST;
488   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
489
490   priv->jbuf = rtp_jitter_buffer_new ();
491   g_mutex_init (&priv->jbuf_lock);
492   g_cond_init (&priv->jbuf_cond);
493
494   /* reset skew detection initialy */
495   rtp_jitter_buffer_reset_skew (priv->jbuf);
496   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
497   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
498   priv->active = TRUE;
499
500   priv->srcpad =
501       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
502       "src");
503
504   gst_pad_set_activatemode_function (priv->srcpad,
505       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
506   gst_pad_set_query_function (priv->srcpad,
507       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
508   gst_pad_set_event_function (priv->srcpad,
509       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
510
511   priv->sinkpad =
512       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
513       "sink");
514
515   gst_pad_set_chain_function (priv->sinkpad,
516       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
517   gst_pad_set_event_function (priv->sinkpad,
518       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
519   gst_pad_set_query_function (priv->sinkpad,
520       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
521
522   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
523   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
524
525   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
526 }
527
528 static void
529 gst_rtp_jitter_buffer_finalize (GObject * object)
530 {
531   GstRtpJitterBuffer *jitterbuffer;
532
533   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
534
535   g_array_free (jitterbuffer->priv->timers, TRUE);
536   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
537   g_cond_clear (&jitterbuffer->priv->jbuf_cond);
538
539   g_object_unref (jitterbuffer->priv->jbuf);
540
541   G_OBJECT_CLASS (parent_class)->finalize (object);
542 }
543
544 static GstIterator *
545 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
546 {
547   GstRtpJitterBuffer *jitterbuffer;
548   GstPad *otherpad = NULL;
549   GstIterator *it;
550   GValue val = { 0, };
551
552   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
553
554   if (pad == jitterbuffer->priv->sinkpad) {
555     otherpad = jitterbuffer->priv->srcpad;
556   } else if (pad == jitterbuffer->priv->srcpad) {
557     otherpad = jitterbuffer->priv->sinkpad;
558   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
559     otherpad = NULL;
560   }
561
562   g_value_init (&val, GST_TYPE_PAD);
563   g_value_set_object (&val, otherpad);
564   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
565   g_value_unset (&val);
566
567   return it;
568 }
569
570 static GstPad *
571 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
572 {
573   GstRtpJitterBufferPrivate *priv;
574
575   priv = jitterbuffer->priv;
576
577   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
578
579   priv->rtcpsinkpad =
580       gst_pad_new_from_static_template
581       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
582   gst_pad_set_chain_function (priv->rtcpsinkpad,
583       gst_rtp_jitter_buffer_chain_rtcp);
584   gst_pad_set_event_function (priv->rtcpsinkpad,
585       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
586   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
587       gst_rtp_jitter_buffer_iterate_internal_links);
588   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
589   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
590
591   return priv->rtcpsinkpad;
592 }
593
594 static void
595 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
596 {
597   GstRtpJitterBufferPrivate *priv;
598
599   priv = jitterbuffer->priv;
600
601   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
602
603   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
604
605   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
606   priv->rtcpsinkpad = NULL;
607 }
608
609 static GstPad *
610 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
611     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
612 {
613   GstRtpJitterBuffer *jitterbuffer;
614   GstElementClass *klass;
615   GstPad *result;
616   GstRtpJitterBufferPrivate *priv;
617
618   g_return_val_if_fail (templ != NULL, NULL);
619   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
620
621   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
622   priv = jitterbuffer->priv;
623   klass = GST_ELEMENT_GET_CLASS (element);
624
625   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
626
627   /* figure out the template */
628   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
629     if (priv->rtcpsinkpad != NULL)
630       goto exists;
631
632     result = create_rtcp_sink (jitterbuffer);
633   } else
634     goto wrong_template;
635
636   return result;
637
638   /* ERRORS */
639 wrong_template:
640   {
641     g_warning ("gstrtpjitterbuffer: this is not our template");
642     return NULL;
643   }
644 exists:
645   {
646     g_warning ("gstrtpjitterbuffer: pad already requested");
647     return NULL;
648   }
649 }
650
651 static void
652 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
653 {
654   GstRtpJitterBuffer *jitterbuffer;
655   GstRtpJitterBufferPrivate *priv;
656
657   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
658   g_return_if_fail (GST_IS_PAD (pad));
659
660   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
661   priv = jitterbuffer->priv;
662
663   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
664
665   if (priv->rtcpsinkpad == pad) {
666     remove_rtcp_sink (jitterbuffer);
667   } else
668     goto wrong_pad;
669
670   return;
671
672   /* ERRORS */
673 wrong_pad:
674   {
675     g_warning ("gstjitterbuffer: asked to release an unknown pad");
676     return;
677   }
678 }
679
680 static GstClock *
681 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
682 {
683   return gst_system_clock_obtain ();
684 }
685
686 static void
687 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
688 {
689   GstRtpJitterBufferPrivate *priv;
690
691   priv = jitterbuffer->priv;
692
693   /* this will trigger a new pt-map request signal, FIXME, do something better. */
694
695   JBUF_LOCK (priv);
696   priv->clock_rate = -1;
697   /* do not clear current content, but refresh state for new arrival */
698   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
699   rtp_jitter_buffer_reset_skew (priv->jbuf);
700   priv->last_popped_seqnum = -1;
701   priv->next_seqnum = -1;
702   JBUF_UNLOCK (priv);
703 }
704
705 static GstClockTime
706 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
707     guint64 offset)
708 {
709   GstRtpJitterBufferPrivate *priv;
710   GstClockTime last_out;
711   GstBuffer *head;
712
713   priv = jbuf->priv;
714
715   JBUF_LOCK (priv);
716   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
717       active, GST_TIME_ARGS (offset));
718
719   if (active != priv->active) {
720     /* add the amount of time spent in paused to the output offset. All
721      * outgoing buffers will have this offset applied to their timestamps in
722      * order to make them arrive in time in the sink. */
723     priv->out_offset = offset;
724     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
725         GST_TIME_ARGS (priv->out_offset));
726     priv->active = active;
727     JBUF_SIGNAL (priv);
728   }
729   if (!active) {
730     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
731   }
732   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
733     /* head buffer timestamp and offset gives our output time */
734     last_out = GST_BUFFER_TIMESTAMP (head) + priv->ts_offset;
735   } else {
736     /* use last known time when the buffer is empty */
737     last_out = priv->last_out_time;
738   }
739   JBUF_UNLOCK (priv);
740
741   return last_out;
742 }
743
744 static GstCaps *
745 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
746 {
747   GstRtpJitterBuffer *jitterbuffer;
748   GstRtpJitterBufferPrivate *priv;
749   GstPad *other;
750   GstCaps *caps;
751   GstCaps *templ;
752
753   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
754   priv = jitterbuffer->priv;
755
756   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
757
758   caps = gst_pad_peer_query_caps (other, filter);
759
760   templ = gst_pad_get_pad_template_caps (pad);
761   if (caps == NULL) {
762     GST_DEBUG_OBJECT (jitterbuffer, "use template");
763     caps = templ;
764   } else {
765     GstCaps *intersect;
766
767     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
768
769     intersect = gst_caps_intersect (caps, templ);
770     gst_caps_unref (caps);
771     gst_caps_unref (templ);
772
773     caps = intersect;
774   }
775   gst_object_unref (jitterbuffer);
776
777   return caps;
778 }
779
780 /*
781  * Must be called with JBUF_LOCK held
782  */
783
784 static gboolean
785 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
786     GstCaps * caps)
787 {
788   GstRtpJitterBufferPrivate *priv;
789   GstStructure *caps_struct;
790   guint val;
791   GstClockTime tval;
792
793   priv = jitterbuffer->priv;
794
795   /* first parse the caps */
796   caps_struct = gst_caps_get_structure (caps, 0);
797
798   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
799
800   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
801    * measure the amount of data in the buffer */
802   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
803     goto error;
804
805   if (priv->clock_rate <= 0)
806     goto wrong_rate;
807
808   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
809
810   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
811    * can use this to track the amount of time elapsed on the sender. */
812   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
813     priv->clock_base = val;
814   else
815     priv->clock_base = -1;
816
817   priv->ext_timestamp = priv->clock_base;
818
819   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
820       priv->clock_base);
821
822   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
823     /* first expected seqnum, only update when we didn't have a previous base. */
824     if (priv->next_in_seqnum == -1)
825       priv->next_in_seqnum = val;
826     if (priv->next_seqnum == -1)
827       priv->next_seqnum = val;
828   }
829
830   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
831
832   /* the start and stop times. The seqnum-base corresponds to the start time. We
833    * will keep track of the seqnums on the output and when we reach the one
834    * corresponding to npt-stop, we emit the npt-stop-reached signal */
835   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
836     priv->npt_start = tval;
837   else
838     priv->npt_start = 0;
839
840   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
841     priv->npt_stop = tval;
842   else
843     priv->npt_stop = -1;
844
845   GST_DEBUG_OBJECT (jitterbuffer,
846       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
847       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
848
849   return TRUE;
850
851   /* ERRORS */
852 error:
853   {
854     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
855     return FALSE;
856   }
857 wrong_rate:
858   {
859     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
860     return FALSE;
861   }
862 }
863
864 static void
865 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
866 {
867   GstRtpJitterBufferPrivate *priv;
868
869   priv = jitterbuffer->priv;
870
871   JBUF_LOCK (priv);
872   /* mark ourselves as flushing */
873   priv->srcresult = GST_FLOW_FLUSHING;
874   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
875   /* this unblocks any waiting pops on the src pad task */
876   JBUF_SIGNAL (priv);
877   /* unlock clock, we just unschedule, the entry will be released by the
878    * locking streaming thread. */
879   if (priv->clock_id) {
880     gst_clock_id_unschedule (priv->clock_id);
881     priv->unscheduled = TRUE;
882   }
883   JBUF_UNLOCK (priv);
884 }
885
886 static void
887 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
888 {
889   GstRtpJitterBufferPrivate *priv;
890
891   priv = jitterbuffer->priv;
892
893   JBUF_LOCK (priv);
894   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
895   /* Mark as non flushing */
896   priv->srcresult = GST_FLOW_OK;
897   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
898   priv->last_popped_seqnum = -1;
899   priv->last_out_time = -1;
900   priv->last_out_pts = -1;
901   priv->next_seqnum = -1;
902   priv->next_in_seqnum = -1;
903   priv->clock_rate = -1;
904   priv->eos = FALSE;
905   priv->estimated_eos = -1;
906   priv->last_elapsed = 0;
907   priv->ext_timestamp = -1;
908   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
909   rtp_jitter_buffer_flush (priv->jbuf);
910   rtp_jitter_buffer_reset_skew (priv->jbuf);
911   remove_all_timers (jitterbuffer);
912   JBUF_UNLOCK (priv);
913 }
914
915 static gboolean
916 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
917     GstPadMode mode, gboolean active)
918 {
919   gboolean result;
920   GstRtpJitterBuffer *jitterbuffer = NULL;
921
922   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
923
924   switch (mode) {
925     case GST_PAD_MODE_PUSH:
926       if (active) {
927         /* allow data processing */
928         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
929
930         /* start pushing out buffers */
931         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
932         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
933             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
934       } else {
935         /* make sure all data processing stops ASAP */
936         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
937
938         /* NOTE this will hardlock if the state change is called from the src pad
939          * task thread because we will _join() the thread. */
940         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
941         result = gst_pad_stop_task (pad);
942       }
943       break;
944     default:
945       result = FALSE;
946       break;
947   }
948   return result;
949 }
950
951 static GstStateChangeReturn
952 gst_rtp_jitter_buffer_change_state (GstElement * element,
953     GstStateChange transition)
954 {
955   GstRtpJitterBuffer *jitterbuffer;
956   GstRtpJitterBufferPrivate *priv;
957   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
958
959   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
960   priv = jitterbuffer->priv;
961
962   switch (transition) {
963     case GST_STATE_CHANGE_NULL_TO_READY:
964       break;
965     case GST_STATE_CHANGE_READY_TO_PAUSED:
966       JBUF_LOCK (priv);
967       /* reset negotiated values */
968       priv->clock_rate = -1;
969       priv->clock_base = -1;
970       priv->peer_latency = 0;
971       priv->last_pt = -1;
972       /* block until we go to PLAYING */
973       priv->blocked = TRUE;
974       JBUF_UNLOCK (priv);
975       break;
976     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
977       JBUF_LOCK (priv);
978       /* unblock to allow streaming in PLAYING */
979       priv->blocked = FALSE;
980       JBUF_SIGNAL (priv);
981       JBUF_UNLOCK (priv);
982       break;
983     default:
984       break;
985   }
986
987   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
988
989   switch (transition) {
990     case GST_STATE_CHANGE_READY_TO_PAUSED:
991       /* we are a live element because we sync to the clock, which we can only
992        * do in the PLAYING state */
993       if (ret != GST_STATE_CHANGE_FAILURE)
994         ret = GST_STATE_CHANGE_NO_PREROLL;
995       break;
996     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
997       JBUF_LOCK (priv);
998       /* block to stop streaming when PAUSED */
999       priv->blocked = TRUE;
1000       JBUF_UNLOCK (priv);
1001       if (ret != GST_STATE_CHANGE_FAILURE)
1002         ret = GST_STATE_CHANGE_NO_PREROLL;
1003       break;
1004     case GST_STATE_CHANGE_PAUSED_TO_READY:
1005       gst_buffer_replace (&priv->last_sr, NULL);
1006       break;
1007     case GST_STATE_CHANGE_READY_TO_NULL:
1008       break;
1009     default:
1010       break;
1011   }
1012
1013   return ret;
1014 }
1015
1016 static gboolean
1017 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1018     GstEvent * event)
1019 {
1020   gboolean ret = TRUE;
1021   GstRtpJitterBuffer *jitterbuffer;
1022   GstRtpJitterBufferPrivate *priv;
1023
1024   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1025   priv = jitterbuffer->priv;
1026
1027   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1028
1029   switch (GST_EVENT_TYPE (event)) {
1030     case GST_EVENT_LATENCY:
1031     {
1032       GstClockTime latency;
1033
1034       gst_event_parse_latency (event, &latency);
1035
1036       GST_DEBUG_OBJECT (jitterbuffer,
1037           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1038
1039       JBUF_LOCK (priv);
1040       /* adjust the overall buffer delay to the total pipeline latency in
1041        * buffering mode because if downstream consumes too fast (because of
1042        * large latency or queues, we would start rebuffering again. */
1043       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1044           RTP_JITTER_BUFFER_MODE_BUFFER) {
1045         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1046       }
1047       JBUF_UNLOCK (priv);
1048
1049       ret = gst_pad_push_event (priv->sinkpad, event);
1050       break;
1051     }
1052     default:
1053       ret = gst_pad_push_event (priv->sinkpad, event);
1054       break;
1055   }
1056
1057   return ret;
1058 }
1059
1060 static gboolean
1061 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1062     GstEvent * event)
1063 {
1064   gboolean ret = TRUE;
1065   GstRtpJitterBuffer *jitterbuffer;
1066   GstRtpJitterBufferPrivate *priv;
1067
1068   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1069   priv = jitterbuffer->priv;
1070
1071   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1072
1073   switch (GST_EVENT_TYPE (event)) {
1074     case GST_EVENT_CAPS:
1075     {
1076       GstCaps *caps;
1077
1078       gst_event_parse_caps (event, &caps);
1079
1080       JBUF_LOCK (priv);
1081       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1082       JBUF_UNLOCK (priv);
1083
1084       /* set same caps on srcpad on success */
1085       if (ret)
1086         ret = gst_pad_push_event (priv->srcpad, event);
1087       else
1088         gst_event_unref (event);
1089       break;
1090     }
1091     case GST_EVENT_SEGMENT:
1092     {
1093       gst_event_copy_segment (event, &priv->segment);
1094
1095       /* we need time for now */
1096       if (priv->segment.format != GST_FORMAT_TIME)
1097         goto newseg_wrong_format;
1098
1099       GST_DEBUG_OBJECT (jitterbuffer,
1100           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1101
1102       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1103       ret = gst_pad_push_event (priv->srcpad, event);
1104       break;
1105     }
1106     case GST_EVENT_FLUSH_START:
1107       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1108       ret = gst_pad_push_event (priv->srcpad, event);
1109       break;
1110     case GST_EVENT_FLUSH_STOP:
1111       ret = gst_pad_push_event (priv->srcpad, event);
1112       ret =
1113           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1114           GST_PAD_MODE_PUSH, TRUE);
1115       break;
1116     case GST_EVENT_EOS:
1117     {
1118       /* push EOS in queue. We always push it at the head */
1119       JBUF_LOCK (priv);
1120       /* check for flushing, we need to discard the event and return FALSE when
1121        * we are flushing */
1122       ret = priv->srcresult == GST_FLOW_OK;
1123       if (ret && !priv->eos) {
1124         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1125         priv->eos = TRUE;
1126         JBUF_SIGNAL (priv);
1127       } else if (priv->eos) {
1128         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1129       } else {
1130         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1131             gst_flow_get_name (priv->srcresult));
1132       }
1133       JBUF_UNLOCK (priv);
1134       gst_event_unref (event);
1135       break;
1136     }
1137     default:
1138       ret = gst_pad_push_event (priv->srcpad, event);
1139       break;
1140   }
1141
1142 done:
1143
1144   return ret;
1145
1146   /* ERRORS */
1147 newseg_wrong_format:
1148   {
1149     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1150     ret = FALSE;
1151     gst_event_unref (event);
1152     goto done;
1153   }
1154 }
1155
1156 static gboolean
1157 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1158     GstEvent * event)
1159 {
1160   gboolean ret = TRUE;
1161   GstRtpJitterBuffer *jitterbuffer;
1162
1163   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1164
1165   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1166
1167   switch (GST_EVENT_TYPE (event)) {
1168     case GST_EVENT_FLUSH_START:
1169       gst_event_unref (event);
1170       break;
1171     case GST_EVENT_FLUSH_STOP:
1172       gst_event_unref (event);
1173       break;
1174     default:
1175       ret = gst_pad_event_default (pad, parent, event);
1176       break;
1177   }
1178
1179   return ret;
1180 }
1181
1182 /*
1183  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1184  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1185  * GST_FLOW_FLUSHING when the element is shutting down. On success
1186  * GST_FLOW_OK is returned.
1187  */
1188 static GstFlowReturn
1189 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1190     guint8 pt)
1191 {
1192   GValue ret = { 0 };
1193   GValue args[2] = { {0}, {0} };
1194   GstCaps *caps;
1195   gboolean res;
1196
1197   g_value_init (&args[0], GST_TYPE_ELEMENT);
1198   g_value_set_object (&args[0], jitterbuffer);
1199   g_value_init (&args[1], G_TYPE_UINT);
1200   g_value_set_uint (&args[1], pt);
1201
1202   g_value_init (&ret, GST_TYPE_CAPS);
1203   g_value_set_boxed (&ret, NULL);
1204
1205   JBUF_UNLOCK (jitterbuffer->priv);
1206   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1207       &ret);
1208   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1209
1210   g_value_unset (&args[0]);
1211   g_value_unset (&args[1]);
1212   caps = (GstCaps *) g_value_dup_boxed (&ret);
1213   g_value_unset (&ret);
1214   if (!caps)
1215     goto no_caps;
1216
1217   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1218   gst_caps_unref (caps);
1219
1220   if (G_UNLIKELY (!res))
1221     goto parse_failed;
1222
1223   return GST_FLOW_OK;
1224
1225   /* ERRORS */
1226 no_caps:
1227   {
1228     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1229     return GST_FLOW_ERROR;
1230   }
1231 out_flushing:
1232   {
1233     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1234     return GST_FLOW_FLUSHING;
1235   }
1236 parse_failed:
1237   {
1238     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1239     return GST_FLOW_ERROR;
1240   }
1241 }
1242
1243 /* call with jbuf lock held */
1244 static void
1245 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1246 {
1247   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1248
1249   /* too short a stream, or too close to EOS will never really fill buffer */
1250   if (*percent != -1 && priv->npt_stop != -1 &&
1251       priv->npt_stop - priv->npt_start <=
1252       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1253     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1254     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1255     *percent = 100;
1256   }
1257 }
1258
1259 static void
1260 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1261 {
1262   GstMessage *message;
1263
1264   /* Post a buffering message */
1265   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1266   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1267
1268   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1269 }
1270
1271 static GstFlowReturn
1272 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1273     GstBuffer * buffer)
1274 {
1275   GstRtpJitterBuffer *jitterbuffer;
1276   GstRtpJitterBufferPrivate *priv;
1277   guint16 seqnum;
1278   GstFlowReturn ret = GST_FLOW_OK;
1279   GstClockTime timestamp;
1280   guint64 latency_ts;
1281   gboolean tail;
1282   gint percent = -1;
1283   guint8 pt;
1284   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1285
1286   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1287
1288   priv = jitterbuffer->priv;
1289
1290   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1291     goto invalid_buffer;
1292
1293   pt = gst_rtp_buffer_get_payload_type (&rtp);
1294   seqnum = gst_rtp_buffer_get_seq (&rtp);
1295   gst_rtp_buffer_unmap (&rtp);
1296
1297   /* take the timestamp of the buffer. This is the time when the packet was
1298    * received and is used to calculate jitter and clock skew. We will adjust
1299    * this timestamp with the smoothed value after processing it in the
1300    * jitterbuffer. */
1301   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1302   /* bring to running time */
1303   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
1304       timestamp);
1305
1306   GST_DEBUG_OBJECT (jitterbuffer,
1307       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1308       GST_TIME_ARGS (timestamp));
1309
1310   JBUF_LOCK_CHECK (priv, out_flushing);
1311
1312   if (G_UNLIKELY (priv->last_pt != pt)) {
1313     GstCaps *caps;
1314
1315     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1316         pt);
1317
1318     priv->last_pt = pt;
1319     /* reset clock-rate so that we get a new one */
1320     priv->clock_rate = -1;
1321
1322     /* Try to get the clock-rate from the caps first if we can. If there are no
1323      * caps we must fire the signal to get the clock-rate. */
1324     if ((caps = gst_pad_get_current_caps (pad))) {
1325       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1326       gst_caps_unref (caps);
1327     }
1328   }
1329
1330   if (G_UNLIKELY (priv->clock_rate == -1)) {
1331     /* no clock rate given on the caps, try to get one with the signal */
1332     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1333             pt) == GST_FLOW_FLUSHING)
1334       goto out_flushing;
1335
1336     if (G_UNLIKELY (priv->clock_rate == -1))
1337       goto no_clock_rate;
1338   }
1339
1340   /* don't accept more data on EOS */
1341   if (G_UNLIKELY (priv->eos))
1342     goto have_eos;
1343
1344   /* now check against our expected seqnum */
1345   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1346     gint gap;
1347     gboolean reset = FALSE;
1348
1349     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1350     if (G_UNLIKELY (gap != 0)) {
1351       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1352           priv->next_in_seqnum, seqnum, gap);
1353       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1354        * sender might have been restarted with different seqnum. */
1355       if (gap < -RTP_MAX_MISORDER) {
1356         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1357         reset = TRUE;
1358       }
1359       /* priv->next_in_seqnum < seqnum, this is a new packet */
1360       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1361         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1362             gap);
1363         reset = TRUE;
1364       } else {
1365         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1366       }
1367     }
1368     if (G_UNLIKELY (reset)) {
1369       GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1370       rtp_jitter_buffer_flush (priv->jbuf);
1371       rtp_jitter_buffer_reset_skew (priv->jbuf);
1372       remove_all_timers (jitterbuffer);
1373       priv->last_popped_seqnum = -1;
1374       priv->next_seqnum = seqnum;
1375     }
1376   }
1377   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1378
1379   /* let's check if this buffer is too late, we can only accept packets with
1380    * bigger seqnum than the one we last pushed. */
1381   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1382     gint gap;
1383
1384     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1385
1386     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1387     if (G_UNLIKELY (gap <= 0))
1388       goto too_late;
1389   }
1390
1391   /* let's drop oldest packet if the queue is already full and drop-on-latency
1392    * is set. We can only do this when there actually is a latency. When no
1393    * latency is set, we just pump it in the queue and let the other end push it
1394    * out as fast as possible. */
1395   if (priv->latency_ms && priv->drop_on_latency) {
1396     latency_ts =
1397         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1398
1399     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1400       GstBuffer *old_buf;
1401
1402       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1403
1404       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1405           old_buf);
1406
1407       gst_buffer_unref (old_buf);
1408     }
1409   }
1410
1411   /* we need to make the metadata writable before pushing it in the jitterbuffer
1412    * because the jitterbuffer will update the timestamp */
1413   buffer = gst_buffer_make_writable (buffer);
1414
1415   /* now insert the packet into the queue in sorted order. This function returns
1416    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1417    * have a duplicate. */
1418   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
1419               priv->clock_rate, &tail, &percent)))
1420     goto duplicate;
1421
1422   /* we had an unhandled SR, handle it now */
1423   if (priv->last_sr)
1424     do_handle_sync (jitterbuffer);
1425
1426   /* signal addition of new buffer when the _loop is waiting. */
1427   if (priv->waiting && priv->active)
1428     JBUF_SIGNAL (priv);
1429
1430   /* let's unschedule and unblock any waiting buffers. We only want to do this
1431    * when the tail buffer changed */
1432   if (G_UNLIKELY (priv->clock_id && tail)) {
1433     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
1434     gst_clock_id_unschedule (priv->clock_id);
1435     priv->unscheduled = TRUE;
1436   }
1437
1438   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1439       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1440
1441   check_buffering_percent (jitterbuffer, &percent);
1442
1443 finished:
1444   JBUF_UNLOCK (priv);
1445
1446   if (percent != -1)
1447     post_buffering_percent (jitterbuffer, percent);
1448
1449   return ret;
1450
1451   /* ERRORS */
1452 invalid_buffer:
1453   {
1454     /* this is not fatal but should be filtered earlier */
1455     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1456         ("Received invalid RTP payload, dropping"));
1457     gst_buffer_unref (buffer);
1458     return GST_FLOW_OK;
1459   }
1460 no_clock_rate:
1461   {
1462     GST_WARNING_OBJECT (jitterbuffer,
1463         "No clock-rate in caps!, dropping buffer");
1464     gst_buffer_unref (buffer);
1465     goto finished;
1466   }
1467 out_flushing:
1468   {
1469     ret = priv->srcresult;
1470     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1471     gst_buffer_unref (buffer);
1472     goto finished;
1473   }
1474 have_eos:
1475   {
1476     ret = GST_FLOW_EOS;
1477     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1478     gst_buffer_unref (buffer);
1479     goto finished;
1480   }
1481 too_late:
1482   {
1483     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1484         " popped, dropping", seqnum, priv->last_popped_seqnum);
1485     priv->num_late++;
1486     gst_buffer_unref (buffer);
1487     goto finished;
1488   }
1489 duplicate:
1490   {
1491     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1492         seqnum);
1493     priv->num_duplicates++;
1494     gst_buffer_unref (buffer);
1495     goto finished;
1496   }
1497 }
1498
1499 static GstClockTime
1500 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1501 {
1502   GstRtpJitterBufferPrivate *priv;
1503
1504   priv = jitterbuffer->priv;
1505
1506   if (timestamp == -1)
1507     return -1;
1508
1509   /* apply the timestamp offset, this is used for inter stream sync */
1510   timestamp += priv->ts_offset;
1511   /* add the offset, this is used when buffering */
1512   timestamp += priv->out_offset;
1513
1514   return timestamp;
1515 }
1516
1517 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
1518
1519 static TimerData *
1520 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1521     guint16 seqnum, gboolean * created)
1522 {
1523   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1524   TimerData *timer;
1525   gint i, len;
1526   gboolean found = FALSE;
1527
1528   len = priv->timers->len;
1529   for (i = 0; i < len; i++) {
1530     timer = &g_array_index (priv->timers, TimerData, i);
1531     if (timer->seqnum == seqnum && timer->type == type) {
1532       found = TRUE;
1533       break;
1534     }
1535   }
1536   if (!found) {
1537     /* not found, create */
1538     g_array_set_size (priv->timers, len + 1);
1539     timer = &g_array_index (priv->timers, TimerData, len);
1540     timer->idx = len;
1541     timer->type = type;
1542     timer->seqnum = seqnum;
1543   }
1544   if (created)
1545     *created = !found;
1546
1547   return timer;
1548 }
1549
1550 static GstFlowReturn
1551 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1552     guint16 seqnum, GstClockTime timeout)
1553 {
1554   TimerData *timer;
1555
1556   GST_DEBUG_OBJECT (jitterbuffer,
1557       "set timer for seqnum %d to %" GST_TIME_FORMAT, seqnum,
1558       GST_TIME_ARGS (timeout));
1559
1560   /* find the seqnum timer */
1561   timer = find_timer (jitterbuffer, type, seqnum, NULL);
1562   timer->timeout = timeout;
1563
1564   return GST_FLOW_WAIT;
1565 }
1566
1567 static void
1568 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1569 {
1570   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1571   guint idx;
1572
1573   idx = timer->idx;
1574   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1575   g_array_remove_index_fast (priv->timers, idx);
1576   timer->idx = idx;
1577 }
1578
1579 static void
1580 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1581 {
1582   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1583   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1584   g_array_set_size (priv->timers, 0);
1585 }
1586
1587 static GstClockTime
1588 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1589 {
1590   guint64 ext_time, elapsed;
1591   guint32 rtp_time;
1592   GstRtpJitterBufferPrivate *priv;
1593   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1594
1595   priv = jitterbuffer->priv;
1596   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1597   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
1598   gst_rtp_buffer_unmap (&rtp);
1599
1600   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1601       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1602
1603   if (rtp_time < priv->ext_timestamp) {
1604     ext_time = priv->ext_timestamp;
1605   } else {
1606     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1607   }
1608
1609   if (ext_time > priv->clock_base)
1610     elapsed = ext_time - priv->clock_base;
1611   else
1612     elapsed = 0;
1613
1614   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1615   return elapsed;
1616 }
1617
1618 static void
1619 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1620 {
1621   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1622
1623   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1624       && priv->clock_base != -1 && priv->clock_rate > 0) {
1625     guint64 elapsed, estimated;
1626
1627     elapsed = compute_elapsed (jitterbuffer, outbuf);
1628
1629     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1630       guint64 left;
1631       GstClockTime out_time;
1632
1633       priv->last_elapsed = elapsed;
1634
1635       left = priv->npt_stop - priv->npt_start;
1636       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1637           GST_TIME_ARGS (left));
1638
1639       out_time = GST_BUFFER_PTS (outbuf);
1640
1641       if (elapsed > 0)
1642         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1643       else {
1644         /* if there is almost nothing left,
1645          * we may never advance enough to end up in the above case */
1646         if (left < GST_SECOND)
1647           estimated = GST_SECOND;
1648         else
1649           estimated = -1;
1650       }
1651
1652       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1653           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1654
1655       if (estimated != -1 && priv->estimated_eos != estimated) {
1656         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
1657         priv->estimated_eos = estimated;
1658       }
1659     }
1660   }
1661 }
1662
1663 /* take a buffer from the queue and push it */
1664 static GstFlowReturn
1665 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1666     GstClockTime pts)
1667 {
1668   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1669   GstFlowReturn result;
1670   GstBuffer *outbuf;
1671   gint percent = -1;
1672   GstClockTime out_time;
1673
1674   /* when we get here we are ready to pop and push the buffer */
1675   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1676
1677   check_buffering_percent (jitterbuffer, &percent);
1678
1679   if (G_UNLIKELY (priv->discont)) {
1680     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1681      * into the jitterbuffer so we can modify now. */
1682     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1683     priv->discont = FALSE;
1684   }
1685   if (G_UNLIKELY (priv->ts_discont)) {
1686     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
1687     priv->ts_discont = FALSE;
1688   }
1689
1690   /* apply timestamp with offset to buffer now */
1691   out_time = apply_offset (jitterbuffer, pts);
1692   GST_BUFFER_PTS (outbuf) = out_time;
1693   GST_BUFFER_DTS (outbuf) = out_time;
1694
1695   /* update the elapsed time when we need to check against the npt stop time. */
1696   update_estimated_eos (jitterbuffer, outbuf);
1697
1698   /* now we are ready to push the buffer. Save the seqnum and release the lock
1699    * so the other end can push stuff in the queue again. */
1700   priv->last_popped_seqnum = seqnum;
1701   priv->last_out_time = out_time;
1702   priv->last_out_pts = pts;
1703   priv->next_seqnum = (seqnum + 1) & 0xffff;
1704   JBUF_UNLOCK (priv);
1705
1706   if (percent != -1)
1707     post_buffering_percent (jitterbuffer, percent);
1708
1709   /* push buffer */
1710   GST_DEBUG_OBJECT (jitterbuffer,
1711       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1712       GST_TIME_ARGS (out_time));
1713   result = gst_pad_push (priv->srcpad, outbuf);
1714
1715   JBUF_LOCK_CHECK (priv, out_flushing);
1716
1717   return result;
1718
1719   /* ERRORS */
1720 out_flushing:
1721   {
1722     return priv->srcresult;
1723   }
1724 }
1725
1726 static GstClockTime
1727 estimate_pts (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts, gint gap)
1728 {
1729   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1730   GstClockTime duration;
1731
1732   if (pts == -1 || priv->last_out_pts == -1)
1733     return pts;
1734
1735   GST_DEBUG_OBJECT (jitterbuffer,
1736       "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1737       GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_out_pts));
1738
1739   /* interpolate between the current time and the last time based on
1740    * number of packets we are missing, this is the estimated duration
1741    * for the missing packet based on equidistant packet spacing. Also make
1742    * sure we never go negative. */
1743   if (pts >= priv->last_out_pts)
1744     duration = (pts - priv->last_out_pts) / (gap + 1);
1745   else
1746     /* packet already lost, timer will timeout quickly */
1747     duration = 0;
1748
1749   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1750       GST_TIME_ARGS (duration));
1751
1752   /* add this duration to the timestamp of the last packet we pushed */
1753   pts = (priv->last_out_pts + duration);
1754
1755   return pts;
1756 }
1757
1758 /* Peek a buffer and compare the seqnum to the expected seqnum.
1759  * If all is fine, the buffer is pushed.
1760  * If something is wrong, a timeout is set. We set 2 kinds of timeouts:
1761  *   * deadline: to the ultimate time we can still push the packet. We
1762  *     do this for the first packet to make sure we have the previous
1763  *     packets.
1764  *   * lost: the ultimate time we can receive a packet before we have
1765  *     to consider it lost. We estimate this based on the packet
1766  *     spacing.
1767  */
1768 static GstFlowReturn
1769 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
1770 {
1771   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1772   GstFlowReturn result = GST_FLOW_OK;
1773   GstBuffer *outbuf;
1774   guint16 seqnum;
1775   GstClockTime pts;
1776   guint32 next_seqnum;
1777   gint gap;
1778   GstRTPBuffer rtp = { NULL, };
1779
1780   /* only push buffers when PLAYING and active and not buffering */
1781   if (priv->blocked || !priv->active ||
1782       rtp_jitter_buffer_is_buffering (priv->jbuf))
1783     return GST_FLOW_WAIT;
1784
1785 again:
1786   /* peek a buffer, we're just looking at the sequence number.
1787    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1788    * wait on the timestamp. In the chain function we will unlock the wait when a
1789    * new buffer is available. The peeked buffer is valid for as long as we hold
1790    * the jitterbuffer lock. */
1791   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1792   if (outbuf == NULL)
1793     goto wait;
1794
1795   /* get the seqnum and the next expected seqnum */
1796   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1797   seqnum = gst_rtp_buffer_get_seq (&rtp);
1798   gst_rtp_buffer_unmap (&rtp);
1799
1800   next_seqnum = priv->next_seqnum;
1801
1802   /* get the timestamp, this is already corrected for clock skew by the
1803    * jitterbuffer */
1804   pts = GST_BUFFER_PTS (outbuf);
1805
1806   /* get the gap between this and the previous packet. If we don't know the
1807    * previous packet seqnum assume no gap. */
1808   if (G_UNLIKELY (next_seqnum == -1)) {
1809     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
1810     /* we don't know what the next_seqnum should be, wait for the last
1811      * possible moment to push this buffer, maybe we get an earlier seqnum
1812      * while we wait */
1813     result = set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
1814   } else {
1815     /* else calculate GAP */
1816     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1817
1818     if (G_LIKELY (gap == 0)) {
1819       /* no missing packet, pop and push */
1820       result = pop_and_push_next (jitterbuffer, seqnum, pts);
1821     } else if (G_UNLIKELY (gap < 0)) {
1822       /* if we have a packet that we already pushed or considered dropped, pop it
1823        * off and get the next packet */
1824       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1825           seqnum, next_seqnum);
1826       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
1827       gst_buffer_unref (outbuf);
1828       goto again;
1829     } else {
1830       GST_DEBUG_OBJECT (jitterbuffer,
1831           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1832           next_seqnum, seqnum, gap);
1833       /* packet missing, estimate when we should ultimately push this packet */
1834       pts = estimate_pts (jitterbuffer, pts, gap);
1835       /* and set a timer for it */
1836       result = set_timer (jitterbuffer, TIMER_TYPE_LOST, next_seqnum, pts);
1837     }
1838   }
1839   return result;
1840
1841 wait:
1842   {
1843     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
1844     return GST_FLOW_WAIT;
1845   }
1846 }
1847
1848 /* a packet is lost */
1849 static GstFlowReturn
1850 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1851     GstClockTimeDiff clock_jitter)
1852 {
1853   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1854   GstClockTime duration = GST_CLOCK_TIME_NONE;
1855   guint32 lost_packets = 1;
1856   gboolean lost_packets_late = FALSE;
1857
1858 #if 0
1859   if (clock_jitter > 0
1860       && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
1861     GstClockTimeDiff total_duration;
1862     GstClockTime out_time_diff;
1863
1864     out_time_diff =
1865         apply_offset (jitterbuffer, timer->timeout) - timer->timeout;
1866     total_duration = MIN (out_time_diff, clock_jitter);
1867
1868     if (duration > 0)
1869       lost_packets = total_duration / duration;
1870     else
1871       lost_packets = gap;
1872     total_duration = lost_packets * duration;
1873
1874     GST_DEBUG_OBJECT (jitterbuffer,
1875         "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
1876         ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
1877         GST_TIME_ARGS (clock_jitter),
1878         lost_packets, GST_TIME_ARGS (total_duration));
1879
1880     duration = total_duration;
1881     lost_packets_late = TRUE;
1882   }
1883 #endif
1884
1885   /* we had a gap and thus we lost some packets. Create an event for this.  */
1886   if (lost_packets > 1)
1887     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
1888         timer->seqnum + lost_packets - 1);
1889   else
1890     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
1891
1892   priv->num_late += lost_packets;
1893   priv->discont = TRUE;
1894
1895   /* update our expected next packet */
1896   priv->last_popped_seqnum = timer->seqnum;
1897   priv->last_out_time = apply_offset (jitterbuffer, timer->timeout);
1898   priv->last_out_pts = timer->timeout;
1899   priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
1900   /* remove timer now */
1901   remove_timer (jitterbuffer, timer);
1902
1903   if (priv->do_lost) {
1904     GstEvent *event;
1905
1906     /* create paket lost event */
1907     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1908         gst_structure_new ("GstRTPPacketLost",
1909             "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
1910             "timestamp", G_TYPE_UINT64, priv->last_out_time,
1911             "duration", G_TYPE_UINT64, duration,
1912             "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
1913     JBUF_UNLOCK (priv);
1914     gst_pad_push_event (priv->srcpad, event);
1915     JBUF_LOCK_CHECK (priv, flushing);
1916   }
1917   return GST_FLOW_OK;
1918
1919   /* ERRORS */
1920 flushing:
1921   {
1922     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1923     return priv->srcresult;
1924   }
1925 }
1926
1927 static GstFlowReturn
1928 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1929 {
1930   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
1931   remove_timer (jitterbuffer, timer);
1932
1933   return GST_FLOW_EOS;
1934 }
1935
1936 /* called when we need to wait for the next timeout.
1937  *
1938  * We loop over the array of recorded timeouts and wait for the earliest one.
1939  * When it timed out, do the logic associated with the timer.
1940  *
1941  * If there are no timers, we wait on a gcond until something new happens.
1942  */
1943 static GstFlowReturn
1944 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
1945 {
1946   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1947   GstFlowReturn result = GST_FLOW_OK;
1948   gint i, len;
1949   TimerData *timer = NULL;
1950
1951   len = priv->timers->len;
1952   for (i = 0; i < len; i++) {
1953     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1954
1955     /* find the smallest timeout */
1956     if (timer == NULL || test->timeout == -1 || test->timeout < timer->timeout)
1957       timer = test;
1958   }
1959   if (timer) {
1960     GstClock *clock;
1961     GstClockTime sync_time;
1962     GstClockID id;
1963     GstClockReturn ret;
1964     GstClockTimeDiff clock_jitter;
1965
1966     /* no timestamp, timeout immeditately */
1967     if (timer->timeout == -1)
1968       goto do_timeout;
1969
1970     GST_OBJECT_LOCK (jitterbuffer);
1971     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1972     if (!clock) {
1973       GST_OBJECT_UNLOCK (jitterbuffer);
1974       /* let's just push if there is no clock */
1975       GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
1976       goto do_timeout;
1977     }
1978
1979     /* prepare for sync against clock */
1980     sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1981     /* add latency of peer to get input time */
1982     sync_time += priv->peer_latency;
1983
1984     /* add our latency and offset to get output times. */
1985     sync_time = apply_offset (jitterbuffer, sync_time);
1986     sync_time += priv->latency_ns;
1987
1988     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
1989         " with sync time %" GST_TIME_FORMAT,
1990         GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time));
1991
1992     /* create an entry for the clock */
1993     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1994     priv->unscheduled = FALSE;
1995     GST_OBJECT_UNLOCK (jitterbuffer);
1996
1997     /* release the lock so that the other end can push stuff or unlock */
1998     JBUF_UNLOCK (priv);
1999
2000     ret = gst_clock_id_wait (id, &clock_jitter);
2001
2002     JBUF_LOCK (priv);
2003     GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, %" G_GINT64_FORMAT,
2004         ret, clock_jitter);
2005     /* and free the entry */
2006     gst_clock_id_unref (id);
2007     priv->clock_id = NULL;
2008
2009     /* at this point, the clock could have been unlocked by a timeout, a new
2010      * tail element was added to the queue or because we are shutting down. Check
2011      * for shutdown first. */
2012     if G_UNLIKELY
2013       ((priv->srcresult != GST_FLOW_OK))
2014           goto flushing;
2015
2016     /* if we got unscheduled and we are not flushing, it's because a new tail
2017      * element became available in the queue or we flushed the queue.
2018      * Grab it and try to push or sync. */
2019     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
2020       GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
2021       goto done;
2022     }
2023   do_timeout:
2024     switch (timer->type) {
2025       case TIMER_TYPE_EXPECTED:
2026         remove_timer (jitterbuffer, timer);
2027         break;
2028       case TIMER_TYPE_LOST:
2029         result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
2030         break;
2031       case TIMER_TYPE_DEADLINE:
2032         priv->next_seqnum = timer->seqnum;
2033         remove_timer (jitterbuffer, timer);
2034         break;
2035       case TIMER_TYPE_EOS:
2036         result = do_eos_timeout (jitterbuffer, timer);
2037         break;
2038     }
2039   } else {
2040     /* no timers, wait for activity */
2041     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2042     priv->waiting = TRUE;
2043     JBUF_WAIT_CHECK (priv, flushing);
2044     priv->waiting = FALSE;
2045     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2046   }
2047
2048 done:
2049   return result;
2050
2051 flushing:
2052   {
2053     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
2054     return priv->srcresult;
2055   }
2056 }
2057
2058 /*
2059  * This funcion implements the main pushing loop on the source pad.
2060  *
2061  * It first tries to push as many buffers as possible. If there is a seqnum
2062  * mismatch, a timeout is created and this function goes waiting for the
2063  * next timeout.
2064  */
2065 static void
2066 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2067 {
2068   GstRtpJitterBufferPrivate *priv;
2069   GstFlowReturn result;
2070
2071   priv = jitterbuffer->priv;
2072
2073   JBUF_LOCK_CHECK (priv, flushing);
2074   do {
2075     result = handle_next_buffer (jitterbuffer);
2076     if (G_LIKELY (result == GST_FLOW_WAIT))
2077       /* now wait for the next event */
2078       result = wait_next_timeout (jitterbuffer);
2079   }
2080   while (result == GST_FLOW_OK);
2081   JBUF_UNLOCK (priv);
2082
2083   /* if we get here we need to pause */
2084   goto pause;
2085
2086   /* ERRORS */
2087 flushing:
2088   {
2089     result = priv->srcresult;
2090     JBUF_UNLOCK (priv);
2091     goto pause;
2092   }
2093 pause:
2094   {
2095     const gchar *reason = gst_flow_get_name (result);
2096     GstEvent *event;
2097
2098     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2099     gst_pad_pause_task (priv->srcpad);
2100     if (result == GST_FLOW_EOS) {
2101       event = gst_event_new_eos ();
2102       gst_pad_push_event (priv->srcpad, event);
2103     }
2104     return;
2105   }
2106 }
2107
2108 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2109  * some sanity checks and then emit the handle-sync signal with the parameters.
2110  * This function must be called with the LOCK */
2111 static void
2112 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2113 {
2114   GstRtpJitterBufferPrivate *priv;
2115   guint64 base_rtptime, base_time;
2116   guint32 clock_rate;
2117   guint64 last_rtptime;
2118   guint64 clock_base;
2119   guint64 ext_rtptime, diff;
2120   gboolean drop = FALSE;
2121
2122   priv = jitterbuffer->priv;
2123
2124   /* get the last values from the jitterbuffer */
2125   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2126       &clock_rate, &last_rtptime);
2127
2128   clock_base = priv->clock_base;
2129   ext_rtptime = priv->ext_rtptime;
2130
2131   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2132       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2133       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2134       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2135
2136   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2137     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2138     drop = TRUE;
2139   } else {
2140     /* we can't accept anything that happened before we did the last resync */
2141     if (base_rtptime > ext_rtptime) {
2142       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2143       drop = TRUE;
2144     } else {
2145       /* the SR RTP timestamp must be something close to what we last observed
2146        * in the jitterbuffer */
2147       if (ext_rtptime > last_rtptime) {
2148         /* check how far ahead it is to our RTP timestamps */
2149         diff = ext_rtptime - last_rtptime;
2150         /* if bigger than 1 second, we drop it */
2151         if (diff > clock_rate) {
2152           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2153           /* should drop this, but some RTSP servers end up with bogus
2154            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2155            * so still trigger rptbin sync but invalidate RTCP data
2156            * (sync might use other methods) */
2157           ext_rtptime = -1;
2158         }
2159         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2160             G_GUINT64_FORMAT, last_rtptime, diff);
2161       }
2162     }
2163   }
2164
2165   if (!drop) {
2166     GstStructure *s;
2167
2168     s = gst_structure_new ("application/x-rtp-sync",
2169         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2170         "base-time", G_TYPE_UINT64, base_time,
2171         "clock-rate", G_TYPE_UINT, clock_rate,
2172         "clock-base", G_TYPE_UINT64, clock_base,
2173         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2174         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2175
2176     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2177     gst_buffer_replace (&priv->last_sr, NULL);
2178     JBUF_UNLOCK (priv);
2179     g_signal_emit (jitterbuffer,
2180         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2181     JBUF_LOCK (priv);
2182     gst_structure_free (s);
2183   } else {
2184     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2185   }
2186 }
2187
2188 static GstFlowReturn
2189 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2190     GstBuffer * buffer)
2191 {
2192   GstRtpJitterBuffer *jitterbuffer;
2193   GstRtpJitterBufferPrivate *priv;
2194   GstFlowReturn ret = GST_FLOW_OK;
2195   guint32 ssrc;
2196   GstRTCPPacket packet;
2197   guint64 ext_rtptime;
2198   guint32 rtptime;
2199   GstRTCPBuffer rtcp = { NULL, };
2200
2201   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2202
2203   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2204     goto invalid_buffer;
2205
2206   priv = jitterbuffer->priv;
2207
2208   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2209
2210   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2211     goto empty_buffer;
2212
2213   /* first packet must be SR or RR or else the validate would have failed */
2214   switch (gst_rtcp_packet_get_type (&packet)) {
2215     case GST_RTCP_TYPE_SR:
2216       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2217           NULL, NULL);
2218       break;
2219     default:
2220       goto ignore_buffer;
2221   }
2222   gst_rtcp_buffer_unmap (&rtcp);
2223
2224   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2225
2226   JBUF_LOCK (priv);
2227   /* convert the RTP timestamp to our extended timestamp, using the same offset
2228    * we used in the jitterbuffer */
2229   ext_rtptime = priv->jbuf->ext_rtptime;
2230   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2231
2232   priv->ext_rtptime = ext_rtptime;
2233   gst_buffer_replace (&priv->last_sr, buffer);
2234
2235   do_handle_sync (jitterbuffer);
2236   JBUF_UNLOCK (priv);
2237
2238 done:
2239   gst_buffer_unref (buffer);
2240
2241   return ret;
2242
2243 invalid_buffer:
2244   {
2245     /* this is not fatal but should be filtered earlier */
2246     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2247         ("Received invalid RTCP payload, dropping"));
2248     ret = GST_FLOW_OK;
2249     goto done;
2250   }
2251 empty_buffer:
2252   {
2253     /* this is not fatal but should be filtered earlier */
2254     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2255         ("Received empty RTCP payload, dropping"));
2256     gst_rtcp_buffer_unmap (&rtcp);
2257     ret = GST_FLOW_OK;
2258     goto done;
2259   }
2260 ignore_buffer:
2261   {
2262     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2263     gst_rtcp_buffer_unmap (&rtcp);
2264     ret = GST_FLOW_OK;
2265     goto done;
2266   }
2267 }
2268
2269 static gboolean
2270 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2271     GstQuery * query)
2272 {
2273   gboolean res = FALSE;
2274
2275   switch (GST_QUERY_TYPE (query)) {
2276     case GST_QUERY_CAPS:
2277     {
2278       GstCaps *filter, *caps;
2279
2280       gst_query_parse_caps (query, &filter);
2281       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2282       gst_query_set_caps_result (query, caps);
2283       gst_caps_unref (caps);
2284       res = TRUE;
2285       break;
2286     }
2287     default:
2288       if (GST_QUERY_IS_SERIALIZED (query)) {
2289         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2290         res = FALSE;
2291       } else {
2292         res = gst_pad_query_default (pad, parent, query);
2293       }
2294       break;
2295   }
2296   return res;
2297 }
2298
2299 static gboolean
2300 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2301     GstQuery * query)
2302 {
2303   GstRtpJitterBuffer *jitterbuffer;
2304   GstRtpJitterBufferPrivate *priv;
2305   gboolean res = FALSE;
2306
2307   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2308   priv = jitterbuffer->priv;
2309
2310   switch (GST_QUERY_TYPE (query)) {
2311     case GST_QUERY_LATENCY:
2312     {
2313       /* We need to send the query upstream and add the returned latency to our
2314        * own */
2315       GstClockTime min_latency, max_latency;
2316       gboolean us_live;
2317       GstClockTime our_latency;
2318
2319       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2320         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2321
2322         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2323             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2324             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2325
2326         /* store this so that we can safely sync on the peer buffers. */
2327         JBUF_LOCK (priv);
2328         priv->peer_latency = min_latency;
2329         our_latency = priv->latency_ns;
2330         JBUF_UNLOCK (priv);
2331
2332         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2333             GST_TIME_ARGS (our_latency));
2334
2335         /* we add some latency but can buffer an infinite amount of time */
2336         min_latency += our_latency;
2337         max_latency = -1;
2338
2339         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2340             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2341             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2342
2343         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2344       }
2345       break;
2346     }
2347     case GST_QUERY_POSITION:
2348     {
2349       GstClockTime start, last_out;
2350       GstFormat fmt;
2351
2352       gst_query_parse_position (query, &fmt, NULL);
2353       if (fmt != GST_FORMAT_TIME) {
2354         res = gst_pad_query_default (pad, parent, query);
2355         break;
2356       }
2357
2358       JBUF_LOCK (priv);
2359       start = priv->npt_start;
2360       last_out = priv->last_out_time;
2361       JBUF_UNLOCK (priv);
2362
2363       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2364           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2365           GST_TIME_ARGS (last_out));
2366
2367       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2368         /* bring 0-based outgoing time to stream time */
2369         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2370         res = TRUE;
2371       } else {
2372         res = gst_pad_query_default (pad, parent, query);
2373       }
2374       break;
2375     }
2376     case GST_QUERY_CAPS:
2377     {
2378       GstCaps *filter, *caps;
2379
2380       gst_query_parse_caps (query, &filter);
2381       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2382       gst_query_set_caps_result (query, caps);
2383       gst_caps_unref (caps);
2384       res = TRUE;
2385       break;
2386     }
2387     default:
2388       res = gst_pad_query_default (pad, parent, query);
2389       break;
2390   }
2391
2392   return res;
2393 }
2394
2395 static void
2396 gst_rtp_jitter_buffer_set_property (GObject * object,
2397     guint prop_id, const GValue * value, GParamSpec * pspec)
2398 {
2399   GstRtpJitterBuffer *jitterbuffer;
2400   GstRtpJitterBufferPrivate *priv;
2401
2402   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2403   priv = jitterbuffer->priv;
2404
2405   switch (prop_id) {
2406     case PROP_LATENCY:
2407     {
2408       guint new_latency, old_latency;
2409
2410       new_latency = g_value_get_uint (value);
2411
2412       JBUF_LOCK (priv);
2413       old_latency = priv->latency_ms;
2414       priv->latency_ms = new_latency;
2415       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2416       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2417       JBUF_UNLOCK (priv);
2418
2419       /* post message if latency changed, this will inform the parent pipeline
2420        * that a latency reconfiguration is possible/needed. */
2421       if (new_latency != old_latency) {
2422         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2423             GST_TIME_ARGS (new_latency * GST_MSECOND));
2424
2425         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2426             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2427       }
2428       break;
2429     }
2430     case PROP_DROP_ON_LATENCY:
2431       JBUF_LOCK (priv);
2432       priv->drop_on_latency = g_value_get_boolean (value);
2433       JBUF_UNLOCK (priv);
2434       break;
2435     case PROP_TS_OFFSET:
2436       JBUF_LOCK (priv);
2437       priv->ts_offset = g_value_get_int64 (value);
2438       priv->ts_discont = TRUE;
2439       JBUF_UNLOCK (priv);
2440       break;
2441     case PROP_DO_LOST:
2442       JBUF_LOCK (priv);
2443       priv->do_lost = g_value_get_boolean (value);
2444       JBUF_UNLOCK (priv);
2445       break;
2446     case PROP_MODE:
2447       JBUF_LOCK (priv);
2448       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2449       JBUF_UNLOCK (priv);
2450       break;
2451     default:
2452       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2453       break;
2454   }
2455 }
2456
2457 static void
2458 gst_rtp_jitter_buffer_get_property (GObject * object,
2459     guint prop_id, GValue * value, GParamSpec * pspec)
2460 {
2461   GstRtpJitterBuffer *jitterbuffer;
2462   GstRtpJitterBufferPrivate *priv;
2463
2464   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2465   priv = jitterbuffer->priv;
2466
2467   switch (prop_id) {
2468     case PROP_LATENCY:
2469       JBUF_LOCK (priv);
2470       g_value_set_uint (value, priv->latency_ms);
2471       JBUF_UNLOCK (priv);
2472       break;
2473     case PROP_DROP_ON_LATENCY:
2474       JBUF_LOCK (priv);
2475       g_value_set_boolean (value, priv->drop_on_latency);
2476       JBUF_UNLOCK (priv);
2477       break;
2478     case PROP_TS_OFFSET:
2479       JBUF_LOCK (priv);
2480       g_value_set_int64 (value, priv->ts_offset);
2481       JBUF_UNLOCK (priv);
2482       break;
2483     case PROP_DO_LOST:
2484       JBUF_LOCK (priv);
2485       g_value_set_boolean (value, priv->do_lost);
2486       JBUF_UNLOCK (priv);
2487       break;
2488     case PROP_MODE:
2489       JBUF_LOCK (priv);
2490       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2491       JBUF_UNLOCK (priv);
2492       break;
2493     case PROP_PERCENT:
2494     {
2495       gint percent;
2496
2497       JBUF_LOCK (priv);
2498       if (priv->srcresult != GST_FLOW_OK)
2499         percent = 100;
2500       else
2501         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2502
2503       g_value_set_int (value, percent);
2504       JBUF_UNLOCK (priv);
2505       break;
2506     }
2507     default:
2508       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2509       break;
2510   }
2511 }