jitterbuffer: use corrected timeout when rescheduling
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 #include <gst/glib-compat-private.h>
71
72 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
73 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
74
75 /* RTPJitterBuffer signals and args */
76 enum
77 {
78   SIGNAL_REQUEST_PT_MAP,
79   SIGNAL_CLEAR_PT_MAP,
80   SIGNAL_HANDLE_SYNC,
81   SIGNAL_ON_NPT_STOP,
82   SIGNAL_SET_ACTIVE,
83   LAST_SIGNAL
84 };
85
86 #define DEFAULT_LATENCY_MS      200
87 #define DEFAULT_DROP_ON_LATENCY FALSE
88 #define DEFAULT_TS_OFFSET       0
89 #define DEFAULT_DO_LOST         FALSE
90 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
91 #define DEFAULT_PERCENT         0
92
93 enum
94 {
95   PROP_0,
96   PROP_LATENCY,
97   PROP_DROP_ON_LATENCY,
98   PROP_TS_OFFSET,
99   PROP_DO_LOST,
100   PROP_MODE,
101   PROP_PERCENT,
102   PROP_LAST
103 };
104
105 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
106
107 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
108   JBUF_LOCK (priv);                                   \
109   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
110     goto label;                                       \
111 } G_STMT_END
112
113 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
114 #define JBUF_WAIT(priv)   (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
115
116 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
117   JBUF_WAIT(priv);                                    \
118   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
119     goto label;                                       \
120 } G_STMT_END
121
122 #define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
123
124 struct _GstRtpJitterBufferPrivate
125 {
126   GstPad *sinkpad, *srcpad;
127   GstPad *rtcpsinkpad;
128
129   RTPJitterBuffer *jbuf;
130   GMutex jbuf_lock;
131   GCond jbuf_cond;
132   gboolean waiting;
133   gboolean discont;
134   gboolean ts_discont;
135   gboolean active;
136   guint64 out_offset;
137
138   /* properties */
139   guint latency_ms;
140   guint64 latency_ns;
141   gboolean drop_on_latency;
142   gint64 ts_offset;
143   gboolean do_lost;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum we push */
148   guint32 next_seqnum;
149   /* last output time */
150   GstClockTime last_out_time;
151   GstClockTime last_out_dts;
152   GstClockTime last_out_pts;
153   /* last valid input timestamp and rtptime pair */
154   GstClockTime last_in_dts;
155   GstClockTime last_in_rtptime;
156   GstClockTime packet_spacing;
157   /* the next expected seqnum we receive */
158   guint32 next_in_seqnum;
159
160   GArray *timers;
161
162   /* start and stop ranges */
163   GstClockTime npt_start;
164   GstClockTime npt_stop;
165   guint64 ext_timestamp;
166   guint64 last_elapsed;
167   guint64 estimated_eos;
168   GstClockID eos_id;
169
170   /* state */
171   gboolean eos;
172
173   /* clock rate and rtp timestamp offset */
174   gint last_pt;
175   gint32 clock_rate;
176   gint64 clock_base;
177   gint64 prev_ts_offset;
178
179   /* when we are shutting down */
180   GstFlowReturn srcresult;
181   gboolean blocked;
182
183   /* for sync */
184   GstSegment segment;
185   GstClockID clock_id;
186   GstClockTime timer_timeout;
187   guint16 timer_seqnum;
188   gboolean unscheduled;
189   /* the latency of the upstream peer, we have to take this into account when
190    * synchronizing the buffers. */
191   GstClockTime peer_latency;
192   guint64 ext_rtptime;
193   GstBuffer *last_sr;
194
195   /* some accounting */
196   guint64 num_late;
197   guint64 num_duplicates;
198 };
199
200 typedef enum
201 {
202   TIMER_TYPE_EXPECTED,
203   TIMER_TYPE_LOST,
204   TIMER_TYPE_DEADLINE,
205   TIMER_TYPE_EOS
206 } TimerType;
207
208 typedef struct
209 {
210   guint idx;
211   guint16 seqnum;
212   TimerType type;
213   GstClockTime timeout;
214 } TimerData;
215
216 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
217   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
218                                 GstRtpJitterBufferPrivate))
219
220 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
221 GST_STATIC_PAD_TEMPLATE ("sink",
222     GST_PAD_SINK,
223     GST_PAD_ALWAYS,
224     GST_STATIC_CAPS ("application/x-rtp, "
225         "clock-rate = (int) [ 1, 2147483647 ]"
226         /* "payload = (int) , "
227          * "encoding-name = (string) "
228          */ )
229     );
230
231 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
232 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
233     GST_PAD_SINK,
234     GST_PAD_REQUEST,
235     GST_STATIC_CAPS ("application/x-rtcp")
236     );
237
238 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
239 GST_STATIC_PAD_TEMPLATE ("src",
240     GST_PAD_SRC,
241     GST_PAD_ALWAYS,
242     GST_STATIC_CAPS ("application/x-rtp"
243         /* "payload = (int) , "
244          * "clock-rate = (int) , "
245          * "encoding-name = (string) "
246          */ )
247     );
248
249 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
250
251 #define gst_rtp_jitter_buffer_parent_class parent_class
252 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
253
254 /* object overrides */
255 static void gst_rtp_jitter_buffer_set_property (GObject * object,
256     guint prop_id, const GValue * value, GParamSpec * pspec);
257 static void gst_rtp_jitter_buffer_get_property (GObject * object,
258     guint prop_id, GValue * value, GParamSpec * pspec);
259 static void gst_rtp_jitter_buffer_finalize (GObject * object);
260
261 /* element overrides */
262 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
263     * element, GstStateChange transition);
264 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
265     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
266 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
267     GstPad * pad);
268 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
269
270 /* pad overrides */
271 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
272 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
273     GstObject * parent);
274
275 /* sinkpad overrides */
276 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
277     GstObject * parent, GstEvent * event);
278 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
279     GstObject * parent, GstBuffer * buffer);
280
281 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
282     GstObject * parent, GstEvent * event);
283 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
284     GstObject * parent, GstBuffer * buffer);
285
286 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
287     GstObject * parent, GstQuery * query);
288
289 /* srcpad overrides */
290 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
291     GstObject * parent, GstEvent * event);
292 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
293     GstObject * parent, GstPadMode mode, gboolean active);
294 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
295 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
296     GstObject * parent, GstQuery * query);
297
298 static void
299 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
300 static GstClockTime
301 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
302     gboolean active, guint64 base_time);
303 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
304
305 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
306 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
307
308 static void
309 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
310 {
311   GObjectClass *gobject_class;
312   GstElementClass *gstelement_class;
313
314   gobject_class = (GObjectClass *) klass;
315   gstelement_class = (GstElementClass *) klass;
316
317   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
318
319   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
320
321   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
322   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
323
324   /**
325    * GstRtpJitterBuffer::latency:
326    *
327    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
328    * for at most this time.
329    */
330   g_object_class_install_property (gobject_class, PROP_LATENCY,
331       g_param_spec_uint ("latency", "Buffer latency in ms",
332           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
333           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334   /**
335    * GstRtpJitterBuffer::drop-on-latency:
336    *
337    * Drop oldest buffers when the queue is completely filled.
338    */
339   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
340       g_param_spec_boolean ("drop-on-latency",
341           "Drop buffers when maximum latency is reached",
342           "Tells the jitterbuffer to never exceed the given latency in size",
343           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344   /**
345    * GstRtpJitterBuffer::ts-offset:
346    *
347    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
348    * This is mainly used to ensure interstream synchronisation.
349    */
350   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
351       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
352           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
353           G_MAXINT64, DEFAULT_TS_OFFSET,
354           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355
356   /**
357    * GstRtpJitterBuffer::do-lost:
358    *
359    * Send out a GstRTPPacketLost event downstream when a packet is considered
360    * lost.
361    */
362   g_object_class_install_property (gobject_class, PROP_DO_LOST,
363       g_param_spec_boolean ("do-lost", "Do Lost",
364           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367   /**
368    * GstRtpJitterBuffer::mode:
369    *
370    * Control the buffering and timestamping mode used by the jitterbuffer.
371    */
372   g_object_class_install_property (gobject_class, PROP_MODE,
373       g_param_spec_enum ("mode", "Mode",
374           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
375           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
376   /**
377    * GstRtpJitterBuffer::percent:
378    *
379    * The percent of the jitterbuffer that is filled.
380    *
381    * Since: 0.10.19
382    */
383   g_object_class_install_property (gobject_class, PROP_PERCENT,
384       g_param_spec_int ("percent", "percent",
385           "The buffer filled percent", 0, 100,
386           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
387   /**
388    * GstRtpJitterBuffer::request-pt-map:
389    * @buffer: the object which received the signal
390    * @pt: the pt
391    *
392    * Request the payload type as #GstCaps for @pt.
393    */
394   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
395       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
396       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
397           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
398       GST_TYPE_CAPS, 1, G_TYPE_UINT);
399   /**
400    * GstRtpJitterBuffer::handle-sync:
401    * @buffer: the object which received the signal
402    * @struct: a GstStructure containing sync values.
403    *
404    * Be notified of new sync values.
405    */
406   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
407       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
408       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
409           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
410       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
411
412   /**
413    * GstRtpJitterBuffer::on-npt-stop
414    * @buffer: the object which received the signal
415    *
416    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
417    * the npt-stop position.
418    */
419   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
420       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
422           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
423       G_TYPE_NONE, 0, G_TYPE_NONE);
424
425   /**
426    * GstRtpJitterBuffer::clear-pt-map:
427    * @buffer: the object which received the signal
428    *
429    * Invalidate the clock-rate as obtained with the
430    * #GstRtpJitterBuffer::request-pt-map signal.
431    */
432   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
433       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
434       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
435       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
436       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
437
438   /**
439    * GstRtpJitterBuffer::set-active:
440    * @buffer: the object which received the signal
441    *
442    * Start pushing out packets with the given base time. This signal is only
443    * useful in buffering mode.
444    *
445    * Returns: the time of the last pushed packet.
446    *
447    * Since: 0.10.19
448    */
449   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
450       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
451       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
452       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
453       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
454       G_TYPE_UINT64);
455
456   gstelement_class->change_state =
457       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
458   gstelement_class->request_new_pad =
459       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
460   gstelement_class->release_pad =
461       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
462   gstelement_class->provide_clock =
463       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
464
465   gst_element_class_add_pad_template (gstelement_class,
466       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
467   gst_element_class_add_pad_template (gstelement_class,
468       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
469   gst_element_class_add_pad_template (gstelement_class,
470       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
471
472   gst_element_class_set_static_metadata (gstelement_class,
473       "RTP packet jitter-buffer", "Filter/Network/RTP",
474       "A buffer that deals with network jitter and other transmission faults",
475       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
476       "Wim Taymans <wim.taymans@gmail.com>");
477
478   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
479   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
480
481   GST_DEBUG_CATEGORY_INIT
482       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
483 }
484
485 static void
486 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
487 {
488   GstRtpJitterBufferPrivate *priv;
489
490   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
491   jitterbuffer->priv = priv;
492
493   priv->latency_ms = DEFAULT_LATENCY_MS;
494   priv->latency_ns = priv->latency_ms * GST_MSECOND;
495   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
496   priv->do_lost = DEFAULT_DO_LOST;
497   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
498
499   priv->jbuf = rtp_jitter_buffer_new ();
500   g_mutex_init (&priv->jbuf_lock);
501   g_cond_init (&priv->jbuf_cond);
502
503   /* reset skew detection initialy */
504   rtp_jitter_buffer_reset_skew (priv->jbuf);
505   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
506   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
507   priv->active = TRUE;
508
509   priv->srcpad =
510       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
511       "src");
512
513   gst_pad_set_activatemode_function (priv->srcpad,
514       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
515   gst_pad_set_query_function (priv->srcpad,
516       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
517   gst_pad_set_event_function (priv->srcpad,
518       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
519
520   priv->sinkpad =
521       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
522       "sink");
523
524   gst_pad_set_chain_function (priv->sinkpad,
525       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
526   gst_pad_set_event_function (priv->sinkpad,
527       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
528   gst_pad_set_query_function (priv->sinkpad,
529       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
530
531   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
532   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
533
534   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
535 }
536
537 static void
538 gst_rtp_jitter_buffer_finalize (GObject * object)
539 {
540   GstRtpJitterBuffer *jitterbuffer;
541
542   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
543
544   g_array_free (jitterbuffer->priv->timers, TRUE);
545   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
546   g_cond_clear (&jitterbuffer->priv->jbuf_cond);
547
548   g_object_unref (jitterbuffer->priv->jbuf);
549
550   G_OBJECT_CLASS (parent_class)->finalize (object);
551 }
552
553 static GstIterator *
554 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
555 {
556   GstRtpJitterBuffer *jitterbuffer;
557   GstPad *otherpad = NULL;
558   GstIterator *it;
559   GValue val = { 0, };
560
561   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
562
563   if (pad == jitterbuffer->priv->sinkpad) {
564     otherpad = jitterbuffer->priv->srcpad;
565   } else if (pad == jitterbuffer->priv->srcpad) {
566     otherpad = jitterbuffer->priv->sinkpad;
567   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
568     otherpad = NULL;
569   }
570
571   g_value_init (&val, GST_TYPE_PAD);
572   g_value_set_object (&val, otherpad);
573   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
574   g_value_unset (&val);
575
576   return it;
577 }
578
579 static GstPad *
580 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
581 {
582   GstRtpJitterBufferPrivate *priv;
583
584   priv = jitterbuffer->priv;
585
586   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
587
588   priv->rtcpsinkpad =
589       gst_pad_new_from_static_template
590       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
591   gst_pad_set_chain_function (priv->rtcpsinkpad,
592       gst_rtp_jitter_buffer_chain_rtcp);
593   gst_pad_set_event_function (priv->rtcpsinkpad,
594       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
595   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
596       gst_rtp_jitter_buffer_iterate_internal_links);
597   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
598   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
599
600   return priv->rtcpsinkpad;
601 }
602
603 static void
604 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
605 {
606   GstRtpJitterBufferPrivate *priv;
607
608   priv = jitterbuffer->priv;
609
610   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
611
612   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
613
614   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
615   priv->rtcpsinkpad = NULL;
616 }
617
618 static GstPad *
619 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
620     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
621 {
622   GstRtpJitterBuffer *jitterbuffer;
623   GstElementClass *klass;
624   GstPad *result;
625   GstRtpJitterBufferPrivate *priv;
626
627   g_return_val_if_fail (templ != NULL, NULL);
628   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
629
630   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
631   priv = jitterbuffer->priv;
632   klass = GST_ELEMENT_GET_CLASS (element);
633
634   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
635
636   /* figure out the template */
637   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
638     if (priv->rtcpsinkpad != NULL)
639       goto exists;
640
641     result = create_rtcp_sink (jitterbuffer);
642   } else
643     goto wrong_template;
644
645   return result;
646
647   /* ERRORS */
648 wrong_template:
649   {
650     g_warning ("gstrtpjitterbuffer: this is not our template");
651     return NULL;
652   }
653 exists:
654   {
655     g_warning ("gstrtpjitterbuffer: pad already requested");
656     return NULL;
657   }
658 }
659
660 static void
661 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
662 {
663   GstRtpJitterBuffer *jitterbuffer;
664   GstRtpJitterBufferPrivate *priv;
665
666   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
667   g_return_if_fail (GST_IS_PAD (pad));
668
669   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
670   priv = jitterbuffer->priv;
671
672   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
673
674   if (priv->rtcpsinkpad == pad) {
675     remove_rtcp_sink (jitterbuffer);
676   } else
677     goto wrong_pad;
678
679   return;
680
681   /* ERRORS */
682 wrong_pad:
683   {
684     g_warning ("gstjitterbuffer: asked to release an unknown pad");
685     return;
686   }
687 }
688
689 static GstClock *
690 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
691 {
692   return gst_system_clock_obtain ();
693 }
694
695 static void
696 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
697 {
698   GstRtpJitterBufferPrivate *priv;
699
700   priv = jitterbuffer->priv;
701
702   /* this will trigger a new pt-map request signal, FIXME, do something better. */
703
704   JBUF_LOCK (priv);
705   priv->clock_rate = -1;
706   /* do not clear current content, but refresh state for new arrival */
707   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
708   rtp_jitter_buffer_reset_skew (priv->jbuf);
709   priv->last_popped_seqnum = -1;
710   priv->next_seqnum = -1;
711   JBUF_UNLOCK (priv);
712 }
713
714 static GstClockTime
715 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
716     guint64 offset)
717 {
718   GstRtpJitterBufferPrivate *priv;
719   GstClockTime last_out;
720   GstBuffer *head;
721
722   priv = jbuf->priv;
723
724   JBUF_LOCK (priv);
725   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
726       active, GST_TIME_ARGS (offset));
727
728   if (active != priv->active) {
729     /* add the amount of time spent in paused to the output offset. All
730      * outgoing buffers will have this offset applied to their timestamps in
731      * order to make them arrive in time in the sink. */
732     priv->out_offset = offset;
733     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
734         GST_TIME_ARGS (priv->out_offset));
735     priv->active = active;
736     JBUF_SIGNAL (priv);
737   }
738   if (!active) {
739     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
740   }
741   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
742     /* head buffer timestamp and offset gives our output time */
743     last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
744   } else {
745     /* use last known time when the buffer is empty */
746     last_out = priv->last_out_time;
747   }
748   JBUF_UNLOCK (priv);
749
750   return last_out;
751 }
752
753 static GstCaps *
754 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
755 {
756   GstRtpJitterBuffer *jitterbuffer;
757   GstRtpJitterBufferPrivate *priv;
758   GstPad *other;
759   GstCaps *caps;
760   GstCaps *templ;
761
762   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
763   priv = jitterbuffer->priv;
764
765   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
766
767   caps = gst_pad_peer_query_caps (other, filter);
768
769   templ = gst_pad_get_pad_template_caps (pad);
770   if (caps == NULL) {
771     GST_DEBUG_OBJECT (jitterbuffer, "use template");
772     caps = templ;
773   } else {
774     GstCaps *intersect;
775
776     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
777
778     intersect = gst_caps_intersect (caps, templ);
779     gst_caps_unref (caps);
780     gst_caps_unref (templ);
781
782     caps = intersect;
783   }
784   gst_object_unref (jitterbuffer);
785
786   return caps;
787 }
788
789 /*
790  * Must be called with JBUF_LOCK held
791  */
792
793 static gboolean
794 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
795     GstCaps * caps)
796 {
797   GstRtpJitterBufferPrivate *priv;
798   GstStructure *caps_struct;
799   guint val;
800   GstClockTime tval;
801
802   priv = jitterbuffer->priv;
803
804   /* first parse the caps */
805   caps_struct = gst_caps_get_structure (caps, 0);
806
807   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
808
809   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
810    * measure the amount of data in the buffer */
811   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
812     goto error;
813
814   if (priv->clock_rate <= 0)
815     goto wrong_rate;
816
817   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
818
819   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
820    * can use this to track the amount of time elapsed on the sender. */
821   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
822     priv->clock_base = val;
823   else
824     priv->clock_base = -1;
825
826   priv->ext_timestamp = priv->clock_base;
827
828   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
829       priv->clock_base);
830
831   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
832     /* first expected seqnum, only update when we didn't have a previous base. */
833     if (priv->next_in_seqnum == -1)
834       priv->next_in_seqnum = val;
835     if (priv->next_seqnum == -1)
836       priv->next_seqnum = val;
837   }
838
839   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
840
841   /* the start and stop times. The seqnum-base corresponds to the start time. We
842    * will keep track of the seqnums on the output and when we reach the one
843    * corresponding to npt-stop, we emit the npt-stop-reached signal */
844   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
845     priv->npt_start = tval;
846   else
847     priv->npt_start = 0;
848
849   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
850     priv->npt_stop = tval;
851   else
852     priv->npt_stop = -1;
853
854   GST_DEBUG_OBJECT (jitterbuffer,
855       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
856       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
857
858   return TRUE;
859
860   /* ERRORS */
861 error:
862   {
863     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
864     return FALSE;
865   }
866 wrong_rate:
867   {
868     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
869     return FALSE;
870   }
871 }
872
873 static void
874 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
875 {
876   GstRtpJitterBufferPrivate *priv;
877
878   priv = jitterbuffer->priv;
879
880   JBUF_LOCK (priv);
881   /* mark ourselves as flushing */
882   priv->srcresult = GST_FLOW_FLUSHING;
883   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
884   /* this unblocks any waiting pops on the src pad task */
885   JBUF_SIGNAL (priv);
886   /* unlock clock, we just unschedule, the entry will be released by the
887    * locking streaming thread. */
888   unschedule_current_timer (jitterbuffer);
889   JBUF_UNLOCK (priv);
890 }
891
892 static void
893 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
894 {
895   GstRtpJitterBufferPrivate *priv;
896
897   priv = jitterbuffer->priv;
898
899   JBUF_LOCK (priv);
900   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
901   /* Mark as non flushing */
902   priv->srcresult = GST_FLOW_OK;
903   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
904   priv->last_popped_seqnum = -1;
905   priv->last_out_time = -1;
906   priv->last_out_dts = -1;
907   priv->last_out_pts = -1;
908   priv->next_seqnum = -1;
909   priv->last_in_rtptime = -1;
910   priv->last_in_dts = GST_CLOCK_TIME_NONE;
911   priv->packet_spacing = 0;
912   priv->next_in_seqnum = -1;
913   priv->clock_rate = -1;
914   priv->eos = FALSE;
915   priv->estimated_eos = -1;
916   priv->last_elapsed = 0;
917   priv->ext_timestamp = -1;
918   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
919   rtp_jitter_buffer_flush (priv->jbuf);
920   rtp_jitter_buffer_reset_skew (priv->jbuf);
921   remove_all_timers (jitterbuffer);
922   JBUF_UNLOCK (priv);
923 }
924
925 static gboolean
926 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
927     GstPadMode mode, gboolean active)
928 {
929   gboolean result;
930   GstRtpJitterBuffer *jitterbuffer = NULL;
931
932   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
933
934   switch (mode) {
935     case GST_PAD_MODE_PUSH:
936       if (active) {
937         /* allow data processing */
938         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
939
940         /* start pushing out buffers */
941         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
942         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
943             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
944       } else {
945         /* make sure all data processing stops ASAP */
946         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
947
948         /* NOTE this will hardlock if the state change is called from the src pad
949          * task thread because we will _join() the thread. */
950         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
951         result = gst_pad_stop_task (pad);
952       }
953       break;
954     default:
955       result = FALSE;
956       break;
957   }
958   return result;
959 }
960
961 static GstStateChangeReturn
962 gst_rtp_jitter_buffer_change_state (GstElement * element,
963     GstStateChange transition)
964 {
965   GstRtpJitterBuffer *jitterbuffer;
966   GstRtpJitterBufferPrivate *priv;
967   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
968
969   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
970   priv = jitterbuffer->priv;
971
972   switch (transition) {
973     case GST_STATE_CHANGE_NULL_TO_READY:
974       break;
975     case GST_STATE_CHANGE_READY_TO_PAUSED:
976       JBUF_LOCK (priv);
977       /* reset negotiated values */
978       priv->clock_rate = -1;
979       priv->clock_base = -1;
980       priv->peer_latency = 0;
981       priv->last_pt = -1;
982       /* block until we go to PLAYING */
983       priv->blocked = TRUE;
984       JBUF_UNLOCK (priv);
985       break;
986     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
987       JBUF_LOCK (priv);
988       /* unblock to allow streaming in PLAYING */
989       priv->blocked = FALSE;
990       JBUF_SIGNAL (priv);
991       JBUF_UNLOCK (priv);
992       break;
993     default:
994       break;
995   }
996
997   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
998
999   switch (transition) {
1000     case GST_STATE_CHANGE_READY_TO_PAUSED:
1001       /* we are a live element because we sync to the clock, which we can only
1002        * do in the PLAYING state */
1003       if (ret != GST_STATE_CHANGE_FAILURE)
1004         ret = GST_STATE_CHANGE_NO_PREROLL;
1005       break;
1006     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1007       JBUF_LOCK (priv);
1008       /* block to stop streaming when PAUSED */
1009       priv->blocked = TRUE;
1010       JBUF_UNLOCK (priv);
1011       if (ret != GST_STATE_CHANGE_FAILURE)
1012         ret = GST_STATE_CHANGE_NO_PREROLL;
1013       break;
1014     case GST_STATE_CHANGE_PAUSED_TO_READY:
1015       gst_buffer_replace (&priv->last_sr, NULL);
1016       break;
1017     case GST_STATE_CHANGE_READY_TO_NULL:
1018       break;
1019     default:
1020       break;
1021   }
1022
1023   return ret;
1024 }
1025
1026 static gboolean
1027 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1028     GstEvent * event)
1029 {
1030   gboolean ret = TRUE;
1031   GstRtpJitterBuffer *jitterbuffer;
1032   GstRtpJitterBufferPrivate *priv;
1033
1034   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1035   priv = jitterbuffer->priv;
1036
1037   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1038
1039   switch (GST_EVENT_TYPE (event)) {
1040     case GST_EVENT_LATENCY:
1041     {
1042       GstClockTime latency;
1043
1044       gst_event_parse_latency (event, &latency);
1045
1046       GST_DEBUG_OBJECT (jitterbuffer,
1047           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1048
1049       JBUF_LOCK (priv);
1050       /* adjust the overall buffer delay to the total pipeline latency in
1051        * buffering mode because if downstream consumes too fast (because of
1052        * large latency or queues, we would start rebuffering again. */
1053       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1054           RTP_JITTER_BUFFER_MODE_BUFFER) {
1055         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1056       }
1057       JBUF_UNLOCK (priv);
1058
1059       ret = gst_pad_push_event (priv->sinkpad, event);
1060       break;
1061     }
1062     default:
1063       ret = gst_pad_push_event (priv->sinkpad, event);
1064       break;
1065   }
1066
1067   return ret;
1068 }
1069
1070 static gboolean
1071 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1072     GstEvent * event)
1073 {
1074   gboolean ret = TRUE;
1075   GstRtpJitterBuffer *jitterbuffer;
1076   GstRtpJitterBufferPrivate *priv;
1077
1078   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1079   priv = jitterbuffer->priv;
1080
1081   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1082
1083   switch (GST_EVENT_TYPE (event)) {
1084     case GST_EVENT_CAPS:
1085     {
1086       GstCaps *caps;
1087
1088       gst_event_parse_caps (event, &caps);
1089
1090       JBUF_LOCK (priv);
1091       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1092       JBUF_UNLOCK (priv);
1093
1094       /* set same caps on srcpad on success */
1095       if (ret)
1096         ret = gst_pad_push_event (priv->srcpad, event);
1097       else
1098         gst_event_unref (event);
1099       break;
1100     }
1101     case GST_EVENT_SEGMENT:
1102     {
1103       gst_event_copy_segment (event, &priv->segment);
1104
1105       /* we need time for now */
1106       if (priv->segment.format != GST_FORMAT_TIME)
1107         goto newseg_wrong_format;
1108
1109       GST_DEBUG_OBJECT (jitterbuffer,
1110           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1111
1112       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1113       ret = gst_pad_push_event (priv->srcpad, event);
1114       break;
1115     }
1116     case GST_EVENT_FLUSH_START:
1117       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1118       ret = gst_pad_push_event (priv->srcpad, event);
1119       break;
1120     case GST_EVENT_FLUSH_STOP:
1121       ret = gst_pad_push_event (priv->srcpad, event);
1122       ret =
1123           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1124           GST_PAD_MODE_PUSH, TRUE);
1125       break;
1126     case GST_EVENT_EOS:
1127     {
1128       /* push EOS in queue. We always push it at the head */
1129       JBUF_LOCK (priv);
1130       /* check for flushing, we need to discard the event and return FALSE when
1131        * we are flushing */
1132       ret = priv->srcresult == GST_FLOW_OK;
1133       if (ret && !priv->eos) {
1134         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1135         priv->eos = TRUE;
1136         JBUF_SIGNAL (priv);
1137       } else if (priv->eos) {
1138         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1139       } else {
1140         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1141             gst_flow_get_name (priv->srcresult));
1142       }
1143       JBUF_UNLOCK (priv);
1144       gst_event_unref (event);
1145       break;
1146     }
1147     default:
1148       ret = gst_pad_push_event (priv->srcpad, event);
1149       break;
1150   }
1151
1152 done:
1153
1154   return ret;
1155
1156   /* ERRORS */
1157 newseg_wrong_format:
1158   {
1159     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1160     ret = FALSE;
1161     gst_event_unref (event);
1162     goto done;
1163   }
1164 }
1165
1166 static gboolean
1167 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1168     GstEvent * event)
1169 {
1170   gboolean ret = TRUE;
1171   GstRtpJitterBuffer *jitterbuffer;
1172
1173   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1174
1175   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1176
1177   switch (GST_EVENT_TYPE (event)) {
1178     case GST_EVENT_FLUSH_START:
1179       gst_event_unref (event);
1180       break;
1181     case GST_EVENT_FLUSH_STOP:
1182       gst_event_unref (event);
1183       break;
1184     default:
1185       ret = gst_pad_event_default (pad, parent, event);
1186       break;
1187   }
1188
1189   return ret;
1190 }
1191
1192 /*
1193  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1194  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1195  * GST_FLOW_FLUSHING when the element is shutting down. On success
1196  * GST_FLOW_OK is returned.
1197  */
1198 static GstFlowReturn
1199 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1200     guint8 pt)
1201 {
1202   GValue ret = { 0 };
1203   GValue args[2] = { {0}, {0} };
1204   GstCaps *caps;
1205   gboolean res;
1206
1207   g_value_init (&args[0], GST_TYPE_ELEMENT);
1208   g_value_set_object (&args[0], jitterbuffer);
1209   g_value_init (&args[1], G_TYPE_UINT);
1210   g_value_set_uint (&args[1], pt);
1211
1212   g_value_init (&ret, GST_TYPE_CAPS);
1213   g_value_set_boxed (&ret, NULL);
1214
1215   JBUF_UNLOCK (jitterbuffer->priv);
1216   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1217       &ret);
1218   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1219
1220   g_value_unset (&args[0]);
1221   g_value_unset (&args[1]);
1222   caps = (GstCaps *) g_value_dup_boxed (&ret);
1223   g_value_unset (&ret);
1224   if (!caps)
1225     goto no_caps;
1226
1227   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1228   gst_caps_unref (caps);
1229
1230   if (G_UNLIKELY (!res))
1231     goto parse_failed;
1232
1233   return GST_FLOW_OK;
1234
1235   /* ERRORS */
1236 no_caps:
1237   {
1238     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1239     return GST_FLOW_ERROR;
1240   }
1241 out_flushing:
1242   {
1243     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1244     return GST_FLOW_FLUSHING;
1245   }
1246 parse_failed:
1247   {
1248     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1249     return GST_FLOW_ERROR;
1250   }
1251 }
1252
1253 /* call with jbuf lock held */
1254 static void
1255 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1256 {
1257   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1258
1259   /* too short a stream, or too close to EOS will never really fill buffer */
1260   if (*percent != -1 && priv->npt_stop != -1 &&
1261       priv->npt_stop - priv->npt_start <=
1262       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1263     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1264     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1265     *percent = 100;
1266   }
1267 }
1268
1269 static void
1270 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1271 {
1272   GstMessage *message;
1273
1274   /* Post a buffering message */
1275   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1276   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1277
1278   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1279 }
1280
1281 static GstClockTime
1282 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1283 {
1284   GstRtpJitterBufferPrivate *priv;
1285
1286   priv = jitterbuffer->priv;
1287
1288   if (timestamp == -1)
1289     return -1;
1290
1291   /* apply the timestamp offset, this is used for inter stream sync */
1292   timestamp += priv->ts_offset;
1293   /* add the offset, this is used when buffering */
1294   timestamp += priv->out_offset;
1295
1296   return timestamp;
1297 }
1298
1299 static TimerData *
1300 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1301 {
1302   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1303   TimerData *timer = NULL;
1304   gint i, len;
1305
1306   len = priv->timers->len;
1307   for (i = 0; i < len; i++) {
1308     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1309     if (test->seqnum == seqnum && test->type == type) {
1310       timer = test;
1311       break;
1312     }
1313   }
1314   return timer;
1315 }
1316
1317 static void
1318 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1319 {
1320   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1321
1322   if (priv->clock_id) {
1323     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1324     gst_clock_id_unschedule (priv->clock_id);
1325     priv->unscheduled = TRUE;
1326   }
1327 }
1328
1329 static GstClockTime
1330 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1331 {
1332   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1333   GstClockTime test_timeout;
1334
1335   if ((test_timeout = timer->timeout) == -1)
1336     return -1;
1337
1338   if (timer->type != TIMER_TYPE_EXPECTED) {
1339     /* add our latency and offset to get output times. */
1340     test_timeout = apply_offset (jitterbuffer, test_timeout);
1341     test_timeout += priv->latency_ns;
1342   }
1343   return test_timeout;
1344 }
1345
1346 static void
1347 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1348 {
1349   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1350
1351   if (priv->clock_id) {
1352     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1353
1354     if (timeout == -1 || timeout < priv->timer_timeout)
1355       unschedule_current_timer (jitterbuffer);
1356   }
1357 }
1358
1359 static TimerData *
1360 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1361     guint16 seqnum, GstClockTime timeout)
1362 {
1363   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1364   TimerData *timer;
1365   gint len;
1366
1367   GST_DEBUG_OBJECT (jitterbuffer,
1368       "add timer for seqnum %d to %" GST_TIME_FORMAT,
1369       seqnum, GST_TIME_ARGS (timeout));
1370
1371   len = priv->timers->len;
1372   g_array_set_size (priv->timers, len + 1);
1373   timer = &g_array_index (priv->timers, TimerData, len);
1374   timer->idx = len;
1375   timer->type = type;
1376   timer->seqnum = seqnum;
1377   timer->timeout = timeout;
1378
1379   recalculate_timer (jitterbuffer, timer);
1380
1381   return timer;
1382 }
1383
1384 static void
1385 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1386     guint16 seqnum, GstClockTime timeout)
1387 {
1388   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1389   gboolean seqchange, timechange;
1390   guint16 oldseq;
1391
1392   seqchange = timer->seqnum != seqnum;
1393   timechange = timer->timeout != timeout;
1394
1395   if (!seqchange && !timechange)
1396     return;
1397
1398   oldseq = timer->seqnum;
1399
1400   GST_DEBUG_OBJECT (jitterbuffer,
1401       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1402       oldseq, seqnum, GST_TIME_ARGS (timeout));
1403
1404   timer->timeout = timeout;
1405   timer->seqnum = seqnum;
1406
1407   if (priv->clock_id) {
1408     /* we changed the seqnum and there is a timer currently waiting with this
1409      * seqnum, unschedule it */
1410     if (seqchange && priv->timer_seqnum == oldseq)
1411       unschedule_current_timer (jitterbuffer);
1412     /* we changed the time, check if it is earlier than what we are waiting
1413      * for and unschedule if so */
1414     else if (timechange)
1415       recalculate_timer (jitterbuffer, timer);
1416   }
1417 }
1418
1419 static TimerData *
1420 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1421     guint16 seqnum, GstClockTime timeout)
1422 {
1423   TimerData *timer;
1424
1425   /* find the seqnum timer */
1426   timer = find_timer (jitterbuffer, type, seqnum);
1427   if (timer == NULL) {
1428     timer = add_timer (jitterbuffer, type, seqnum, timeout);
1429   } else {
1430     reschedule_timer (jitterbuffer, timer, seqnum, timeout);
1431   }
1432   return timer;
1433 }
1434
1435 static void
1436 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1437 {
1438   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1439   guint idx;
1440
1441   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1442     unschedule_current_timer (jitterbuffer);
1443
1444   idx = timer->idx;
1445   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1446   g_array_remove_index_fast (priv->timers, idx);
1447   timer->idx = idx;
1448 }
1449
1450 static void
1451 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1452 {
1453   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1454   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1455   g_array_set_size (priv->timers, 0);
1456   unschedule_current_timer (jitterbuffer);
1457 }
1458
1459 /* we just received a packet with seqnum and dts.
1460  *
1461  * First check for old seqnum that we are still expecting. If the gap with the
1462  * current timestamp is too big, unschedule the timeouts.
1463  *
1464  * If we have a valid packet spacing estimate we can set a timer for when we
1465  * should receive the next packet.
1466  * If we don't have a valid estimate, we remove any timer we might have
1467  * had for this packet.
1468  */
1469 static void
1470 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1471     GstClockTime dts)
1472 {
1473   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1474   TimerData *timer = NULL;
1475   gint i, len;
1476
1477   /* go through all timers and unschedule the ones with a large gap, also find
1478    * the timer for the seqnum */
1479   len = priv->timers->len;
1480   for (i = 0; i < len; i++) {
1481     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1482     gint gap;
1483
1484     if (test->type != TIMER_TYPE_EXPECTED)
1485       continue;
1486
1487     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1488
1489     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1490         test->seqnum, seqnum, gap);
1491
1492     if (gap == 0) {
1493       /* the timer for the current seqnum */
1494       timer = test;
1495     } else if (gap > 5) {
1496       /* max gap, we exceeded the max reorder distance and we don't expect the
1497        * missing packet to be this reordered */
1498       reschedule_timer (jitterbuffer, test, test->seqnum, -1);
1499     }
1500   }
1501
1502   if (priv->packet_spacing > 0) {
1503     GstClockTime expected;
1504
1505     /* calculate expected arrival time of the next seqnum */
1506     expected = dts + priv->packet_spacing + 20 * GST_MSECOND;
1507     /* and update/install timer for next seqnum */
1508     if (timer)
1509       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected);
1510     else
1511       timer = add_timer (jitterbuffer, TIMER_TYPE_EXPECTED,
1512           priv->next_in_seqnum, expected);
1513   } else if (timer) {
1514     /* if we had a timer, remove it, we don't know when to expect the next
1515      * packet. */
1516     remove_timer (jitterbuffer, timer);
1517   }
1518 }
1519
1520 static GstFlowReturn
1521 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1522     GstBuffer * buffer)
1523 {
1524   GstRtpJitterBuffer *jitterbuffer;
1525   GstRtpJitterBufferPrivate *priv;
1526   guint16 seqnum;
1527   guint32 rtptime;
1528   GstFlowReturn ret = GST_FLOW_OK;
1529   GstClockTime dts, pts;
1530   guint64 latency_ts;
1531   gboolean tail;
1532   gint percent = -1;
1533   guint8 pt;
1534   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1535
1536   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1537
1538   priv = jitterbuffer->priv;
1539
1540   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1541     goto invalid_buffer;
1542
1543   pt = gst_rtp_buffer_get_payload_type (&rtp);
1544   seqnum = gst_rtp_buffer_get_seq (&rtp);
1545   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1546   gst_rtp_buffer_unmap (&rtp);
1547
1548   /* make sure we have PTS and DTS set */
1549   pts = GST_BUFFER_PTS (buffer);
1550   dts = GST_BUFFER_DTS (buffer);
1551   if (dts == -1)
1552     dts = pts;
1553   else if (pts == -1)
1554     pts = dts;
1555
1556   /* take the DTS of the buffer. This is the time when the packet was
1557    * received and is used to calculate jitter and clock skew. We will adjust
1558    * this PTS with the smoothed value after processing it in the
1559    * jitterbuffer and assign it as the PTS. */
1560   /* bring to running time */
1561   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1562
1563   GST_DEBUG_OBJECT (jitterbuffer,
1564       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1565       GST_TIME_ARGS (dts));
1566
1567   JBUF_LOCK_CHECK (priv, out_flushing);
1568
1569   if (G_UNLIKELY (priv->last_pt != pt)) {
1570     GstCaps *caps;
1571
1572     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1573         pt);
1574
1575     priv->last_pt = pt;
1576     /* reset clock-rate so that we get a new one */
1577     priv->clock_rate = -1;
1578
1579     /* Try to get the clock-rate from the caps first if we can. If there are no
1580      * caps we must fire the signal to get the clock-rate. */
1581     if ((caps = gst_pad_get_current_caps (pad))) {
1582       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1583       gst_caps_unref (caps);
1584     }
1585   }
1586
1587   if (G_UNLIKELY (priv->clock_rate == -1)) {
1588     /* no clock rate given on the caps, try to get one with the signal */
1589     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1590             pt) == GST_FLOW_FLUSHING)
1591       goto out_flushing;
1592
1593     if (G_UNLIKELY (priv->clock_rate == -1))
1594       goto no_clock_rate;
1595   }
1596
1597   /* don't accept more data on EOS */
1598   if (G_UNLIKELY (priv->eos))
1599     goto have_eos;
1600
1601   /* now check against our expected seqnum */
1602   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1603     gint gap;
1604
1605     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1606     if (G_UNLIKELY (gap != 0)) {
1607       gboolean reset = FALSE;
1608
1609       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1610           priv->next_in_seqnum, seqnum, gap);
1611       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1612        * sender might have been restarted with different seqnum. */
1613       if (gap < -RTP_MAX_MISORDER) {
1614         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1615         reset = TRUE;
1616       }
1617       /* priv->next_in_seqnum < seqnum, this is a new packet */
1618       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1619         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1620             gap);
1621         reset = TRUE;
1622       } else {
1623         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1624       }
1625       if (G_UNLIKELY (reset)) {
1626         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1627         rtp_jitter_buffer_flush (priv->jbuf);
1628         rtp_jitter_buffer_reset_skew (priv->jbuf);
1629         remove_all_timers (jitterbuffer);
1630         priv->last_popped_seqnum = -1;
1631         priv->next_seqnum = seqnum;
1632       }
1633       /* reset spacing estimation when gap */
1634       priv->last_in_rtptime = -1;
1635       priv->last_in_dts = -1;
1636     } else {
1637       /* packet is expected, we need consecutive seqnums with a different
1638        * rtptime to estimate the packet spacing. */
1639       if (priv->last_in_rtptime != rtptime) {
1640         /* rtptime changed, check dts diff */
1641         if (priv->last_in_dts != -1 && dts != -1 && dts > priv->last_in_dts) {
1642           priv->packet_spacing = dts - priv->last_in_dts;
1643           GST_DEBUG_OBJECT (jitterbuffer,
1644               "new packet spacing %" GST_TIME_FORMAT,
1645               GST_TIME_ARGS (priv->packet_spacing));
1646         }
1647         priv->last_in_rtptime = rtptime;
1648         priv->last_in_dts = dts;
1649       }
1650     }
1651   }
1652   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1653
1654   /* let's check if this buffer is too late, we can only accept packets with
1655    * bigger seqnum than the one we last pushed. */
1656   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1657     gint gap;
1658
1659     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1660
1661     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1662     if (G_UNLIKELY (gap <= 0))
1663       goto too_late;
1664   }
1665
1666   /* let's drop oldest packet if the queue is already full and drop-on-latency
1667    * is set. We can only do this when there actually is a latency. When no
1668    * latency is set, we just pump it in the queue and let the other end push it
1669    * out as fast as possible. */
1670   if (priv->latency_ms && priv->drop_on_latency) {
1671     latency_ts =
1672         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1673
1674     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1675       GstBuffer *old_buf;
1676
1677       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1678
1679       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1680           old_buf);
1681
1682       gst_buffer_unref (old_buf);
1683     }
1684   }
1685
1686   /* we need to make the metadata writable before pushing it in the jitterbuffer
1687    * because the jitterbuffer will update the PTS */
1688   buffer = gst_buffer_make_writable (buffer);
1689   GST_BUFFER_DTS (buffer) = dts;
1690   GST_BUFFER_PTS (buffer) = pts;
1691
1692   /* now insert the packet into the queue in sorted order. This function returns
1693    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1694    * have a duplicate. */
1695   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
1696               priv->clock_rate, &tail, &percent)))
1697     goto duplicate;
1698
1699   /* update timers */
1700   update_timers (jitterbuffer, seqnum, dts);
1701
1702   /* we had an unhandled SR, handle it now */
1703   if (priv->last_sr)
1704     do_handle_sync (jitterbuffer);
1705
1706   /* signal addition of new buffer when the _loop is waiting. */
1707   if (priv->waiting && priv->active)
1708     JBUF_SIGNAL (priv);
1709
1710   /* let's unschedule and unblock any waiting buffers. We only want to do this
1711    * when the tail buffer changed */
1712   if (G_UNLIKELY (priv->clock_id && tail)) {
1713     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
1714     unschedule_current_timer (jitterbuffer);
1715   }
1716
1717   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1718       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1719
1720   check_buffering_percent (jitterbuffer, &percent);
1721
1722 finished:
1723   JBUF_UNLOCK (priv);
1724
1725   if (percent != -1)
1726     post_buffering_percent (jitterbuffer, percent);
1727
1728   return ret;
1729
1730   /* ERRORS */
1731 invalid_buffer:
1732   {
1733     /* this is not fatal but should be filtered earlier */
1734     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1735         ("Received invalid RTP payload, dropping"));
1736     gst_buffer_unref (buffer);
1737     return GST_FLOW_OK;
1738   }
1739 no_clock_rate:
1740   {
1741     GST_WARNING_OBJECT (jitterbuffer,
1742         "No clock-rate in caps!, dropping buffer");
1743     gst_buffer_unref (buffer);
1744     goto finished;
1745   }
1746 out_flushing:
1747   {
1748     ret = priv->srcresult;
1749     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1750     gst_buffer_unref (buffer);
1751     goto finished;
1752   }
1753 have_eos:
1754   {
1755     ret = GST_FLOW_EOS;
1756     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1757     gst_buffer_unref (buffer);
1758     goto finished;
1759   }
1760 too_late:
1761   {
1762     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1763         " popped, dropping", seqnum, priv->last_popped_seqnum);
1764     priv->num_late++;
1765     gst_buffer_unref (buffer);
1766     goto finished;
1767   }
1768 duplicate:
1769   {
1770     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1771         seqnum);
1772     priv->num_duplicates++;
1773     gst_buffer_unref (buffer);
1774     goto finished;
1775   }
1776 }
1777
1778 static GstClockTime
1779 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1780 {
1781   guint64 ext_time, elapsed;
1782   guint32 rtp_time;
1783   GstRtpJitterBufferPrivate *priv;
1784   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1785
1786   priv = jitterbuffer->priv;
1787   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1788   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
1789   gst_rtp_buffer_unmap (&rtp);
1790
1791   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1792       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1793
1794   if (rtp_time < priv->ext_timestamp) {
1795     ext_time = priv->ext_timestamp;
1796   } else {
1797     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1798   }
1799
1800   if (ext_time > priv->clock_base)
1801     elapsed = ext_time - priv->clock_base;
1802   else
1803     elapsed = 0;
1804
1805   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1806   return elapsed;
1807 }
1808
1809 static void
1810 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1811 {
1812   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1813
1814   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1815       && priv->clock_base != -1 && priv->clock_rate > 0) {
1816     guint64 elapsed, estimated;
1817
1818     elapsed = compute_elapsed (jitterbuffer, outbuf);
1819
1820     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1821       guint64 left;
1822       GstClockTime out_time;
1823
1824       priv->last_elapsed = elapsed;
1825
1826       left = priv->npt_stop - priv->npt_start;
1827       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1828           GST_TIME_ARGS (left));
1829
1830       out_time = GST_BUFFER_DTS (outbuf);
1831
1832       if (elapsed > 0)
1833         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1834       else {
1835         /* if there is almost nothing left,
1836          * we may never advance enough to end up in the above case */
1837         if (left < GST_SECOND)
1838           estimated = GST_SECOND;
1839         else
1840           estimated = -1;
1841       }
1842
1843       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1844           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1845
1846       if (estimated != -1 && priv->estimated_eos != estimated) {
1847         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
1848         priv->estimated_eos = estimated;
1849       }
1850     }
1851   }
1852 }
1853
1854 /* take a buffer from the queue and push it */
1855 static GstFlowReturn
1856 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
1857 {
1858   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1859   GstFlowReturn result;
1860   GstBuffer *outbuf;
1861   GstClockTime dts, pts;
1862   gint percent = -1;
1863
1864   /* when we get here we are ready to pop and push the buffer */
1865   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1866
1867   check_buffering_percent (jitterbuffer, &percent);
1868
1869   if (G_UNLIKELY (priv->discont)) {
1870     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1871      * into the jitterbuffer so we can modify now. */
1872     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1873     priv->discont = FALSE;
1874   }
1875   if (G_UNLIKELY (priv->ts_discont)) {
1876     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
1877     priv->ts_discont = FALSE;
1878   }
1879
1880   dts = GST_BUFFER_DTS (outbuf);
1881   pts = GST_BUFFER_PTS (outbuf);
1882
1883   /* apply timestamp with offset to buffer now */
1884   GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
1885   GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
1886
1887   /* update the elapsed time when we need to check against the npt stop time. */
1888   update_estimated_eos (jitterbuffer, outbuf);
1889
1890   /* now we are ready to push the buffer. Save the seqnum and release the lock
1891    * so the other end can push stuff in the queue again. */
1892   priv->last_popped_seqnum = seqnum;
1893   priv->last_out_time = GST_BUFFER_PTS (outbuf);
1894   priv->last_out_dts = dts;
1895   priv->last_out_pts = pts;
1896   priv->next_seqnum = (seqnum + 1) & 0xffff;
1897   JBUF_UNLOCK (priv);
1898
1899   if (percent != -1)
1900     post_buffering_percent (jitterbuffer, percent);
1901
1902   /* push buffer */
1903   GST_DEBUG_OBJECT (jitterbuffer,
1904       "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
1905       seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
1906       GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
1907
1908   result = gst_pad_push (priv->srcpad, outbuf);
1909
1910   JBUF_LOCK_CHECK (priv, out_flushing);
1911
1912   return result;
1913
1914   /* ERRORS */
1915 out_flushing:
1916   {
1917     return priv->srcresult;
1918   }
1919 }
1920
1921 static GstClockTime
1922 estimate_dts (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts, gint gap)
1923 {
1924   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1925   GstClockTime duration;
1926
1927   if (dts == -1 || priv->last_out_dts == -1)
1928     return dts;
1929
1930   GST_DEBUG_OBJECT (jitterbuffer,
1931       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1932       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_out_dts));
1933
1934   /* interpolate between the current time and the last time based on
1935    * number of packets we are missing, this is the estimated duration
1936    * for the missing packet based on equidistant packet spacing. Also make
1937    * sure we never go negative. */
1938   if (dts >= priv->last_out_dts)
1939     duration = (dts - priv->last_out_dts) / (gap + 1);
1940   else
1941     /* packet already lost, timer will timeout quickly */
1942     duration = 0;
1943
1944   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1945       GST_TIME_ARGS (duration));
1946
1947   /* add this duration to the timestamp of the last packet we pushed */
1948   dts = (priv->last_out_dts + duration);
1949
1950   return dts;
1951 }
1952
1953 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
1954
1955 /* Peek a buffer and compare the seqnum to the expected seqnum.
1956  * If all is fine, the buffer is pushed.
1957  * If something is wrong, a timeout is set. We set 2 kinds of timeouts:
1958  *   * deadline: to the ultimate time we can still push the packet. We
1959  *     do this for the first packet to make sure we have the previous
1960  *     packets.
1961  *   * lost: the ultimate time we can receive a packet before we have
1962  *     to consider it lost. We estimate this based on the packet
1963  *     spacing.
1964  */
1965 static GstFlowReturn
1966 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
1967 {
1968   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1969   GstFlowReturn result = GST_FLOW_OK;
1970   GstBuffer *outbuf;
1971   guint16 seqnum;
1972   GstClockTime dts;
1973   guint32 next_seqnum;
1974   gint gap;
1975   GstRTPBuffer rtp = { NULL, };
1976
1977   /* only push buffers when PLAYING and active and not buffering */
1978   if (priv->blocked || !priv->active ||
1979       rtp_jitter_buffer_is_buffering (priv->jbuf))
1980     return GST_FLOW_WAIT;
1981
1982 again:
1983   /* peek a buffer, we're just looking at the sequence number.
1984    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1985    * wait on the DTS. In the chain function we will unlock the wait when a
1986    * new buffer is available. The peeked buffer is valid for as long as we hold
1987    * the jitterbuffer lock. */
1988   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1989   if (outbuf == NULL)
1990     goto wait;
1991
1992   /* get the seqnum and the next expected seqnum */
1993   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1994   seqnum = gst_rtp_buffer_get_seq (&rtp);
1995   gst_rtp_buffer_unmap (&rtp);
1996
1997   next_seqnum = priv->next_seqnum;
1998
1999   dts = GST_BUFFER_DTS (outbuf);
2000
2001   /* get the gap between this and the previous packet. If we don't know the
2002    * previous packet seqnum assume no gap. */
2003   if (G_UNLIKELY (next_seqnum == -1)) {
2004     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2005     /* we don't know what the next_seqnum should be, wait for the last
2006      * possible moment to push this buffer, maybe we get an earlier seqnum
2007      * while we wait */
2008     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2009     result = GST_FLOW_WAIT;
2010   } else {
2011     /* else calculate GAP */
2012     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2013
2014     if (G_LIKELY (gap == 0)) {
2015       /* no missing packet, pop and push */
2016       result = pop_and_push_next (jitterbuffer, seqnum);
2017     } else if (G_UNLIKELY (gap < 0)) {
2018       /* if we have a packet that we already pushed or considered dropped, pop it
2019        * off and get the next packet */
2020       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2021           seqnum, next_seqnum);
2022       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2023       gst_buffer_unref (outbuf);
2024       goto again;
2025     } else {
2026       GST_DEBUG_OBJECT (jitterbuffer,
2027           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2028           next_seqnum, seqnum, gap);
2029       /* packet missing, estimate when we should ultimately push this packet */
2030       dts = estimate_dts (jitterbuffer, dts, gap);
2031       /* and set a timer for it */
2032       set_timer (jitterbuffer, TIMER_TYPE_LOST, next_seqnum, dts);
2033       result = GST_FLOW_WAIT;
2034     }
2035   }
2036   return result;
2037
2038 wait:
2039   {
2040     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2041     return GST_FLOW_WAIT;
2042   }
2043 }
2044
2045 /* a packet is lost */
2046 static GstFlowReturn
2047 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2048     GstClockTimeDiff clock_jitter)
2049 {
2050   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2051   GstClockTime duration = GST_CLOCK_TIME_NONE;
2052   guint32 lost_packets = 1;
2053   gboolean lost_packets_late = FALSE;
2054
2055 #if 0
2056   if (clock_jitter > 0
2057       && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
2058     GstClockTimeDiff total_duration;
2059     GstClockTime out_time_diff;
2060
2061     out_time_diff =
2062         apply_offset (jitterbuffer, timer->timeout) - timer->timeout;
2063     total_duration = MIN (out_time_diff, clock_jitter);
2064
2065     if (duration > 0)
2066       lost_packets = total_duration / duration;
2067     else
2068       lost_packets = gap;
2069     total_duration = lost_packets * duration;
2070
2071     GST_DEBUG_OBJECT (jitterbuffer,
2072         "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
2073         ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
2074         GST_TIME_ARGS (clock_jitter),
2075         lost_packets, GST_TIME_ARGS (total_duration));
2076
2077     duration = total_duration;
2078     lost_packets_late = TRUE;
2079   }
2080 #endif
2081
2082   /* we had a gap and thus we lost some packets. Create an event for this.  */
2083   if (lost_packets > 1)
2084     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
2085         timer->seqnum + lost_packets - 1);
2086   else
2087     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
2088
2089   priv->num_late += lost_packets;
2090   priv->discont = TRUE;
2091
2092   /* update our expected next packet */
2093   priv->last_popped_seqnum = timer->seqnum;
2094   priv->last_out_time = apply_offset (jitterbuffer, timer->timeout);
2095   priv->last_out_dts = timer->timeout;
2096   priv->last_out_pts = timer->timeout;
2097   priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
2098   /* remove timer now */
2099   remove_timer (jitterbuffer, timer);
2100
2101   if (priv->do_lost) {
2102     GstEvent *event;
2103
2104     /* create paket lost event */
2105     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2106         gst_structure_new ("GstRTPPacketLost",
2107             "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
2108             "timestamp", G_TYPE_UINT64, priv->last_out_time,
2109             "duration", G_TYPE_UINT64, duration,
2110             "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
2111     JBUF_UNLOCK (priv);
2112     gst_pad_push_event (priv->srcpad, event);
2113     JBUF_LOCK_CHECK (priv, flushing);
2114   }
2115   return GST_FLOW_OK;
2116
2117   /* ERRORS */
2118 flushing:
2119   {
2120     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
2121     return priv->srcresult;
2122   }
2123 }
2124
2125 static GstFlowReturn
2126 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2127 {
2128   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2129   remove_timer (jitterbuffer, timer);
2130
2131   return GST_FLOW_EOS;
2132 }
2133
2134 /* called when we need to wait for the next timeout.
2135  *
2136  * We loop over the array of recorded timeouts and wait for the earliest one.
2137  * When it timed out, do the logic associated with the timer.
2138  *
2139  * If there are no timers, we wait on a gcond until something new happens.
2140  */
2141 static GstFlowReturn
2142 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2143 {
2144   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2145   GstFlowReturn result = GST_FLOW_OK;
2146   gint i, len;
2147   TimerData *timer = NULL;
2148   GstClockTime timer_timeout = -1;
2149   gint timer_idx;
2150
2151   len = priv->timers->len;
2152   for (i = 0; i < len; i++) {
2153     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2154     GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2155
2156     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT,
2157         i, test->seqnum, GST_TIME_ARGS (test_timeout));
2158
2159     /* find the smallest timeout */
2160     if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
2161       timer = test;
2162       timer_timeout = test_timeout;
2163       if (timer_timeout == -1)
2164         break;
2165     }
2166   }
2167   if (timer) {
2168     GstClock *clock;
2169     GstClockTime sync_time;
2170     GstClockID id;
2171     GstClockReturn ret;
2172     GstClockTimeDiff clock_jitter;
2173
2174     /* no timestamp, timeout immeditately */
2175     if (timer_timeout == -1)
2176       goto do_timeout;
2177
2178     GST_OBJECT_LOCK (jitterbuffer);
2179     clock = GST_ELEMENT_CLOCK (jitterbuffer);
2180     if (!clock) {
2181       GST_OBJECT_UNLOCK (jitterbuffer);
2182       /* let's just push if there is no clock */
2183       GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2184       goto do_timeout;
2185     }
2186
2187     /* prepare for sync against clock */
2188     sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2189     /* add latency of peer to get input time */
2190     sync_time += priv->peer_latency;
2191
2192     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2193         " with sync time %" GST_TIME_FORMAT,
2194         GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2195
2196     /* create an entry for the clock */
2197     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2198     priv->unscheduled = FALSE;
2199     priv->timer_timeout = timer_timeout;
2200     priv->timer_seqnum = timer->seqnum;
2201     timer_idx = timer->idx;
2202     GST_OBJECT_UNLOCK (jitterbuffer);
2203
2204     /* release the lock so that the other end can push stuff or unlock */
2205     JBUF_UNLOCK (priv);
2206
2207     ret = gst_clock_id_wait (id, &clock_jitter);
2208
2209     JBUF_LOCK (priv);
2210     GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2211         ret, priv->timer_seqnum, clock_jitter);
2212     /* and free the entry */
2213     gst_clock_id_unref (id);
2214     priv->clock_id = NULL;
2215
2216     /* at this point, the clock could have been unlocked by a timeout, a new
2217      * tail element was added to the queue or because we are shutting down. Check
2218      * for shutdown first. */
2219     if G_UNLIKELY
2220       ((priv->srcresult != GST_FLOW_OK))
2221           goto flushing;
2222
2223     /* we released the lock, the array might have changed */
2224     timer = &g_array_index (priv->timers, TimerData, timer_idx);
2225     /* if changed to timeout immediately, do so */
2226     if (timer->timeout == -1)
2227       goto do_timeout;
2228
2229     /* if we got unscheduled and we are not flushing, it's because a new tail
2230      * element became available in the queue or we flushed the queue.
2231      * Grab it and try to push or sync. */
2232     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
2233       GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
2234       goto done;
2235     }
2236
2237   do_timeout:
2238     switch (timer->type) {
2239       case TIMER_TYPE_EXPECTED:
2240         GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive",
2241             timer->seqnum);
2242         remove_timer (jitterbuffer, timer);
2243         break;
2244       case TIMER_TYPE_LOST:
2245         result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
2246         break;
2247       case TIMER_TYPE_DEADLINE:
2248         priv->next_seqnum = timer->seqnum;
2249         remove_timer (jitterbuffer, timer);
2250         break;
2251       case TIMER_TYPE_EOS:
2252         result = do_eos_timeout (jitterbuffer, timer);
2253         break;
2254     }
2255   } else {
2256     /* no timers, wait for activity */
2257     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2258     priv->waiting = TRUE;
2259     JBUF_WAIT_CHECK (priv, flushing);
2260     priv->waiting = FALSE;
2261     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2262   }
2263
2264 done:
2265   return result;
2266
2267 flushing:
2268   {
2269     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
2270     return priv->srcresult;
2271   }
2272 }
2273
2274 /*
2275  * This funcion implements the main pushing loop on the source pad.
2276  *
2277  * It first tries to push as many buffers as possible. If there is a seqnum
2278  * mismatch, a timeout is created and this function goes waiting for the
2279  * next timeout.
2280  */
2281 static void
2282 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2283 {
2284   GstRtpJitterBufferPrivate *priv;
2285   GstFlowReturn result;
2286
2287   priv = jitterbuffer->priv;
2288
2289   JBUF_LOCK_CHECK (priv, flushing);
2290   do {
2291     result = handle_next_buffer (jitterbuffer);
2292     if (G_LIKELY (result == GST_FLOW_WAIT))
2293       /* now wait for the next event */
2294       result = wait_next_timeout (jitterbuffer);
2295   }
2296   while (result == GST_FLOW_OK);
2297   JBUF_UNLOCK (priv);
2298
2299   /* if we get here we need to pause */
2300   goto pause;
2301
2302   /* ERRORS */
2303 flushing:
2304   {
2305     result = priv->srcresult;
2306     JBUF_UNLOCK (priv);
2307     goto pause;
2308   }
2309 pause:
2310   {
2311     const gchar *reason = gst_flow_get_name (result);
2312     GstEvent *event;
2313
2314     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2315     gst_pad_pause_task (priv->srcpad);
2316     if (result == GST_FLOW_EOS) {
2317       event = gst_event_new_eos ();
2318       gst_pad_push_event (priv->srcpad, event);
2319     }
2320     return;
2321   }
2322 }
2323
2324 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2325  * some sanity checks and then emit the handle-sync signal with the parameters.
2326  * This function must be called with the LOCK */
2327 static void
2328 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2329 {
2330   GstRtpJitterBufferPrivate *priv;
2331   guint64 base_rtptime, base_time;
2332   guint32 clock_rate;
2333   guint64 last_rtptime;
2334   guint64 clock_base;
2335   guint64 ext_rtptime, diff;
2336   gboolean drop = FALSE;
2337
2338   priv = jitterbuffer->priv;
2339
2340   /* get the last values from the jitterbuffer */
2341   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2342       &clock_rate, &last_rtptime);
2343
2344   clock_base = priv->clock_base;
2345   ext_rtptime = priv->ext_rtptime;
2346
2347   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2348       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2349       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2350       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2351
2352   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2353     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2354     drop = TRUE;
2355   } else {
2356     /* we can't accept anything that happened before we did the last resync */
2357     if (base_rtptime > ext_rtptime) {
2358       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2359       drop = TRUE;
2360     } else {
2361       /* the SR RTP timestamp must be something close to what we last observed
2362        * in the jitterbuffer */
2363       if (ext_rtptime > last_rtptime) {
2364         /* check how far ahead it is to our RTP timestamps */
2365         diff = ext_rtptime - last_rtptime;
2366         /* if bigger than 1 second, we drop it */
2367         if (diff > clock_rate) {
2368           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2369           /* should drop this, but some RTSP servers end up with bogus
2370            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2371            * so still trigger rptbin sync but invalidate RTCP data
2372            * (sync might use other methods) */
2373           ext_rtptime = -1;
2374         }
2375         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2376             G_GUINT64_FORMAT, last_rtptime, diff);
2377       }
2378     }
2379   }
2380
2381   if (!drop) {
2382     GstStructure *s;
2383
2384     s = gst_structure_new ("application/x-rtp-sync",
2385         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2386         "base-time", G_TYPE_UINT64, base_time,
2387         "clock-rate", G_TYPE_UINT, clock_rate,
2388         "clock-base", G_TYPE_UINT64, clock_base,
2389         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2390         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2391
2392     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2393     gst_buffer_replace (&priv->last_sr, NULL);
2394     JBUF_UNLOCK (priv);
2395     g_signal_emit (jitterbuffer,
2396         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2397     JBUF_LOCK (priv);
2398     gst_structure_free (s);
2399   } else {
2400     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2401   }
2402 }
2403
2404 static GstFlowReturn
2405 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2406     GstBuffer * buffer)
2407 {
2408   GstRtpJitterBuffer *jitterbuffer;
2409   GstRtpJitterBufferPrivate *priv;
2410   GstFlowReturn ret = GST_FLOW_OK;
2411   guint32 ssrc;
2412   GstRTCPPacket packet;
2413   guint64 ext_rtptime;
2414   guint32 rtptime;
2415   GstRTCPBuffer rtcp = { NULL, };
2416
2417   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2418
2419   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2420     goto invalid_buffer;
2421
2422   priv = jitterbuffer->priv;
2423
2424   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2425
2426   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2427     goto empty_buffer;
2428
2429   /* first packet must be SR or RR or else the validate would have failed */
2430   switch (gst_rtcp_packet_get_type (&packet)) {
2431     case GST_RTCP_TYPE_SR:
2432       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2433           NULL, NULL);
2434       break;
2435     default:
2436       goto ignore_buffer;
2437   }
2438   gst_rtcp_buffer_unmap (&rtcp);
2439
2440   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2441
2442   JBUF_LOCK (priv);
2443   /* convert the RTP timestamp to our extended timestamp, using the same offset
2444    * we used in the jitterbuffer */
2445   ext_rtptime = priv->jbuf->ext_rtptime;
2446   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2447
2448   priv->ext_rtptime = ext_rtptime;
2449   gst_buffer_replace (&priv->last_sr, buffer);
2450
2451   do_handle_sync (jitterbuffer);
2452   JBUF_UNLOCK (priv);
2453
2454 done:
2455   gst_buffer_unref (buffer);
2456
2457   return ret;
2458
2459 invalid_buffer:
2460   {
2461     /* this is not fatal but should be filtered earlier */
2462     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2463         ("Received invalid RTCP payload, dropping"));
2464     ret = GST_FLOW_OK;
2465     goto done;
2466   }
2467 empty_buffer:
2468   {
2469     /* this is not fatal but should be filtered earlier */
2470     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2471         ("Received empty RTCP payload, dropping"));
2472     gst_rtcp_buffer_unmap (&rtcp);
2473     ret = GST_FLOW_OK;
2474     goto done;
2475   }
2476 ignore_buffer:
2477   {
2478     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2479     gst_rtcp_buffer_unmap (&rtcp);
2480     ret = GST_FLOW_OK;
2481     goto done;
2482   }
2483 }
2484
2485 static gboolean
2486 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2487     GstQuery * query)
2488 {
2489   gboolean res = FALSE;
2490
2491   switch (GST_QUERY_TYPE (query)) {
2492     case GST_QUERY_CAPS:
2493     {
2494       GstCaps *filter, *caps;
2495
2496       gst_query_parse_caps (query, &filter);
2497       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2498       gst_query_set_caps_result (query, caps);
2499       gst_caps_unref (caps);
2500       res = TRUE;
2501       break;
2502     }
2503     default:
2504       if (GST_QUERY_IS_SERIALIZED (query)) {
2505         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2506         res = FALSE;
2507       } else {
2508         res = gst_pad_query_default (pad, parent, query);
2509       }
2510       break;
2511   }
2512   return res;
2513 }
2514
2515 static gboolean
2516 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2517     GstQuery * query)
2518 {
2519   GstRtpJitterBuffer *jitterbuffer;
2520   GstRtpJitterBufferPrivate *priv;
2521   gboolean res = FALSE;
2522
2523   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2524   priv = jitterbuffer->priv;
2525
2526   switch (GST_QUERY_TYPE (query)) {
2527     case GST_QUERY_LATENCY:
2528     {
2529       /* We need to send the query upstream and add the returned latency to our
2530        * own */
2531       GstClockTime min_latency, max_latency;
2532       gboolean us_live;
2533       GstClockTime our_latency;
2534
2535       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2536         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2537
2538         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2539             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2540             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2541
2542         /* store this so that we can safely sync on the peer buffers. */
2543         JBUF_LOCK (priv);
2544         priv->peer_latency = min_latency;
2545         our_latency = priv->latency_ns;
2546         JBUF_UNLOCK (priv);
2547
2548         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2549             GST_TIME_ARGS (our_latency));
2550
2551         /* we add some latency but can buffer an infinite amount of time */
2552         min_latency += our_latency;
2553         max_latency = -1;
2554
2555         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2556             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2557             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2558
2559         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2560       }
2561       break;
2562     }
2563     case GST_QUERY_POSITION:
2564     {
2565       GstClockTime start, last_out;
2566       GstFormat fmt;
2567
2568       gst_query_parse_position (query, &fmt, NULL);
2569       if (fmt != GST_FORMAT_TIME) {
2570         res = gst_pad_query_default (pad, parent, query);
2571         break;
2572       }
2573
2574       JBUF_LOCK (priv);
2575       start = priv->npt_start;
2576       last_out = priv->last_out_time;
2577       JBUF_UNLOCK (priv);
2578
2579       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2580           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2581           GST_TIME_ARGS (last_out));
2582
2583       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2584         /* bring 0-based outgoing time to stream time */
2585         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2586         res = TRUE;
2587       } else {
2588         res = gst_pad_query_default (pad, parent, query);
2589       }
2590       break;
2591     }
2592     case GST_QUERY_CAPS:
2593     {
2594       GstCaps *filter, *caps;
2595
2596       gst_query_parse_caps (query, &filter);
2597       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2598       gst_query_set_caps_result (query, caps);
2599       gst_caps_unref (caps);
2600       res = TRUE;
2601       break;
2602     }
2603     default:
2604       res = gst_pad_query_default (pad, parent, query);
2605       break;
2606   }
2607
2608   return res;
2609 }
2610
2611 static void
2612 gst_rtp_jitter_buffer_set_property (GObject * object,
2613     guint prop_id, const GValue * value, GParamSpec * pspec)
2614 {
2615   GstRtpJitterBuffer *jitterbuffer;
2616   GstRtpJitterBufferPrivate *priv;
2617
2618   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2619   priv = jitterbuffer->priv;
2620
2621   switch (prop_id) {
2622     case PROP_LATENCY:
2623     {
2624       guint new_latency, old_latency;
2625
2626       new_latency = g_value_get_uint (value);
2627
2628       JBUF_LOCK (priv);
2629       old_latency = priv->latency_ms;
2630       priv->latency_ms = new_latency;
2631       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2632       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2633       JBUF_UNLOCK (priv);
2634
2635       /* post message if latency changed, this will inform the parent pipeline
2636        * that a latency reconfiguration is possible/needed. */
2637       if (new_latency != old_latency) {
2638         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2639             GST_TIME_ARGS (new_latency * GST_MSECOND));
2640
2641         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2642             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2643       }
2644       break;
2645     }
2646     case PROP_DROP_ON_LATENCY:
2647       JBUF_LOCK (priv);
2648       priv->drop_on_latency = g_value_get_boolean (value);
2649       JBUF_UNLOCK (priv);
2650       break;
2651     case PROP_TS_OFFSET:
2652       JBUF_LOCK (priv);
2653       priv->ts_offset = g_value_get_int64 (value);
2654       priv->ts_discont = TRUE;
2655       JBUF_UNLOCK (priv);
2656       break;
2657     case PROP_DO_LOST:
2658       JBUF_LOCK (priv);
2659       priv->do_lost = g_value_get_boolean (value);
2660       JBUF_UNLOCK (priv);
2661       break;
2662     case PROP_MODE:
2663       JBUF_LOCK (priv);
2664       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2665       JBUF_UNLOCK (priv);
2666       break;
2667     default:
2668       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2669       break;
2670   }
2671 }
2672
2673 static void
2674 gst_rtp_jitter_buffer_get_property (GObject * object,
2675     guint prop_id, GValue * value, GParamSpec * pspec)
2676 {
2677   GstRtpJitterBuffer *jitterbuffer;
2678   GstRtpJitterBufferPrivate *priv;
2679
2680   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2681   priv = jitterbuffer->priv;
2682
2683   switch (prop_id) {
2684     case PROP_LATENCY:
2685       JBUF_LOCK (priv);
2686       g_value_set_uint (value, priv->latency_ms);
2687       JBUF_UNLOCK (priv);
2688       break;
2689     case PROP_DROP_ON_LATENCY:
2690       JBUF_LOCK (priv);
2691       g_value_set_boolean (value, priv->drop_on_latency);
2692       JBUF_UNLOCK (priv);
2693       break;
2694     case PROP_TS_OFFSET:
2695       JBUF_LOCK (priv);
2696       g_value_set_int64 (value, priv->ts_offset);
2697       JBUF_UNLOCK (priv);
2698       break;
2699     case PROP_DO_LOST:
2700       JBUF_LOCK (priv);
2701       g_value_set_boolean (value, priv->do_lost);
2702       JBUF_UNLOCK (priv);
2703       break;
2704     case PROP_MODE:
2705       JBUF_LOCK (priv);
2706       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2707       JBUF_UNLOCK (priv);
2708       break;
2709     case PROP_PERCENT:
2710     {
2711       gint percent;
2712
2713       JBUF_LOCK (priv);
2714       if (priv->srcresult != GST_FLOW_OK)
2715         percent = 100;
2716       else
2717         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2718
2719       g_value_set_int (value, percent);
2720       JBUF_UNLOCK (priv);
2721       break;
2722     }
2723     default:
2724       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2725       break;
2726   }
2727 }