jitterbuffer: update timers when removing
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 #include <gst/glib-compat-private.h>
71
72 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
73 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
74
75 /* RTPJitterBuffer signals and args */
76 enum
77 {
78   SIGNAL_REQUEST_PT_MAP,
79   SIGNAL_CLEAR_PT_MAP,
80   SIGNAL_HANDLE_SYNC,
81   SIGNAL_ON_NPT_STOP,
82   SIGNAL_SET_ACTIVE,
83   LAST_SIGNAL
84 };
85
86 #define DEFAULT_LATENCY_MS      200
87 #define DEFAULT_DROP_ON_LATENCY FALSE
88 #define DEFAULT_TS_OFFSET       0
89 #define DEFAULT_DO_LOST         FALSE
90 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
91 #define DEFAULT_PERCENT         0
92
93 enum
94 {
95   PROP_0,
96   PROP_LATENCY,
97   PROP_DROP_ON_LATENCY,
98   PROP_TS_OFFSET,
99   PROP_DO_LOST,
100   PROP_MODE,
101   PROP_PERCENT,
102   PROP_LAST
103 };
104
105 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
106
107 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
108   JBUF_LOCK (priv);                                   \
109   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
110     goto label;                                       \
111 } G_STMT_END
112
113 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
114 #define JBUF_WAIT(priv)   (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
115
116 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
117   JBUF_WAIT(priv);                                    \
118   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
119     goto label;                                       \
120 } G_STMT_END
121
122 #define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
123
124 struct _GstRtpJitterBufferPrivate
125 {
126   GstPad *sinkpad, *srcpad;
127   GstPad *rtcpsinkpad;
128
129   RTPJitterBuffer *jbuf;
130   GMutex jbuf_lock;
131   GCond jbuf_cond;
132   gboolean waiting;
133   gboolean discont;
134   gboolean ts_discont;
135   gboolean active;
136   guint64 out_offset;
137
138   /* properties */
139   guint latency_ms;
140   guint64 latency_ns;
141   gboolean drop_on_latency;
142   gint64 ts_offset;
143   gboolean do_lost;
144
145   /* the last seqnum we pushed out */
146   guint32 last_popped_seqnum;
147   /* the next expected seqnum we push */
148   guint32 next_seqnum;
149   /* last output time */
150   GstClockTime last_out_time;
151   GstClockTime last_out_dts;
152   GstClockTime last_out_pts;
153   /* last valid input timestamp and rtptime pair */
154   GstClockTime last_in_dts;
155   GstClockTime last_in_rtptime;
156   GstClockTime packet_spacing;
157   /* the next expected seqnum we receive */
158   guint32 next_in_seqnum;
159
160   GArray *timers;
161
162   /* start and stop ranges */
163   GstClockTime npt_start;
164   GstClockTime npt_stop;
165   guint64 ext_timestamp;
166   guint64 last_elapsed;
167   guint64 estimated_eos;
168   GstClockID eos_id;
169
170   /* state */
171   gboolean eos;
172
173   /* clock rate and rtp timestamp offset */
174   gint last_pt;
175   gint32 clock_rate;
176   gint64 clock_base;
177   gint64 prev_ts_offset;
178
179   /* when we are shutting down */
180   GstFlowReturn srcresult;
181   gboolean blocked;
182
183   /* for sync */
184   GstSegment segment;
185   GstClockID clock_id;
186   GstClockTime timer_timeout;
187   guint16 timer_seqnum;
188   gboolean unscheduled;
189   /* the latency of the upstream peer, we have to take this into account when
190    * synchronizing the buffers. */
191   GstClockTime peer_latency;
192   guint64 ext_rtptime;
193   GstBuffer *last_sr;
194
195   /* some accounting */
196   guint64 num_late;
197   guint64 num_duplicates;
198 };
199
200 typedef enum
201 {
202   TIMER_TYPE_EXPECTED,
203   TIMER_TYPE_LOST,
204   TIMER_TYPE_DEADLINE,
205   TIMER_TYPE_EOS
206 } TimerType;
207
208 typedef struct
209 {
210   guint idx;
211   guint16 seqnum;
212   TimerType type;
213   GstClockTime timeout;
214 } TimerData;
215
216 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
217   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
218                                 GstRtpJitterBufferPrivate))
219
220 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
221 GST_STATIC_PAD_TEMPLATE ("sink",
222     GST_PAD_SINK,
223     GST_PAD_ALWAYS,
224     GST_STATIC_CAPS ("application/x-rtp, "
225         "clock-rate = (int) [ 1, 2147483647 ]"
226         /* "payload = (int) , "
227          * "encoding-name = (string) "
228          */ )
229     );
230
231 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
232 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
233     GST_PAD_SINK,
234     GST_PAD_REQUEST,
235     GST_STATIC_CAPS ("application/x-rtcp")
236     );
237
238 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
239 GST_STATIC_PAD_TEMPLATE ("src",
240     GST_PAD_SRC,
241     GST_PAD_ALWAYS,
242     GST_STATIC_CAPS ("application/x-rtp"
243         /* "payload = (int) , "
244          * "clock-rate = (int) , "
245          * "encoding-name = (string) "
246          */ )
247     );
248
249 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
250
251 #define gst_rtp_jitter_buffer_parent_class parent_class
252 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
253
254 /* object overrides */
255 static void gst_rtp_jitter_buffer_set_property (GObject * object,
256     guint prop_id, const GValue * value, GParamSpec * pspec);
257 static void gst_rtp_jitter_buffer_get_property (GObject * object,
258     guint prop_id, GValue * value, GParamSpec * pspec);
259 static void gst_rtp_jitter_buffer_finalize (GObject * object);
260
261 /* element overrides */
262 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
263     * element, GstStateChange transition);
264 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
265     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
266 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
267     GstPad * pad);
268 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
269
270 /* pad overrides */
271 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
272 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
273     GstObject * parent);
274
275 /* sinkpad overrides */
276 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
277     GstObject * parent, GstEvent * event);
278 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
279     GstObject * parent, GstBuffer * buffer);
280
281 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
282     GstObject * parent, GstEvent * event);
283 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
284     GstObject * parent, GstBuffer * buffer);
285
286 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
287     GstObject * parent, GstQuery * query);
288
289 /* srcpad overrides */
290 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
291     GstObject * parent, GstEvent * event);
292 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
293     GstObject * parent, GstPadMode mode, gboolean active);
294 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
295 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
296     GstObject * parent, GstQuery * query);
297
298 static void
299 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
300 static GstClockTime
301 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
302     gboolean active, guint64 base_time);
303 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
304
305 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
306 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
307
308 static void
309 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
310 {
311   GObjectClass *gobject_class;
312   GstElementClass *gstelement_class;
313
314   gobject_class = (GObjectClass *) klass;
315   gstelement_class = (GstElementClass *) klass;
316
317   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
318
319   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
320
321   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
322   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
323
324   /**
325    * GstRtpJitterBuffer::latency:
326    *
327    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
328    * for at most this time.
329    */
330   g_object_class_install_property (gobject_class, PROP_LATENCY,
331       g_param_spec_uint ("latency", "Buffer latency in ms",
332           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
333           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
334   /**
335    * GstRtpJitterBuffer::drop-on-latency:
336    *
337    * Drop oldest buffers when the queue is completely filled.
338    */
339   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
340       g_param_spec_boolean ("drop-on-latency",
341           "Drop buffers when maximum latency is reached",
342           "Tells the jitterbuffer to never exceed the given latency in size",
343           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344   /**
345    * GstRtpJitterBuffer::ts-offset:
346    *
347    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
348    * This is mainly used to ensure interstream synchronisation.
349    */
350   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
351       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
352           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
353           G_MAXINT64, DEFAULT_TS_OFFSET,
354           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355
356   /**
357    * GstRtpJitterBuffer::do-lost:
358    *
359    * Send out a GstRTPPacketLost event downstream when a packet is considered
360    * lost.
361    */
362   g_object_class_install_property (gobject_class, PROP_DO_LOST,
363       g_param_spec_boolean ("do-lost", "Do Lost",
364           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
365           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
366
367   /**
368    * GstRtpJitterBuffer::mode:
369    *
370    * Control the buffering and timestamping mode used by the jitterbuffer.
371    */
372   g_object_class_install_property (gobject_class, PROP_MODE,
373       g_param_spec_enum ("mode", "Mode",
374           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
375           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
376   /**
377    * GstRtpJitterBuffer::percent:
378    *
379    * The percent of the jitterbuffer that is filled.
380    *
381    * Since: 0.10.19
382    */
383   g_object_class_install_property (gobject_class, PROP_PERCENT,
384       g_param_spec_int ("percent", "percent",
385           "The buffer filled percent", 0, 100,
386           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
387   /**
388    * GstRtpJitterBuffer::request-pt-map:
389    * @buffer: the object which received the signal
390    * @pt: the pt
391    *
392    * Request the payload type as #GstCaps for @pt.
393    */
394   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
395       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
396       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
397           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
398       GST_TYPE_CAPS, 1, G_TYPE_UINT);
399   /**
400    * GstRtpJitterBuffer::handle-sync:
401    * @buffer: the object which received the signal
402    * @struct: a GstStructure containing sync values.
403    *
404    * Be notified of new sync values.
405    */
406   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
407       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
408       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
409           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
410       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
411
412   /**
413    * GstRtpJitterBuffer::on-npt-stop
414    * @buffer: the object which received the signal
415    *
416    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
417    * the npt-stop position.
418    */
419   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
420       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
421       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
422           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
423       G_TYPE_NONE, 0, G_TYPE_NONE);
424
425   /**
426    * GstRtpJitterBuffer::clear-pt-map:
427    * @buffer: the object which received the signal
428    *
429    * Invalidate the clock-rate as obtained with the
430    * #GstRtpJitterBuffer::request-pt-map signal.
431    */
432   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
433       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
434       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
435       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
436       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
437
438   /**
439    * GstRtpJitterBuffer::set-active:
440    * @buffer: the object which received the signal
441    *
442    * Start pushing out packets with the given base time. This signal is only
443    * useful in buffering mode.
444    *
445    * Returns: the time of the last pushed packet.
446    *
447    * Since: 0.10.19
448    */
449   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
450       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
451       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
452       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
453       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
454       G_TYPE_UINT64);
455
456   gstelement_class->change_state =
457       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
458   gstelement_class->request_new_pad =
459       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
460   gstelement_class->release_pad =
461       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
462   gstelement_class->provide_clock =
463       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
464
465   gst_element_class_add_pad_template (gstelement_class,
466       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
467   gst_element_class_add_pad_template (gstelement_class,
468       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
469   gst_element_class_add_pad_template (gstelement_class,
470       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
471
472   gst_element_class_set_static_metadata (gstelement_class,
473       "RTP packet jitter-buffer", "Filter/Network/RTP",
474       "A buffer that deals with network jitter and other transmission faults",
475       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
476       "Wim Taymans <wim.taymans@gmail.com>");
477
478   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
479   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
480
481   GST_DEBUG_CATEGORY_INIT
482       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
483 }
484
485 static void
486 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
487 {
488   GstRtpJitterBufferPrivate *priv;
489
490   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
491   jitterbuffer->priv = priv;
492
493   priv->latency_ms = DEFAULT_LATENCY_MS;
494   priv->latency_ns = priv->latency_ms * GST_MSECOND;
495   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
496   priv->do_lost = DEFAULT_DO_LOST;
497   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
498
499   priv->jbuf = rtp_jitter_buffer_new ();
500   g_mutex_init (&priv->jbuf_lock);
501   g_cond_init (&priv->jbuf_cond);
502
503   /* reset skew detection initialy */
504   rtp_jitter_buffer_reset_skew (priv->jbuf);
505   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
506   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
507   priv->active = TRUE;
508
509   priv->srcpad =
510       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
511       "src");
512
513   gst_pad_set_activatemode_function (priv->srcpad,
514       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
515   gst_pad_set_query_function (priv->srcpad,
516       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
517   gst_pad_set_event_function (priv->srcpad,
518       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
519
520   priv->sinkpad =
521       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
522       "sink");
523
524   gst_pad_set_chain_function (priv->sinkpad,
525       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
526   gst_pad_set_event_function (priv->sinkpad,
527       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
528   gst_pad_set_query_function (priv->sinkpad,
529       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
530
531   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
532   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
533
534   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
535 }
536
537 static void
538 gst_rtp_jitter_buffer_finalize (GObject * object)
539 {
540   GstRtpJitterBuffer *jitterbuffer;
541
542   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
543
544   g_array_free (jitterbuffer->priv->timers, TRUE);
545   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
546   g_cond_clear (&jitterbuffer->priv->jbuf_cond);
547
548   g_object_unref (jitterbuffer->priv->jbuf);
549
550   G_OBJECT_CLASS (parent_class)->finalize (object);
551 }
552
553 static GstIterator *
554 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
555 {
556   GstRtpJitterBuffer *jitterbuffer;
557   GstPad *otherpad = NULL;
558   GstIterator *it;
559   GValue val = { 0, };
560
561   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
562
563   if (pad == jitterbuffer->priv->sinkpad) {
564     otherpad = jitterbuffer->priv->srcpad;
565   } else if (pad == jitterbuffer->priv->srcpad) {
566     otherpad = jitterbuffer->priv->sinkpad;
567   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
568     otherpad = NULL;
569   }
570
571   g_value_init (&val, GST_TYPE_PAD);
572   g_value_set_object (&val, otherpad);
573   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
574   g_value_unset (&val);
575
576   return it;
577 }
578
579 static GstPad *
580 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
581 {
582   GstRtpJitterBufferPrivate *priv;
583
584   priv = jitterbuffer->priv;
585
586   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
587
588   priv->rtcpsinkpad =
589       gst_pad_new_from_static_template
590       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
591   gst_pad_set_chain_function (priv->rtcpsinkpad,
592       gst_rtp_jitter_buffer_chain_rtcp);
593   gst_pad_set_event_function (priv->rtcpsinkpad,
594       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
595   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
596       gst_rtp_jitter_buffer_iterate_internal_links);
597   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
598   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
599
600   return priv->rtcpsinkpad;
601 }
602
603 static void
604 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
605 {
606   GstRtpJitterBufferPrivate *priv;
607
608   priv = jitterbuffer->priv;
609
610   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
611
612   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
613
614   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
615   priv->rtcpsinkpad = NULL;
616 }
617
618 static GstPad *
619 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
620     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
621 {
622   GstRtpJitterBuffer *jitterbuffer;
623   GstElementClass *klass;
624   GstPad *result;
625   GstRtpJitterBufferPrivate *priv;
626
627   g_return_val_if_fail (templ != NULL, NULL);
628   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
629
630   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
631   priv = jitterbuffer->priv;
632   klass = GST_ELEMENT_GET_CLASS (element);
633
634   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
635
636   /* figure out the template */
637   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
638     if (priv->rtcpsinkpad != NULL)
639       goto exists;
640
641     result = create_rtcp_sink (jitterbuffer);
642   } else
643     goto wrong_template;
644
645   return result;
646
647   /* ERRORS */
648 wrong_template:
649   {
650     g_warning ("gstrtpjitterbuffer: this is not our template");
651     return NULL;
652   }
653 exists:
654   {
655     g_warning ("gstrtpjitterbuffer: pad already requested");
656     return NULL;
657   }
658 }
659
660 static void
661 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
662 {
663   GstRtpJitterBuffer *jitterbuffer;
664   GstRtpJitterBufferPrivate *priv;
665
666   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
667   g_return_if_fail (GST_IS_PAD (pad));
668
669   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
670   priv = jitterbuffer->priv;
671
672   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
673
674   if (priv->rtcpsinkpad == pad) {
675     remove_rtcp_sink (jitterbuffer);
676   } else
677     goto wrong_pad;
678
679   return;
680
681   /* ERRORS */
682 wrong_pad:
683   {
684     g_warning ("gstjitterbuffer: asked to release an unknown pad");
685     return;
686   }
687 }
688
689 static GstClock *
690 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
691 {
692   return gst_system_clock_obtain ();
693 }
694
695 static void
696 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
697 {
698   GstRtpJitterBufferPrivate *priv;
699
700   priv = jitterbuffer->priv;
701
702   /* this will trigger a new pt-map request signal, FIXME, do something better. */
703
704   JBUF_LOCK (priv);
705   priv->clock_rate = -1;
706   /* do not clear current content, but refresh state for new arrival */
707   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
708   rtp_jitter_buffer_reset_skew (priv->jbuf);
709   priv->last_popped_seqnum = -1;
710   priv->next_seqnum = -1;
711   JBUF_UNLOCK (priv);
712 }
713
714 static GstClockTime
715 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
716     guint64 offset)
717 {
718   GstRtpJitterBufferPrivate *priv;
719   GstClockTime last_out;
720   GstBuffer *head;
721
722   priv = jbuf->priv;
723
724   JBUF_LOCK (priv);
725   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
726       active, GST_TIME_ARGS (offset));
727
728   if (active != priv->active) {
729     /* add the amount of time spent in paused to the output offset. All
730      * outgoing buffers will have this offset applied to their timestamps in
731      * order to make them arrive in time in the sink. */
732     priv->out_offset = offset;
733     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
734         GST_TIME_ARGS (priv->out_offset));
735     priv->active = active;
736     JBUF_SIGNAL (priv);
737   }
738   if (!active) {
739     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
740   }
741   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
742     /* head buffer timestamp and offset gives our output time */
743     last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
744   } else {
745     /* use last known time when the buffer is empty */
746     last_out = priv->last_out_time;
747   }
748   JBUF_UNLOCK (priv);
749
750   return last_out;
751 }
752
753 static GstCaps *
754 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
755 {
756   GstRtpJitterBuffer *jitterbuffer;
757   GstRtpJitterBufferPrivate *priv;
758   GstPad *other;
759   GstCaps *caps;
760   GstCaps *templ;
761
762   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
763   priv = jitterbuffer->priv;
764
765   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
766
767   caps = gst_pad_peer_query_caps (other, filter);
768
769   templ = gst_pad_get_pad_template_caps (pad);
770   if (caps == NULL) {
771     GST_DEBUG_OBJECT (jitterbuffer, "use template");
772     caps = templ;
773   } else {
774     GstCaps *intersect;
775
776     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
777
778     intersect = gst_caps_intersect (caps, templ);
779     gst_caps_unref (caps);
780     gst_caps_unref (templ);
781
782     caps = intersect;
783   }
784   gst_object_unref (jitterbuffer);
785
786   return caps;
787 }
788
789 /*
790  * Must be called with JBUF_LOCK held
791  */
792
793 static gboolean
794 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
795     GstCaps * caps)
796 {
797   GstRtpJitterBufferPrivate *priv;
798   GstStructure *caps_struct;
799   guint val;
800   GstClockTime tval;
801
802   priv = jitterbuffer->priv;
803
804   /* first parse the caps */
805   caps_struct = gst_caps_get_structure (caps, 0);
806
807   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
808
809   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
810    * measure the amount of data in the buffer */
811   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
812     goto error;
813
814   if (priv->clock_rate <= 0)
815     goto wrong_rate;
816
817   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
818
819   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
820    * can use this to track the amount of time elapsed on the sender. */
821   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
822     priv->clock_base = val;
823   else
824     priv->clock_base = -1;
825
826   priv->ext_timestamp = priv->clock_base;
827
828   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
829       priv->clock_base);
830
831   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
832     /* first expected seqnum, only update when we didn't have a previous base. */
833     if (priv->next_in_seqnum == -1)
834       priv->next_in_seqnum = val;
835     if (priv->next_seqnum == -1)
836       priv->next_seqnum = val;
837   }
838
839   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
840
841   /* the start and stop times. The seqnum-base corresponds to the start time. We
842    * will keep track of the seqnums on the output and when we reach the one
843    * corresponding to npt-stop, we emit the npt-stop-reached signal */
844   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
845     priv->npt_start = tval;
846   else
847     priv->npt_start = 0;
848
849   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
850     priv->npt_stop = tval;
851   else
852     priv->npt_stop = -1;
853
854   GST_DEBUG_OBJECT (jitterbuffer,
855       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
856       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
857
858   return TRUE;
859
860   /* ERRORS */
861 error:
862   {
863     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
864     return FALSE;
865   }
866 wrong_rate:
867   {
868     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
869     return FALSE;
870   }
871 }
872
873 static void
874 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
875 {
876   GstRtpJitterBufferPrivate *priv;
877
878   priv = jitterbuffer->priv;
879
880   JBUF_LOCK (priv);
881   /* mark ourselves as flushing */
882   priv->srcresult = GST_FLOW_FLUSHING;
883   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
884   /* this unblocks any waiting pops on the src pad task */
885   JBUF_SIGNAL (priv);
886   /* unlock clock, we just unschedule, the entry will be released by the
887    * locking streaming thread. */
888   unschedule_current_timer (jitterbuffer);
889   JBUF_UNLOCK (priv);
890 }
891
892 static void
893 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
894 {
895   GstRtpJitterBufferPrivate *priv;
896
897   priv = jitterbuffer->priv;
898
899   JBUF_LOCK (priv);
900   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
901   /* Mark as non flushing */
902   priv->srcresult = GST_FLOW_OK;
903   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
904   priv->last_popped_seqnum = -1;
905   priv->last_out_time = -1;
906   priv->last_out_dts = -1;
907   priv->last_out_pts = -1;
908   priv->next_seqnum = -1;
909   priv->last_in_rtptime = -1;
910   priv->last_in_dts = GST_CLOCK_TIME_NONE;
911   priv->packet_spacing = 0;
912   priv->next_in_seqnum = -1;
913   priv->clock_rate = -1;
914   priv->eos = FALSE;
915   priv->estimated_eos = -1;
916   priv->last_elapsed = 0;
917   priv->ext_timestamp = -1;
918   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
919   rtp_jitter_buffer_flush (priv->jbuf);
920   rtp_jitter_buffer_reset_skew (priv->jbuf);
921   remove_all_timers (jitterbuffer);
922   JBUF_UNLOCK (priv);
923 }
924
925 static gboolean
926 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
927     GstPadMode mode, gboolean active)
928 {
929   gboolean result;
930   GstRtpJitterBuffer *jitterbuffer = NULL;
931
932   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
933
934   switch (mode) {
935     case GST_PAD_MODE_PUSH:
936       if (active) {
937         /* allow data processing */
938         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
939
940         /* start pushing out buffers */
941         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
942         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
943             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
944       } else {
945         /* make sure all data processing stops ASAP */
946         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
947
948         /* NOTE this will hardlock if the state change is called from the src pad
949          * task thread because we will _join() the thread. */
950         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
951         result = gst_pad_stop_task (pad);
952       }
953       break;
954     default:
955       result = FALSE;
956       break;
957   }
958   return result;
959 }
960
961 static GstStateChangeReturn
962 gst_rtp_jitter_buffer_change_state (GstElement * element,
963     GstStateChange transition)
964 {
965   GstRtpJitterBuffer *jitterbuffer;
966   GstRtpJitterBufferPrivate *priv;
967   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
968
969   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
970   priv = jitterbuffer->priv;
971
972   switch (transition) {
973     case GST_STATE_CHANGE_NULL_TO_READY:
974       break;
975     case GST_STATE_CHANGE_READY_TO_PAUSED:
976       JBUF_LOCK (priv);
977       /* reset negotiated values */
978       priv->clock_rate = -1;
979       priv->clock_base = -1;
980       priv->peer_latency = 0;
981       priv->last_pt = -1;
982       /* block until we go to PLAYING */
983       priv->blocked = TRUE;
984       JBUF_UNLOCK (priv);
985       break;
986     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
987       JBUF_LOCK (priv);
988       /* unblock to allow streaming in PLAYING */
989       priv->blocked = FALSE;
990       JBUF_SIGNAL (priv);
991       JBUF_UNLOCK (priv);
992       break;
993     default:
994       break;
995   }
996
997   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
998
999   switch (transition) {
1000     case GST_STATE_CHANGE_READY_TO_PAUSED:
1001       /* we are a live element because we sync to the clock, which we can only
1002        * do in the PLAYING state */
1003       if (ret != GST_STATE_CHANGE_FAILURE)
1004         ret = GST_STATE_CHANGE_NO_PREROLL;
1005       break;
1006     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1007       JBUF_LOCK (priv);
1008       /* block to stop streaming when PAUSED */
1009       priv->blocked = TRUE;
1010       JBUF_UNLOCK (priv);
1011       if (ret != GST_STATE_CHANGE_FAILURE)
1012         ret = GST_STATE_CHANGE_NO_PREROLL;
1013       break;
1014     case GST_STATE_CHANGE_PAUSED_TO_READY:
1015       gst_buffer_replace (&priv->last_sr, NULL);
1016       break;
1017     case GST_STATE_CHANGE_READY_TO_NULL:
1018       break;
1019     default:
1020       break;
1021   }
1022
1023   return ret;
1024 }
1025
1026 static gboolean
1027 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1028     GstEvent * event)
1029 {
1030   gboolean ret = TRUE;
1031   GstRtpJitterBuffer *jitterbuffer;
1032   GstRtpJitterBufferPrivate *priv;
1033
1034   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1035   priv = jitterbuffer->priv;
1036
1037   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1038
1039   switch (GST_EVENT_TYPE (event)) {
1040     case GST_EVENT_LATENCY:
1041     {
1042       GstClockTime latency;
1043
1044       gst_event_parse_latency (event, &latency);
1045
1046       GST_DEBUG_OBJECT (jitterbuffer,
1047           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1048
1049       JBUF_LOCK (priv);
1050       /* adjust the overall buffer delay to the total pipeline latency in
1051        * buffering mode because if downstream consumes too fast (because of
1052        * large latency or queues, we would start rebuffering again. */
1053       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1054           RTP_JITTER_BUFFER_MODE_BUFFER) {
1055         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1056       }
1057       JBUF_UNLOCK (priv);
1058
1059       ret = gst_pad_push_event (priv->sinkpad, event);
1060       break;
1061     }
1062     default:
1063       ret = gst_pad_push_event (priv->sinkpad, event);
1064       break;
1065   }
1066
1067   return ret;
1068 }
1069
1070 static gboolean
1071 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1072     GstEvent * event)
1073 {
1074   gboolean ret = TRUE;
1075   GstRtpJitterBuffer *jitterbuffer;
1076   GstRtpJitterBufferPrivate *priv;
1077
1078   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1079   priv = jitterbuffer->priv;
1080
1081   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1082
1083   switch (GST_EVENT_TYPE (event)) {
1084     case GST_EVENT_CAPS:
1085     {
1086       GstCaps *caps;
1087
1088       gst_event_parse_caps (event, &caps);
1089
1090       JBUF_LOCK (priv);
1091       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1092       JBUF_UNLOCK (priv);
1093
1094       /* set same caps on srcpad on success */
1095       if (ret)
1096         ret = gst_pad_push_event (priv->srcpad, event);
1097       else
1098         gst_event_unref (event);
1099       break;
1100     }
1101     case GST_EVENT_SEGMENT:
1102     {
1103       gst_event_copy_segment (event, &priv->segment);
1104
1105       /* we need time for now */
1106       if (priv->segment.format != GST_FORMAT_TIME)
1107         goto newseg_wrong_format;
1108
1109       GST_DEBUG_OBJECT (jitterbuffer,
1110           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1111
1112       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1113       ret = gst_pad_push_event (priv->srcpad, event);
1114       break;
1115     }
1116     case GST_EVENT_FLUSH_START:
1117       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1118       ret = gst_pad_push_event (priv->srcpad, event);
1119       break;
1120     case GST_EVENT_FLUSH_STOP:
1121       ret = gst_pad_push_event (priv->srcpad, event);
1122       ret =
1123           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1124           GST_PAD_MODE_PUSH, TRUE);
1125       break;
1126     case GST_EVENT_EOS:
1127     {
1128       /* push EOS in queue. We always push it at the head */
1129       JBUF_LOCK (priv);
1130       /* check for flushing, we need to discard the event and return FALSE when
1131        * we are flushing */
1132       ret = priv->srcresult == GST_FLOW_OK;
1133       if (ret && !priv->eos) {
1134         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1135         priv->eos = TRUE;
1136         JBUF_SIGNAL (priv);
1137       } else if (priv->eos) {
1138         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1139       } else {
1140         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1141             gst_flow_get_name (priv->srcresult));
1142       }
1143       JBUF_UNLOCK (priv);
1144       gst_event_unref (event);
1145       break;
1146     }
1147     default:
1148       ret = gst_pad_push_event (priv->srcpad, event);
1149       break;
1150   }
1151
1152 done:
1153
1154   return ret;
1155
1156   /* ERRORS */
1157 newseg_wrong_format:
1158   {
1159     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1160     ret = FALSE;
1161     gst_event_unref (event);
1162     goto done;
1163   }
1164 }
1165
1166 static gboolean
1167 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1168     GstEvent * event)
1169 {
1170   gboolean ret = TRUE;
1171   GstRtpJitterBuffer *jitterbuffer;
1172
1173   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1174
1175   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1176
1177   switch (GST_EVENT_TYPE (event)) {
1178     case GST_EVENT_FLUSH_START:
1179       gst_event_unref (event);
1180       break;
1181     case GST_EVENT_FLUSH_STOP:
1182       gst_event_unref (event);
1183       break;
1184     default:
1185       ret = gst_pad_event_default (pad, parent, event);
1186       break;
1187   }
1188
1189   return ret;
1190 }
1191
1192 /*
1193  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1194  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1195  * GST_FLOW_FLUSHING when the element is shutting down. On success
1196  * GST_FLOW_OK is returned.
1197  */
1198 static GstFlowReturn
1199 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1200     guint8 pt)
1201 {
1202   GValue ret = { 0 };
1203   GValue args[2] = { {0}, {0} };
1204   GstCaps *caps;
1205   gboolean res;
1206
1207   g_value_init (&args[0], GST_TYPE_ELEMENT);
1208   g_value_set_object (&args[0], jitterbuffer);
1209   g_value_init (&args[1], G_TYPE_UINT);
1210   g_value_set_uint (&args[1], pt);
1211
1212   g_value_init (&ret, GST_TYPE_CAPS);
1213   g_value_set_boxed (&ret, NULL);
1214
1215   JBUF_UNLOCK (jitterbuffer->priv);
1216   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1217       &ret);
1218   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1219
1220   g_value_unset (&args[0]);
1221   g_value_unset (&args[1]);
1222   caps = (GstCaps *) g_value_dup_boxed (&ret);
1223   g_value_unset (&ret);
1224   if (!caps)
1225     goto no_caps;
1226
1227   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1228   gst_caps_unref (caps);
1229
1230   if (G_UNLIKELY (!res))
1231     goto parse_failed;
1232
1233   return GST_FLOW_OK;
1234
1235   /* ERRORS */
1236 no_caps:
1237   {
1238     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1239     return GST_FLOW_ERROR;
1240   }
1241 out_flushing:
1242   {
1243     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1244     return GST_FLOW_FLUSHING;
1245   }
1246 parse_failed:
1247   {
1248     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1249     return GST_FLOW_ERROR;
1250   }
1251 }
1252
1253 /* call with jbuf lock held */
1254 static void
1255 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1256 {
1257   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1258
1259   /* too short a stream, or too close to EOS will never really fill buffer */
1260   if (*percent != -1 && priv->npt_stop != -1 &&
1261       priv->npt_stop - priv->npt_start <=
1262       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1263     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1264     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1265     *percent = 100;
1266   }
1267 }
1268
1269 static void
1270 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1271 {
1272   GstMessage *message;
1273
1274   /* Post a buffering message */
1275   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1276   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1277
1278   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1279 }
1280
1281 static TimerData *
1282 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1283 {
1284   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1285   TimerData *timer = NULL;
1286   gint i, len;
1287
1288   len = priv->timers->len;
1289   for (i = 0; i < len; i++) {
1290     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1291     if (test->seqnum == seqnum && test->type == type) {
1292       timer = test;
1293       break;
1294     }
1295   }
1296   return timer;
1297 }
1298
1299 static void
1300 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1301 {
1302   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1303
1304   if (priv->clock_id) {
1305     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1306     gst_clock_id_unschedule (priv->clock_id);
1307     priv->unscheduled = TRUE;
1308   }
1309 }
1310
1311 static void
1312 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1313 {
1314   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1315
1316   if (priv->clock_id && (timer->timeout == -1
1317           || priv->timer_timeout > timer->timeout))
1318     unschedule_current_timer (jitterbuffer);
1319 }
1320
1321 static TimerData *
1322 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1323     guint16 seqnum, GstClockTime timeout)
1324 {
1325   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1326   TimerData *timer;
1327   gint len;
1328
1329   GST_DEBUG_OBJECT (jitterbuffer,
1330       "add timer for seqnum %d to %" GST_TIME_FORMAT,
1331       seqnum, GST_TIME_ARGS (timeout));
1332
1333   len = priv->timers->len;
1334   g_array_set_size (priv->timers, len + 1);
1335   timer = &g_array_index (priv->timers, TimerData, len);
1336   timer->idx = len;
1337   timer->type = type;
1338   timer->seqnum = seqnum;
1339   timer->timeout = timeout;
1340
1341   recalculate_timer (jitterbuffer, timer);
1342
1343   return timer;
1344 }
1345
1346 static void
1347 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1348     guint16 seqnum, GstClockTime timeout)
1349 {
1350   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1351   gboolean seqchange, timechange;
1352   guint16 oldseq;
1353
1354   seqchange = timer->seqnum != seqnum;
1355   timechange = timer->timeout != timeout;
1356
1357   if (!seqchange && !timechange)
1358     return;
1359
1360   oldseq = timer->seqnum;
1361
1362   GST_DEBUG_OBJECT (jitterbuffer,
1363       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1364       oldseq, seqnum, GST_TIME_ARGS (timeout));
1365
1366   timer->timeout = timeout;
1367   timer->seqnum = seqnum;
1368
1369   if (priv->clock_id) {
1370     /* we changed the seqnum and there is a timer currently waiting with this
1371      * seqnum, unschedule it */
1372     if (seqchange && priv->timer_seqnum == oldseq)
1373       unschedule_current_timer (jitterbuffer);
1374     /* we changed the time, check if it is earlier than what we are waiting
1375      * for and unschedule if so */
1376     else if (timechange)
1377       recalculate_timer (jitterbuffer, timer);
1378   }
1379 }
1380
1381 static TimerData *
1382 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1383     guint16 seqnum, GstClockTime timeout)
1384 {
1385   TimerData *timer;
1386
1387   /* find the seqnum timer */
1388   timer = find_timer (jitterbuffer, type, seqnum);
1389   if (timer == NULL) {
1390     timer = add_timer (jitterbuffer, type, seqnum, timeout);
1391   } else {
1392     reschedule_timer (jitterbuffer, timer, seqnum, timeout);
1393   }
1394   return timer;
1395 }
1396
1397 static void
1398 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1399 {
1400   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1401   guint idx;
1402
1403   if (timer == NULL)
1404     return;
1405
1406   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1407     unschedule_current_timer (jitterbuffer);
1408
1409   idx = timer->idx;
1410   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1411   g_array_remove_index_fast (priv->timers, idx);
1412   timer->idx = idx;
1413 }
1414
1415 static void
1416 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1417 {
1418   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1419   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1420   g_array_set_size (priv->timers, 0);
1421   unschedule_current_timer (jitterbuffer);
1422 }
1423
1424
1425 static GstFlowReturn
1426 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1427     GstBuffer * buffer)
1428 {
1429   GstRtpJitterBuffer *jitterbuffer;
1430   GstRtpJitterBufferPrivate *priv;
1431   guint16 seqnum;
1432   guint32 rtptime;
1433   GstFlowReturn ret = GST_FLOW_OK;
1434   GstClockTime dts, pts;
1435   guint64 latency_ts;
1436   gboolean tail;
1437   gint percent = -1;
1438   guint8 pt;
1439   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1440   TimerData *timer;
1441
1442   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1443
1444   priv = jitterbuffer->priv;
1445
1446   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1447     goto invalid_buffer;
1448
1449   pt = gst_rtp_buffer_get_payload_type (&rtp);
1450   seqnum = gst_rtp_buffer_get_seq (&rtp);
1451   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1452   gst_rtp_buffer_unmap (&rtp);
1453
1454   /* make sure we have PTS and DTS set */
1455   pts = GST_BUFFER_PTS (buffer);
1456   dts = GST_BUFFER_DTS (buffer);
1457   if (dts == -1)
1458     dts = pts;
1459   else if (pts == -1)
1460     pts = dts;
1461
1462   /* take the DTS of the buffer. This is the time when the packet was
1463    * received and is used to calculate jitter and clock skew. We will adjust
1464    * this PTS with the smoothed value after processing it in the
1465    * jitterbuffer and assign it as the PTS. */
1466   /* bring to running time */
1467   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1468
1469   GST_DEBUG_OBJECT (jitterbuffer,
1470       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1471       GST_TIME_ARGS (dts));
1472
1473   JBUF_LOCK_CHECK (priv, out_flushing);
1474
1475   if (G_UNLIKELY (priv->last_pt != pt)) {
1476     GstCaps *caps;
1477
1478     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1479         pt);
1480
1481     priv->last_pt = pt;
1482     /* reset clock-rate so that we get a new one */
1483     priv->clock_rate = -1;
1484
1485     /* Try to get the clock-rate from the caps first if we can. If there are no
1486      * caps we must fire the signal to get the clock-rate. */
1487     if ((caps = gst_pad_get_current_caps (pad))) {
1488       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1489       gst_caps_unref (caps);
1490     }
1491   }
1492
1493   if (G_UNLIKELY (priv->clock_rate == -1)) {
1494     /* no clock rate given on the caps, try to get one with the signal */
1495     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1496             pt) == GST_FLOW_FLUSHING)
1497       goto out_flushing;
1498
1499     if (G_UNLIKELY (priv->clock_rate == -1))
1500       goto no_clock_rate;
1501   }
1502
1503   /* don't accept more data on EOS */
1504   if (G_UNLIKELY (priv->eos))
1505     goto have_eos;
1506
1507   /* now check against our expected seqnum */
1508   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1509     gint gap;
1510
1511     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1512     if (G_UNLIKELY (gap != 0)) {
1513       gboolean reset = FALSE;
1514
1515       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1516           priv->next_in_seqnum, seqnum, gap);
1517       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1518        * sender might have been restarted with different seqnum. */
1519       if (gap < -RTP_MAX_MISORDER) {
1520         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1521         reset = TRUE;
1522       }
1523       /* priv->next_in_seqnum < seqnum, this is a new packet */
1524       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1525         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1526             gap);
1527         reset = TRUE;
1528       } else {
1529         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1530       }
1531       if (G_UNLIKELY (reset)) {
1532         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1533         rtp_jitter_buffer_flush (priv->jbuf);
1534         rtp_jitter_buffer_reset_skew (priv->jbuf);
1535         remove_all_timers (jitterbuffer);
1536         priv->last_popped_seqnum = -1;
1537         priv->next_seqnum = seqnum;
1538       }
1539       /* reset spacing estimation when gap */
1540       priv->last_in_rtptime = -1;
1541       priv->last_in_dts = -1;
1542     } else {
1543       /* packet is expected, we need consecutive seqnums with a different
1544        * rtptime to estimate the packet spacing. */
1545       if (priv->last_in_rtptime != rtptime) {
1546         /* rtptime changed, check dts diff */
1547         if (priv->last_in_dts != -1 && dts != -1 && dts > priv->last_in_dts) {
1548           priv->packet_spacing = dts - priv->last_in_dts;
1549           GST_DEBUG_OBJECT (jitterbuffer,
1550               "new packet spacing %" GST_TIME_FORMAT,
1551               GST_TIME_ARGS (priv->packet_spacing));
1552         }
1553         priv->last_in_rtptime = rtptime;
1554         priv->last_in_dts = dts;
1555       }
1556     }
1557   }
1558   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1559
1560   /* find the timer for the current seqnum. */
1561   timer = find_timer (jitterbuffer, TIMER_TYPE_EXPECTED, seqnum);
1562   if (priv->packet_spacing > 0) {
1563     GstClockTime expected;
1564
1565     /* calculate expected arrival time of the next seqnum */
1566     expected = dts + priv->packet_spacing + 20 * GST_MSECOND;
1567     /* and update/install timer for next seqnum */
1568     if (timer)
1569       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected);
1570     else
1571       timer = add_timer (jitterbuffer, TIMER_TYPE_EXPECTED,
1572           priv->next_in_seqnum, expected);
1573   } else {
1574     remove_timer (jitterbuffer, timer);
1575   }
1576
1577   /* let's check if this buffer is too late, we can only accept packets with
1578    * bigger seqnum than the one we last pushed. */
1579   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1580     gint gap;
1581
1582     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1583
1584     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1585     if (G_UNLIKELY (gap <= 0))
1586       goto too_late;
1587   }
1588
1589   /* let's drop oldest packet if the queue is already full and drop-on-latency
1590    * is set. We can only do this when there actually is a latency. When no
1591    * latency is set, we just pump it in the queue and let the other end push it
1592    * out as fast as possible. */
1593   if (priv->latency_ms && priv->drop_on_latency) {
1594     latency_ts =
1595         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1596
1597     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1598       GstBuffer *old_buf;
1599
1600       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1601
1602       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1603           old_buf);
1604
1605       gst_buffer_unref (old_buf);
1606     }
1607   }
1608
1609   /* we need to make the metadata writable before pushing it in the jitterbuffer
1610    * because the jitterbuffer will update the PTS */
1611   buffer = gst_buffer_make_writable (buffer);
1612   GST_BUFFER_DTS (buffer) = dts;
1613   GST_BUFFER_PTS (buffer) = pts;
1614
1615   /* now insert the packet into the queue in sorted order. This function returns
1616    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1617    * have a duplicate. */
1618   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
1619               priv->clock_rate, &tail, &percent)))
1620     goto duplicate;
1621
1622   /* we had an unhandled SR, handle it now */
1623   if (priv->last_sr)
1624     do_handle_sync (jitterbuffer);
1625
1626   /* signal addition of new buffer when the _loop is waiting. */
1627   if (priv->waiting && priv->active)
1628     JBUF_SIGNAL (priv);
1629
1630   /* let's unschedule and unblock any waiting buffers. We only want to do this
1631    * when the tail buffer changed */
1632   if (G_UNLIKELY (priv->clock_id && tail)) {
1633     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
1634     unschedule_current_timer (jitterbuffer);
1635   }
1636
1637   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1638       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1639
1640   check_buffering_percent (jitterbuffer, &percent);
1641
1642 finished:
1643   JBUF_UNLOCK (priv);
1644
1645   if (percent != -1)
1646     post_buffering_percent (jitterbuffer, percent);
1647
1648   return ret;
1649
1650   /* ERRORS */
1651 invalid_buffer:
1652   {
1653     /* this is not fatal but should be filtered earlier */
1654     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1655         ("Received invalid RTP payload, dropping"));
1656     gst_buffer_unref (buffer);
1657     return GST_FLOW_OK;
1658   }
1659 no_clock_rate:
1660   {
1661     GST_WARNING_OBJECT (jitterbuffer,
1662         "No clock-rate in caps!, dropping buffer");
1663     gst_buffer_unref (buffer);
1664     goto finished;
1665   }
1666 out_flushing:
1667   {
1668     ret = priv->srcresult;
1669     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1670     gst_buffer_unref (buffer);
1671     goto finished;
1672   }
1673 have_eos:
1674   {
1675     ret = GST_FLOW_EOS;
1676     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1677     gst_buffer_unref (buffer);
1678     goto finished;
1679   }
1680 too_late:
1681   {
1682     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1683         " popped, dropping", seqnum, priv->last_popped_seqnum);
1684     priv->num_late++;
1685     gst_buffer_unref (buffer);
1686     goto finished;
1687   }
1688 duplicate:
1689   {
1690     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1691         seqnum);
1692     priv->num_duplicates++;
1693     gst_buffer_unref (buffer);
1694     goto finished;
1695   }
1696 }
1697
1698 static GstClockTime
1699 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1700 {
1701   GstRtpJitterBufferPrivate *priv;
1702
1703   priv = jitterbuffer->priv;
1704
1705   if (timestamp == -1)
1706     return -1;
1707
1708   /* apply the timestamp offset, this is used for inter stream sync */
1709   timestamp += priv->ts_offset;
1710   /* add the offset, this is used when buffering */
1711   timestamp += priv->out_offset;
1712
1713   return timestamp;
1714 }
1715
1716 static GstClockTime
1717 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1718 {
1719   guint64 ext_time, elapsed;
1720   guint32 rtp_time;
1721   GstRtpJitterBufferPrivate *priv;
1722   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1723
1724   priv = jitterbuffer->priv;
1725   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1726   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
1727   gst_rtp_buffer_unmap (&rtp);
1728
1729   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1730       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1731
1732   if (rtp_time < priv->ext_timestamp) {
1733     ext_time = priv->ext_timestamp;
1734   } else {
1735     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1736   }
1737
1738   if (ext_time > priv->clock_base)
1739     elapsed = ext_time - priv->clock_base;
1740   else
1741     elapsed = 0;
1742
1743   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1744   return elapsed;
1745 }
1746
1747 static void
1748 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1749 {
1750   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1751
1752   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1753       && priv->clock_base != -1 && priv->clock_rate > 0) {
1754     guint64 elapsed, estimated;
1755
1756     elapsed = compute_elapsed (jitterbuffer, outbuf);
1757
1758     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1759       guint64 left;
1760       GstClockTime out_time;
1761
1762       priv->last_elapsed = elapsed;
1763
1764       left = priv->npt_stop - priv->npt_start;
1765       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1766           GST_TIME_ARGS (left));
1767
1768       out_time = GST_BUFFER_DTS (outbuf);
1769
1770       if (elapsed > 0)
1771         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1772       else {
1773         /* if there is almost nothing left,
1774          * we may never advance enough to end up in the above case */
1775         if (left < GST_SECOND)
1776           estimated = GST_SECOND;
1777         else
1778           estimated = -1;
1779       }
1780
1781       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1782           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1783
1784       if (estimated != -1 && priv->estimated_eos != estimated) {
1785         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
1786         priv->estimated_eos = estimated;
1787       }
1788     }
1789   }
1790 }
1791
1792 /* take a buffer from the queue and push it */
1793 static GstFlowReturn
1794 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
1795 {
1796   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1797   GstFlowReturn result;
1798   GstBuffer *outbuf;
1799   GstClockTime dts, pts;
1800   gint percent = -1;
1801
1802   /* when we get here we are ready to pop and push the buffer */
1803   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1804
1805   check_buffering_percent (jitterbuffer, &percent);
1806
1807   if (G_UNLIKELY (priv->discont)) {
1808     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1809      * into the jitterbuffer so we can modify now. */
1810     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1811     priv->discont = FALSE;
1812   }
1813   if (G_UNLIKELY (priv->ts_discont)) {
1814     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
1815     priv->ts_discont = FALSE;
1816   }
1817
1818   dts = GST_BUFFER_DTS (outbuf);
1819   pts = GST_BUFFER_PTS (outbuf);
1820
1821   /* apply timestamp with offset to buffer now */
1822   GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
1823   GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
1824
1825   /* update the elapsed time when we need to check against the npt stop time. */
1826   update_estimated_eos (jitterbuffer, outbuf);
1827
1828   /* now we are ready to push the buffer. Save the seqnum and release the lock
1829    * so the other end can push stuff in the queue again. */
1830   priv->last_popped_seqnum = seqnum;
1831   priv->last_out_time = GST_BUFFER_PTS (outbuf);
1832   priv->last_out_dts = dts;
1833   priv->last_out_pts = pts;
1834   priv->next_seqnum = (seqnum + 1) & 0xffff;
1835   JBUF_UNLOCK (priv);
1836
1837   if (percent != -1)
1838     post_buffering_percent (jitterbuffer, percent);
1839
1840   /* push buffer */
1841   GST_DEBUG_OBJECT (jitterbuffer,
1842       "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
1843       seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
1844       GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
1845
1846   result = gst_pad_push (priv->srcpad, outbuf);
1847
1848   JBUF_LOCK_CHECK (priv, out_flushing);
1849
1850   return result;
1851
1852   /* ERRORS */
1853 out_flushing:
1854   {
1855     return priv->srcresult;
1856   }
1857 }
1858
1859 static GstClockTime
1860 estimate_dts (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts, gint gap)
1861 {
1862   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1863   GstClockTime duration;
1864
1865   if (dts == -1 || priv->last_out_dts == -1)
1866     return dts;
1867
1868   GST_DEBUG_OBJECT (jitterbuffer,
1869       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1870       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_out_dts));
1871
1872   /* interpolate between the current time and the last time based on
1873    * number of packets we are missing, this is the estimated duration
1874    * for the missing packet based on equidistant packet spacing. Also make
1875    * sure we never go negative. */
1876   if (dts >= priv->last_out_dts)
1877     duration = (dts - priv->last_out_dts) / (gap + 1);
1878   else
1879     /* packet already lost, timer will timeout quickly */
1880     duration = 0;
1881
1882   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1883       GST_TIME_ARGS (duration));
1884
1885   /* add this duration to the timestamp of the last packet we pushed */
1886   dts = (priv->last_out_dts + duration);
1887
1888   return dts;
1889 }
1890
1891 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
1892
1893 /* Peek a buffer and compare the seqnum to the expected seqnum.
1894  * If all is fine, the buffer is pushed.
1895  * If something is wrong, a timeout is set. We set 2 kinds of timeouts:
1896  *   * deadline: to the ultimate time we can still push the packet. We
1897  *     do this for the first packet to make sure we have the previous
1898  *     packets.
1899  *   * lost: the ultimate time we can receive a packet before we have
1900  *     to consider it lost. We estimate this based on the packet
1901  *     spacing.
1902  */
1903 static GstFlowReturn
1904 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
1905 {
1906   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1907   GstFlowReturn result = GST_FLOW_OK;
1908   GstBuffer *outbuf;
1909   guint16 seqnum;
1910   GstClockTime dts;
1911   guint32 next_seqnum;
1912   gint gap;
1913   GstRTPBuffer rtp = { NULL, };
1914
1915   /* only push buffers when PLAYING and active and not buffering */
1916   if (priv->blocked || !priv->active ||
1917       rtp_jitter_buffer_is_buffering (priv->jbuf))
1918     return GST_FLOW_WAIT;
1919
1920 again:
1921   /* peek a buffer, we're just looking at the sequence number.
1922    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1923    * wait on the DTS. In the chain function we will unlock the wait when a
1924    * new buffer is available. The peeked buffer is valid for as long as we hold
1925    * the jitterbuffer lock. */
1926   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1927   if (outbuf == NULL)
1928     goto wait;
1929
1930   /* get the seqnum and the next expected seqnum */
1931   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1932   seqnum = gst_rtp_buffer_get_seq (&rtp);
1933   gst_rtp_buffer_unmap (&rtp);
1934
1935   next_seqnum = priv->next_seqnum;
1936
1937   dts = GST_BUFFER_DTS (outbuf);
1938
1939   /* get the gap between this and the previous packet. If we don't know the
1940    * previous packet seqnum assume no gap. */
1941   if (G_UNLIKELY (next_seqnum == -1)) {
1942     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
1943     /* we don't know what the next_seqnum should be, wait for the last
1944      * possible moment to push this buffer, maybe we get an earlier seqnum
1945      * while we wait */
1946     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
1947     result = GST_FLOW_WAIT;
1948   } else {
1949     /* else calculate GAP */
1950     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1951
1952     if (G_LIKELY (gap == 0)) {
1953       /* no missing packet, pop and push */
1954       result = pop_and_push_next (jitterbuffer, seqnum);
1955     } else if (G_UNLIKELY (gap < 0)) {
1956       /* if we have a packet that we already pushed or considered dropped, pop it
1957        * off and get the next packet */
1958       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1959           seqnum, next_seqnum);
1960       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
1961       gst_buffer_unref (outbuf);
1962       goto again;
1963     } else {
1964       GST_DEBUG_OBJECT (jitterbuffer,
1965           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1966           next_seqnum, seqnum, gap);
1967       /* packet missing, estimate when we should ultimately push this packet */
1968       dts = estimate_dts (jitterbuffer, dts, gap);
1969       /* and set a timer for it */
1970       set_timer (jitterbuffer, TIMER_TYPE_LOST, next_seqnum, dts);
1971       result = GST_FLOW_WAIT;
1972     }
1973   }
1974   return result;
1975
1976 wait:
1977   {
1978     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
1979     return GST_FLOW_WAIT;
1980   }
1981 }
1982
1983 /* a packet is lost */
1984 static GstFlowReturn
1985 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1986     GstClockTimeDiff clock_jitter)
1987 {
1988   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1989   GstClockTime duration = GST_CLOCK_TIME_NONE;
1990   guint32 lost_packets = 1;
1991   gboolean lost_packets_late = FALSE;
1992
1993 #if 0
1994   if (clock_jitter > 0
1995       && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
1996     GstClockTimeDiff total_duration;
1997     GstClockTime out_time_diff;
1998
1999     out_time_diff =
2000         apply_offset (jitterbuffer, timer->timeout) - timer->timeout;
2001     total_duration = MIN (out_time_diff, clock_jitter);
2002
2003     if (duration > 0)
2004       lost_packets = total_duration / duration;
2005     else
2006       lost_packets = gap;
2007     total_duration = lost_packets * duration;
2008
2009     GST_DEBUG_OBJECT (jitterbuffer,
2010         "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
2011         ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
2012         GST_TIME_ARGS (clock_jitter),
2013         lost_packets, GST_TIME_ARGS (total_duration));
2014
2015     duration = total_duration;
2016     lost_packets_late = TRUE;
2017   }
2018 #endif
2019
2020   /* we had a gap and thus we lost some packets. Create an event for this.  */
2021   if (lost_packets > 1)
2022     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
2023         timer->seqnum + lost_packets - 1);
2024   else
2025     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
2026
2027   priv->num_late += lost_packets;
2028   priv->discont = TRUE;
2029
2030   /* update our expected next packet */
2031   priv->last_popped_seqnum = timer->seqnum;
2032   priv->last_out_time = apply_offset (jitterbuffer, timer->timeout);
2033   priv->last_out_dts = timer->timeout;
2034   priv->last_out_pts = timer->timeout;
2035   priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
2036   /* remove timer now */
2037   remove_timer (jitterbuffer, timer);
2038
2039   if (priv->do_lost) {
2040     GstEvent *event;
2041
2042     /* create paket lost event */
2043     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2044         gst_structure_new ("GstRTPPacketLost",
2045             "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
2046             "timestamp", G_TYPE_UINT64, priv->last_out_time,
2047             "duration", G_TYPE_UINT64, duration,
2048             "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
2049     JBUF_UNLOCK (priv);
2050     gst_pad_push_event (priv->srcpad, event);
2051     JBUF_LOCK_CHECK (priv, flushing);
2052   }
2053   return GST_FLOW_OK;
2054
2055   /* ERRORS */
2056 flushing:
2057   {
2058     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
2059     return priv->srcresult;
2060   }
2061 }
2062
2063 static GstFlowReturn
2064 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2065 {
2066   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2067   remove_timer (jitterbuffer, timer);
2068
2069   return GST_FLOW_EOS;
2070 }
2071
2072 /* called when we need to wait for the next timeout.
2073  *
2074  * We loop over the array of recorded timeouts and wait for the earliest one.
2075  * When it timed out, do the logic associated with the timer.
2076  *
2077  * If there are no timers, we wait on a gcond until something new happens.
2078  */
2079 static GstFlowReturn
2080 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2081 {
2082   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2083   GstFlowReturn result = GST_FLOW_OK;
2084   gint i, len;
2085   TimerData *timer = NULL;
2086   GstClockTime timer_timeout = -1;
2087   gint timer_idx;
2088
2089   len = priv->timers->len;
2090   for (i = 0; i < len; i++) {
2091     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2092     GstClockTime test_timeout;
2093
2094     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT,
2095         i, test->seqnum, GST_TIME_ARGS (test->timeout));
2096
2097     test_timeout = test->timeout;
2098     if (test_timeout == -1) {
2099       timer = test;
2100       timer_timeout = test_timeout;
2101       break;
2102     }
2103
2104     if (test->type != TIMER_TYPE_EXPECTED) {
2105       /* add our latency and offset to get output times. */
2106       test_timeout = apply_offset (jitterbuffer, test_timeout);
2107       test_timeout += priv->latency_ns;
2108     }
2109
2110     /* find the smallest timeout */
2111     if (timer == NULL || test_timeout < timer_timeout) {
2112       timer = test;
2113       timer_timeout = test_timeout;
2114     }
2115   }
2116   if (timer) {
2117     GstClock *clock;
2118     GstClockTime sync_time;
2119     GstClockID id;
2120     GstClockReturn ret;
2121     GstClockTimeDiff clock_jitter;
2122
2123     /* no timestamp, timeout immeditately */
2124     if (timer_timeout == -1)
2125       goto do_timeout;
2126
2127     GST_OBJECT_LOCK (jitterbuffer);
2128     clock = GST_ELEMENT_CLOCK (jitterbuffer);
2129     if (!clock) {
2130       GST_OBJECT_UNLOCK (jitterbuffer);
2131       /* let's just push if there is no clock */
2132       GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2133       goto do_timeout;
2134     }
2135
2136     /* prepare for sync against clock */
2137     sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2138     /* add latency of peer to get input time */
2139     sync_time += priv->peer_latency;
2140
2141     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2142         " with sync time %" GST_TIME_FORMAT,
2143         GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2144
2145     /* create an entry for the clock */
2146     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2147     priv->unscheduled = FALSE;
2148     priv->timer_timeout = timer_timeout;
2149     priv->timer_seqnum = timer->seqnum;
2150     timer_idx = timer->idx;
2151     GST_OBJECT_UNLOCK (jitterbuffer);
2152
2153     /* release the lock so that the other end can push stuff or unlock */
2154     JBUF_UNLOCK (priv);
2155
2156     ret = gst_clock_id_wait (id, &clock_jitter);
2157
2158     JBUF_LOCK (priv);
2159     GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, %" G_GINT64_FORMAT,
2160         ret, clock_jitter);
2161     /* and free the entry */
2162     gst_clock_id_unref (id);
2163     priv->clock_id = NULL;
2164
2165     /* at this point, the clock could have been unlocked by a timeout, a new
2166      * tail element was added to the queue or because we are shutting down. Check
2167      * for shutdown first. */
2168     if G_UNLIKELY
2169       ((priv->srcresult != GST_FLOW_OK))
2170           goto flushing;
2171
2172     /* we released the lock, the array might have changed */
2173     timer = &g_array_index (priv->timers, TimerData, timer_idx);
2174     /* if changed to timeout immediately, do so */
2175     if (timer->timeout == -1)
2176       goto do_timeout;
2177
2178     /* if we got unscheduled and we are not flushing, it's because a new tail
2179      * element became available in the queue or we flushed the queue.
2180      * Grab it and try to push or sync. */
2181     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
2182       GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
2183       goto done;
2184     }
2185
2186   do_timeout:
2187     switch (timer->type) {
2188       case TIMER_TYPE_EXPECTED:
2189         GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive",
2190             timer->seqnum);
2191         remove_timer (jitterbuffer, timer);
2192         break;
2193       case TIMER_TYPE_LOST:
2194         result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
2195         break;
2196       case TIMER_TYPE_DEADLINE:
2197         priv->next_seqnum = timer->seqnum;
2198         remove_timer (jitterbuffer, timer);
2199         break;
2200       case TIMER_TYPE_EOS:
2201         result = do_eos_timeout (jitterbuffer, timer);
2202         break;
2203     }
2204   } else {
2205     /* no timers, wait for activity */
2206     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2207     priv->waiting = TRUE;
2208     JBUF_WAIT_CHECK (priv, flushing);
2209     priv->waiting = FALSE;
2210     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2211   }
2212
2213 done:
2214   return result;
2215
2216 flushing:
2217   {
2218     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
2219     return priv->srcresult;
2220   }
2221 }
2222
2223 /*
2224  * This funcion implements the main pushing loop on the source pad.
2225  *
2226  * It first tries to push as many buffers as possible. If there is a seqnum
2227  * mismatch, a timeout is created and this function goes waiting for the
2228  * next timeout.
2229  */
2230 static void
2231 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2232 {
2233   GstRtpJitterBufferPrivate *priv;
2234   GstFlowReturn result;
2235
2236   priv = jitterbuffer->priv;
2237
2238   JBUF_LOCK_CHECK (priv, flushing);
2239   do {
2240     result = handle_next_buffer (jitterbuffer);
2241     if (G_LIKELY (result == GST_FLOW_WAIT))
2242       /* now wait for the next event */
2243       result = wait_next_timeout (jitterbuffer);
2244   }
2245   while (result == GST_FLOW_OK);
2246   JBUF_UNLOCK (priv);
2247
2248   /* if we get here we need to pause */
2249   goto pause;
2250
2251   /* ERRORS */
2252 flushing:
2253   {
2254     result = priv->srcresult;
2255     JBUF_UNLOCK (priv);
2256     goto pause;
2257   }
2258 pause:
2259   {
2260     const gchar *reason = gst_flow_get_name (result);
2261     GstEvent *event;
2262
2263     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2264     gst_pad_pause_task (priv->srcpad);
2265     if (result == GST_FLOW_EOS) {
2266       event = gst_event_new_eos ();
2267       gst_pad_push_event (priv->srcpad, event);
2268     }
2269     return;
2270   }
2271 }
2272
2273 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2274  * some sanity checks and then emit the handle-sync signal with the parameters.
2275  * This function must be called with the LOCK */
2276 static void
2277 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2278 {
2279   GstRtpJitterBufferPrivate *priv;
2280   guint64 base_rtptime, base_time;
2281   guint32 clock_rate;
2282   guint64 last_rtptime;
2283   guint64 clock_base;
2284   guint64 ext_rtptime, diff;
2285   gboolean drop = FALSE;
2286
2287   priv = jitterbuffer->priv;
2288
2289   /* get the last values from the jitterbuffer */
2290   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2291       &clock_rate, &last_rtptime);
2292
2293   clock_base = priv->clock_base;
2294   ext_rtptime = priv->ext_rtptime;
2295
2296   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2297       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2298       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2299       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2300
2301   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2302     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2303     drop = TRUE;
2304   } else {
2305     /* we can't accept anything that happened before we did the last resync */
2306     if (base_rtptime > ext_rtptime) {
2307       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2308       drop = TRUE;
2309     } else {
2310       /* the SR RTP timestamp must be something close to what we last observed
2311        * in the jitterbuffer */
2312       if (ext_rtptime > last_rtptime) {
2313         /* check how far ahead it is to our RTP timestamps */
2314         diff = ext_rtptime - last_rtptime;
2315         /* if bigger than 1 second, we drop it */
2316         if (diff > clock_rate) {
2317           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2318           /* should drop this, but some RTSP servers end up with bogus
2319            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2320            * so still trigger rptbin sync but invalidate RTCP data
2321            * (sync might use other methods) */
2322           ext_rtptime = -1;
2323         }
2324         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2325             G_GUINT64_FORMAT, last_rtptime, diff);
2326       }
2327     }
2328   }
2329
2330   if (!drop) {
2331     GstStructure *s;
2332
2333     s = gst_structure_new ("application/x-rtp-sync",
2334         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2335         "base-time", G_TYPE_UINT64, base_time,
2336         "clock-rate", G_TYPE_UINT, clock_rate,
2337         "clock-base", G_TYPE_UINT64, clock_base,
2338         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2339         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2340
2341     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2342     gst_buffer_replace (&priv->last_sr, NULL);
2343     JBUF_UNLOCK (priv);
2344     g_signal_emit (jitterbuffer,
2345         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2346     JBUF_LOCK (priv);
2347     gst_structure_free (s);
2348   } else {
2349     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2350   }
2351 }
2352
2353 static GstFlowReturn
2354 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2355     GstBuffer * buffer)
2356 {
2357   GstRtpJitterBuffer *jitterbuffer;
2358   GstRtpJitterBufferPrivate *priv;
2359   GstFlowReturn ret = GST_FLOW_OK;
2360   guint32 ssrc;
2361   GstRTCPPacket packet;
2362   guint64 ext_rtptime;
2363   guint32 rtptime;
2364   GstRTCPBuffer rtcp = { NULL, };
2365
2366   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2367
2368   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2369     goto invalid_buffer;
2370
2371   priv = jitterbuffer->priv;
2372
2373   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2374
2375   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2376     goto empty_buffer;
2377
2378   /* first packet must be SR or RR or else the validate would have failed */
2379   switch (gst_rtcp_packet_get_type (&packet)) {
2380     case GST_RTCP_TYPE_SR:
2381       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2382           NULL, NULL);
2383       break;
2384     default:
2385       goto ignore_buffer;
2386   }
2387   gst_rtcp_buffer_unmap (&rtcp);
2388
2389   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2390
2391   JBUF_LOCK (priv);
2392   /* convert the RTP timestamp to our extended timestamp, using the same offset
2393    * we used in the jitterbuffer */
2394   ext_rtptime = priv->jbuf->ext_rtptime;
2395   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2396
2397   priv->ext_rtptime = ext_rtptime;
2398   gst_buffer_replace (&priv->last_sr, buffer);
2399
2400   do_handle_sync (jitterbuffer);
2401   JBUF_UNLOCK (priv);
2402
2403 done:
2404   gst_buffer_unref (buffer);
2405
2406   return ret;
2407
2408 invalid_buffer:
2409   {
2410     /* this is not fatal but should be filtered earlier */
2411     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2412         ("Received invalid RTCP payload, dropping"));
2413     ret = GST_FLOW_OK;
2414     goto done;
2415   }
2416 empty_buffer:
2417   {
2418     /* this is not fatal but should be filtered earlier */
2419     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2420         ("Received empty RTCP payload, dropping"));
2421     gst_rtcp_buffer_unmap (&rtcp);
2422     ret = GST_FLOW_OK;
2423     goto done;
2424   }
2425 ignore_buffer:
2426   {
2427     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2428     gst_rtcp_buffer_unmap (&rtcp);
2429     ret = GST_FLOW_OK;
2430     goto done;
2431   }
2432 }
2433
2434 static gboolean
2435 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2436     GstQuery * query)
2437 {
2438   gboolean res = FALSE;
2439
2440   switch (GST_QUERY_TYPE (query)) {
2441     case GST_QUERY_CAPS:
2442     {
2443       GstCaps *filter, *caps;
2444
2445       gst_query_parse_caps (query, &filter);
2446       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2447       gst_query_set_caps_result (query, caps);
2448       gst_caps_unref (caps);
2449       res = TRUE;
2450       break;
2451     }
2452     default:
2453       if (GST_QUERY_IS_SERIALIZED (query)) {
2454         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2455         res = FALSE;
2456       } else {
2457         res = gst_pad_query_default (pad, parent, query);
2458       }
2459       break;
2460   }
2461   return res;
2462 }
2463
2464 static gboolean
2465 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2466     GstQuery * query)
2467 {
2468   GstRtpJitterBuffer *jitterbuffer;
2469   GstRtpJitterBufferPrivate *priv;
2470   gboolean res = FALSE;
2471
2472   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2473   priv = jitterbuffer->priv;
2474
2475   switch (GST_QUERY_TYPE (query)) {
2476     case GST_QUERY_LATENCY:
2477     {
2478       /* We need to send the query upstream and add the returned latency to our
2479        * own */
2480       GstClockTime min_latency, max_latency;
2481       gboolean us_live;
2482       GstClockTime our_latency;
2483
2484       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2485         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2486
2487         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2488             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2489             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2490
2491         /* store this so that we can safely sync on the peer buffers. */
2492         JBUF_LOCK (priv);
2493         priv->peer_latency = min_latency;
2494         our_latency = priv->latency_ns;
2495         JBUF_UNLOCK (priv);
2496
2497         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2498             GST_TIME_ARGS (our_latency));
2499
2500         /* we add some latency but can buffer an infinite amount of time */
2501         min_latency += our_latency;
2502         max_latency = -1;
2503
2504         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2505             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2506             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2507
2508         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2509       }
2510       break;
2511     }
2512     case GST_QUERY_POSITION:
2513     {
2514       GstClockTime start, last_out;
2515       GstFormat fmt;
2516
2517       gst_query_parse_position (query, &fmt, NULL);
2518       if (fmt != GST_FORMAT_TIME) {
2519         res = gst_pad_query_default (pad, parent, query);
2520         break;
2521       }
2522
2523       JBUF_LOCK (priv);
2524       start = priv->npt_start;
2525       last_out = priv->last_out_time;
2526       JBUF_UNLOCK (priv);
2527
2528       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2529           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2530           GST_TIME_ARGS (last_out));
2531
2532       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2533         /* bring 0-based outgoing time to stream time */
2534         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2535         res = TRUE;
2536       } else {
2537         res = gst_pad_query_default (pad, parent, query);
2538       }
2539       break;
2540     }
2541     case GST_QUERY_CAPS:
2542     {
2543       GstCaps *filter, *caps;
2544
2545       gst_query_parse_caps (query, &filter);
2546       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2547       gst_query_set_caps_result (query, caps);
2548       gst_caps_unref (caps);
2549       res = TRUE;
2550       break;
2551     }
2552     default:
2553       res = gst_pad_query_default (pad, parent, query);
2554       break;
2555   }
2556
2557   return res;
2558 }
2559
2560 static void
2561 gst_rtp_jitter_buffer_set_property (GObject * object,
2562     guint prop_id, const GValue * value, GParamSpec * pspec)
2563 {
2564   GstRtpJitterBuffer *jitterbuffer;
2565   GstRtpJitterBufferPrivate *priv;
2566
2567   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2568   priv = jitterbuffer->priv;
2569
2570   switch (prop_id) {
2571     case PROP_LATENCY:
2572     {
2573       guint new_latency, old_latency;
2574
2575       new_latency = g_value_get_uint (value);
2576
2577       JBUF_LOCK (priv);
2578       old_latency = priv->latency_ms;
2579       priv->latency_ms = new_latency;
2580       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2581       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2582       JBUF_UNLOCK (priv);
2583
2584       /* post message if latency changed, this will inform the parent pipeline
2585        * that a latency reconfiguration is possible/needed. */
2586       if (new_latency != old_latency) {
2587         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2588             GST_TIME_ARGS (new_latency * GST_MSECOND));
2589
2590         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2591             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2592       }
2593       break;
2594     }
2595     case PROP_DROP_ON_LATENCY:
2596       JBUF_LOCK (priv);
2597       priv->drop_on_latency = g_value_get_boolean (value);
2598       JBUF_UNLOCK (priv);
2599       break;
2600     case PROP_TS_OFFSET:
2601       JBUF_LOCK (priv);
2602       priv->ts_offset = g_value_get_int64 (value);
2603       priv->ts_discont = TRUE;
2604       JBUF_UNLOCK (priv);
2605       break;
2606     case PROP_DO_LOST:
2607       JBUF_LOCK (priv);
2608       priv->do_lost = g_value_get_boolean (value);
2609       JBUF_UNLOCK (priv);
2610       break;
2611     case PROP_MODE:
2612       JBUF_LOCK (priv);
2613       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2614       JBUF_UNLOCK (priv);
2615       break;
2616     default:
2617       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2618       break;
2619   }
2620 }
2621
2622 static void
2623 gst_rtp_jitter_buffer_get_property (GObject * object,
2624     guint prop_id, GValue * value, GParamSpec * pspec)
2625 {
2626   GstRtpJitterBuffer *jitterbuffer;
2627   GstRtpJitterBufferPrivate *priv;
2628
2629   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2630   priv = jitterbuffer->priv;
2631
2632   switch (prop_id) {
2633     case PROP_LATENCY:
2634       JBUF_LOCK (priv);
2635       g_value_set_uint (value, priv->latency_ms);
2636       JBUF_UNLOCK (priv);
2637       break;
2638     case PROP_DROP_ON_LATENCY:
2639       JBUF_LOCK (priv);
2640       g_value_set_boolean (value, priv->drop_on_latency);
2641       JBUF_UNLOCK (priv);
2642       break;
2643     case PROP_TS_OFFSET:
2644       JBUF_LOCK (priv);
2645       g_value_set_int64 (value, priv->ts_offset);
2646       JBUF_UNLOCK (priv);
2647       break;
2648     case PROP_DO_LOST:
2649       JBUF_LOCK (priv);
2650       g_value_set_boolean (value, priv->do_lost);
2651       JBUF_UNLOCK (priv);
2652       break;
2653     case PROP_MODE:
2654       JBUF_LOCK (priv);
2655       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2656       JBUF_UNLOCK (priv);
2657       break;
2658     case PROP_PERCENT:
2659     {
2660       gint percent;
2661
2662       JBUF_LOCK (priv);
2663       if (priv->srcresult != GST_FLOW_OK)
2664         percent = 100;
2665       else
2666         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2667
2668       g_value_set_int (value, percent);
2669       JBUF_UNLOCK (priv);
2670       break;
2671     }
2672     default:
2673       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2674       break;
2675   }
2676 }