Merge remote-tracking branch 'origin/master' into 0.11
[platform/upstream/gstreamer.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* RTPJitterBuffer signals and args */
74 enum
75 {
76   SIGNAL_REQUEST_PT_MAP,
77   SIGNAL_CLEAR_PT_MAP,
78   SIGNAL_HANDLE_SYNC,
79   SIGNAL_ON_NPT_STOP,
80   SIGNAL_SET_ACTIVE,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS      200
85 #define DEFAULT_DROP_ON_LATENCY FALSE
86 #define DEFAULT_TS_OFFSET       0
87 #define DEFAULT_DO_LOST         FALSE
88 #define DEFAULT_MODE            RTP_JITTER_BUFFER_MODE_SLAVE
89 #define DEFAULT_PERCENT         0
90
91 enum
92 {
93   PROP_0,
94   PROP_LATENCY,
95   PROP_DROP_ON_LATENCY,
96   PROP_TS_OFFSET,
97   PROP_DO_LOST,
98   PROP_MODE,
99   PROP_PERCENT,
100   PROP_LAST
101 };
102
103 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
104
105 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
106   JBUF_LOCK (priv);                                   \
107   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
108     goto label;                                       \
109 } G_STMT_END
110
111 #define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
112 #define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
113
114 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
115   JBUF_WAIT(priv);                                    \
116   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
117     goto label;                                       \
118 } G_STMT_END
119
120 #define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
121
122 struct _GstRtpJitterBufferPrivate
123 {
124   GstPad *sinkpad, *srcpad;
125   GstPad *rtcpsinkpad;
126
127   RTPJitterBuffer *jbuf;
128   GMutex *jbuf_lock;
129   GCond *jbuf_cond;
130   gboolean waiting;
131   gboolean discont;
132   gboolean active;
133   guint64 out_offset;
134
135   /* properties */
136   guint latency_ms;
137   guint64 latency_ns;
138   gboolean drop_on_latency;
139   gint64 ts_offset;
140   gboolean do_lost;
141
142   /* the last seqnum we pushed out */
143   guint32 last_popped_seqnum;
144   /* the next expected seqnum we push */
145   guint32 next_seqnum;
146   /* last output time */
147   GstClockTime last_out_time;
148   /* the next expected seqnum we receive */
149   guint32 next_in_seqnum;
150
151   /* start and stop ranges */
152   GstClockTime npt_start;
153   GstClockTime npt_stop;
154   guint64 ext_timestamp;
155   guint64 last_elapsed;
156   guint64 estimated_eos;
157   GstClockID eos_id;
158   gboolean reached_npt_stop;
159
160   /* state */
161   gboolean eos;
162
163   /* clock rate and rtp timestamp offset */
164   gint last_pt;
165   gint32 clock_rate;
166   gint64 clock_base;
167   gint64 prev_ts_offset;
168
169   /* when we are shutting down */
170   GstFlowReturn srcresult;
171   gboolean blocked;
172
173   /* for sync */
174   GstSegment segment;
175   GstClockID clock_id;
176   gboolean unscheduled;
177   /* the latency of the upstream peer, we have to take this into account when
178    * synchronizing the buffers. */
179   GstClockTime peer_latency;
180
181   /* some accounting */
182   guint64 num_late;
183   guint64 num_duplicates;
184 };
185
186 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
187   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
188                                 GstRtpJitterBufferPrivate))
189
190 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
191 GST_STATIC_PAD_TEMPLATE ("sink",
192     GST_PAD_SINK,
193     GST_PAD_ALWAYS,
194     GST_STATIC_CAPS ("application/x-rtp, "
195         "clock-rate = (int) [ 1, 2147483647 ]"
196         /* "payload = (int) , "
197          * "encoding-name = (string) "
198          */ )
199     );
200
201 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
202 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
203     GST_PAD_SINK,
204     GST_PAD_REQUEST,
205     GST_STATIC_CAPS ("application/x-rtcp")
206     );
207
208 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
209 GST_STATIC_PAD_TEMPLATE ("src",
210     GST_PAD_SRC,
211     GST_PAD_ALWAYS,
212     GST_STATIC_CAPS ("application/x-rtp"
213         /* "payload = (int) , "
214          * "clock-rate = (int) , "
215          * "encoding-name = (string) "
216          */ )
217     );
218
219 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
220
221 #define gst_rtp_jitter_buffer_parent_class parent_class
222 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
223
224 /* object overrides */
225 static void gst_rtp_jitter_buffer_set_property (GObject * object,
226     guint prop_id, const GValue * value, GParamSpec * pspec);
227 static void gst_rtp_jitter_buffer_get_property (GObject * object,
228     guint prop_id, GValue * value, GParamSpec * pspec);
229 static void gst_rtp_jitter_buffer_finalize (GObject * object);
230
231 /* element overrides */
232 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
233     * element, GstStateChange transition);
234 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
235     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
236 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
237     GstPad * pad);
238 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
239
240 /* pad overrides */
241 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
242 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
243     GstObject * parent);
244
245 /* sinkpad overrides */
246 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
247     GstObject * parent, GstEvent * event);
248 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
249     GstObject * parent, GstBuffer * buffer);
250
251 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
252     GstObject * parent, GstEvent * event);
253 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
254     GstObject * parent, GstBuffer * buffer);
255
256 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
257     GstObject * parent, GstQuery * query);
258
259 /* srcpad overrides */
260 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
261     GstObject * parent, GstEvent * event);
262 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
263     GstObject * parent, GstPadMode mode, gboolean active);
264 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
265 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
266     GstObject * parent, GstQuery * query);
267
268 static void
269 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
270 static GstClockTime
271 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
272     gboolean active, guint64 base_time);
273
274 static void
275 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
276 {
277   GObjectClass *gobject_class;
278   GstElementClass *gstelement_class;
279
280   gobject_class = (GObjectClass *) klass;
281   gstelement_class = (GstElementClass *) klass;
282
283   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
284
285   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
286
287   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
288   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
289
290   /**
291    * GstRtpJitterBuffer::latency:
292    *
293    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
294    * for at most this time.
295    */
296   g_object_class_install_property (gobject_class, PROP_LATENCY,
297       g_param_spec_uint ("latency", "Buffer latency in ms",
298           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
299           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
300   /**
301    * GstRtpJitterBuffer::drop-on-latency:
302    *
303    * Drop oldest buffers when the queue is completely filled.
304    */
305   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
306       g_param_spec_boolean ("drop-on-latency",
307           "Drop buffers when maximum latency is reached",
308           "Tells the jitterbuffer to never exceed the given latency in size",
309           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
310   /**
311    * GstRtpJitterBuffer::ts-offset:
312    *
313    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
314    * This is mainly used to ensure interstream synchronisation.
315    */
316   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
317       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
318           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
319           G_MAXINT64, DEFAULT_TS_OFFSET,
320           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
321
322   /**
323    * GstRtpJitterBuffer::do-lost:
324    *
325    * Send out a GstRTPPacketLost event downstream when a packet is considered
326    * lost.
327    */
328   g_object_class_install_property (gobject_class, PROP_DO_LOST,
329       g_param_spec_boolean ("do-lost", "Do Lost",
330           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
331           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
332
333   /**
334    * GstRtpJitterBuffer::mode:
335    *
336    * Control the buffering and timestamping mode used by the jitterbuffer.
337    */
338   g_object_class_install_property (gobject_class, PROP_MODE,
339       g_param_spec_enum ("mode", "Mode",
340           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
341           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
342   /**
343    * GstRtpJitterBuffer::percent:
344    *
345    * The percent of the jitterbuffer that is filled.
346    *
347    * Since: 0.10.19
348    */
349   g_object_class_install_property (gobject_class, PROP_PERCENT,
350       g_param_spec_int ("percent", "percent",
351           "The buffer filled percent", 0, 100,
352           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
353   /**
354    * GstRtpJitterBuffer::request-pt-map:
355    * @buffer: the object which received the signal
356    * @pt: the pt
357    *
358    * Request the payload type as #GstCaps for @pt.
359    */
360   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
361       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
362       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
363           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
364       GST_TYPE_CAPS, 1, G_TYPE_UINT);
365   /**
366    * GstRtpJitterBuffer::handle-sync:
367    * @buffer: the object which received the signal
368    * @struct: a GstStructure containing sync values.
369    *
370    * Be notified of new sync values.
371    */
372   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
373       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
374       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
375           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
376       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
377
378   /**
379    * GstRtpJitterBuffer::on-npt-stop
380    * @buffer: the object which received the signal
381    *
382    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
383    * the npt-stop position.
384    */
385   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
386       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
387       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
388           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
389       G_TYPE_NONE, 0, G_TYPE_NONE);
390
391   /**
392    * GstRtpJitterBuffer::clear-pt-map:
393    * @buffer: the object which received the signal
394    *
395    * Invalidate the clock-rate as obtained with the
396    * #GstRtpJitterBuffer::request-pt-map signal.
397    */
398   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
399       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
400       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
401       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
402       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
403
404   /**
405    * GstRtpJitterBuffer::set-active:
406    * @buffer: the object which received the signal
407    *
408    * Start pushing out packets with the given base time. This signal is only
409    * useful in buffering mode.
410    *
411    * Returns: the time of the last pushed packet.
412    *
413    * Since: 0.10.19
414    */
415   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
416       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
417       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
418       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
419       gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
420       G_TYPE_UINT64);
421
422   gstelement_class->change_state =
423       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
424   gstelement_class->request_new_pad =
425       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
426   gstelement_class->release_pad =
427       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
428   gstelement_class->provide_clock =
429       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
430
431   gst_element_class_add_pad_template (gstelement_class,
432       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
433   gst_element_class_add_pad_template (gstelement_class,
434       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
435   gst_element_class_add_pad_template (gstelement_class,
436       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
437
438   gst_element_class_set_details_simple (gstelement_class,
439       "RTP packet jitter-buffer", "Filter/Network/RTP",
440       "A buffer that deals with network jitter and other transmission faults",
441       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
442       "Wim Taymans <wim.taymans@gmail.com>");
443
444   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
445   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
446
447   GST_DEBUG_CATEGORY_INIT
448       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
449 }
450
451 static void
452 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
453 {
454   GstRtpJitterBufferPrivate *priv;
455
456   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
457   jitterbuffer->priv = priv;
458
459   priv->latency_ms = DEFAULT_LATENCY_MS;
460   priv->latency_ns = priv->latency_ms * GST_MSECOND;
461   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
462   priv->do_lost = DEFAULT_DO_LOST;
463
464   priv->jbuf = rtp_jitter_buffer_new ();
465   priv->jbuf_lock = g_mutex_new ();
466   priv->jbuf_cond = g_cond_new ();
467
468   /* reset skew detection initialy */
469   rtp_jitter_buffer_reset_skew (priv->jbuf);
470   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
471   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
472   priv->active = TRUE;
473
474   priv->srcpad =
475       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
476       "src");
477
478   gst_pad_set_activatemode_function (priv->srcpad,
479       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
480   gst_pad_set_query_function (priv->srcpad,
481       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
482   gst_pad_set_event_function (priv->srcpad,
483       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
484
485   priv->sinkpad =
486       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
487       "sink");
488
489   gst_pad_set_chain_function (priv->sinkpad,
490       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
491   gst_pad_set_event_function (priv->sinkpad,
492       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
493   gst_pad_set_query_function (priv->sinkpad,
494       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
495
496   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
497   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
498
499   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
500 }
501
502 static void
503 gst_rtp_jitter_buffer_finalize (GObject * object)
504 {
505   GstRtpJitterBuffer *jitterbuffer;
506
507   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
508
509   g_mutex_free (jitterbuffer->priv->jbuf_lock);
510   g_cond_free (jitterbuffer->priv->jbuf_cond);
511
512   g_object_unref (jitterbuffer->priv->jbuf);
513
514   G_OBJECT_CLASS (parent_class)->finalize (object);
515 }
516
517 static GstIterator *
518 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
519 {
520   GstRtpJitterBuffer *jitterbuffer;
521   GstPad *otherpad = NULL;
522   GstIterator *it;
523   GValue val = { 0, };
524
525   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
526
527   if (pad == jitterbuffer->priv->sinkpad) {
528     otherpad = jitterbuffer->priv->srcpad;
529   } else if (pad == jitterbuffer->priv->srcpad) {
530     otherpad = jitterbuffer->priv->sinkpad;
531   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
532     otherpad = NULL;
533   }
534
535   g_value_init (&val, GST_TYPE_PAD);
536   g_value_set_object (&val, otherpad);
537   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
538   g_value_unset (&val);
539
540   return it;
541 }
542
543 static GstPad *
544 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
545 {
546   GstRtpJitterBufferPrivate *priv;
547
548   priv = jitterbuffer->priv;
549
550   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
551
552   priv->rtcpsinkpad =
553       gst_pad_new_from_static_template
554       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
555   gst_pad_set_chain_function (priv->rtcpsinkpad,
556       gst_rtp_jitter_buffer_chain_rtcp);
557   gst_pad_set_event_function (priv->rtcpsinkpad,
558       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
559   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
560       gst_rtp_jitter_buffer_iterate_internal_links);
561   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
562   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
563
564   return priv->rtcpsinkpad;
565 }
566
567 static void
568 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
569 {
570   GstRtpJitterBufferPrivate *priv;
571
572   priv = jitterbuffer->priv;
573
574   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
575
576   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
577
578   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
579   priv->rtcpsinkpad = NULL;
580 }
581
582 static GstPad *
583 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
584     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
585 {
586   GstRtpJitterBuffer *jitterbuffer;
587   GstElementClass *klass;
588   GstPad *result;
589   GstRtpJitterBufferPrivate *priv;
590
591   g_return_val_if_fail (templ != NULL, NULL);
592   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
593
594   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
595   priv = jitterbuffer->priv;
596   klass = GST_ELEMENT_GET_CLASS (element);
597
598   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
599
600   /* figure out the template */
601   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
602     if (priv->rtcpsinkpad != NULL)
603       goto exists;
604
605     result = create_rtcp_sink (jitterbuffer);
606   } else
607     goto wrong_template;
608
609   return result;
610
611   /* ERRORS */
612 wrong_template:
613   {
614     g_warning ("gstrtpjitterbuffer: this is not our template");
615     return NULL;
616   }
617 exists:
618   {
619     g_warning ("gstrtpjitterbuffer: pad already requested");
620     return NULL;
621   }
622 }
623
624 static void
625 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
626 {
627   GstRtpJitterBuffer *jitterbuffer;
628   GstRtpJitterBufferPrivate *priv;
629
630   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
631   g_return_if_fail (GST_IS_PAD (pad));
632
633   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
634   priv = jitterbuffer->priv;
635
636   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
637
638   if (priv->rtcpsinkpad == pad) {
639     remove_rtcp_sink (jitterbuffer);
640   } else
641     goto wrong_pad;
642
643   return;
644
645   /* ERRORS */
646 wrong_pad:
647   {
648     g_warning ("gstjitterbuffer: asked to release an unknown pad");
649     return;
650   }
651 }
652
653 static GstClock *
654 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
655 {
656   return gst_system_clock_obtain ();
657 }
658
659 static void
660 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
661 {
662   GstRtpJitterBufferPrivate *priv;
663
664   priv = jitterbuffer->priv;
665
666   /* this will trigger a new pt-map request signal, FIXME, do something better. */
667
668   JBUF_LOCK (priv);
669   priv->clock_rate = -1;
670   /* do not clear current content, but refresh state for new arrival */
671   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
672   rtp_jitter_buffer_reset_skew (priv->jbuf);
673   priv->last_popped_seqnum = -1;
674   priv->next_seqnum = -1;
675   JBUF_UNLOCK (priv);
676 }
677
678 static GstClockTime
679 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
680     guint64 offset)
681 {
682   GstRtpJitterBufferPrivate *priv;
683   GstClockTime last_out;
684   GstBuffer *head;
685
686   priv = jbuf->priv;
687
688   JBUF_LOCK (priv);
689   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
690       active, GST_TIME_ARGS (offset));
691
692   if (active != priv->active) {
693     /* add the amount of time spent in paused to the output offset. All
694      * outgoing buffers will have this offset applied to their timestamps in
695      * order to make them arrive in time in the sink. */
696     priv->out_offset = offset;
697     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
698         GST_TIME_ARGS (priv->out_offset));
699     priv->active = active;
700     JBUF_SIGNAL (priv);
701   }
702   if (!active) {
703     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
704   }
705   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
706     /* head buffer timestamp and offset gives our output time */
707     last_out = GST_BUFFER_TIMESTAMP (head) + priv->ts_offset;
708   } else {
709     /* use last known time when the buffer is empty */
710     last_out = priv->last_out_time;
711   }
712   JBUF_UNLOCK (priv);
713
714   return last_out;
715 }
716
717 static GstCaps *
718 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
719 {
720   GstRtpJitterBuffer *jitterbuffer;
721   GstRtpJitterBufferPrivate *priv;
722   GstPad *other;
723   GstCaps *caps;
724   const GstCaps *templ;
725
726   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
727   priv = jitterbuffer->priv;
728
729   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
730
731   caps = gst_pad_peer_query_caps (other, filter);
732
733   templ = gst_pad_get_pad_template_caps (pad);
734   if (caps == NULL) {
735     GST_DEBUG_OBJECT (jitterbuffer, "copy template");
736     caps = gst_caps_copy (templ);
737   } else {
738     GstCaps *intersect;
739
740     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
741
742     intersect = gst_caps_intersect (caps, templ);
743     gst_caps_unref (caps);
744
745     caps = intersect;
746   }
747   gst_object_unref (jitterbuffer);
748
749   return caps;
750 }
751
752 /*
753  * Must be called with JBUF_LOCK held
754  */
755
756 static gboolean
757 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
758     GstCaps * caps)
759 {
760   GstRtpJitterBufferPrivate *priv;
761   GstStructure *caps_struct;
762   guint val;
763   GstClockTime tval;
764
765   priv = jitterbuffer->priv;
766
767   /* first parse the caps */
768   caps_struct = gst_caps_get_structure (caps, 0);
769
770   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
771
772   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
773    * measure the amount of data in the buffer */
774   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
775     goto error;
776
777   if (priv->clock_rate <= 0)
778     goto wrong_rate;
779
780   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
781
782   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
783    * can use this to track the amount of time elapsed on the sender. */
784   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
785     priv->clock_base = val;
786   else
787     priv->clock_base = -1;
788
789   priv->ext_timestamp = priv->clock_base;
790
791   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
792       priv->clock_base);
793
794   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
795     /* first expected seqnum, only update when we didn't have a previous base. */
796     if (priv->next_in_seqnum == -1)
797       priv->next_in_seqnum = val;
798     if (priv->next_seqnum == -1)
799       priv->next_seqnum = val;
800   }
801
802   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
803
804   /* the start and stop times. The seqnum-base corresponds to the start time. We
805    * will keep track of the seqnums on the output and when we reach the one
806    * corresponding to npt-stop, we emit the npt-stop-reached signal */
807   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
808     priv->npt_start = tval;
809   else
810     priv->npt_start = 0;
811
812   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
813     priv->npt_stop = tval;
814   else
815     priv->npt_stop = -1;
816
817   GST_DEBUG_OBJECT (jitterbuffer,
818       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
819       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
820
821   return TRUE;
822
823   /* ERRORS */
824 error:
825   {
826     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
827     return FALSE;
828   }
829 wrong_rate:
830   {
831     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
832     return FALSE;
833   }
834 }
835
836 static void
837 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
838 {
839   GstRtpJitterBufferPrivate *priv;
840
841   priv = jitterbuffer->priv;
842
843   JBUF_LOCK (priv);
844   /* mark ourselves as flushing */
845   priv->srcresult = GST_FLOW_WRONG_STATE;
846   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
847   /* this unblocks any waiting pops on the src pad task */
848   JBUF_SIGNAL (priv);
849   /* unlock clock, we just unschedule, the entry will be released by the
850    * locking streaming thread. */
851   if (priv->clock_id) {
852     gst_clock_id_unschedule (priv->clock_id);
853     priv->unscheduled = TRUE;
854   }
855   JBUF_UNLOCK (priv);
856 }
857
858 static void
859 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
860 {
861   GstRtpJitterBufferPrivate *priv;
862
863   priv = jitterbuffer->priv;
864
865   JBUF_LOCK (priv);
866   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
867   /* Mark as non flushing */
868   priv->srcresult = GST_FLOW_OK;
869   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
870   priv->last_popped_seqnum = -1;
871   priv->last_out_time = -1;
872   priv->next_seqnum = -1;
873   priv->next_in_seqnum = -1;
874   priv->clock_rate = -1;
875   priv->eos = FALSE;
876   priv->estimated_eos = -1;
877   priv->last_elapsed = 0;
878   priv->reached_npt_stop = FALSE;
879   priv->ext_timestamp = -1;
880   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
881   rtp_jitter_buffer_flush (priv->jbuf);
882   rtp_jitter_buffer_reset_skew (priv->jbuf);
883   JBUF_UNLOCK (priv);
884 }
885
886 static gboolean
887 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
888     GstPadMode mode, gboolean active)
889 {
890   gboolean result;
891   GstRtpJitterBuffer *jitterbuffer = NULL;
892
893   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
894
895   switch (mode) {
896     case GST_PAD_MODE_PUSH:
897       if (active) {
898         /* allow data processing */
899         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
900
901         /* start pushing out buffers */
902         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
903         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
904             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
905       } else {
906         /* make sure all data processing stops ASAP */
907         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
908
909         /* NOTE this will hardlock if the state change is called from the src pad
910          * task thread because we will _join() the thread. */
911         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
912         result = gst_pad_stop_task (pad);
913       }
914       break;
915     default:
916       result = FALSE;
917       break;
918   }
919   return result;
920 }
921
922 static GstStateChangeReturn
923 gst_rtp_jitter_buffer_change_state (GstElement * element,
924     GstStateChange transition)
925 {
926   GstRtpJitterBuffer *jitterbuffer;
927   GstRtpJitterBufferPrivate *priv;
928   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
929
930   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
931   priv = jitterbuffer->priv;
932
933   switch (transition) {
934     case GST_STATE_CHANGE_NULL_TO_READY:
935       break;
936     case GST_STATE_CHANGE_READY_TO_PAUSED:
937       JBUF_LOCK (priv);
938       /* reset negotiated values */
939       priv->clock_rate = -1;
940       priv->clock_base = -1;
941       priv->peer_latency = 0;
942       priv->last_pt = -1;
943       /* block until we go to PLAYING */
944       priv->blocked = TRUE;
945       JBUF_UNLOCK (priv);
946       break;
947     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
948       JBUF_LOCK (priv);
949       /* unblock to allow streaming in PLAYING */
950       priv->blocked = FALSE;
951       JBUF_SIGNAL (priv);
952       JBUF_UNLOCK (priv);
953       break;
954     default:
955       break;
956   }
957
958   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
959
960   switch (transition) {
961     case GST_STATE_CHANGE_READY_TO_PAUSED:
962       /* we are a live element because we sync to the clock, which we can only
963        * do in the PLAYING state */
964       if (ret != GST_STATE_CHANGE_FAILURE)
965         ret = GST_STATE_CHANGE_NO_PREROLL;
966       break;
967     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
968       JBUF_LOCK (priv);
969       /* block to stop streaming when PAUSED */
970       priv->blocked = TRUE;
971       JBUF_UNLOCK (priv);
972       if (ret != GST_STATE_CHANGE_FAILURE)
973         ret = GST_STATE_CHANGE_NO_PREROLL;
974       break;
975     case GST_STATE_CHANGE_PAUSED_TO_READY:
976       break;
977     case GST_STATE_CHANGE_READY_TO_NULL:
978       break;
979     default:
980       break;
981   }
982
983   return ret;
984 }
985
986 static gboolean
987 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
988     GstEvent * event)
989 {
990   gboolean ret = TRUE;
991   GstRtpJitterBuffer *jitterbuffer;
992   GstRtpJitterBufferPrivate *priv;
993
994   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
995   priv = jitterbuffer->priv;
996
997   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
998
999   switch (GST_EVENT_TYPE (event)) {
1000     case GST_EVENT_LATENCY:
1001     {
1002       GstClockTime latency;
1003
1004       gst_event_parse_latency (event, &latency);
1005
1006       JBUF_LOCK (priv);
1007       /* adjust the overall buffer delay to the total pipeline latency in
1008        * buffering mode because if downstream consumes too fast (because of
1009        * large latency or queues, we would start rebuffering again. */
1010       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1011           RTP_JITTER_BUFFER_MODE_BUFFER) {
1012         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1013       }
1014       JBUF_UNLOCK (priv);
1015
1016       ret = gst_pad_push_event (priv->sinkpad, event);
1017       break;
1018     }
1019     default:
1020       ret = gst_pad_push_event (priv->sinkpad, event);
1021       break;
1022   }
1023
1024   return ret;
1025 }
1026
1027 static gboolean
1028 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1029     GstEvent * event)
1030 {
1031   gboolean ret = TRUE;
1032   GstRtpJitterBuffer *jitterbuffer;
1033   GstRtpJitterBufferPrivate *priv;
1034
1035   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1036   priv = jitterbuffer->priv;
1037
1038   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1039
1040   switch (GST_EVENT_TYPE (event)) {
1041     case GST_EVENT_CAPS:
1042     {
1043       GstCaps *caps;
1044
1045       gst_event_parse_caps (event, &caps);
1046
1047       JBUF_LOCK (priv);
1048       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1049       JBUF_UNLOCK (priv);
1050
1051       /* set same caps on srcpad on success */
1052       if (ret)
1053         gst_pad_set_caps (priv->srcpad, caps);
1054
1055       gst_event_unref (event);
1056       break;
1057     }
1058     case GST_EVENT_SEGMENT:
1059     {
1060       gst_event_copy_segment (event, &priv->segment);
1061
1062       /* we need time for now */
1063       if (priv->segment.format != GST_FORMAT_TIME)
1064         goto newseg_wrong_format;
1065
1066       GST_DEBUG_OBJECT (jitterbuffer,
1067           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1068
1069       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1070       ret = gst_pad_push_event (priv->srcpad, event);
1071       break;
1072     }
1073     case GST_EVENT_FLUSH_START:
1074       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1075       ret = gst_pad_push_event (priv->srcpad, event);
1076       break;
1077     case GST_EVENT_FLUSH_STOP:
1078       ret = gst_pad_push_event (priv->srcpad, event);
1079       ret =
1080           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1081           GST_PAD_MODE_PUSH, TRUE);
1082       break;
1083     case GST_EVENT_EOS:
1084     {
1085       /* push EOS in queue. We always push it at the head */
1086       JBUF_LOCK (priv);
1087       /* check for flushing, we need to discard the event and return FALSE when
1088        * we are flushing */
1089       ret = priv->srcresult == GST_FLOW_OK;
1090       if (ret && !priv->eos) {
1091         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1092         priv->eos = TRUE;
1093         JBUF_SIGNAL (priv);
1094       } else if (priv->eos) {
1095         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1096       } else {
1097         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1098             gst_flow_get_name (priv->srcresult));
1099       }
1100       JBUF_UNLOCK (priv);
1101       gst_event_unref (event);
1102       break;
1103     }
1104     default:
1105       ret = gst_pad_push_event (priv->srcpad, event);
1106       break;
1107   }
1108
1109 done:
1110
1111   return ret;
1112
1113   /* ERRORS */
1114 newseg_wrong_format:
1115   {
1116     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1117     ret = FALSE;
1118     gst_event_unref (event);
1119     goto done;
1120   }
1121 }
1122
1123 static gboolean
1124 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1125     GstEvent * event)
1126 {
1127   GstRtpJitterBuffer *jitterbuffer;
1128
1129   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1130
1131   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1132
1133   switch (GST_EVENT_TYPE (event)) {
1134     case GST_EVENT_FLUSH_START:
1135       break;
1136     case GST_EVENT_FLUSH_STOP:
1137       break;
1138     default:
1139       break;
1140   }
1141   gst_event_unref (event);
1142
1143   return TRUE;
1144 }
1145
1146 /*
1147  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1148  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1149  * GST_FLOW_WRONG_STATE when the element is shutting down. On success
1150  * GST_FLOW_OK is returned.
1151  */
1152 static GstFlowReturn
1153 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1154     guint8 pt)
1155 {
1156   GValue ret = { 0 };
1157   GValue args[2] = { {0}, {0} };
1158   GstCaps *caps;
1159   gboolean res;
1160
1161   g_value_init (&args[0], GST_TYPE_ELEMENT);
1162   g_value_set_object (&args[0], jitterbuffer);
1163   g_value_init (&args[1], G_TYPE_UINT);
1164   g_value_set_uint (&args[1], pt);
1165
1166   g_value_init (&ret, GST_TYPE_CAPS);
1167   g_value_set_boxed (&ret, NULL);
1168
1169   JBUF_UNLOCK (jitterbuffer->priv);
1170   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1171       &ret);
1172   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1173
1174   g_value_unset (&args[0]);
1175   g_value_unset (&args[1]);
1176   caps = (GstCaps *) g_value_dup_boxed (&ret);
1177   g_value_unset (&ret);
1178   if (!caps)
1179     goto no_caps;
1180
1181   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1182   gst_caps_unref (caps);
1183
1184   if (G_UNLIKELY (!res))
1185     goto parse_failed;
1186
1187   return GST_FLOW_OK;
1188
1189   /* ERRORS */
1190 no_caps:
1191   {
1192     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1193     return GST_FLOW_ERROR;
1194   }
1195 out_flushing:
1196   {
1197     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1198     return GST_FLOW_WRONG_STATE;
1199   }
1200 parse_failed:
1201   {
1202     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1203     return GST_FLOW_ERROR;
1204   }
1205 }
1206
1207 /* call with jbuf lock held */
1208 static void
1209 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1210 {
1211   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1212
1213   /* too short a stream, or too close to EOS will never really fill buffer */
1214   if (*percent != -1 && priv->npt_stop != -1 &&
1215       priv->npt_stop - priv->npt_start <=
1216       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1217     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1218     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1219     *percent = 100;
1220   }
1221 }
1222
1223 static void
1224 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1225 {
1226   GstMessage *message;
1227
1228   /* Post a buffering message */
1229   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1230   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1231
1232   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1233 }
1234
1235 static GstFlowReturn
1236 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1237     GstBuffer * buffer)
1238 {
1239   GstRtpJitterBuffer *jitterbuffer;
1240   GstRtpJitterBufferPrivate *priv;
1241   guint16 seqnum;
1242   GstFlowReturn ret = GST_FLOW_OK;
1243   GstClockTime timestamp;
1244   guint64 latency_ts;
1245   gboolean tail;
1246   gint percent = -1;
1247   guint8 pt;
1248   GstRTPBuffer rtp = { NULL };
1249
1250   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1251
1252   if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
1253     goto invalid_buffer;
1254
1255   priv = jitterbuffer->priv;
1256
1257   gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp);
1258   pt = gst_rtp_buffer_get_payload_type (&rtp);
1259   seqnum = gst_rtp_buffer_get_seq (&rtp);
1260   gst_rtp_buffer_unmap (&rtp);
1261
1262   /* take the timestamp of the buffer. This is the time when the packet was
1263    * received and is used to calculate jitter and clock skew. We will adjust
1264    * this timestamp with the smoothed value after processing it in the
1265    * jitterbuffer. */
1266   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1267   /* bring to running time */
1268   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
1269       timestamp);
1270
1271   GST_DEBUG_OBJECT (jitterbuffer,
1272       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
1273       GST_TIME_ARGS (timestamp));
1274
1275   JBUF_LOCK_CHECK (priv, out_flushing);
1276
1277   if (G_UNLIKELY (priv->last_pt != pt)) {
1278     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1279         pt);
1280
1281     priv->last_pt = pt;
1282     /* reset clock-rate so that we get a new one */
1283     priv->clock_rate = -1;
1284 #if 0
1285     GstCaps *caps;
1286     /* Try to get the clock-rate from the caps first if we can. If there are no
1287      * caps we must fire the signal to get the clock-rate. */
1288     if ((caps = GST_BUFFER_CAPS (buffer))) {
1289       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1290     }
1291 #endif
1292   }
1293
1294   if (G_UNLIKELY (priv->clock_rate == -1)) {
1295     /* no clock rate given on the caps, try to get one with the signal */
1296     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1297             pt) == GST_FLOW_WRONG_STATE)
1298       goto out_flushing;
1299
1300     if (G_UNLIKELY (priv->clock_rate == -1))
1301       goto no_clock_rate;
1302   }
1303
1304   /* don't accept more data on EOS */
1305   if (G_UNLIKELY (priv->eos))
1306     goto have_eos;
1307
1308   /* now check against our expected seqnum */
1309   if (G_LIKELY (priv->next_in_seqnum != -1)) {
1310     gint gap;
1311     gboolean reset = FALSE;
1312
1313     gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
1314     if (G_UNLIKELY (gap != 0)) {
1315       GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1316           priv->next_in_seqnum, seqnum, gap);
1317       /* priv->next_in_seqnum >= seqnum, this packet is too late or the
1318        * sender might have been restarted with different seqnum. */
1319       if (gap < -RTP_MAX_MISORDER) {
1320         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
1321         reset = TRUE;
1322       }
1323       /* priv->next_in_seqnum < seqnum, this is a new packet */
1324       else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1325         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
1326             gap);
1327         reset = TRUE;
1328       } else {
1329         GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
1330       }
1331     }
1332     if (G_UNLIKELY (reset)) {
1333       GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1334       rtp_jitter_buffer_flush (priv->jbuf);
1335       rtp_jitter_buffer_reset_skew (priv->jbuf);
1336       priv->last_popped_seqnum = -1;
1337       priv->next_seqnum = seqnum;
1338     }
1339   }
1340   priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1341
1342   /* let's check if this buffer is too late, we can only accept packets with
1343    * bigger seqnum than the one we last pushed. */
1344   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1345     gint gap;
1346
1347     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1348
1349     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1350     if (G_UNLIKELY (gap <= 0))
1351       goto too_late;
1352   }
1353
1354   /* let's drop oldest packet if the queue is already full and drop-on-latency
1355    * is set. We can only do this when there actually is a latency. When no
1356    * latency is set, we just pump it in the queue and let the other end push it
1357    * out as fast as possible. */
1358   if (priv->latency_ms && priv->drop_on_latency) {
1359     latency_ts =
1360         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1361
1362     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1363       GstBuffer *old_buf;
1364
1365       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1366
1367       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1368           old_buf);
1369
1370       gst_buffer_unref (old_buf);
1371     }
1372   }
1373
1374   /* we need to make the metadata writable before pushing it in the jitterbuffer
1375    * because the jitterbuffer will update the timestamp */
1376   buffer = gst_buffer_make_writable (buffer);
1377
1378   /* now insert the packet into the queue in sorted order. This function returns
1379    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1380    * have a duplicate. */
1381   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
1382               priv->clock_rate, &tail, &percent)))
1383     goto duplicate;
1384
1385   /* signal addition of new buffer when the _loop is waiting. */
1386   if (priv->waiting)
1387     JBUF_SIGNAL (priv);
1388
1389   /* let's unschedule and unblock any waiting buffers. We only want to do this
1390    * when the tail buffer changed */
1391   if (G_UNLIKELY (priv->clock_id && tail)) {
1392     GST_DEBUG_OBJECT (jitterbuffer,
1393         "Unscheduling waiting buffer, new tail buffer");
1394     gst_clock_id_unschedule (priv->clock_id);
1395     priv->unscheduled = TRUE;
1396   }
1397
1398   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
1399       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
1400
1401   check_buffering_percent (jitterbuffer, &percent);
1402
1403 finished:
1404   JBUF_UNLOCK (priv);
1405
1406   if (percent != -1)
1407     post_buffering_percent (jitterbuffer, percent);
1408
1409   return ret;
1410
1411   /* ERRORS */
1412 invalid_buffer:
1413   {
1414     /* this is not fatal but should be filtered earlier */
1415     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
1416         ("Received invalid RTP payload, dropping"));
1417     gst_buffer_unref (buffer);
1418     return GST_FLOW_OK;
1419   }
1420 no_clock_rate:
1421   {
1422     GST_WARNING_OBJECT (jitterbuffer,
1423         "No clock-rate in caps!, dropping buffer");
1424     gst_buffer_unref (buffer);
1425     goto finished;
1426   }
1427 out_flushing:
1428   {
1429     ret = priv->srcresult;
1430     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
1431     gst_buffer_unref (buffer);
1432     goto finished;
1433   }
1434 have_eos:
1435   {
1436     ret = GST_FLOW_UNEXPECTED;
1437     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
1438     gst_buffer_unref (buffer);
1439     goto finished;
1440   }
1441 too_late:
1442   {
1443     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1444         " popped, dropping", seqnum, priv->last_popped_seqnum);
1445     priv->num_late++;
1446     gst_buffer_unref (buffer);
1447     goto finished;
1448   }
1449 duplicate:
1450   {
1451     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1452         seqnum);
1453     priv->num_duplicates++;
1454     gst_buffer_unref (buffer);
1455     goto finished;
1456   }
1457 }
1458
1459 static GstClockTime
1460 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1461 {
1462   GstRtpJitterBufferPrivate *priv;
1463
1464   priv = jitterbuffer->priv;
1465
1466   if (timestamp == -1)
1467     return -1;
1468
1469   /* apply the timestamp offset, this is used for inter stream sync */
1470   timestamp += priv->ts_offset;
1471   /* add the offset, this is used when buffering */
1472   timestamp += priv->out_offset;
1473
1474   return timestamp;
1475 }
1476
1477 static GstClockTime
1478 get_sync_time (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1479 {
1480   GstClockTime result;
1481   GstRtpJitterBufferPrivate *priv;
1482
1483   priv = jitterbuffer->priv;
1484
1485   result = timestamp + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1486   /* add latency, this includes our own latency and the peer latency. */
1487   result += priv->latency_ns;
1488   result += priv->peer_latency;
1489
1490   return result;
1491 }
1492
1493 static gboolean
1494 eos_reached (GstClock * clock, GstClockTime time, GstClockID id,
1495     GstRtpJitterBuffer * jitterbuffer)
1496 {
1497   GstRtpJitterBufferPrivate *priv;
1498
1499   priv = jitterbuffer->priv;
1500
1501   JBUF_LOCK_CHECK (priv, flushing);
1502   if (priv->waiting) {
1503     GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
1504     priv->reached_npt_stop = TRUE;
1505     JBUF_SIGNAL (priv);
1506   }
1507   JBUF_UNLOCK (priv);
1508
1509   return TRUE;
1510
1511   /* ERRORS */
1512 flushing:
1513   {
1514     JBUF_UNLOCK (priv);
1515     return FALSE;
1516   }
1517 }
1518
1519 static GstClockTime
1520 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
1521 {
1522   guint64 ext_time, elapsed;
1523   guint32 rtp_time;
1524   GstRtpJitterBufferPrivate *priv;
1525   GstRTPBuffer rtp = { NULL };
1526
1527   priv = jitterbuffer->priv;
1528   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1529   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
1530   gst_rtp_buffer_unmap (&rtp);
1531
1532   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
1533       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
1534
1535   if (rtp_time < priv->ext_timestamp) {
1536     ext_time = priv->ext_timestamp;
1537   } else {
1538     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
1539   }
1540
1541   if (ext_time > priv->clock_base)
1542     elapsed = ext_time - priv->clock_base;
1543   else
1544     elapsed = 0;
1545
1546   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
1547   return elapsed;
1548 }
1549
1550 /*
1551  * This funcion will push out buffers on the source pad.
1552  *
1553  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
1554  * different seqnum (missing packets before B), this function will wait for the
1555  * missing packet to arrive up to the timestamp of buffer B.
1556  */
1557 static void
1558 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
1559 {
1560   GstRtpJitterBufferPrivate *priv;
1561   GstBuffer *outbuf;
1562   GstFlowReturn result;
1563   guint16 seqnum;
1564   guint32 next_seqnum;
1565   GstClockTime timestamp, out_time;
1566   gboolean discont = FALSE;
1567   gint gap;
1568   GstClock *clock;
1569   GstClockID id;
1570   GstClockTime sync_time;
1571   gint percent = -1;
1572   GstRTPBuffer rtp = { NULL };
1573
1574   priv = jitterbuffer->priv;
1575
1576   JBUF_LOCK_CHECK (priv, flushing);
1577 again:
1578   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
1579   while (TRUE) {
1580     id = NULL;
1581     /* always wait if we are blocked */
1582     if (G_LIKELY (!priv->blocked)) {
1583       /* we're buffering but not EOS, wait. */
1584       if (!priv->eos && (!priv->active
1585               || rtp_jitter_buffer_is_buffering (priv->jbuf))) {
1586         GstClockTime elapsed, delay, left;
1587
1588         if (priv->estimated_eos == -1)
1589           goto do_wait;
1590
1591         outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1592         if (outbuf != NULL) {
1593           elapsed = compute_elapsed (jitterbuffer, outbuf);
1594           if (GST_BUFFER_DURATION_IS_VALID (outbuf))
1595             elapsed += GST_BUFFER_DURATION (outbuf);
1596         } else {
1597           GST_INFO_OBJECT (jitterbuffer, "no buffer, using last_elapsed");
1598           elapsed = priv->last_elapsed;
1599         }
1600
1601         delay = rtp_jitter_buffer_get_delay (priv->jbuf);
1602
1603         if (priv->estimated_eos > elapsed)
1604           left = priv->estimated_eos - elapsed;
1605         else
1606           left = 0;
1607
1608         GST_INFO_OBJECT (jitterbuffer, "buffering, elapsed %" GST_TIME_FORMAT
1609             " estimated_eos %" GST_TIME_FORMAT " left %" GST_TIME_FORMAT
1610             " delay %" GST_TIME_FORMAT,
1611             GST_TIME_ARGS (elapsed), GST_TIME_ARGS (priv->estimated_eos),
1612             GST_TIME_ARGS (left), GST_TIME_ARGS (delay));
1613         if (left > delay)
1614           goto do_wait;
1615       }
1616       /* if we have a packet, we can exit the loop and grab it */
1617       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1618         break;
1619       /* no packets but we are EOS, do eos logic */
1620       if (G_UNLIKELY (priv->eos))
1621         goto do_eos;
1622       /* underrun, wait for packets or flushing now if we are expecting an EOS
1623        * timeout, set the async timer for it too */
1624       if (priv->estimated_eos != -1 && !priv->reached_npt_stop) {
1625         sync_time = get_sync_time (jitterbuffer, priv->estimated_eos);
1626
1627         GST_OBJECT_LOCK (jitterbuffer);
1628         clock = GST_ELEMENT_CLOCK (jitterbuffer);
1629         if (clock) {
1630           GST_INFO_OBJECT (jitterbuffer, "scheduling timeout");
1631           id = gst_clock_new_single_shot_id (clock, sync_time);
1632           gst_clock_id_wait_async (id, (GstClockCallback) eos_reached,
1633               jitterbuffer);
1634         }
1635         GST_OBJECT_UNLOCK (jitterbuffer);
1636       }
1637     }
1638   do_wait:
1639     /* now we wait */
1640     GST_DEBUG_OBJECT (jitterbuffer, "waiting");
1641     priv->waiting = TRUE;
1642     JBUF_WAIT (priv);
1643     priv->waiting = FALSE;
1644     GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
1645
1646     if (id) {
1647       /* unschedule any pending async notifications we might have */
1648       gst_clock_id_unschedule (id);
1649       gst_clock_id_unref (id);
1650     }
1651     if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
1652       goto flushing;
1653
1654     if (id && priv->reached_npt_stop) {
1655       goto do_npt_stop;
1656     }
1657   }
1658
1659   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1660    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1661    * wait on the timestamp. In the chain function we will unlock the wait when a
1662    * new buffer is available. The peeked buffer is valid for as long as we hold
1663    * the jitterbuffer lock. */
1664   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1665
1666   /* get the seqnum and the next expected seqnum */
1667   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
1668   seqnum = gst_rtp_buffer_get_seq (&rtp);
1669   gst_rtp_buffer_unmap (&rtp);
1670   next_seqnum = priv->next_seqnum;
1671
1672   /* get the timestamp, this is already corrected for clock skew by the
1673    * jitterbuffer */
1674   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1675
1676   GST_DEBUG_OBJECT (jitterbuffer,
1677       "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
1678       ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
1679       rtp_jitter_buffer_num_packets (priv->jbuf));
1680
1681   /* apply our timestamp offset to the incomming buffer, this will be our output
1682    * timestamp. */
1683   out_time = apply_offset (jitterbuffer, timestamp);
1684
1685   /* get the gap between this and the previous packet. If we don't know the
1686    * previous packet seqnum assume no gap. */
1687   if (G_LIKELY (next_seqnum != -1)) {
1688     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1689
1690     /* if we have a packet that we already pushed or considered dropped, pop it
1691      * off and get the next packet */
1692     if (G_UNLIKELY (gap < 0)) {
1693       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1694           seqnum, next_seqnum);
1695       outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1696       gst_buffer_unref (outbuf);
1697       goto again;
1698     }
1699   } else {
1700     GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
1701     gap = -1;
1702   }
1703
1704   /* If we don't know what the next seqnum should be (== -1) we have to wait
1705    * because it might be possible that we are not receiving this buffer in-order,
1706    * a buffer with a lower seqnum could arrive later and we want to push that
1707    * earlier buffer before this buffer then.
1708    * If we know the expected seqnum, we can compare it to the current seqnum to
1709    * determine if we have missing a packet. If we have a missing packet (which
1710    * must be before this packet) we can wait for it until the deadline for this
1711    * packet expires. */
1712   if (G_UNLIKELY (gap != 0 && out_time != -1)) {
1713     GstClockReturn ret;
1714     GstClockTime duration = GST_CLOCK_TIME_NONE;
1715
1716     if (gap > 0) {
1717       /* we have a gap */
1718       GST_DEBUG_OBJECT (jitterbuffer,
1719           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1720           next_seqnum, seqnum, gap);
1721
1722       if (priv->last_out_time != -1) {
1723         GST_DEBUG_OBJECT (jitterbuffer,
1724             "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1725             GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
1726         /* interpolate between the current time and the last time based on
1727          * number of packets we are missing, this is the estimated duration
1728          * for the missing packet based on equidistant packet spacing. Also make
1729          * sure we never go negative. */
1730         if (out_time >= priv->last_out_time)
1731           duration = (out_time - priv->last_out_time) / (gap + 1);
1732         else
1733           goto lost;
1734
1735         GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1736             GST_TIME_ARGS (duration));
1737         /* add this duration to the timestamp of the last packet we pushed */
1738         out_time = (priv->last_out_time + duration);
1739       }
1740     } else {
1741       /* we don't know what the next_seqnum should be, wait for the last
1742        * possible moment to push this buffer, maybe we get an earlier seqnum
1743        * while we wait */
1744       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1745     }
1746
1747     GST_OBJECT_LOCK (jitterbuffer);
1748     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1749     if (!clock) {
1750       GST_OBJECT_UNLOCK (jitterbuffer);
1751       /* let's just push if there is no clock */
1752       GST_DEBUG_OBJECT (jitterbuffer, "No clock, push right away");
1753       goto push_buffer;
1754     }
1755
1756     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
1757         GST_TIME_ARGS (out_time));
1758
1759     /* prepare for sync against clock */
1760     sync_time = get_sync_time (jitterbuffer, out_time);
1761
1762     /* create an entry for the clock */
1763     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1764     priv->unscheduled = FALSE;
1765     GST_OBJECT_UNLOCK (jitterbuffer);
1766
1767     /* release the lock so that the other end can push stuff or unlock */
1768     JBUF_UNLOCK (priv);
1769
1770     ret = gst_clock_id_wait (id, NULL);
1771
1772     JBUF_LOCK (priv);
1773     /* and free the entry */
1774     gst_clock_id_unref (id);
1775     priv->clock_id = NULL;
1776
1777     /* at this point, the clock could have been unlocked by a timeout, a new
1778      * tail element was added to the queue or because we are shutting down. Check
1779      * for shutdown first. */
1780     if G_UNLIKELY
1781       ((priv->srcresult != GST_FLOW_OK))
1782           goto flushing;
1783
1784     /* if we got unscheduled and we are not flushing, it's because a new tail
1785      * element became available in the queue or we flushed the queue.
1786      * Grab it and try to push or sync. */
1787     if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
1788       GST_DEBUG_OBJECT (jitterbuffer,
1789           "Wait got unscheduled, will retry to push with new buffer");
1790       goto again;
1791     }
1792
1793   lost:
1794     /* we now timed out, this means we lost a packet or finished synchronizing
1795      * on the first buffer. */
1796     if (gap > 0) {
1797       GstEvent *event;
1798
1799       /* we had a gap and thus we lost a packet. Create an event for this.  */
1800       GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
1801       priv->num_late++;
1802       discont = TRUE;
1803
1804       /* update our expected next packet */
1805       priv->last_popped_seqnum = next_seqnum;
1806       priv->last_out_time = out_time;
1807       priv->next_seqnum = (next_seqnum + 1) & 0xffff;
1808
1809       if (priv->do_lost) {
1810         /* create paket lost event */
1811         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1812             gst_structure_new ("GstRTPPacketLost",
1813                 "seqnum", G_TYPE_UINT, (guint) next_seqnum,
1814                 "timestamp", G_TYPE_UINT64, out_time,
1815                 "duration", G_TYPE_UINT64, duration, NULL));
1816
1817         JBUF_UNLOCK (priv);
1818         gst_pad_push_event (priv->srcpad, event);
1819         JBUF_LOCK_CHECK (priv, flushing);
1820       }
1821       /* look for next packet */
1822       goto again;
1823     }
1824
1825     /* there was no known gap,just the first packet, exit the loop and push */
1826     GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
1827
1828     /* get new timestamp, latency might have changed */
1829     out_time = apply_offset (jitterbuffer, timestamp);
1830   }
1831 push_buffer:
1832
1833   /* when we get here we are ready to pop and push the buffer */
1834   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1835
1836   check_buffering_percent (jitterbuffer, &percent);
1837
1838   if (G_UNLIKELY (discont || priv->discont)) {
1839     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1840      * into the jitterbuffer so we can modify now. */
1841     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1842     priv->discont = FALSE;
1843   }
1844
1845   /* apply timestamp with offset to buffer now */
1846   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
1847
1848   /* update the elapsed time when we need to check against the npt stop time. */
1849   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
1850       && priv->clock_base != -1 && priv->clock_rate > 0) {
1851     guint64 elapsed, estimated;
1852
1853     elapsed = compute_elapsed (jitterbuffer, outbuf);
1854
1855     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
1856       guint64 left;
1857
1858       priv->last_elapsed = elapsed;
1859
1860       left = priv->npt_stop - priv->npt_start;
1861       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
1862           GST_TIME_ARGS (left));
1863
1864       if (elapsed > 0)
1865         estimated = gst_util_uint64_scale (out_time, left, elapsed);
1866       else {
1867         /* if there is almost nothing left,
1868          * we may never advance enough to end up in the above case */
1869         if (left < GST_SECOND)
1870           estimated = GST_SECOND;
1871         else
1872           estimated = -1;
1873       }
1874
1875       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
1876           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
1877
1878       priv->estimated_eos = estimated;
1879     }
1880   }
1881
1882   /* now we are ready to push the buffer. Save the seqnum and release the lock
1883    * so the other end can push stuff in the queue again. */
1884   priv->last_popped_seqnum = seqnum;
1885   priv->last_out_time = out_time;
1886   priv->next_seqnum = (seqnum + 1) & 0xffff;
1887   JBUF_UNLOCK (priv);
1888
1889   if (percent != -1)
1890     post_buffering_percent (jitterbuffer, percent);
1891
1892   /* push buffer */
1893   GST_DEBUG_OBJECT (jitterbuffer,
1894       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1895       GST_TIME_ARGS (out_time));
1896   result = gst_pad_push (priv->srcpad, outbuf);
1897   if (G_UNLIKELY (result != GST_FLOW_OK))
1898     goto pause;
1899
1900   return;
1901
1902   /* ERRORS */
1903 do_eos:
1904   {
1905     /* store result, we are flushing now */
1906     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1907     priv->srcresult = GST_FLOW_UNEXPECTED;
1908     gst_pad_pause_task (priv->srcpad);
1909     JBUF_UNLOCK (priv);
1910     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1911     return;
1912   }
1913 do_npt_stop:
1914   {
1915     /* store result, we are flushing now */
1916     GST_DEBUG_OBJECT (jitterbuffer, "We reached the NPT stop");
1917     JBUF_UNLOCK (priv);
1918
1919     g_signal_emit (jitterbuffer,
1920         gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP], 0, NULL);
1921     return;
1922   }
1923 flushing:
1924   {
1925     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1926     gst_pad_pause_task (priv->srcpad);
1927     JBUF_UNLOCK (priv);
1928     return;
1929   }
1930 pause:
1931   {
1932     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
1933         gst_flow_get_name (result));
1934
1935     JBUF_LOCK (priv);
1936     /* store result */
1937     priv->srcresult = result;
1938     /* we don't post errors or anything because upstream will do that for us
1939      * when we pass the return value upstream. */
1940     gst_pad_pause_task (priv->srcpad);
1941     JBUF_UNLOCK (priv);
1942     return;
1943   }
1944 }
1945
1946 static GstFlowReturn
1947 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
1948     GstBuffer * buffer)
1949 {
1950   GstRtpJitterBuffer *jitterbuffer;
1951   GstRtpJitterBufferPrivate *priv;
1952   GstFlowReturn ret = GST_FLOW_OK;
1953   guint64 base_rtptime, base_time;
1954   guint32 clock_rate;
1955   guint64 last_rtptime;
1956   guint32 ssrc;
1957   GstRTCPPacket packet;
1958   guint64 ext_rtptime, diff;
1959   guint32 rtptime;
1960   gboolean drop = FALSE;
1961   GstRTCPBuffer rtcp = { NULL };
1962   guint64 clock_base;
1963
1964   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1965
1966   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
1967     goto invalid_buffer;
1968
1969   priv = jitterbuffer->priv;
1970
1971   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
1972
1973   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
1974     goto empty_buffer;
1975
1976   /* first packet must be SR or RR or else the validate would have failed */
1977   switch (gst_rtcp_packet_get_type (&packet)) {
1978     case GST_RTCP_TYPE_SR:
1979       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
1980           NULL, NULL);
1981       break;
1982     default:
1983       goto ignore_buffer;
1984   }
1985   gst_rtcp_buffer_unmap (&rtcp);
1986
1987   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
1988
1989   JBUF_LOCK (priv);
1990   /* convert the RTP timestamp to our extended timestamp, using the same offset
1991    * we used in the jitterbuffer */
1992   ext_rtptime = priv->jbuf->ext_rtptime;
1993   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
1994
1995   /* get the last values from the jitterbuffer */
1996   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
1997       &clock_rate, &last_rtptime);
1998
1999   clock_base = priv->clock_base;
2000
2001   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2002       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2003       ", clock-base %" G_GUINT64_FORMAT,
2004       ext_rtptime, base_rtptime, clock_rate, clock_base);
2005
2006   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2007     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2008     drop = TRUE;
2009   } else {
2010     /* we can't accept anything that happened before we did the last resync */
2011     if (base_rtptime > ext_rtptime) {
2012       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2013       drop = TRUE;
2014     } else {
2015       /* the SR RTP timestamp must be something close to what we last observed
2016        * in the jitterbuffer */
2017       if (ext_rtptime > last_rtptime) {
2018         /* check how far ahead it is to our RTP timestamps */
2019         diff = ext_rtptime - last_rtptime;
2020         /* if bigger than 1 second, we drop it */
2021         if (diff > clock_rate) {
2022           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2023           /* should drop this, but some RTSP servers end up with bogus
2024            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2025            * so still trigger rptbin sync but invalidate RTCP data
2026            * (sync might use other methods) */
2027           ext_rtptime = -1;
2028         }
2029         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2030             G_GUINT64_FORMAT, last_rtptime, diff);
2031       }
2032     }
2033   }
2034   JBUF_UNLOCK (priv);
2035
2036   if (!drop) {
2037     GstStructure *s;
2038
2039     s = gst_structure_new ("application/x-rtp-sync",
2040         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2041         "base-time", G_TYPE_UINT64, base_time,
2042         "clock-rate", G_TYPE_UINT, clock_rate,
2043         "clock-base", G_TYPE_UINT64, clock_base,
2044         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2045         "sr-buffer", GST_TYPE_BUFFER, buffer, NULL);
2046
2047     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2048     g_signal_emit (jitterbuffer,
2049         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2050     gst_structure_free (s);
2051   } else {
2052     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2053     ret = GST_FLOW_OK;
2054   }
2055
2056 done:
2057   gst_buffer_unref (buffer);
2058
2059   return ret;
2060
2061 invalid_buffer:
2062   {
2063     /* this is not fatal but should be filtered earlier */
2064     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2065         ("Received invalid RTCP payload, dropping"));
2066     ret = GST_FLOW_OK;
2067     goto done;
2068   }
2069 empty_buffer:
2070   {
2071     /* this is not fatal but should be filtered earlier */
2072     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2073         ("Received empty RTCP payload, dropping"));
2074     gst_rtcp_buffer_unmap (&rtcp);
2075     ret = GST_FLOW_OK;
2076     goto done;
2077   }
2078 ignore_buffer:
2079   {
2080     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2081     gst_rtcp_buffer_unmap (&rtcp);
2082     ret = GST_FLOW_OK;
2083     goto done;
2084   }
2085 }
2086
2087 static gboolean
2088 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2089     GstQuery * query)
2090 {
2091   gboolean res = FALSE;
2092
2093   switch (GST_QUERY_TYPE (query)) {
2094     case GST_QUERY_CAPS:
2095     {
2096       GstCaps *filter, *caps;
2097
2098       gst_query_parse_caps (query, &filter);
2099       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2100       gst_query_set_caps_result (query, caps);
2101       gst_caps_unref (caps);
2102       res = TRUE;
2103       break;
2104     }
2105     default:
2106       res = gst_pad_query_default (pad, parent, query);
2107       break;
2108   }
2109
2110   return res;
2111 }
2112
2113 static gboolean
2114 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2115     GstQuery * query)
2116 {
2117   GstRtpJitterBuffer *jitterbuffer;
2118   GstRtpJitterBufferPrivate *priv;
2119   gboolean res = FALSE;
2120
2121   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2122   priv = jitterbuffer->priv;
2123
2124   switch (GST_QUERY_TYPE (query)) {
2125     case GST_QUERY_LATENCY:
2126     {
2127       /* We need to send the query upstream and add the returned latency to our
2128        * own */
2129       GstClockTime min_latency, max_latency;
2130       gboolean us_live;
2131       GstClockTime our_latency;
2132
2133       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2134         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2135
2136         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2137             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2138             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2139
2140         /* store this so that we can safely sync on the peer buffers. */
2141         JBUF_LOCK (priv);
2142         priv->peer_latency = min_latency;
2143         our_latency = priv->latency_ns;
2144         JBUF_UNLOCK (priv);
2145
2146         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2147             GST_TIME_ARGS (our_latency));
2148
2149         /* we add some latency but can buffer an infinite amount of time */
2150         min_latency += our_latency;
2151         max_latency = -1;
2152
2153         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2154             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2155             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2156
2157         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2158       }
2159       break;
2160     }
2161     case GST_QUERY_POSITION:
2162     {
2163       GstClockTime start, last_out;
2164       GstFormat fmt;
2165
2166       gst_query_parse_position (query, &fmt, NULL);
2167       if (fmt != GST_FORMAT_TIME) {
2168         res = gst_pad_query_default (pad, parent, query);
2169         break;
2170       }
2171
2172       JBUF_LOCK (priv);
2173       start = priv->npt_start;
2174       last_out = priv->last_out_time;
2175       JBUF_UNLOCK (priv);
2176
2177       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2178           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2179           GST_TIME_ARGS (last_out));
2180
2181       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2182         /* bring 0-based outgoing time to stream time */
2183         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2184         res = TRUE;
2185       } else {
2186         res = gst_pad_query_default (pad, parent, query);
2187       }
2188       break;
2189     }
2190     case GST_QUERY_CAPS:
2191     {
2192       GstCaps *filter, *caps;
2193
2194       gst_query_parse_caps (query, &filter);
2195       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2196       gst_query_set_caps_result (query, caps);
2197       gst_caps_unref (caps);
2198       res = TRUE;
2199       break;
2200     }
2201     default:
2202       res = gst_pad_query_default (pad, parent, query);
2203       break;
2204   }
2205
2206   return res;
2207 }
2208
2209 static void
2210 gst_rtp_jitter_buffer_set_property (GObject * object,
2211     guint prop_id, const GValue * value, GParamSpec * pspec)
2212 {
2213   GstRtpJitterBuffer *jitterbuffer;
2214   GstRtpJitterBufferPrivate *priv;
2215
2216   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2217   priv = jitterbuffer->priv;
2218
2219   switch (prop_id) {
2220     case PROP_LATENCY:
2221     {
2222       guint new_latency, old_latency;
2223
2224       new_latency = g_value_get_uint (value);
2225
2226       JBUF_LOCK (priv);
2227       old_latency = priv->latency_ms;
2228       priv->latency_ms = new_latency;
2229       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2230       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2231       JBUF_UNLOCK (priv);
2232
2233       /* post message if latency changed, this will inform the parent pipeline
2234        * that a latency reconfiguration is possible/needed. */
2235       if (new_latency != old_latency) {
2236         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2237             GST_TIME_ARGS (new_latency * GST_MSECOND));
2238
2239         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2240             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2241       }
2242       break;
2243     }
2244     case PROP_DROP_ON_LATENCY:
2245       JBUF_LOCK (priv);
2246       priv->drop_on_latency = g_value_get_boolean (value);
2247       JBUF_UNLOCK (priv);
2248       break;
2249     case PROP_TS_OFFSET:
2250       JBUF_LOCK (priv);
2251       priv->ts_offset = g_value_get_int64 (value);
2252       /* FIXME, we don't really have a method for signaling a timestamp
2253        * DISCONT without also making this a data discont. */
2254       /* priv->discont = TRUE; */
2255       JBUF_UNLOCK (priv);
2256       break;
2257     case PROP_DO_LOST:
2258       JBUF_LOCK (priv);
2259       priv->do_lost = g_value_get_boolean (value);
2260       JBUF_UNLOCK (priv);
2261       break;
2262     case PROP_MODE:
2263       JBUF_LOCK (priv);
2264       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2265       JBUF_UNLOCK (priv);
2266       break;
2267     default:
2268       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2269       break;
2270   }
2271 }
2272
2273 static void
2274 gst_rtp_jitter_buffer_get_property (GObject * object,
2275     guint prop_id, GValue * value, GParamSpec * pspec)
2276 {
2277   GstRtpJitterBuffer *jitterbuffer;
2278   GstRtpJitterBufferPrivate *priv;
2279
2280   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2281   priv = jitterbuffer->priv;
2282
2283   switch (prop_id) {
2284     case PROP_LATENCY:
2285       JBUF_LOCK (priv);
2286       g_value_set_uint (value, priv->latency_ms);
2287       JBUF_UNLOCK (priv);
2288       break;
2289     case PROP_DROP_ON_LATENCY:
2290       JBUF_LOCK (priv);
2291       g_value_set_boolean (value, priv->drop_on_latency);
2292       JBUF_UNLOCK (priv);
2293       break;
2294     case PROP_TS_OFFSET:
2295       JBUF_LOCK (priv);
2296       g_value_set_int64 (value, priv->ts_offset);
2297       JBUF_UNLOCK (priv);
2298       break;
2299     case PROP_DO_LOST:
2300       JBUF_LOCK (priv);
2301       g_value_set_boolean (value, priv->do_lost);
2302       JBUF_UNLOCK (priv);
2303       break;
2304     case PROP_MODE:
2305       JBUF_LOCK (priv);
2306       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2307       JBUF_UNLOCK (priv);
2308       break;
2309     case PROP_PERCENT:
2310     {
2311       gint percent;
2312
2313       JBUF_LOCK (priv);
2314       if (priv->srcresult != GST_FLOW_OK)
2315         percent = 100;
2316       else
2317         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
2318
2319       g_value_set_int (value, percent);
2320       JBUF_UNLOCK (priv);
2321       break;
2322     }
2323     default:
2324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2325       break;
2326   }
2327 }