upload tizen1.0 source
[framework/multimedia/gst-plugins-good0.10.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* RTPJitterBuffer signals and args */
74 enum
75 {
76   SIGNAL_REQUEST_PT_MAP,
77   SIGNAL_CLEAR_PT_MAP,
78   SIGNAL_HANDLE_SYNC,
79   SIGNAL_ON_NPT_STOP,
80   SIGNAL_SET_ACTIVE,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS      200
85 #define DEFAULT_DROP_ON_LATENCY FALSE
86 #define DEFAULT_TS_OFFSET       0
87 #define DEFAULT_DO_LOST         FALSE
88 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
89 #define DEFAULT_PERCENT         0
90
91 enum
92 {
93   PROP_0,
94   PROP_LATENCY,
95   PROP_DROP_ON_LATENCY,
96   PROP_TS_OFFSET,
97   PROP_DO_LOST,
98   PROP_MODE,
99   PROP_PERCENT,
100   PROP_LAST
101 };
102
103 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
104
105 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
106   JBUF_LOCK (priv);                                   \
107   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
108     goto label;                                       \
109 } G_STMT_END
110
111 #define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
112 #define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
113
114 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
115   JBUF_WAIT(priv);                                    \
116   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
117     goto label;                                       \
118 } G_STMT_END
119
120 #define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
121
122 struct _GstRtpJitterBufferPrivate
123 {
124   GstPad *sinkpad, *srcpad;
125   GstPad *rtcpsinkpad;
126
127   RTPJitterBuffer *jbuf;
128   GMutex *jbuf_lock;
129   GCond *jbuf_cond;
130   gboolean waiting;
131   gboolean discont;
132   gboolean active;
133   guint64 out_offset;
134
135   /* properties */
136   guint latency_ms;
137   guint64 latency_ns;
138   gboolean drop_on_latency;
139   gint64 ts_offset;
140   gboolean do_lost;
141
142   /* the last seqnum we pushed out */
143   guint32 last_popped_seqnum;
144   /* the next expected seqnum we push */
145   guint32 next_seqnum;
146   /* last output time */
147   GstClockTime last_out_time;
148   /* the next expected seqnum we receive */
149   guint32 next_in_seqnum;
150
151   /* start and stop ranges */
152   GstClockTime npt_start;
153   GstClockTime npt_stop;
154   guint64 ext_timestamp;
155   guint64 last_elapsed;
156   guint64 estimated_eos;
157   GstClockID eos_id;
158   gboolean reached_npt_stop;
159
160   /* state */
161   gboolean eos;
162
163   /* clock rate and rtp timestamp offset */
164   gint last_pt;
165   gint32 clock_rate;
166   gint64 clock_base;
167   gint64 prev_ts_offset;
168
169   /* when we are shutting down */
170   GstFlowReturn srcresult;
171   gboolean blocked;
172
173   /* for sync */
174   GstSegment segment;
175   GstClockID clock_id;
176   gboolean unscheduled;
177   /* the latency of the upstream peer, we have to take this into account when
178    * synchronizing the buffers. */
179   GstClockTime peer_latency;
180
181   /* some accounting */
182   guint64 num_late;
183   guint64 num_duplicates;
184 };
185
186 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
187   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
188                                 GstRtpJitterBufferPrivate))
189
190 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
191 GST_STATIC_PAD_TEMPLATE ("sink",
192     GST_PAD_SINK,
193     GST_PAD_ALWAYS,
194     GST_STATIC_CAPS ("application/x-rtp, "
195         "clock-rate = (int) [ 1, 2147483647 ]"
196         /* "payload = (int) , "
197          * "encoding-name = (string) "
198          */ )
199     );
200
201 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
202 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
203     GST_PAD_SINK,
204     GST_PAD_REQUEST,
205     GST_STATIC_CAPS ("application/x-rtcp")
206     );
207
208 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
209 GST_STATIC_PAD_TEMPLATE ("src",
210     GST_PAD_SRC,
211     GST_PAD_ALWAYS,
212     GST_STATIC_CAPS ("application/x-rtp"
213         /* "payload = (int) , "
214          * "clock-rate = (int) , "
215          * "encoding-name = (string) "
216          */ )
217     );
218
219 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
220
221 GST_BOILERPLATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GstElement,
222     GST_TYPE_ELEMENT);
223
224 /* object overrides */
225 static void gst_rtp_jitter_buffer_set_property (GObject * object,
226     guint prop_id, const GValue * value, GParamSpec * pspec);
227 static void gst_rtp_jitter_buffer_get_property (GObject * object,
228     guint prop_id, GValue * value, GParamSpec * pspec);
229 static void gst_rtp_jitter_buffer_finalize (GObject * object);
230
231 /* element overrides */
232 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
233     * element, GstStateChange transition);
234 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
235     GstPadTemplate * templ, const gchar * name);
236 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
237     GstPad * pad);
238 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
239
240 /* pad overrides */
241 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
242 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad);
243
244 /* sinkpad overrides */
245 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
246 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
247     GstEvent * event);
248 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
249     GstBuffer * buffer);
250
251 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
252     GstEvent * event);
253 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
254     GstBuffer * buffer);
255
256 /* srcpad overrides */
257 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
258     GstEvent * event);
259 static gboolean
260 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
261 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
262 static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
263
264 static void
265 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
266 static GstClockTime
267 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
268     gboolean active, guint64 base_time);
269
270 static void
271 gst_rtp_jitter_buffer_base_init (gpointer klass)
272 {
273   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
274
275   gst_element_class_add_pad_template (element_class,
276       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
277   gst_element_class_add_pad_template (element_class,
278       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
279   gst_element_class_add_pad_template (element_class,
280       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
281
282   gst_element_class_set_details_simple (element_class,
283       "RTP packet jitter-buffer", "Filter/Network/RTP",
284       "A buffer that deals with network jitter and other transmission faults",
285       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
286       "Wim Taymans <wim.taymans@gmail.com>");
287 }
288
289 static void
290 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
291 {
292   GObjectClass *gobject_class;
293   GstElementClass *gstelement_class;
294
295   gobject_class = (GObjectClass *) klass;
296   gstelement_class = (GstElementClass *) klass;
297
298   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
299
300   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
301
302   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
303   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
304
305   /**
306    * GstRtpJitterBuffer::latency:
307    *
308    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
309    * for at most this time.
310    */
311   g_object_class_install_property (gobject_class, PROP_LATENCY,
312       g_param_spec_uint ("latency", "Buffer latency in ms",
313           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
314           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
315   /**
316    * GstRtpJitterBuffer::drop-on-latency:
317    *
318    * Drop oldest buffers when the queue is completely filled.
319    */
320   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
321       g_param_spec_boolean ("drop-on-latency",
322           "Drop buffers when maximum latency is reached",
323           "Tells the jitterbuffer to never exceed the given latency in size",
324           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
325   /**
326    * GstRtpJitterBuffer::ts-offset:
327    *
328    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
329    * This is mainly used to ensure interstream synchronisation.
330    */
331   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
332       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
333           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
334           G_MAXINT64, DEFAULT_TS_OFFSET,
335           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
336
337   /**
338    * GstRtpJitterBuffer::do-lost:
339    *
340    * Send out a GstRTPPacketLost event downstream when a packet is considered
341    * lost.
342    */
343   g_object_class_install_property (gobject_class, PROP_DO_LOST,
344       g_param_spec_boolean ("do-lost", "Do Lost",
345           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
346           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
347
348   /**
349    * GstRtpJitterBuffer::mode:
350    *
351    * Control the buffering and timestamping mode used by the jitterbuffer.
352    */
353   g_object_class_install_property (gobject_class, PROP_MODE,
354       g_param_spec_enum ("mode", "Mode",
355           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
356           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357   /**
358    * GstRtpJitterBuffer::percent:
359    *
360    * The percent of the jitterbuffer that is filled.
361    *
362    * Since: 0.10.19
363    */
364   g_object_class_install_property (gobject_class, PROP_PERCENT,
365       g_param_spec_int ("percent", "percent",
366           "The buffer filled percent", 0, 100,
367           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
368   /**
369    * GstRtpJitterBuffer::request-pt-map:
370    * @buffer: the object which received the signal
371    * @pt: the pt
372    *
373    * Request the payload type as #GstCaps for @pt.
374    */
375   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
376       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
377       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
378           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
379       GST_TYPE_CAPS, 1, G_TYPE_UINT);
380   /**
381    * GstRtpJitterBuffer::handle-sync:
382    * @buffer: the object which received the signal
383    * @struct: a GstStructure containing sync values.
384    *
385    * Be notified of new sync values.
386    */
387   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
388       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
389       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
390           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
391       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
392
393   /**
394    * GstRtpJitterBuffer::on-npt-stop
395    * @buffer: the object which received the signal
396    *
397    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
398    * the npt-stop position.
399    */
400   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
401       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
402       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
403           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
404       G_TYPE_NONE, 0, G_TYPE_NONE);
405
406   /**
407    * GstRtpJitterBuffer::clear-pt-map:
408    * @buffer: the object which received the signal
409    *
410    * Invalidate the clock-rate as obtained with the
411    * #GstRtpJitterBuffer::request-pt-map signal.
412    */
413   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
414       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
415       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
416       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
417       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
418
419   /**
420    * GstRtpJitterBuffer::set-active:
421    * @buffer: the object which received the signal
422    *
423    * Start pushing out packets with the given base time. This signal is only
424    * useful in buffering mode.
425    *
426    * Returns: the time of the last pushed packet.
427    *
428    * Since: 0.10.19
429    */
430   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
431       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
432       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
433       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
434       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
435       G_TYPE_UINT64);
436
437   gstelement_class->change_state =
438       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
439   gstelement_class->request_new_pad =
440       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
441   gstelement_class->release_pad =
442       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
443   gstelement_class->provide_clock =
444       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
445
446   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
447   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
448
449   GST_DEBUG_CATEGORY_INIT
450       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
451 }
452
453 static void
454 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
455     GstRtpJitterBufferClass * klass)
456 {
457   GstRtpJitterBufferPrivate *priv;
458
459   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
460   jitterbuffer->priv = priv;
461
462   priv->latency_ms = DEFAULT_LATENCY_MS;
463   priv->latency_ns = priv->latency_ms * GST_MSECOND;
464   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
465   priv->do_lost = DEFAULT_DO_LOST;
466
467   priv->jbuf = rtp_jitter_buffer_new ();
468   priv->jbuf_lock = g_mutex_new ();
469   priv->jbuf_cond = g_cond_new ();
470
471   /* reset skew detection initialy */
472   rtp_jitter_buffer_reset_skew (priv->jbuf);
473   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
474   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
475   priv->active = TRUE;
476
477   priv->srcpad =
478       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
479       "src");
480
481   gst_pad_set_activatepush_function (priv->srcpad,
482       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
483   gst_pad_set_query_function (priv->srcpad,
484       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
485   gst_pad_set_getcaps_function (priv->srcpad,
486       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
487   gst_pad_set_event_function (priv->srcpad,
488       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
489
490   priv->sinkpad =
491       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
492       "sink");
493
494   gst_pad_set_chain_function (priv->sinkpad,
495       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
496   gst_pad_set_event_function (priv->sinkpad,
497       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
498   gst_pad_set_setcaps_function (priv->sinkpad,
499       GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
500   gst_pad_set_getcaps_function (priv->sinkpad,
501       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
502
503   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
504   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
505 }
506
507 static void
508 gst_rtp_jitter_buffer_finalize (GObject * object)
509 {
510   GstRtpJitterBuffer *jitterbuffer;
511
512   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
513
514   g_mutex_free (jitterbuffer->priv->jbuf_lock);
515   g_cond_free (jitterbuffer->priv->jbuf_cond);
516
517   g_object_unref (jitterbuffer->priv->jbuf);
518
519   G_OBJECT_CLASS (parent_class)->finalize (object);
520 }
521
522 static GstIterator *
523 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad)
524 {
525   GstRtpJitterBuffer *jitterbuffer;
526   GstPad *otherpad = NULL;
527   GstIterator *it;
528
529   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
530
531   if (pad == jitterbuffer->priv->sinkpad) {
532     otherpad = jitterbuffer->priv->srcpad;
533   } else if (pad == jitterbuffer->priv->srcpad) {
534     otherpad = jitterbuffer->priv->sinkpad;
535   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
536     otherpad = NULL;
537   }
538
539   it = gst_iterator_new_single (GST_TYPE_PAD, otherpad,
540       (GstCopyFunction) gst_object_ref, (GFreeFunc) gst_object_unref);
541
542   gst_object_unref (jitterbuffer);
543
544   return it;
545 }
546
547 static GstPad *
548 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
549 {
550   GstRtpJitterBufferPrivate *priv;
551
552   priv = jitterbuffer->priv;
553
554   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
555
556   priv->rtcpsinkpad =
557       gst_pad_new_from_static_template
558       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
559   gst_pad_set_chain_function (priv->rtcpsinkpad,
560       gst_rtp_jitter_buffer_chain_rtcp);
561   gst_pad_set_event_function (priv->rtcpsinkpad,
562       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
563   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
564       gst_rtp_jitter_buffer_iterate_internal_links);
565   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
566   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
567
568   return priv->rtcpsinkpad;
569 }
570
571 static void
572 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
573 {
574   GstRtpJitterBufferPrivate *priv;
575
576   priv = jitterbuffer->priv;
577
578   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
579
580   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
581
582   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
583   priv->rtcpsinkpad = NULL;
584 }
585
586 static GstPad *
587 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
588     GstPadTemplate * templ, const gchar * name)
589 {
590   GstRtpJitterBuffer *jitterbuffer;
591   GstElementClass *klass;
592   GstPad *result;
593   GstRtpJitterBufferPrivate *priv;
594
595   g_return_val_if_fail (templ != NULL, NULL);
596   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
597
598   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
599   priv = jitterbuffer->priv;
600   klass = GST_ELEMENT_GET_CLASS (element);
601
602   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
603
604   /* figure out the template */
605   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
606     if (priv->rtcpsinkpad != NULL)
607       goto exists;
608
609     result = create_rtcp_sink (jitterbuffer);
610   } else
611     goto wrong_template;
612
613   return result;
614
615   /* ERRORS */
616 wrong_template:
617   {
618     g_warning ("gstrtpjitterbuffer: this is not our template");
619     return NULL;
620   }
621 exists:
622   {
623     g_warning ("gstrtpjitterbuffer: pad already requested");
624     return NULL;
625   }
626 }
627
628 static void
629 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
630 {
631   GstRtpJitterBuffer *jitterbuffer;
632   GstRtpJitterBufferPrivate *priv;
633
634   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
635   g_return_if_fail (GST_IS_PAD (pad));
636
637   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
638   priv = jitterbuffer->priv;
639
640   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
641
642   if (priv->rtcpsinkpad == pad) {
643     remove_rtcp_sink (jitterbuffer);
644   } else
645     goto wrong_pad;
646
647   return;
648
649   /* ERRORS */
650 wrong_pad:
651   {
652     g_warning ("gstjitterbuffer: asked to release an unknown pad");
653     return;
654   }
655 }
656
657 static GstClock *
658 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
659 {
660   return gst_system_clock_obtain ();
661 }
662
663 static void
664 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
665 {
666   GstRtpJitterBufferPrivate *priv;
667
668   priv = jitterbuffer->priv;
669
670   /* this will trigger a new pt-map request signal, FIXME, do something better. */
671
672   JBUF_LOCK (priv);
673   priv->clock_rate = -1;
674   JBUF_UNLOCK (priv);
675 }
676
677 static GstClockTime
678 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
679     guint64 offset)
680 {
681   GstRtpJitterBufferPrivate *priv;
682   GstClockTime last_out;
683   GstBuffer *head;
684
685   priv = jbuf->priv;
686
687   JBUF_LOCK (priv);
688   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
689       active, GST_TIME_ARGS (offset));
690
691   if (active != priv->active) {
692     /* add the amount of time spent in paused to the output offset. All
693      * outgoing buffers will have this offset applied to their timestamps in
694      * order to make them arrive in time in the sink. */
695     priv->out_offset = offset;
696     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
697         GST_TIME_ARGS (priv->out_offset));
698     priv->active = active;
699     JBUF_SIGNAL (priv);
700   }
701   if (!active) {
702     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
703   }
704   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
705     /* head buffer timestamp and offset gives our output time */
706     last_out = GST_BUFFER_TIMESTAMP (head) + priv->ts_offset;
707   } else {
708     /* use last known time when the buffer is empty */
709     last_out = priv->last_out_time;
710   }
711   JBUF_UNLOCK (priv);
712
713   return last_out;
714 }
715
716 static GstCaps *
717 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
718 {
719   GstRtpJitterBuffer *jitterbuffer;
720   GstRtpJitterBufferPrivate *priv;
721   GstPad *other;
722   GstCaps *caps;
723   const GstCaps *templ;
724
725   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
726   priv = jitterbuffer->priv;
727
728   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
729
730   caps = gst_pad_peer_get_caps (other);
731
732   templ = gst_pad_get_pad_template_caps (pad);
733   if (caps == NULL) {
734     GST_DEBUG_OBJECT (jitterbuffer, "copy template");
735     caps = gst_caps_copy (templ);
736   } else {
737     GstCaps *intersect;
738
739     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
740
741     intersect = gst_caps_intersect (caps, templ);
742     gst_caps_unref (caps);
743
744     caps = intersect;
745   }
746   gst_object_unref (jitterbuffer);
747
748   return caps;
749 }
750
751 /*
752  * Must be called with JBUF_LOCK held
753  */
754
755 static gboolean
756 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
757     GstCaps * caps)
758 {
759   GstRtpJitterBufferPrivate *priv;
760   GstStructure *caps_struct;
761   guint val;
762   GstClockTime tval;
763
764   priv = jitterbuffer->priv;
765
766   /* first parse the caps */
767   caps_struct = gst_caps_get_structure (caps, 0);
768
769   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
770
771   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
772    * measure the amount of data in the buffer */
773   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
774     goto error;
775
776   if (priv->clock_rate <= 0)
777     goto wrong_rate;
778
779   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
780
781   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
782    * can use this to track the amount of time elapsed on the sender. */
783   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
784     priv->clock_base = val;
785   else
786     priv->clock_base = -1;
787
788   priv->ext_timestamp = priv->clock_base;
789
790   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
791       priv->clock_base);
792
793   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
794     /* first expected seqnum, only update when we didn't have a previous base. */
795     if (priv->next_in_seqnum == -1)
796       priv->next_in_seqnum = val;
797     if (priv->next_seqnum == -1)
798       priv->next_seqnum = val;
799   }
800
801   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
802
803   /* the start and stop times. The seqnum-base corresponds to the start time. We
804    * will keep track of the seqnums on the output and when we reach the one
805    * corresponding to npt-stop, we emit the npt-stop-reached signal */
806   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
807     priv->npt_start = tval;
808   else
809     priv->npt_start = 0;
810
811   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
812     priv->npt_stop = tval;
813   else
814     priv->npt_stop = -1;
815
816   GST_DEBUG_OBJECT (jitterbuffer,
817       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
818       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
819
820   return TRUE;
821
822   /* ERRORS */
823 error:
824   {
825     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
826     return FALSE;
827   }
828 wrong_rate:
829   {
830     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
831     return FALSE;
832   }
833 }
834
835 static gboolean
836 gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
837 {
838   GstRtpJitterBuffer *jitterbuffer;
839   GstRtpJitterBufferPrivate *priv;
840   gboolean res;
841
842   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
843   priv = jitterbuffer->priv;
844
845   JBUF_LOCK (priv);
846   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
847   JBUF_UNLOCK (priv);
848
849   /* set same caps on srcpad on success */
850   if (res)
851     gst_pad_set_caps (priv->srcpad, caps);
852
853   gst_object_unref (jitterbuffer);
854
855   return res;
856 }
857
858 static void
859 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
860 {
861   GstRtpJitterBufferPrivate *priv;
862
863   priv = jitterbuffer->priv;
864
865   JBUF_LOCK (priv);
866   /* mark ourselves as flushing */
867   priv->srcresult = GST_FLOW_WRONG_STATE;
868   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
869   /* this unblocks any waiting pops on the src pad task */
870   JBUF_SIGNAL (priv);
871   /* unlock clock, we just unschedule, the entry will be released by the
872    * locking streaming thread. */
873   if (priv->clock_id) {
874     gst_clock_id_unschedule (priv->clock_id);
875     priv->unscheduled = TRUE;
876   }
877   JBUF_UNLOCK (priv);
878 }
879
880 static void
881 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
882 {
883   GstRtpJitterBufferPrivate *priv;
884
885   priv = jitterbuffer->priv;
886
887   JBUF_LOCK (priv);
888   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
889   /* Mark as non flushing */
890   priv->srcresult = GST_FLOW_OK;
891   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
892   priv->last_popped_seqnum = -1;
893   priv->last_out_time = -1;
894   priv->next_seqnum = -1;
895   priv->next_in_seqnum = -1;
896   priv->clock_rate = -1;
897   priv->eos = FALSE;
898   priv->estimated_eos = -1;
899   priv->last_elapsed = 0;
900   priv->reached_npt_stop = FALSE;
901   priv->ext_timestamp = -1;
902   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
903   rtp_jitter_buffer_flush (priv->jbuf);
904   rtp_jitter_buffer_reset_skew (priv->jbuf);
905   JBUF_UNLOCK (priv);
906 }
907
908 static gboolean
909 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
910 {
911   gboolean result = TRUE;
912   GstRtpJitterBuffer *jitterbuffer = NULL;
913
914   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
915
916   if (active) {
917     /* allow data processing */
918     gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
919
920     /* start pushing out buffers */
921     GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
922     gst_pad_start_task (jitterbuffer->priv->srcpad,
923         (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
924   } else {
925     /* make sure all data processing stops ASAP */
926     gst_rtp_jitter_buffer_flush_start (jitterbuffer);
927
928     /* NOTE this will hardlock if the state change is called from the src pad
929      * task thread because we will _join() the thread. */
930     GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
931     result = gst_pad_stop_task (pad);
932   }
933
934   gst_object_unref (jitterbuffer);
935
936   return result;
937 }
938
939 static GstStateChangeReturn
940 gst_rtp_jitter_buffer_change_state (GstElement * element,
941     GstStateChange transition)
942 {
943   GstRtpJitterBuffer *jitterbuffer;
944   GstRtpJitterBufferPrivate *priv;
945   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
946
947   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
948   priv = jitterbuffer->priv;
949
950   switch (transition) {
951     case GST_STATE_CHANGE_NULL_TO_READY:
952       break;
953     case GST_STATE_CHANGE_READY_TO_PAUSED:
954       JBUF_LOCK (priv);
955       /* reset negotiated values */
956       priv->clock_rate = -1;
957       priv->clock_base = -1;
958       priv->peer_latency = 0;
959       priv->last_pt = -1;
960       /* block until we go to PLAYING */
961       priv->blocked = TRUE;
962       JBUF_UNLOCK (priv);
963       break;
964     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
965       JBUF_LOCK (priv);
966       /* unblock to allow streaming in PLAYING */
967       priv->blocked = FALSE;
968       JBUF_SIGNAL (priv);
969       JBUF_UNLOCK (priv);
970       break;
971     default:
972       break;
973   }
974
975   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
976
977   switch (transition) {
978     case GST_STATE_CHANGE_READY_TO_PAUSED:
979       /* we are a live element because we sync to the clock, which we can only
980        * do in the PLAYING state */
981       if (ret != GST_STATE_CHANGE_FAILURE)
982         ret = GST_STATE_CHANGE_NO_PREROLL;
983       break;
984     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
985       JBUF_LOCK (priv);
986       /* block to stop streaming when PAUSED */
987       priv->blocked = TRUE;
988       JBUF_UNLOCK (priv);
989       if (ret != GST_STATE_CHANGE_FAILURE)
990         ret = GST_STATE_CHANGE_NO_PREROLL;
991       break;
992     case GST_STATE_CHANGE_PAUSED_TO_READY:
993       break;
994     case GST_STATE_CHANGE_READY_TO_NULL:
995       break;
996     default:
997       break;
998   }
999
1000   return ret;
1001 }
1002
1003 static gboolean
1004 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstEvent * event)
1005 {
1006   gboolean ret = TRUE;
1007   GstRtpJitterBuffer *jitterbuffer;
1008   GstRtpJitterBufferPrivate *priv;
1009
1010   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1011   if (G_UNLIKELY (jitterbuffer == NULL)) {
1012     gst_event_unref (event);
1013     return FALSE;
1014   }
1015   priv = jitterbuffer->priv;
1016
1017   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1018
1019   switch (GST_EVENT_TYPE (event)) {
1020     case GST_EVENT_LATENCY:
1021     {
1022       GstClockTime latency;
1023
1024       gst_event_parse_latency (event, &latency);
1025
1026       JBUF_LOCK (priv);
1027       /* adjust the overall buffer delay to the total pipeline latency in
1028        * buffering mode because if downstream consumes too fast (because of
1029        * large latency or queues, we would start rebuffering again. */
1030       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1031           RTP_JITTER_BUFFER_MODE_BUFFER) {
1032         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1033       }
1034       JBUF_UNLOCK (priv);
1035
1036       ret = gst_pad_push_event (priv->sinkpad, event);
1037       break;
1038     }
1039     default:
1040       ret = gst_pad_push_event (priv->sinkpad, event);
1041       break;
1042   }
1043   gst_object_unref (jitterbuffer);
1044
1045   return ret;
1046 }
1047
1048 static gboolean
1049 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
1050 {
1051   gboolean ret = TRUE;
1052   GstRtpJitterBuffer *jitterbuffer;
1053   GstRtpJitterBufferPrivate *priv;
1054
1055   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1056   if (G_UNLIKELY (jitterbuffer == NULL)) {
1057     gst_event_unref (event);
1058     return FALSE;
1059   }
1060   priv = jitterbuffer->priv;
1061
1062   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1063
1064   switch (GST_EVENT_TYPE (event)) {
1065     case GST_EVENT_NEWSEGMENT:
1066     {
1067       GstFormat format;
1068       gdouble rate, arate;
1069       gint64 start, stop, time;
1070       gboolean update;
1071
1072       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1073           &start, &stop, &time);
1074
1075       /* we need time for now */
1076       if (format != GST_FORMAT_TIME)
1077         goto newseg_wrong_format;
1078
1079       GST_DEBUG_OBJECT (jitterbuffer,
1080           "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
1081           ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
1082           update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
1083           GST_TIME_ARGS (time));
1084
1085       /* now configure the values, we need these to time the release of the
1086        * buffers on the srcpad. */
1087       gst_segment_set_newsegment_full (&priv->segment, update,
1088           rate, arate, format, start, stop, time);
1089
1090       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1091       ret = gst_pad_push_event (priv->srcpad, event);
1092       break;
1093     }
1094     case GST_EVENT_FLUSH_START:
1095       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1096       ret = gst_pad_push_event (priv->srcpad, event);
1097       break;
1098     case GST_EVENT_FLUSH_STOP:
1099       ret = gst_pad_push_event (priv->srcpad, event);
1100       ret = gst_rtp_jitter_buffer_src_activate_push (priv->srcpad, TRUE);
1101       break;
1102     case GST_EVENT_EOS:
1103     {
1104       /* push EOS in queue. We always push it at the head */
1105       JBUF_LOCK (priv);
1106       /* check for flushing, we need to discard the event and return FALSE when
1107        * we are flushing */
1108       ret = priv->srcresult == GST_FLOW_OK;
1109       if (ret && !priv->eos) {
1110         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1111         priv->eos = TRUE;
1112         JBUF_SIGNAL (priv);
1113       } else if (priv->eos) {
1114         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1115       } else {
1116         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1117             gst_flow_get_name (priv->srcresult));
1118       }
1119       JBUF_UNLOCK (priv);
1120       gst_event_unref (event);
1121       break;
1122     }
1123     default:
1124       ret = gst_pad_push_event (priv->srcpad, event);
1125       break;
1126   }
1127
1128 done:
1129   gst_object_unref (jitterbuffer);
1130
1131   return ret;
1132
1133   /* ERRORS */
1134 newseg_wrong_format:
1135   {
1136     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1137     ret = FALSE;
1138     gst_event_unref (event);
1139     goto done;
1140   }
1141 }
1142
1143 static gboolean
1144 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstEvent * event)
1145 {
1146   GstRtpJitterBuffer *jitterbuffer;
1147
1148   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1149
1150   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1151
1152   switch (GST_EVENT_TYPE (event)) {
1153     case GST_EVENT_FLUSH_START:
1154       break;
1155     case GST_EVENT_FLUSH_STOP:
1156       break;
1157     default:
1158       break;
1159   }
1160   gst_event_unref (event);
1161   gst_object_unref (jitterbuffer);
1162
1163   return TRUE;
1164 }
1165
1166 /*
1167  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1168  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1169  * GST_FLOW_WRONG_STATE when the element is shutting down. On success
1170  * GST_FLOW_OK is returned.
1171  */
1172 static GstFlowReturn
1173 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1174     guint8 pt)
1175 {
1176   GValue ret = { 0 };
1177   GValue args[2] = { {0}, {0} };
1178   GstCaps *caps;
1179   gboolean res;
1180
1181   g_value_init (&args[0], GST_TYPE_ELEMENT);
1182   g_value_set_object (&args[0], jitterbuffer);
1183   g_value_init (&args[1], G_TYPE_UINT);
1184   g_value_set_uint (&args[1], pt);
1185
1186   g_value_init (&ret, GST_TYPE_CAPS);
1187   g_value_set_boxed (&ret, NULL);
1188
1189   JBUF_UNLOCK (jitterbuffer->priv);
1190   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1191       &ret);
1192   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1193
1194   g_value_unset (&args[0]);
1195   g_value_unset (&args[1]);
1196   caps = (GstCaps *) g_value_dup_boxed (&ret);
1197   g_value_unset (&ret);
1198   if (!caps)
1199     goto no_caps;
1200
1201   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1202   gst_caps_unref (caps);
1203
1204   if (G_UNLIKELY (!res))
1205     goto parse_failed;
1206
1207   return GST_FLOW_OK;
1208
1209   /* ERRORS */
1210 no_caps:
1211   {
1212     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1213     return GST_FLOW_ERROR;
1214   }
1215 out_flushing:
1216   {
1217     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1218     return GST_FLOW_WRONG_STATE;
1219   }
1220 parse_failed:
1221   {
1222     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1223     return GST_FLOW_ERROR;
1224   }
1225 }
1226
1227 /* call with jbuf lock held */
1228 static void
1229 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1230 {
1231   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1232
1233   /* too short a stream, or too close to EOS will never really fill buffer */
1234   if (*percent != -1 && priv->npt_stop != -1 &&
1235       priv->npt_stop - priv->npt_start <=
1236       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1237     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1238     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1239     *percent = 100;
1240   }
1241 }
1242
1243 static void
1244 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1245 {
1246   GstMessage *message;
1247
1248   /* Post a buffering message */
1249   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1250   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1251
1252   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1253 }
1254
1255 static GstFlowReturn
1256 gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
1257 {
1258   GstRtpJitterBuffer *jitterbuffer;
1259   GstRtpJitterBufferPrivate *priv;
1260   guint16 seqnum;
1261   GstFlowReturn ret = GST_FLOW_OK;
1262   GstClockTime timestamp;
1263   guint64 latency_ts;
1264   gboolean tail;
1265   gint percent = -1;
1266   guint8 pt;
1267
1268   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1269
1270   if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
1271     goto invalid_buffer;
1272
1273   priv = jitterbuffer->priv;
1274
1275   pt = gst_rtp_buffer_get_payload_type (buffer);
1276
1277   /* take the timestamp of the buffer. This is the time when the packet was
1278    * received and is used to calculate jitter and clock skew. We will adjust
1279    * this timestamp with the smoothed value after processing it in the
1280    * jitterbuffer. */
1281   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1282   /* bring to running time */
1283   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
1284       timestamp);
1285
1286   seqnum = gst_rtp_buffer_get_seq (buffer);
1287
1288   GST_DEBUG_OBJECT (jitterbuffer,
1289       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1290       GST_TIME_ARGS (timestamp));
1291
1292   JBUF_LOCK_CHECK (priv, out_flushing);
1293
1294   if (G_UNLIKELY (priv->last_pt != pt)) {
1295     GstCaps *caps;
1296
1297     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1298         pt);
1299
1300     priv->last_pt = pt;
1301     /* reset clock-rate so that we get a new one */
1302     priv->clock_rate = -1;
1303     /* Try to get the clock-rate from the caps first if we can. If there are no
1304      * caps we must fire the signal to get the clock-rate. */
1305     if ((caps = GST_BUFFER_CAPS (buffer))) {
1306       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1307     }
1308   }
1309
1310   if (G_UNLIKELY (priv->clock_rate == -1)) {
1311     /* no clock rate given on the caps, try to get one with the signal */
1312     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1313             pt) == GST_FLOW_WRONG_STATE)
1314       goto out_flushing;
1315
1316     if (G_UNLIKELY (priv->clock_rate == -1))
1317       goto no_clock_rate;
1318   }
1319
1320   /* don't accept more data on EOS */
1321   if (G_UNLIKELY (priv->eos))
1322     goto have_eos;
1323
1324   /* now check against our expected seqnum */
1325   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1326     gint gap;
1327     gboolean reset = FALSE;
1328
1329     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1330     if (G_UNLIKELY (gap != 0)) {
1331       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1332           priv->next_in_seqnum, seqnum, gap);
1333       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1334        * sender might have been restarted with different seqnum. */
1335       if (gap < -RTP_MAX_MISORDER) {
1336         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1337         reset = TRUE;
1338       }
1339       /* priv->next_in_seqnum < seqnum, this is a new packet */
1340       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1341         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1342             gap);
1343         reset = TRUE;
1344       } else {
1345         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1346       }
1347     }
1348     if (G_UNLIKELY (reset)) {
1349       GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1350       rtp_jitter_buffer_flush (priv->jbuf);
1351       rtp_jitter_buffer_reset_skew (priv->jbuf);
1352       priv->last_popped_seqnum = -1;
1353       priv->next_seqnum = seqnum;
1354     }
1355   }
1356   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1357
1358   /* let's check if this buffer is too late, we can only accept packets with
1359    * bigger seqnum than the one we last pushed. */
1360   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1361     gint gap;
1362
1363     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1364
1365     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1366     if (G_UNLIKELY (gap <= 0))
1367       goto too_late;
1368   }
1369
1370   /* let's drop oldest packet if the queue is already full and drop-on-latency
1371    * is set. We can only do this when there actually is a latency. When no
1372    * latency is set, we just pump it in the queue and let the other end push it
1373    * out as fast as possible. */
1374   if (priv->latency_ms && priv->drop_on_latency) {
1375     latency_ts =
1376         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1377
1378     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1379       GstBuffer *old_buf;
1380
1381       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1382
1383       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
1384           gst_rtp_buffer_get_seq (old_buf));
1385
1386       gst_buffer_unref (old_buf);
1387     }
1388   }
1389
1390   /* we need to make the metadata writable before pushing it in the jitterbuffer
1391    * because the jitterbuffer will update the timestamp */
1392   buffer = gst_buffer_make_metadata_writable (buffer);
1393
1394   /* now insert the packet into the queue in sorted order. This function returns
1395    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1396    * have a duplicate. */
1397   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
1398               priv->clock_rate, &tail, &percent)))
1399     goto duplicate;
1400
1401   /* signal addition of new buffer when the _loop is waiting. */
1402   if (priv->waiting)
1403     JBUF_SIGNAL (priv);
1404
1405   /* let's unschedule and unblock any waiting buffers. We only want to do this
1406    * when the tail buffer changed */
1407   if (G_UNLIKELY (priv->clock_id && tail)) {
1408     GST_DEBUG_OBJECT (jitterbuffer,
1409         "Unscheduling waiting buffer, new tail buffer");
1410     gst_clock_id_unschedule (priv->clock_id);
1411     priv->unscheduled = TRUE;
1412   }
1413
1414   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1415       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1416
1417   check_buffering_percent (jitterbuffer, &percent);
1418
1419 finished:
1420   JBUF_UNLOCK (priv);
1421
1422   if (percent != -1)
1423     post_buffering_percent (jitterbuffer, percent);
1424
1425   gst_object_unref (jitterbuffer);
1426
1427   return ret;
1428
1429   /* ERRORS */
1430 invalid_buffer:
1431   {
1432     /* this is not fatal but should be filtered earlier */
1433     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1434         ("Received invalid RTP payload, dropping"));
1435     gst_buffer_unref (buffer);
1436     gst_object_unref (jitterbuffer);
1437     return GST_FLOW_OK;
1438   }
1439 no_clock_rate:
1440   {
1441     GST_WARNING_OBJECT (jitterbuffer,
1442         "No clock-rate in caps!, dropping buffer");
1443     gst_buffer_unref (buffer);
1444     goto finished;
1445   }
1446 out_flushing:
1447   {
1448     ret = priv->srcresult;
1449     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1450     gst_buffer_unref (buffer);
1451     goto finished;
1452   }
1453 have_eos:
1454   {
1455     ret = GST_FLOW_UNEXPECTED;
1456     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1457     gst_buffer_unref (buffer);
1458     goto finished;
1459   }
1460 too_late:
1461   {
1462     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1463         " popped, dropping", seqnum, priv->last_popped_seqnum);
1464     priv->num_late++;
1465     gst_buffer_unref (buffer);
1466     goto finished;
1467   }
1468 duplicate:
1469   {
1470     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1471         seqnum);
1472     priv->num_duplicates++;
1473     gst_buffer_unref (buffer);
1474     goto finished;
1475   }
1476 }
1477
1478 static GstClockTime
1479 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1480 {
1481   GstRtpJitterBufferPrivate *priv;
1482
1483   priv = jitterbuffer->priv;
1484
1485   if (timestamp == -1)
1486     return -1;
1487
1488   /* apply the timestamp offset, this is used for inter stream sync */
1489   timestamp += priv->ts_offset;
1490   /* add the offset, this is used when buffering */
1491   timestamp += priv->out_offset;
1492
1493   return timestamp;
1494 }
1495
1496 static GstClockTime
1497 get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1498 {
1499   GstClockTime result;
1500   GstRtpJitterBufferPrivate *priv;
1501
1502   priv = jitterbuffer->priv;
1503
1504   result = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1505   /* add latency, this includes our own latency and the peer latency. */
1506   result += priv->latency_ns;
1507   result += priv->peer_latency;
1508
1509   return result;
1510 }
1511
1512 static gboolean
1513 eos_reached (GstClock * clock, GstClockTime time, GstClockID id,
1514     GstRtpJitterBuffer * jitterbuffer)
1515 {
1516   GstRtpJitterBufferPrivate *priv;
1517
1518   priv = jitterbuffer->priv;
1519
1520   JBUF_LOCK_CHECK (priv, flushing);
1521   if (priv->waiting) {
1522     GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
1523     priv->reached_npt_stop = TRUE;
1524     JBUF_SIGNAL (priv);
1525   }
1526   JBUF_UNLOCK (priv);
1527
1528   return TRUE;
1529
1530   /* ERRORS */
1531 flushing:
1532   {
1533     JBUF_UNLOCK (priv);
1534     return FALSE;
1535   }
1536 }
1537
1538 static GstClockTime
1539 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1540 {
1541   guint64 ext_time, elapsed;
1542   guint32 rtp_time;
1543   GstRtpJitterBufferPrivate *priv;
1544
1545   priv = jitterbuffer->priv;
1546   rtp_time = gst_rtp_buffer_get_timestamp (outbuf);
1547
1548   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1549       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1550
1551   if (rtp_time < priv->ext_timestamp) {
1552     ext_time = priv->ext_timestamp;
1553   } else {
1554     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1555   }
1556
1557   if (ext_time > priv->clock_base)
1558     elapsed = ext_time - priv->clock_base;
1559   else
1560     elapsed = 0;
1561
1562   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1563   return elapsed;
1564 }
1565
1566 /*
1567  * This funcion will push out buffers on the source pad.
1568  *
1569  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
1570  * different seqnum (missing packets before B), this function will wait for the
1571  * missing packet to arrive up to the timestamp of buffer B.
1572  */
1573 static void
1574 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
1575 {
1576   GstRtpJitterBufferPrivate *priv;
1577   GstBuffer *outbuf;
1578   GstFlowReturn result;
1579   guint16 seqnum;
1580   guint32 next_seqnum;
1581   GstClockTime timestamp, out_time;
1582   gboolean discont = FALSE;
1583   gint gap;
1584   GstClock *clock;
1585   GstClockID id;
1586   GstClockTime sync_time;
1587   gint percent = -1;
1588
1589   priv = jitterbuffer->priv;
1590
1591   JBUF_LOCK_CHECK (priv, flushing);
1592 again:
1593   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
1594   while (TRUE) {
1595     id = NULL;
1596     /* always wait if we are blocked */
1597     if (G_LIKELY (!priv->blocked)) {
1598       /* we're buffering but not EOS, wait. */
1599       if (!priv->eos && (!priv->active
1600               || rtp_jitter_buffer_is_buffering (priv->jbuf))) {
1601         GstClockTime elapsed, delay, left;
1602
1603         if (priv->estimated_eos == -1)
1604           goto do_wait;
1605
1606         outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1607         if (outbuf != NULL) {
1608           elapsed = compute_elapsed (jitterbuffer, outbuf);
1609           if (GST_BUFFER_DURATION_IS_VALID (outbuf))
1610             elapsed += GST_BUFFER_DURATION (outbuf);
1611         } else {
1612           GST_INFO_OBJECT (jitterbuffer, "no buffer, using last_elapsed");
1613           elapsed = priv->last_elapsed;
1614         }
1615
1616         delay = rtp_jitter_buffer_get_delay (priv->jbuf);
1617
1618         if (priv->estimated_eos > elapsed)
1619           left = priv->estimated_eos - elapsed;
1620         else
1621           left = 0;
1622
1623         GST_INFO_OBJECT (jitterbuffer, "buffering, elapsed %" GST_TIME_FORMAT
1624             " estimated_eos %" GST_TIME_FORMAT " left %" GST_TIME_FORMAT
1625             " delay %" GST_TIME_FORMAT,
1626             GST_TIME_ARGS (elapsed), GST_TIME_ARGS (priv->estimated_eos),
1627             GST_TIME_ARGS (left), GST_TIME_ARGS (delay));
1628         if (left > delay)
1629           goto do_wait;
1630       }
1631       /* if we have a packet, we can exit the loop and grab it */
1632       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1633         break;
1634       /* no packets but we are EOS, do eos logic */
1635       if (G_UNLIKELY (priv->eos))
1636         goto do_eos;
1637       /* underrun, wait for packets or flushing now if we are expecting an EOS
1638        * timeout, set the async timer for it too */
1639       if (priv->estimated_eos != -1 && !priv->reached_npt_stop) {
1640         sync_time = get_sync_time (jitterbuffer, priv->estimated_eos);
1641
1642         GST_OBJECT_LOCK (jitterbuffer);
1643         clock = GST_ELEMENT_CLOCK (jitterbuffer);
1644         if (clock) {
1645           GST_INFO_OBJECT (jitterbuffer, "scheduling timeout");
1646           id = gst_clock_new_single_shot_id (clock, sync_time);
1647           gst_clock_id_wait_async (id, (GstClockCallback) eos_reached,
1648               jitterbuffer);
1649         }
1650         GST_OBJECT_UNLOCK (jitterbuffer);
1651       }
1652     }
1653   do_wait:
1654     /* now we wait */
1655     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
1656     priv->waiting = TRUE;
1657     JBUF_WAIT (priv);
1658     priv->waiting = FALSE;
1659     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
1660
1661     if (id) {
1662       /* unschedule any pending async notifications we might have */
1663       gst_clock_id_unschedule (id);
1664       gst_clock_id_unref (id);
1665     }
1666     if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
1667       goto flushing;
1668
1669     if (id && priv->reached_npt_stop) {
1670       goto do_npt_stop;
1671     }
1672   }
1673
1674   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1675    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1676    * wait on the timestamp. In the chain function we will unlock the wait when a
1677    * new buffer is available. The peeked buffer is valid for as long as we hold
1678    * the jitterbuffer lock. */
1679   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1680
1681   /* get the seqnum and the next expected seqnum */
1682   seqnum = gst_rtp_buffer_get_seq (outbuf);
1683   next_seqnum = priv->next_seqnum;
1684
1685   /* get the timestamp, this is already corrected for clock skew by the
1686    * jitterbuffer */
1687   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1688
1689   GST_DEBUG_OBJECT (jitterbuffer,
1690       "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
1691       ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
1692       rtp_jitter_buffer_num_packets (priv->jbuf));
1693
1694   /* apply our timestamp offset to the incomming buffer, this will be our output
1695    * timestamp. */
1696   out_time = apply_offset (jitterbuffer, timestamp);
1697
1698   /* get the gap between this and the previous packet. If we don't know the
1699    * previous packet seqnum assume no gap. */
1700   if (G_LIKELY (next_seqnum != -1)) {
1701     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1702
1703     /* if we have a packet that we already pushed or considered dropped, pop it
1704      * off and get the next packet */
1705     if (G_UNLIKELY (gap < 0)) {
1706       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1707           seqnum, next_seqnum);
1708       outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1709       gst_buffer_unref (outbuf);
1710       goto again;
1711     }
1712   } else {
1713     GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
1714     gap = -1;
1715   }
1716
1717   /* If we don't know what the next seqnum should be (== -1) we have to wait
1718    * because it might be possible that we are not receiving this buffer in-order,
1719    * a buffer with a lower seqnum could arrive later and we want to push that
1720    * earlier buffer before this buffer then.
1721    * If we know the expected seqnum, we can compare it to the current seqnum to
1722    * determine if we have missing a packet. If we have a missing packet (which
1723    * must be before this packet) we can wait for it until the deadline for this
1724    * packet expires. */
1725   if (G_UNLIKELY (gap != 0 && out_time != -1)) {
1726     GstClockReturn ret;
1727     GstClockTime duration = GST_CLOCK_TIME_NONE;
1728
1729     if (gap > 0) {
1730       /* we have a gap */
1731       GST_DEBUG_OBJECT (jitterbuffer,
1732           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1733           next_seqnum, seqnum, gap);
1734
1735       if (priv->last_out_time != -1) {
1736         GST_DEBUG_OBJECT (jitterbuffer,
1737             "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1738             GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
1739         /* interpolate between the current time and the last time based on
1740          * number of packets we are missing, this is the estimated duration
1741          * for the missing packet based on equidistant packet spacing. Also make
1742          * sure we never go negative. */
1743         if (out_time >= priv->last_out_time)
1744           duration = (out_time - priv->last_out_time) / (gap + 1);
1745         else
1746           goto lost;
1747
1748         GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1749             GST_TIME_ARGS (duration));
1750         /* add this duration to the timestamp of the last packet we pushed */
1751         out_time = (priv->last_out_time + duration);
1752       }
1753     } else {
1754       /* we don't know what the next_seqnum should be, wait for the last
1755        * possible moment to push this buffer, maybe we get an earlier seqnum
1756        * while we wait */
1757       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1758     }
1759
1760     GST_OBJECT_LOCK (jitterbuffer);
1761     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1762     if (!clock) {
1763       GST_OBJECT_UNLOCK (jitterbuffer);
1764       /* let's just push if there is no clock */
1765       GST_DEBUG_OBJECT (jitterbuffer, "No clock, push right away");
1766       goto push_buffer;
1767     }
1768
1769     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
1770         GST_TIME_ARGS (out_time));
1771
1772     /* prepare for sync against clock */
1773     sync_time = get_sync_time (jitterbuffer, out_time);
1774
1775     /* create an entry for the clock */
1776     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1777     priv->unscheduled = FALSE;
1778     GST_OBJECT_UNLOCK (jitterbuffer);
1779
1780     /* release the lock so that the other end can push stuff or unlock */
1781     JBUF_UNLOCK (priv);
1782
1783     ret = gst_clock_id_wait (id, NULL);
1784
1785     JBUF_LOCK (priv);
1786     /* and free the entry */
1787     gst_clock_id_unref (id);
1788     priv->clock_id = NULL;
1789
1790     /* at this point, the clock could have been unlocked by a timeout, a new
1791      * tail element was added to the queue or because we are shutting down. Check
1792      * for shutdown first. */
1793     if G_UNLIKELY
1794       ((priv->srcresult != GST_FLOW_OK))
1795           goto flushing;
1796
1797     /* if we got unscheduled and we are not flushing, it's because a new tail
1798      * element became available in the queue or we flushed the queue.
1799      * Grab it and try to push or sync. */
1800     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
1801       GST_DEBUG_OBJECT (jitterbuffer,
1802           "Wait got unscheduled, will retry to push with new buffer");
1803       goto again;
1804     }
1805
1806   lost:
1807     /* we now timed out, this means we lost a packet or finished synchronizing
1808      * on the first buffer. */
1809     if (gap > 0) {
1810       GstEvent *event;
1811
1812       /* we had a gap and thus we lost a packet. Create an event for this.  */
1813       GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
1814       priv->num_late++;
1815       discont = TRUE;
1816
1817       /* update our expected next packet */
1818       priv->last_popped_seqnum = next_seqnum;
1819       priv->last_out_time = out_time;
1820       priv->next_seqnum = (next_seqnum + 1) & 0xffff;
1821
1822       if (priv->do_lost) {
1823         /* create paket lost event */
1824         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1825             gst_structure_new ("GstRTPPacketLost",
1826                 "seqnum", G_TYPE_UINT, (guint) next_seqnum,
1827                 "timestamp", G_TYPE_UINT64, out_time,
1828                 "duration", G_TYPE_UINT64, duration, NULL));
1829
1830         JBUF_UNLOCK (priv);
1831         gst_pad_push_event (priv->srcpad, event);
1832         JBUF_LOCK_CHECK (priv, flushing);
1833       }
1834       /* look for next packet */
1835       goto again;
1836     }
1837
1838     /* there was no known gap,just the first packet, exit the loop and push */
1839     GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
1840
1841     /* get new timestamp, latency might have changed */
1842     out_time = apply_offset (jitterbuffer, timestamp);
1843   }
1844 push_buffer:
1845
1846   /* when we get here we are ready to pop and push the buffer */
1847   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1848
1849   check_buffering_percent (jitterbuffer, &percent);
1850
1851   if (G_UNLIKELY (discont || priv->discont)) {
1852     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1853      * into the jitterbuffer so we can modify now. */
1854     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1855     priv->discont = FALSE;
1856   }
1857
1858   /* apply timestamp with offset to buffer now */
1859   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
1860
1861   /* update the elapsed time when we need to check against the npt stop time. */
1862   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1863       && priv->clock_base != -1 && priv->clock_rate > 0) {
1864     guint64 elapsed, estimated;
1865
1866     elapsed = compute_elapsed (jitterbuffer, outbuf);
1867
1868     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1869       guint64 left;
1870
1871       priv->last_elapsed = elapsed;
1872
1873       left = priv->npt_stop - priv->npt_start;
1874       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1875           GST_TIME_ARGS (left));
1876
1877       if (elapsed > 0)
1878         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1879       else {
1880         /* if there is almost nothing left,
1881          * we may never advance enough to end up in the above case */
1882         if (left < GST_SECOND)
1883           estimated = GST_SECOND;
1884         else
1885           estimated = -1;
1886       }
1887
1888       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1889           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1890
1891       priv->estimated_eos = estimated;
1892     }
1893   }
1894
1895   /* now we are ready to push the buffer. Save the seqnum and release the lock
1896    * so the other end can push stuff in the queue again. */
1897   priv->last_popped_seqnum = seqnum;
1898   priv->last_out_time = out_time;
1899   priv->next_seqnum = (seqnum + 1) & 0xffff;
1900   JBUF_UNLOCK (priv);
1901
1902   if (percent != -1)
1903     post_buffering_percent (jitterbuffer, percent);
1904
1905   /* push buffer */
1906   GST_DEBUG_OBJECT (jitterbuffer,
1907       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1908       GST_TIME_ARGS (out_time));
1909   result = gst_pad_push (priv->srcpad, outbuf);
1910   if (G_UNLIKELY (result != GST_FLOW_OK))
1911     goto pause;
1912
1913   return;
1914
1915   /* ERRORS */
1916 do_eos:
1917   {
1918     /* store result, we are flushing now */
1919     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1920     priv->srcresult = GST_FLOW_UNEXPECTED;
1921     gst_pad_pause_task (priv->srcpad);
1922     JBUF_UNLOCK (priv);
1923     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1924     return;
1925   }
1926 do_npt_stop:
1927   {
1928     /* store result, we are flushing now */
1929     GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop");
1930     JBUF_UNLOCK (priv);
1931
1932     g_signal_emit (jitterbuffer,
1933         gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL);
1934     return;
1935   }
1936 flushing:
1937   {
1938     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1939     gst_pad_pause_task (priv->srcpad);
1940     JBUF_UNLOCK (priv);
1941     return;
1942   }
1943 pause:
1944   {
1945     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
1946         gst_flow_get_name (result));
1947
1948     JBUF_LOCK (priv);
1949     /* store result */
1950     priv->srcresult = result;
1951     /* we don't post errors or anything because upstream will do that for us
1952      * when we pass the return value upstream. */
1953     gst_pad_pause_task (priv->srcpad);
1954     JBUF_UNLOCK (priv);
1955     return;
1956   }
1957 }
1958
1959 static GstFlowReturn
1960 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstBuffer * buffer)
1961 {
1962   GstRtpJitterBuffer *jitterbuffer;
1963   GstRtpJitterBufferPrivate *priv;
1964   GstFlowReturn ret = GST_FLOW_OK;
1965   guint64 base_rtptime, base_time;
1966   guint32 clock_rate;
1967   guint64 last_rtptime;
1968   guint32 ssrc;
1969   GstRTCPPacket packet;
1970   guint64 ext_rtptime, diff;
1971   guint32 rtptime;
1972   gboolean drop = FALSE;
1973
1974   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1975
1976   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
1977     goto invalid_buffer;
1978
1979   priv = jitterbuffer->priv;
1980
1981   if (!gst_rtcp_buffer_get_first_packet (buffer, &packet))
1982     goto invalid_buffer;
1983
1984   /* first packet must be SR or RR or else the validate would have failed */
1985   switch (gst_rtcp_packet_get_type (&packet)) {
1986     case GST_RTCP_TYPE_SR:
1987       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
1988           NULL, NULL);
1989       break;
1990     default:
1991       goto ignore_buffer;
1992   }
1993
1994   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
1995
1996   JBUF_LOCK (priv);
1997   /* convert the RTP timestamp to our extended timestamp, using the same offset
1998    * we used in the jitterbuffer */
1999   ext_rtptime = priv->jbuf->ext_rtptime;
2000   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2001
2002   /* get the last values from the jitterbuffer */
2003   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2004       &clock_rate, &last_rtptime);
2005
2006   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2007       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT,
2008       ext_rtptime, base_rtptime, clock_rate);
2009
2010   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2011     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2012     drop = TRUE;
2013   } else {
2014     /* we can't accept anything that happened before we did the last resync */
2015     if (base_rtptime > ext_rtptime) {
2016       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2017       drop = TRUE;
2018     } else {
2019       /* the SR RTP timestamp must be something close to what we last observed
2020        * in the jitterbuffer */
2021       if (ext_rtptime > last_rtptime) {
2022         /* check how far ahead it is to our RTP timestamps */
2023         diff = ext_rtptime - last_rtptime;
2024         /* if bigger than 1 second, we drop it */
2025         if (diff > clock_rate) {
2026           GST_DEBUG_OBJECT (jitterbuffer, "dropping, too far ahead");
2027           drop = TRUE;
2028         }
2029         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2030             G_GUINT64_FORMAT, last_rtptime, diff);
2031       }
2032     }
2033   }
2034   JBUF_UNLOCK (priv);
2035
2036   if (!drop) {
2037     GstStructure *s;
2038
2039     s = gst_structure_new ("application/x-rtp-sync",
2040         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2041         "base-time", G_TYPE_UINT64, base_time,
2042         "clock-rate", G_TYPE_UINT, clock_rate,
2043         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2044         "sr-buffer", GST_TYPE_BUFFER, buffer, NULL);
2045
2046     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2047     g_signal_emit (jitterbuffer,
2048         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2049     gst_structure_free (s);
2050   } else {
2051     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2052     ret = GST_FLOW_OK;
2053   }
2054
2055 done:
2056   gst_buffer_unref (buffer);
2057   gst_object_unref (jitterbuffer);
2058
2059   return ret;
2060
2061 invalid_buffer:
2062   {
2063     /* this is not fatal but should be filtered earlier */
2064     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2065         ("Received invalid RTCP payload, dropping"));
2066     ret = GST_FLOW_OK;
2067     goto done;
2068   }
2069 ignore_buffer:
2070   {
2071     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2072     ret = GST_FLOW_OK;
2073     goto done;
2074   }
2075 }
2076
2077 static gboolean
2078 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
2079 {
2080   GstRtpJitterBuffer *jitterbuffer;
2081   GstRtpJitterBufferPrivate *priv;
2082   gboolean res = FALSE;
2083
2084   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
2085   if (G_UNLIKELY (jitterbuffer == NULL))
2086     return FALSE;
2087   priv = jitterbuffer->priv;
2088
2089   switch (GST_QUERY_TYPE (query)) {
2090     case GST_QUERY_LATENCY:
2091     {
2092       /* We need to send the query upstream and add the returned latency to our
2093        * own */
2094       GstClockTime min_latency, max_latency;
2095       gboolean us_live;
2096       GstClockTime our_latency;
2097
2098       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2099         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2100
2101         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2102             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2103             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2104
2105         /* store this so that we can safely sync on the peer buffers. */
2106         JBUF_LOCK (priv);
2107         priv->peer_latency = min_latency;
2108         our_latency = priv->latency_ns;
2109         JBUF_UNLOCK (priv);
2110
2111         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2112             GST_TIME_ARGS (our_latency));
2113
2114         /* we add some latency but can buffer an infinite amount of time */
2115         min_latency += our_latency;
2116         max_latency = -1;
2117
2118         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2119             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2120             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2121
2122         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2123       }
2124       break;
2125     }
2126     case GST_QUERY_POSITION:
2127     {
2128       GstClockTime start, last_out;
2129       GstFormat fmt;
2130
2131       gst_query_parse_position (query, &fmt, NULL);
2132       if (fmt != GST_FORMAT_TIME) {
2133         res = gst_pad_query_default (pad, query);
2134         break;
2135       }
2136
2137       JBUF_LOCK (priv);
2138       start = priv->npt_start;
2139       last_out = priv->last_out_time;
2140       JBUF_UNLOCK (priv);
2141
2142       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2143           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2144           GST_TIME_ARGS (last_out));
2145
2146       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2147         /* bring 0-based outgoing time to stream time */
2148         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2149         res = TRUE;
2150       } else {
2151         res = gst_pad_query_default (pad, query);
2152       }
2153       break;
2154     }
2155     default:
2156       res = gst_pad_query_default (pad, query);
2157       break;
2158   }
2159
2160   gst_object_unref (jitterbuffer);
2161
2162   return res;
2163 }
2164
2165 static void
2166 gst_rtp_jitter_buffer_set_property (GObject * object,
2167     guint prop_id, const GValue * value, GParamSpec * pspec)
2168 {
2169   GstRtpJitterBuffer *jitterbuffer;
2170   GstRtpJitterBufferPrivate *priv;
2171
2172   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2173   priv = jitterbuffer->priv;
2174
2175   switch (prop_id) {
2176     case PROP_LATENCY:
2177     {
2178       guint new_latency, old_latency;
2179
2180       new_latency = g_value_get_uint (value);
2181
2182       JBUF_LOCK (priv);
2183       old_latency = priv->latency_ms;
2184       priv->latency_ms = new_latency;
2185       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2186       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2187       JBUF_UNLOCK (priv);
2188
2189       /* post message if latency changed, this will inform the parent pipeline
2190        * that a latency reconfiguration is possible/needed. */
2191       if (new_latency != old_latency) {
2192         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2193             GST_TIME_ARGS (new_latency * GST_MSECOND));
2194
2195         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2196             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2197       }
2198       break;
2199     }
2200     case PROP_DROP_ON_LATENCY:
2201       JBUF_LOCK (priv);
2202       priv->drop_on_latency = g_value_get_boolean (value);
2203       JBUF_UNLOCK (priv);
2204       break;
2205     case PROP_TS_OFFSET:
2206       JBUF_LOCK (priv);
2207       priv->ts_offset = g_value_get_int64 (value);
2208       /* FIXME, we don't really have a method for signaling a timestamp
2209        * DISCONT without also making this a data discont. */
2210       /* priv->discont = TRUE; */
2211       JBUF_UNLOCK (priv);
2212       break;
2213     case PROP_DO_LOST:
2214       JBUF_LOCK (priv);
2215       priv->do_lost = g_value_get_boolean (value);
2216       JBUF_UNLOCK (priv);
2217       break;
2218     case PROP_MODE:
2219       JBUF_LOCK (priv);
2220       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2221       JBUF_UNLOCK (priv);
2222       break;
2223     default:
2224       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2225       break;
2226   }
2227 }
2228
2229 static void
2230 gst_rtp_jitter_buffer_get_property (GObject * object,
2231     guint prop_id, GValue * value, GParamSpec * pspec)
2232 {
2233   GstRtpJitterBuffer *jitterbuffer;
2234   GstRtpJitterBufferPrivate *priv;
2235
2236   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2237   priv = jitterbuffer->priv;
2238
2239   switch (prop_id) {
2240     case PROP_LATENCY:
2241       JBUF_LOCK (priv);
2242       g_value_set_uint (value, priv->latency_ms);
2243       JBUF_UNLOCK (priv);
2244       break;
2245     case PROP_DROP_ON_LATENCY:
2246       JBUF_LOCK (priv);
2247       g_value_set_boolean (value, priv->drop_on_latency);
2248       JBUF_UNLOCK (priv);
2249       break;
2250     case PROP_TS_OFFSET:
2251       JBUF_LOCK (priv);
2252       g_value_set_int64 (value, priv->ts_offset);
2253       JBUF_UNLOCK (priv);
2254       break;
2255     case PROP_DO_LOST:
2256       JBUF_LOCK (priv);
2257       g_value_set_boolean (value, priv->do_lost);
2258       JBUF_UNLOCK (priv);
2259       break;
2260     case PROP_MODE:
2261       JBUF_LOCK (priv);
2262       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2263       JBUF_UNLOCK (priv);
2264       break;
2265     case PROP_PERCENT:
2266     {
2267       gint percent;
2268
2269       JBUF_LOCK (priv);
2270       if (priv->srcresult != GST_FLOW_OK)
2271         percent = 100;
2272       else
2273         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2274
2275       g_value_set_int (value, percent);
2276       JBUF_UNLOCK (priv);
2277       break;
2278     }
2279     default:
2280       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2281       break;
2282   }
2283 }