gst/rtpmanager/gstrtpbin.*: Add signal to notify listeners when a sender becomes...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd, 
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
22  * Boston, MA 02111-1307, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  * 
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  * 
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  * 
42  * This element will automatically be used inside gstrtpbin.
43  * 
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpbin-marshal.h"
65
66 #include "gstrtpjitterbuffer.h"
67 #include "rtpjitterbuffer.h"
68 #include "rtpstats.h"
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* low and high threshold tell the queue when to start and stop buffering */
74 #define LOW_THRESHOLD 0.2
75 #define HIGH_THRESHOLD 0.8
76
77 /* elementfactory information */
78 static const GstElementDetails gst_rtp_jitter_buffer_details =
79 GST_ELEMENT_DETAILS ("RTP packet jitter-buffer",
80     "Filter/Network/RTP",
81     "A buffer that deals with network jitter and other transmission faults",
82     "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
83     "Wim Taymans <wim.taymans@gmail.com>");
84
85 /* RTPJitterBuffer signals and args */
86 enum
87 {
88   SIGNAL_REQUEST_PT_MAP,
89   SIGNAL_CLEAR_PT_MAP,
90   LAST_SIGNAL
91 };
92
93 #define DEFAULT_LATENCY_MS      200
94 #define DEFAULT_DROP_ON_LATENCY FALSE
95 #define DEFAULT_TS_OFFSET       0
96 #define DEFAULT_DO_LOST         FALSE
97
98 enum
99 {
100   PROP_0,
101   PROP_LATENCY,
102   PROP_DROP_ON_LATENCY,
103   PROP_TS_OFFSET,
104   PROP_DO_LOST,
105   PROP_LAST
106 };
107
108 #define JBUF_LOCK(priv)   (g_mutex_lock ((priv)->jbuf_lock))
109
110 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
111   JBUF_LOCK (priv);                                   \
112   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
113     goto label;                                       \
114 } G_STMT_END
115
116 #define JBUF_UNLOCK(priv) (g_mutex_unlock ((priv)->jbuf_lock))
117 #define JBUF_WAIT(priv)   (g_cond_wait ((priv)->jbuf_cond, (priv)->jbuf_lock))
118
119 #define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
120   JBUF_WAIT(priv);                                    \
121   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
122     goto label;                                       \
123 } G_STMT_END
124
125 #define JBUF_SIGNAL(priv) (g_cond_signal ((priv)->jbuf_cond))
126
127 struct _GstRtpJitterBufferPrivate
128 {
129   GstPad *sinkpad, *srcpad;
130
131   RTPJitterBuffer *jbuf;
132   GMutex *jbuf_lock;
133   GCond *jbuf_cond;
134   gboolean waiting;
135   gboolean discont;
136
137   /* properties */
138   guint latency_ms;
139   gboolean drop_on_latency;
140   gint64 ts_offset;
141   gboolean do_lost;
142
143   /* the last seqnum we pushed out */
144   guint32 last_popped_seqnum;
145   /* the next expected seqnum */
146   guint32 next_seqnum;
147   /* last output time */
148   GstClockTime last_out_time;
149
150   /* state */
151   gboolean eos;
152
153   /* clock rate and rtp timestamp offset */
154   gint last_pt;
155   gint32 clock_rate;
156   gint64 clock_base;
157   gint64 prev_ts_offset;
158
159   /* when we are shutting down */
160   GstFlowReturn srcresult;
161   gboolean blocked;
162
163   /* for sync */
164   GstSegment segment;
165   GstClockID clock_id;
166   /* the latency of the upstream peer, we have to take this into account when
167    * synchronizing the buffers. */
168   GstClockTime peer_latency;
169
170   /* some accounting */
171   guint64 num_late;
172   guint64 num_duplicates;
173 };
174
175 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
176   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
177                                 GstRtpJitterBufferPrivate))
178
179 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
180 GST_STATIC_PAD_TEMPLATE ("sink",
181     GST_PAD_SINK,
182     GST_PAD_ALWAYS,
183     GST_STATIC_CAPS ("application/x-rtp, "
184         "clock-rate = (int) [ 1, 2147483647 ]"
185         /* "payload = (int) , "
186          * "encoding-name = (string) "
187          */ )
188     );
189
190 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
191 GST_STATIC_PAD_TEMPLATE ("src",
192     GST_PAD_SRC,
193     GST_PAD_ALWAYS,
194     GST_STATIC_CAPS ("application/x-rtp"
195         /* "payload = (int) , "
196          * "clock-rate = (int) , "
197          * "encoding-name = (string) "
198          */ )
199     );
200
201 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
202
203 GST_BOILERPLATE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GstElement,
204     GST_TYPE_ELEMENT);
205
206 /* object overrides */
207 static void gst_rtp_jitter_buffer_set_property (GObject * object,
208     guint prop_id, const GValue * value, GParamSpec * pspec);
209 static void gst_rtp_jitter_buffer_get_property (GObject * object,
210     guint prop_id, GValue * value, GParamSpec * pspec);
211 static void gst_rtp_jitter_buffer_finalize (GObject * object);
212
213 /* element overrides */
214 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
215     * element, GstStateChange transition);
216
217 /* pad overrides */
218 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad);
219
220 /* sinkpad overrides */
221 static gboolean gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps);
222 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
223     GstEvent * event);
224 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
225     GstEvent * event);
226 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
227     GstBuffer * buffer);
228
229 /* srcpad overrides */
230 static gboolean
231 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active);
232 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
233 static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
234
235 static void
236 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
237
238 static void
239 gst_rtp_jitter_buffer_base_init (gpointer klass)
240 {
241   GstElementClass *element_class = GST_ELEMENT_CLASS (klass);
242
243   gst_element_class_add_pad_template (element_class,
244       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
245   gst_element_class_add_pad_template (element_class,
246       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
247   gst_element_class_set_details (element_class, &gst_rtp_jitter_buffer_details);
248 }
249
250 static void
251 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
252 {
253   GObjectClass *gobject_class;
254   GstElementClass *gstelement_class;
255
256   gobject_class = (GObjectClass *) klass;
257   gstelement_class = (GstElementClass *) klass;
258
259   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
260
261   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_finalize);
262
263   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
264   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
265
266   /**
267    * GstRtpJitterBuffer::latency:
268    * 
269    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
270    * for at most this time.
271    */
272   g_object_class_install_property (gobject_class, PROP_LATENCY,
273       g_param_spec_uint ("latency", "Buffer latency in ms",
274           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
275           G_PARAM_READWRITE));
276   /**
277    * GstRtpJitterBuffer::drop-on-latency:
278    * 
279    * Drop oldest buffers when the queue is completely filled. 
280    */
281   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
282       g_param_spec_boolean ("drop-on-latency",
283           "Drop buffers when maximum latency is reached",
284           "Tells the jitterbuffer to never exceed the given latency in size",
285           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE));
286   /**
287    * GstRtpJitterBuffer::ts-offset:
288    * 
289    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
290    * This is mainly used to ensure interstream synchronisation.
291    */
292   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
293       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
294           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
295           G_MAXINT64, DEFAULT_TS_OFFSET,
296           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
297
298   /**
299    * GstRtpJitterBuffer::do-lost:
300    * 
301    * Send out a GstRTPPacketLost event downstream when a packet is considered
302    * lost.
303    */
304   g_object_class_install_property (gobject_class, PROP_DO_LOST,
305       g_param_spec_boolean ("do-lost", "Do Lost",
306           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
307           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
308   /**
309    * GstRtpJitterBuffer::request-pt-map:
310    * @buffer: the object which received the signal
311    * @pt: the pt
312    *
313    * Request the payload type as #GstCaps for @pt.
314    */
315   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
316       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
317       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
318           request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
319       GST_TYPE_CAPS, 1, G_TYPE_UINT);
320   /**
321    * GstRtpJitterBuffer::clear-pt-map:
322    * @buffer: the object which received the signal
323    *
324    * Invalidate the clock-rate as obtained with the
325    * #GstRtpJitterBuffer::request-pt-map signal.
326    */
327   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
328       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
329       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
330           clear_pt_map), NULL, NULL, g_cclosure_marshal_VOID__VOID,
331       G_TYPE_NONE, 0, G_TYPE_NONE);
332
333   gstelement_class->change_state = gst_rtp_jitter_buffer_change_state;
334
335   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
336
337   GST_DEBUG_CATEGORY_INIT
338       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
339 }
340
341 static void
342 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
343     GstRtpJitterBufferClass * klass)
344 {
345   GstRtpJitterBufferPrivate *priv;
346
347   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
348   jitterbuffer->priv = priv;
349
350   priv->latency_ms = DEFAULT_LATENCY_MS;
351   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
352   priv->do_lost = DEFAULT_DO_LOST;
353
354   priv->jbuf = rtp_jitter_buffer_new ();
355   priv->jbuf_lock = g_mutex_new ();
356   priv->jbuf_cond = g_cond_new ();
357
358   priv->srcpad =
359       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
360       "src");
361
362   gst_pad_set_activatepush_function (priv->srcpad,
363       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_push));
364   gst_pad_set_query_function (priv->srcpad,
365       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_query));
366   gst_pad_set_getcaps_function (priv->srcpad,
367       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
368   gst_pad_set_event_function (priv->srcpad,
369       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
370
371   priv->sinkpad =
372       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
373       "sink");
374
375   gst_pad_set_chain_function (priv->sinkpad,
376       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
377   gst_pad_set_event_function (priv->sinkpad,
378       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
379   gst_pad_set_setcaps_function (priv->sinkpad,
380       GST_DEBUG_FUNCPTR (gst_jitter_buffer_sink_setcaps));
381   gst_pad_set_getcaps_function (priv->sinkpad,
382       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_getcaps));
383
384   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
385   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
386 }
387
388 static void
389 gst_rtp_jitter_buffer_finalize (GObject * object)
390 {
391   GstRtpJitterBuffer *jitterbuffer;
392
393   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
394
395   g_mutex_free (jitterbuffer->priv->jbuf_lock);
396   g_cond_free (jitterbuffer->priv->jbuf_cond);
397
398   g_object_unref (jitterbuffer->priv->jbuf);
399
400   G_OBJECT_CLASS (parent_class)->finalize (object);
401 }
402
403 static void
404 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
405 {
406   GstRtpJitterBufferPrivate *priv;
407
408   priv = jitterbuffer->priv;
409
410   /* this will trigger a new pt-map request signal, FIXME, do something better. */
411   priv->clock_rate = -1;
412 }
413
414 static GstCaps *
415 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
416 {
417   GstRtpJitterBuffer *jitterbuffer;
418   GstRtpJitterBufferPrivate *priv;
419   GstPad *other;
420   GstCaps *caps;
421   const GstCaps *templ;
422
423   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
424   priv = jitterbuffer->priv;
425
426   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
427
428   caps = gst_pad_peer_get_caps (other);
429
430   templ = gst_pad_get_pad_template_caps (pad);
431   if (caps == NULL) {
432     GST_DEBUG_OBJECT (jitterbuffer, "copy template");
433     caps = gst_caps_copy (templ);
434   } else {
435     GstCaps *intersect;
436
437     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
438
439     intersect = gst_caps_intersect (caps, templ);
440     gst_caps_unref (caps);
441
442     caps = intersect;
443   }
444   gst_object_unref (jitterbuffer);
445
446   return caps;
447 }
448
449 static gboolean
450 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
451     GstCaps * caps)
452 {
453   GstRtpJitterBufferPrivate *priv;
454   GstStructure *caps_struct;
455   guint val;
456
457   priv = jitterbuffer->priv;
458
459   /* first parse the caps */
460   caps_struct = gst_caps_get_structure (caps, 0);
461
462   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
463
464   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
465    * measure the amount of data in the buffer */
466   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
467     goto error;
468
469   if (priv->clock_rate <= 0)
470     goto wrong_rate;
471
472   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
473
474   /* gah, clock-base is uint. If we don't have a base, we will use the first
475    * buffer timestamp as the base time. This will screw up sync but it's better
476    * than nothing. */
477   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
478     priv->clock_base = val;
479   else
480     priv->clock_base = -1;
481
482   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
483       priv->clock_base);
484
485   /* first expected seqnum */
486   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val))
487     priv->next_seqnum = val;
488   else
489     priv->next_seqnum = -1;
490
491   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_seqnum);
492
493   return TRUE;
494
495   /* ERRORS */
496 error:
497   {
498     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
499     return FALSE;
500   }
501 wrong_rate:
502   {
503     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
504     return FALSE;
505   }
506 }
507
508 static gboolean
509 gst_jitter_buffer_sink_setcaps (GstPad * pad, GstCaps * caps)
510 {
511   GstRtpJitterBuffer *jitterbuffer;
512   GstRtpJitterBufferPrivate *priv;
513   gboolean res;
514
515   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
516   priv = jitterbuffer->priv;
517
518   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
519
520   /* set same caps on srcpad on success */
521   if (res)
522     gst_pad_set_caps (priv->srcpad, caps);
523
524   gst_object_unref (jitterbuffer);
525
526   return res;
527 }
528
529 static void
530 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
531 {
532   GstRtpJitterBufferPrivate *priv;
533
534   priv = jitterbuffer->priv;
535
536   JBUF_LOCK (priv);
537   /* mark ourselves as flushing */
538   priv->srcresult = GST_FLOW_WRONG_STATE;
539   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
540   /* this unblocks any waiting pops on the src pad task */
541   JBUF_SIGNAL (priv);
542   /* unlock clock, we just unschedule, the entry will be released by the 
543    * locking streaming thread. */
544   if (priv->clock_id)
545     gst_clock_id_unschedule (priv->clock_id);
546   JBUF_UNLOCK (priv);
547 }
548
549 static void
550 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
551 {
552   GstRtpJitterBufferPrivate *priv;
553
554   priv = jitterbuffer->priv;
555
556   JBUF_LOCK (priv);
557   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
558   /* Mark as non flushing */
559   priv->srcresult = GST_FLOW_OK;
560   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
561   priv->last_popped_seqnum = -1;
562   priv->last_out_time = -1;
563   priv->next_seqnum = -1;
564   priv->clock_rate = -1;
565   priv->eos = FALSE;
566   rtp_jitter_buffer_flush (priv->jbuf);
567   rtp_jitter_buffer_reset_skew (priv->jbuf);
568   JBUF_UNLOCK (priv);
569 }
570
571 static gboolean
572 gst_rtp_jitter_buffer_src_activate_push (GstPad * pad, gboolean active)
573 {
574   gboolean result = TRUE;
575   GstRtpJitterBuffer *jitterbuffer = NULL;
576
577   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
578
579   if (active) {
580     /* allow data processing */
581     gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
582
583     /* start pushing out buffers */
584     GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
585     gst_pad_start_task (jitterbuffer->priv->srcpad,
586         (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer);
587   } else {
588     /* make sure all data processing stops ASAP */
589     gst_rtp_jitter_buffer_flush_start (jitterbuffer);
590
591     /* NOTE this will hardlock if the state change is called from the src pad
592      * task thread because we will _join() the thread. */
593     GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
594     result = gst_pad_stop_task (pad);
595   }
596
597   gst_object_unref (jitterbuffer);
598
599   return result;
600 }
601
602 static GstStateChangeReturn
603 gst_rtp_jitter_buffer_change_state (GstElement * element,
604     GstStateChange transition)
605 {
606   GstRtpJitterBuffer *jitterbuffer;
607   GstRtpJitterBufferPrivate *priv;
608   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
609
610   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
611   priv = jitterbuffer->priv;
612
613   switch (transition) {
614     case GST_STATE_CHANGE_NULL_TO_READY:
615       break;
616     case GST_STATE_CHANGE_READY_TO_PAUSED:
617       JBUF_LOCK (priv);
618       /* reset negotiated values */
619       priv->clock_rate = -1;
620       priv->clock_base = -1;
621       priv->peer_latency = 0;
622       priv->last_pt = -1;
623       /* block until we go to PLAYING */
624       priv->blocked = TRUE;
625       /* reset skew detection initialy */
626       rtp_jitter_buffer_reset_skew (priv->jbuf);
627       JBUF_UNLOCK (priv);
628       break;
629     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
630       JBUF_LOCK (priv);
631       /* unblock to allow streaming in PLAYING */
632       priv->blocked = FALSE;
633       JBUF_SIGNAL (priv);
634       JBUF_UNLOCK (priv);
635       break;
636     default:
637       break;
638   }
639
640   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
641
642   switch (transition) {
643     case GST_STATE_CHANGE_READY_TO_PAUSED:
644       /* we are a live element because we sync to the clock, which we can only
645        * do in the PLAYING state */
646       if (ret != GST_STATE_CHANGE_FAILURE)
647         ret = GST_STATE_CHANGE_NO_PREROLL;
648       break;
649     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
650       JBUF_LOCK (priv);
651       /* block to stop streaming when PAUSED */
652       priv->blocked = TRUE;
653       JBUF_UNLOCK (priv);
654       if (ret != GST_STATE_CHANGE_FAILURE)
655         ret = GST_STATE_CHANGE_NO_PREROLL;
656       break;
657     case GST_STATE_CHANGE_PAUSED_TO_READY:
658       break;
659     case GST_STATE_CHANGE_READY_TO_NULL:
660       break;
661     default:
662       break;
663   }
664
665   return ret;
666 }
667
668 static gboolean
669 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstEvent * event)
670 {
671   gboolean ret = TRUE;
672   GstRtpJitterBuffer *jitterbuffer;
673   GstRtpJitterBufferPrivate *priv;
674
675   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
676   priv = jitterbuffer->priv;
677
678   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
679
680   switch (GST_EVENT_TYPE (event)) {
681     default:
682       ret = gst_pad_push_event (priv->sinkpad, event);
683       break;
684   }
685   gst_object_unref (jitterbuffer);
686
687   return ret;
688 }
689
690 static gboolean
691 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstEvent * event)
692 {
693   gboolean ret = TRUE;
694   GstRtpJitterBuffer *jitterbuffer;
695   GstRtpJitterBufferPrivate *priv;
696
697   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
698   priv = jitterbuffer->priv;
699
700   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
701
702   switch (GST_EVENT_TYPE (event)) {
703     case GST_EVENT_NEWSEGMENT:
704     {
705       GstFormat format;
706       gdouble rate, arate;
707       gint64 start, stop, time;
708       gboolean update;
709
710       gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
711           &start, &stop, &time);
712
713       /* we need time for now */
714       if (format != GST_FORMAT_TIME)
715         goto newseg_wrong_format;
716
717       GST_DEBUG_OBJECT (jitterbuffer,
718           "newsegment: update %d, rate %g, arate %g, start %" GST_TIME_FORMAT
719           ", stop %" GST_TIME_FORMAT ", time %" GST_TIME_FORMAT,
720           update, rate, arate, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
721           GST_TIME_ARGS (time));
722
723       /* now configure the values, we need these to time the release of the
724        * buffers on the srcpad. */
725       gst_segment_set_newsegment_full (&priv->segment, update,
726           rate, arate, format, start, stop, time);
727
728       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
729       ret = gst_pad_push_event (priv->srcpad, event);
730       break;
731     }
732     case GST_EVENT_FLUSH_START:
733       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
734       ret = gst_pad_push_event (priv->srcpad, event);
735       break;
736     case GST_EVENT_FLUSH_STOP:
737       ret = gst_pad_push_event (priv->srcpad, event);
738       ret = gst_rtp_jitter_buffer_src_activate_push (priv->srcpad, TRUE);
739       break;
740     case GST_EVENT_EOS:
741     {
742       /* push EOS in queue. We always push it at the head */
743       JBUF_LOCK (priv);
744       /* check for flushing, we need to discard the event and return FALSE when
745        * we are flushing */
746       ret = priv->srcresult == GST_FLOW_OK;
747       if (ret && !priv->eos) {
748         GST_DEBUG_OBJECT (jitterbuffer, "queuing EOS");
749         priv->eos = TRUE;
750         JBUF_SIGNAL (priv);
751       } else if (priv->eos) {
752         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
753       } else {
754         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
755             gst_flow_get_name (priv->srcresult));
756       }
757       JBUF_UNLOCK (priv);
758       gst_event_unref (event);
759       break;
760     }
761     default:
762       ret = gst_pad_push_event (priv->srcpad, event);
763       break;
764   }
765
766 done:
767   gst_object_unref (jitterbuffer);
768
769   return ret;
770
771   /* ERRORS */
772 newseg_wrong_format:
773   {
774     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
775     ret = FALSE;
776     goto done;
777   }
778 }
779
780 static gboolean
781 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
782     guint8 pt)
783 {
784   GValue ret = { 0 };
785   GValue args[2] = { {0}, {0} };
786   GstCaps *caps;
787   gboolean res;
788
789   g_value_init (&args[0], GST_TYPE_ELEMENT);
790   g_value_set_object (&args[0], jitterbuffer);
791   g_value_init (&args[1], G_TYPE_UINT);
792   g_value_set_uint (&args[1], pt);
793
794   g_value_init (&ret, GST_TYPE_CAPS);
795   g_value_set_boxed (&ret, NULL);
796
797   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
798       &ret);
799
800   g_value_unset (&args[0]);
801   g_value_unset (&args[1]);
802   caps = (GstCaps *) g_value_dup_boxed (&ret);
803   g_value_unset (&ret);
804   if (!caps)
805     goto no_caps;
806
807   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
808
809   gst_caps_unref (caps);
810
811   return res;
812
813   /* ERRORS */
814 no_caps:
815   {
816     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
817     return FALSE;
818   }
819 }
820
821 static GstFlowReturn
822 gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
823 {
824   GstRtpJitterBuffer *jitterbuffer;
825   GstRtpJitterBufferPrivate *priv;
826   guint16 seqnum;
827   GstFlowReturn ret = GST_FLOW_OK;
828   GstClockTime timestamp;
829   guint64 latency_ts;
830   gboolean tail;
831
832   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
833
834   if (G_UNLIKELY (!gst_rtp_buffer_validate (buffer)))
835     goto invalid_buffer;
836
837   priv = jitterbuffer->priv;
838
839   if (G_UNLIKELY (priv->last_pt != gst_rtp_buffer_get_payload_type (buffer))) {
840     GstCaps *caps;
841
842     priv->last_pt = gst_rtp_buffer_get_payload_type (buffer);
843     /* reset clock-rate so that we get a new one */
844     priv->clock_rate = -1;
845     /* Try to get the clock-rate from the caps first if we can. If there are no
846      * caps we must fire the signal to get the clock-rate. */
847     if ((caps = GST_BUFFER_CAPS (buffer))) {
848       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
849     }
850   }
851
852   if (G_UNLIKELY (priv->clock_rate == -1)) {
853     guint8 pt;
854
855     /* no clock rate given on the caps, try to get one with the signal */
856     pt = gst_rtp_buffer_get_payload_type (buffer);
857
858     gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer, pt);
859     if (G_UNLIKELY (priv->clock_rate == -1))
860       goto not_negotiated;
861   }
862
863   /* take the timestamp of the buffer. This is the time when the packet was
864    * received and is used to calculate jitter and clock skew. We will adjust
865    * this timestamp with the smoothed value after processing it in the
866    * jitterbuffer. */
867   timestamp = GST_BUFFER_TIMESTAMP (buffer);
868   /* bring to running time */
869   timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
870       timestamp);
871
872   seqnum = gst_rtp_buffer_get_seq (buffer);
873   GST_DEBUG_OBJECT (jitterbuffer,
874       "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
875       GST_TIME_ARGS (timestamp));
876
877   JBUF_LOCK_CHECK (priv, out_flushing);
878   /* don't accept more data on EOS */
879   if (G_UNLIKELY (priv->eos))
880     goto have_eos;
881
882   /* let's check if this buffer is too late, we can only accept packets with
883    * bigger seqnum than the one we last pushed. */
884   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
885     gint gap;
886     gboolean reset = FALSE;
887
888     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
889
890     if (G_UNLIKELY (gap <= 0)) {
891       /* priv->last_popped_seqnum >= seqnum, this packet is too late or the
892        * sender might have been restarted with different seqnum. */
893       if (gap < -RTP_MAX_MISORDER) {
894         GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
895         reset = TRUE;
896       } else {
897         goto too_late;
898       }
899     } else {
900       /* priv->last_popped_seqnum < seqnum, this is a new packet */
901       if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
902         GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
903             gap);
904         reset = TRUE;
905       } else {
906         GST_DEBUG_OBJECT (jitterbuffer, "dropped packets %d but <= %d", gap,
907             RTP_MAX_DROPOUT);
908       }
909     }
910     if (G_UNLIKELY (reset)) {
911       priv->last_popped_seqnum = -1;
912       priv->next_seqnum = -1;
913       rtp_jitter_buffer_reset_skew (priv->jbuf);
914     }
915   }
916
917   /* let's drop oldest packet if the queue is already full and drop-on-latency
918    * is set. We can only do this when there actually is a latency. When no
919    * latency is set, we just pump it in the queue and let the other end push it
920    * out as fast as possible. */
921   if (priv->latency_ms && priv->drop_on_latency) {
922
923     latency_ts =
924         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
925
926     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
927       GstBuffer *old_buf;
928
929       old_buf = rtp_jitter_buffer_pop (priv->jbuf);
930
931       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
932           gst_rtp_buffer_get_seq (old_buf));
933
934       gst_buffer_unref (old_buf);
935     }
936   }
937
938   /* we need to make the metadata writable before pushing it in the jitterbuffer
939    * because the jitterbuffer will update the timestamp */
940   buffer = gst_buffer_make_metadata_writable (buffer);
941
942   /* now insert the packet into the queue in sorted order. This function returns
943    * FALSE if a packet with the same seqnum was already in the queue, meaning we
944    * have a duplicate. */
945   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
946               priv->clock_rate, &tail)))
947     goto duplicate;
948
949   /* signal addition of new buffer when the _loop is waiting. */
950   if (priv->waiting)
951     JBUF_SIGNAL (priv);
952
953   /* let's unschedule and unblock any waiting buffers. We only want to do this
954    * when the tail buffer changed */
955   if (G_UNLIKELY (priv->clock_id && tail)) {
956     GST_DEBUG_OBJECT (jitterbuffer,
957         "Unscheduling waiting buffer, new tail buffer");
958     gst_clock_id_unschedule (priv->clock_id);
959   }
960
961   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets",
962       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf));
963
964 finished:
965   JBUF_UNLOCK (priv);
966
967   gst_object_unref (jitterbuffer);
968
969   return ret;
970
971   /* ERRORS */
972 invalid_buffer:
973   {
974     /* this is not fatal but should be filtered earlier */
975     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
976         ("Received invalid RTP payload, dropping"));
977     gst_buffer_unref (buffer);
978     gst_object_unref (jitterbuffer);
979     return GST_FLOW_OK;
980   }
981 not_negotiated:
982   {
983     GST_WARNING_OBJECT (jitterbuffer, "No clock-rate in caps!");
984     gst_buffer_unref (buffer);
985     gst_object_unref (jitterbuffer);
986     return GST_FLOW_OK;
987   }
988 out_flushing:
989   {
990     ret = priv->srcresult;
991     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
992     gst_buffer_unref (buffer);
993     goto finished;
994   }
995 have_eos:
996   {
997     ret = GST_FLOW_UNEXPECTED;
998     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
999     gst_buffer_unref (buffer);
1000     goto finished;
1001   }
1002 too_late:
1003   {
1004     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
1005         " popped, dropping", seqnum, priv->last_popped_seqnum);
1006     priv->num_late++;
1007     gst_buffer_unref (buffer);
1008     goto finished;
1009   }
1010 duplicate:
1011   {
1012     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
1013         seqnum);
1014     priv->num_duplicates++;
1015     gst_buffer_unref (buffer);
1016     goto finished;
1017   }
1018 }
1019
1020 static GstClockTime
1021 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1022 {
1023   GstRtpJitterBufferPrivate *priv;
1024
1025   priv = jitterbuffer->priv;
1026
1027   if (timestamp == -1)
1028     return -1;
1029
1030   /* apply the timestamp offset */
1031   timestamp += priv->ts_offset;
1032
1033   return timestamp;
1034 }
1035
1036 /**
1037  * This funcion will push out buffers on the source pad.
1038  *
1039  * For each pushed buffer, the seqnum is recorded, if the next buffer B has a
1040  * different seqnum (missing packets before B), this function will wait for the
1041  * missing packet to arrive up to the timestamp of buffer B.
1042  */
1043 static void
1044 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
1045 {
1046   GstRtpJitterBufferPrivate *priv;
1047   GstBuffer *outbuf;
1048   GstFlowReturn result;
1049   guint16 seqnum;
1050   guint32 next_seqnum;
1051   GstClockTime timestamp, out_time;
1052   gboolean discont = FALSE;
1053   gint gap;
1054
1055   priv = jitterbuffer->priv;
1056
1057   JBUF_LOCK_CHECK (priv, flushing);
1058 again:
1059   GST_DEBUG_OBJECT (jitterbuffer, "Peeking item");
1060   while (TRUE) {
1061     /* always wait if we are blocked */
1062     if (G_LIKELY (!priv->blocked)) {
1063       /* if we have a packet, we can exit the loop and grab it */
1064       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
1065         break;
1066       /* no packets but we are EOS, do eos logic */
1067       if (G_UNLIKELY (priv->eos))
1068         goto do_eos;
1069     }
1070     /* underrun, wait for packets or flushing now */
1071     priv->waiting = TRUE;
1072     JBUF_WAIT_CHECK (priv, flushing);
1073     priv->waiting = FALSE;
1074   }
1075
1076   /* peek a buffer, we're just looking at the timestamp and the sequence number.
1077    * If all is fine, we'll pop and push it. If the sequence number is wrong we
1078    * wait on the timestamp. In the chain function we will unlock the wait when a
1079    * new buffer is available. The peeked buffer is valid for as long as we hold
1080    * the jitterbuffer lock. */
1081   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
1082
1083   /* get the seqnum and the next expected seqnum */
1084   seqnum = gst_rtp_buffer_get_seq (outbuf);
1085   next_seqnum = priv->next_seqnum;
1086
1087   /* get the timestamp, this is already corrected for clock skew by the
1088    * jitterbuffer */
1089   timestamp = GST_BUFFER_TIMESTAMP (outbuf);
1090
1091   GST_DEBUG_OBJECT (jitterbuffer,
1092       "Peeked buffer #%d, expect #%d, timestamp %" GST_TIME_FORMAT
1093       ", now %d left", seqnum, next_seqnum, GST_TIME_ARGS (timestamp),
1094       rtp_jitter_buffer_num_packets (priv->jbuf));
1095
1096   /* apply our timestamp offset to the incomming buffer, this will be our output
1097    * timestamp. */
1098   out_time = apply_offset (jitterbuffer, timestamp);
1099
1100   /* get the gap between this and the previous packet. If we don't know the
1101    * previous packet seqnum assume no gap. */
1102   if (G_LIKELY (next_seqnum != -1)) {
1103     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
1104
1105     /* if we have a packet that we already pushed or considered dropped, pop it
1106      * off and get the next packet */
1107     if (G_UNLIKELY (gap < 0)) {
1108       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
1109           seqnum, next_seqnum);
1110       outbuf = rtp_jitter_buffer_pop (priv->jbuf);
1111       gst_buffer_unref (outbuf);
1112       goto again;
1113     }
1114   } else {
1115     GST_DEBUG_OBJECT (jitterbuffer, "no next seqnum known, first packet");
1116     gap = -1;
1117   }
1118
1119   /* If we don't know what the next seqnum should be (== -1) we have to wait
1120    * because it might be possible that we are not receiving this buffer in-order,
1121    * a buffer with a lower seqnum could arrive later and we want to push that
1122    * earlier buffer before this buffer then.
1123    * If we know the expected seqnum, we can compare it to the current seqnum to
1124    * determine if we have missing a packet. If we have a missing packet (which
1125    * must be before this packet) we can wait for it until the deadline for this
1126    * packet expires. */
1127   if (G_UNLIKELY (gap != 0 && out_time != -1)) {
1128     GstClockID id;
1129     GstClockTime sync_time;
1130     GstClockReturn ret;
1131     GstClock *clock;
1132     GstClockTime duration = GST_CLOCK_TIME_NONE;
1133
1134     if (gap > 0) {
1135       /* we have a gap */
1136       GST_WARNING_OBJECT (jitterbuffer,
1137           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
1138           next_seqnum, seqnum, gap);
1139
1140       if (priv->last_out_time != -1) {
1141         GST_DEBUG_OBJECT (jitterbuffer,
1142             "out_time %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1143             GST_TIME_ARGS (out_time), GST_TIME_ARGS (priv->last_out_time));
1144         /* interpolate between the current time and the last time based on
1145          * number of packets we are missing, this is the estimated duration
1146          * for the missing packet based on equidistant packet spacing. Also make
1147          * sure we never go negative. */
1148         if (out_time > priv->last_out_time)
1149           duration = (out_time - priv->last_out_time) / (gap + 1);
1150         else
1151           goto lost;
1152
1153         GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1154             GST_TIME_ARGS (duration));
1155         /* add this duration to the timestamp of the last packet we pushed */
1156         out_time = (priv->last_out_time + duration);
1157       }
1158     } else {
1159       /* we don't know what the next_seqnum should be, wait for the last
1160        * possible moment to push this buffer, maybe we get an earlier seqnum
1161        * while we wait */
1162       GST_DEBUG_OBJECT (jitterbuffer, "First buffer %d, do sync", seqnum);
1163     }
1164
1165     GST_OBJECT_LOCK (jitterbuffer);
1166     clock = GST_ELEMENT_CLOCK (jitterbuffer);
1167     if (!clock) {
1168       GST_OBJECT_UNLOCK (jitterbuffer);
1169       /* let's just push if there is no clock */
1170       goto push_buffer;
1171     }
1172
1173     GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT,
1174         GST_TIME_ARGS (out_time));
1175
1176     /* prepare for sync against clock */
1177     sync_time = out_time + GST_ELEMENT_CAST (jitterbuffer)->base_time;
1178     /* add latency, this includes our own latency and the peer latency. */
1179     sync_time += (priv->latency_ms * GST_MSECOND);
1180     sync_time += priv->peer_latency;
1181
1182     /* create an entry for the clock */
1183     id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
1184     GST_OBJECT_UNLOCK (jitterbuffer);
1185
1186     /* release the lock so that the other end can push stuff or unlock */
1187     JBUF_UNLOCK (priv);
1188
1189     ret = gst_clock_id_wait (id, NULL);
1190
1191     JBUF_LOCK (priv);
1192     /* and free the entry */
1193     gst_clock_id_unref (id);
1194     priv->clock_id = NULL;
1195
1196     /* at this point, the clock could have been unlocked by a timeout, a new
1197      * tail element was added to the queue or because we are shutting down. Check
1198      * for shutdown first. */
1199     if G_UNLIKELY
1200       ((priv->srcresult != GST_FLOW_OK))
1201           goto flushing;
1202
1203     /* if we got unscheduled and we are not flushing, it's because a new tail
1204      * element became available in the queue. Grab it and try to push or sync. */
1205     if (ret == GST_CLOCK_UNSCHEDULED) {
1206       GST_DEBUG_OBJECT (jitterbuffer,
1207           "Wait got unscheduled, will retry to push with new buffer");
1208       goto again;
1209     }
1210
1211   lost:
1212     /* we now timed out, this means we lost a packet or finished synchronizing
1213      * on the first buffer. */
1214     if (gap > 0) {
1215       GstEvent *event;
1216
1217       /* we had a gap and thus we lost a packet. Create an event for this.  */
1218       GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", next_seqnum);
1219       priv->num_late++;
1220       discont = TRUE;
1221
1222       if (priv->do_lost) {
1223         /* create paket lost event */
1224         event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1225             gst_structure_new ("GstRTPPacketLost",
1226                 "seqnum", G_TYPE_UINT, (guint) next_seqnum,
1227                 "timestamp", G_TYPE_UINT64, out_time,
1228                 "duration", G_TYPE_UINT64, duration, NULL));
1229         gst_pad_push_event (priv->srcpad, event);
1230       }
1231
1232       /* update our expected next packet */
1233       priv->last_popped_seqnum = next_seqnum;
1234       priv->last_out_time = out_time;
1235       priv->next_seqnum = (next_seqnum + 1) & 0xffff;
1236       /* look for next packet */
1237       goto again;
1238     }
1239
1240     /* there was no known gap,just the first packet, exit the loop and push */
1241     GST_DEBUG_OBJECT (jitterbuffer, "First packet #%d synced", seqnum);
1242
1243     /* get new timestamp, latency might have changed */
1244     out_time = apply_offset (jitterbuffer, timestamp);
1245   }
1246 push_buffer:
1247
1248   /* when we get here we are ready to pop and push the buffer */
1249   outbuf = rtp_jitter_buffer_pop (priv->jbuf);
1250
1251   if (G_UNLIKELY (discont || priv->discont)) {
1252     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
1253      * into the jitterbuffer so we can modify now. */
1254     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
1255     priv->discont = FALSE;
1256   }
1257
1258   /* apply timestamp with offset to buffer now */
1259   GST_BUFFER_TIMESTAMP (outbuf) = out_time;
1260
1261   /* now we are ready to push the buffer. Save the seqnum and release the lock
1262    * so the other end can push stuff in the queue again. */
1263   priv->last_popped_seqnum = seqnum;
1264   priv->last_out_time = out_time;
1265   priv->next_seqnum = (seqnum + 1) & 0xffff;
1266   JBUF_UNLOCK (priv);
1267
1268   /* push buffer */
1269   GST_DEBUG_OBJECT (jitterbuffer,
1270       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
1271       GST_TIME_ARGS (out_time));
1272   result = gst_pad_push (priv->srcpad, outbuf);
1273   if (G_UNLIKELY (result != GST_FLOW_OK))
1274     goto pause;
1275
1276   return;
1277
1278   /* ERRORS */
1279 do_eos:
1280   {
1281     /* store result, we are flushing now */
1282     GST_DEBUG_OBJECT (jitterbuffer, "We are EOS, pushing EOS downstream");
1283     priv->srcresult = GST_FLOW_UNEXPECTED;
1284     gst_pad_pause_task (priv->srcpad);
1285     gst_pad_push_event (priv->srcpad, gst_event_new_eos ());
1286     JBUF_UNLOCK (priv);
1287     return;
1288   }
1289 flushing:
1290   {
1291     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1292     gst_pad_pause_task (priv->srcpad);
1293     JBUF_UNLOCK (priv);
1294     return;
1295   }
1296 pause:
1297   {
1298     const gchar *reason = gst_flow_get_name (result);
1299
1300     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
1301
1302     JBUF_LOCK (priv);
1303     /* store result */
1304     priv->srcresult = result;
1305     /* we don't post errors or anything because upstream will do that for us
1306      * when we pass the return value upstream. */
1307     gst_pad_pause_task (priv->srcpad);
1308     JBUF_UNLOCK (priv);
1309     return;
1310   }
1311 }
1312
1313 static gboolean
1314 gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query)
1315 {
1316   GstRtpJitterBuffer *jitterbuffer;
1317   GstRtpJitterBufferPrivate *priv;
1318   gboolean res = FALSE;
1319
1320   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1321   priv = jitterbuffer->priv;
1322
1323   switch (GST_QUERY_TYPE (query)) {
1324     case GST_QUERY_LATENCY:
1325     {
1326       /* We need to send the query upstream and add the returned latency to our
1327        * own */
1328       GstClockTime min_latency, max_latency;
1329       gboolean us_live;
1330       GstClockTime our_latency;
1331
1332       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
1333         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
1334
1335         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
1336             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1337             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1338
1339         /* store this so that we can safely sync on the peer buffers. */
1340         JBUF_LOCK (priv);
1341         priv->peer_latency = min_latency;
1342         our_latency = ((guint64) priv->latency_ms) * GST_MSECOND;
1343         JBUF_UNLOCK (priv);
1344
1345         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
1346             GST_TIME_ARGS (our_latency));
1347
1348         /* we add some latency but can buffer an infinite amount of time */
1349         min_latency += our_latency;
1350         max_latency = -1;
1351
1352         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
1353             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
1354             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
1355
1356         gst_query_set_latency (query, TRUE, min_latency, max_latency);
1357       }
1358       break;
1359     }
1360     default:
1361       res = gst_pad_query_default (pad, query);
1362       break;
1363   }
1364
1365   gst_object_unref (jitterbuffer);
1366
1367   return res;
1368 }
1369
1370 static void
1371 gst_rtp_jitter_buffer_set_property (GObject * object,
1372     guint prop_id, const GValue * value, GParamSpec * pspec)
1373 {
1374   GstRtpJitterBuffer *jitterbuffer;
1375   GstRtpJitterBufferPrivate *priv;
1376
1377   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1378   priv = jitterbuffer->priv;
1379
1380   switch (prop_id) {
1381     case PROP_LATENCY:
1382     {
1383       guint new_latency, old_latency;
1384
1385       new_latency = g_value_get_uint (value);
1386
1387       JBUF_LOCK (priv);
1388       old_latency = priv->latency_ms;
1389       priv->latency_ms = new_latency;
1390       JBUF_UNLOCK (priv);
1391
1392       /* post message if latency changed, this will inform the parent pipeline
1393        * that a latency reconfiguration is possible/needed. */
1394       if (new_latency != old_latency) {
1395         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
1396             GST_TIME_ARGS (new_latency * GST_MSECOND));
1397
1398         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
1399             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
1400       }
1401       break;
1402     }
1403     case PROP_DROP_ON_LATENCY:
1404       JBUF_LOCK (priv);
1405       priv->drop_on_latency = g_value_get_boolean (value);
1406       JBUF_UNLOCK (priv);
1407       break;
1408     case PROP_TS_OFFSET:
1409       JBUF_LOCK (priv);
1410       priv->ts_offset = g_value_get_int64 (value);
1411       /* FIXME, we don't really have a method for signaling a timestamp
1412        * DISCONT without also making this a data discont. */
1413       /* priv->discont = TRUE; */
1414       JBUF_UNLOCK (priv);
1415       break;
1416     case PROP_DO_LOST:
1417       JBUF_LOCK (priv);
1418       priv->do_lost = g_value_get_boolean (value);
1419       JBUF_UNLOCK (priv);
1420       break;
1421     default:
1422       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1423       break;
1424   }
1425 }
1426
1427 static void
1428 gst_rtp_jitter_buffer_get_property (GObject * object,
1429     guint prop_id, GValue * value, GParamSpec * pspec)
1430 {
1431   GstRtpJitterBuffer *jitterbuffer;
1432   GstRtpJitterBufferPrivate *priv;
1433
1434   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
1435   priv = jitterbuffer->priv;
1436
1437   switch (prop_id) {
1438     case PROP_LATENCY:
1439       JBUF_LOCK (priv);
1440       g_value_set_uint (value, priv->latency_ms);
1441       JBUF_UNLOCK (priv);
1442       break;
1443     case PROP_DROP_ON_LATENCY:
1444       JBUF_LOCK (priv);
1445       g_value_set_boolean (value, priv->drop_on_latency);
1446       JBUF_UNLOCK (priv);
1447       break;
1448     case PROP_TS_OFFSET:
1449       JBUF_LOCK (priv);
1450       g_value_set_int64 (value, priv->ts_offset);
1451       JBUF_UNLOCK (priv);
1452       break;
1453     case PROP_DO_LOST:
1454       JBUF_LOCK (priv);
1455       g_value_set_boolean (value, priv->do_lost);
1456       JBUF_UNLOCK (priv);
1457       break;
1458     default:
1459       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1460       break;
1461   }
1462 }
1463
1464 void
1465 gst_rtp_jitter_buffer_get_sync (GstRtpJitterBuffer * buffer, guint64 * rtptime,
1466     guint64 * timestamp)
1467 {
1468   GstRtpJitterBufferPrivate *priv;
1469
1470   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (buffer));
1471
1472   priv = buffer->priv;
1473
1474   JBUF_LOCK (priv);
1475   rtp_jitter_buffer_get_sync (priv->jbuf, rtptime, timestamp);
1476   JBUF_UNLOCK (priv);
1477 }