rtpjitterbuffer: set correct expected time
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpjitterbuffer.h"
65 #include "rtpjitterbuffer.h"
66 #include "rtpstats.h"
67
68 #include <gst/glib-compat-private.h>
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* RTPJitterBuffer signals and args */
74 enum
75 {
76   SIGNAL_REQUEST_PT_MAP,
77   SIGNAL_CLEAR_PT_MAP,
78   SIGNAL_HANDLE_SYNC,
79   SIGNAL_ON_NPT_STOP,
80   SIGNAL_SET_ACTIVE,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS          200
85 #define DEFAULT_DROP_ON_LATENCY     FALSE
86 #define DEFAULT_TS_OFFSET           0
87 #define DEFAULT_DO_LOST             FALSE
88 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
89 #define DEFAULT_PERCENT             0
90 #define DEFAULT_DO_RETRANSMISSION   FALSE
91 #define DEFAULT_RTX_DELAY           20
92 #define DEFAULT_RTX_DELAY_REORDER   3
93 #define DEFAULT_RTX_RETRY_TIMEOUT   40
94 #define DEFAULT_RTX_RETRY_PERIOD    160
95
96 enum
97 {
98   PROP_0,
99   PROP_LATENCY,
100   PROP_DROP_ON_LATENCY,
101   PROP_TS_OFFSET,
102   PROP_DO_LOST,
103   PROP_MODE,
104   PROP_PERCENT,
105   PROP_DO_RETRANSMISSION,
106   PROP_RTX_DELAY,
107   PROP_RTX_DELAY_REORDER,
108   PROP_RTX_RETRY_TIMEOUT,
109   PROP_RTX_RETRY_PERIOD,
110   PROP_LAST
111 };
112
113 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
114
115 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
116   JBUF_LOCK (priv);                                   \
117   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
118     goto label;                                       \
119 } G_STMT_END
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
121
122 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
123   (priv)->waiting_timer = TRUE;                           \
124   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
125   (priv)->waiting_timer = FALSE;                          \
126 } G_STMT_END
127 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {    \
128   if (G_UNLIKELY ((priv)->waiting_timer))         \
129     g_cond_signal (&(priv)->jbuf_timer);          \
130 } G_STMT_END
131
132 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
133   (priv)->waiting_event = TRUE;                          \
134   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
135   (priv)->waiting_event = FALSE;                         \
136   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
137     goto label;                                          \
138 } G_STMT_END
139 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {    \
140   if (G_UNLIKELY ((priv)->waiting_event))         \
141     g_cond_signal (&(priv)->jbuf_event);          \
142 } G_STMT_END
143
144 struct _GstRtpJitterBufferPrivate
145 {
146   GstPad *sinkpad, *srcpad;
147   GstPad *rtcpsinkpad;
148
149   RTPJitterBuffer *jbuf;
150   GMutex jbuf_lock;
151   gboolean waiting_timer;
152   GCond jbuf_timer;
153   gboolean waiting_event;
154   GCond jbuf_event;
155   gboolean discont;
156   gboolean ts_discont;
157   gboolean active;
158   guint64 out_offset;
159
160   gboolean timer_running;
161   GThread *timer_thread;
162
163   /* properties */
164   guint latency_ms;
165   guint64 latency_ns;
166   gboolean drop_on_latency;
167   gint64 ts_offset;
168   gboolean do_lost;
169   gboolean do_retransmission;
170   gint rtx_delay;
171   gint rtx_delay_reorder;
172   gint rtx_retry_timeout;
173   gint rtx_retry_period;
174
175   /* the last seqnum we pushed out */
176   guint32 last_popped_seqnum;
177   /* the next expected seqnum we push */
178   guint32 next_seqnum;
179   /* last output time */
180   GstClockTime last_out_time;
181   /* last valid input timestamp and rtptime pair */
182   GstClockTime ips_dts;
183   guint64 ips_rtptime;
184   GstClockTime packet_spacing;
185
186   /* the next expected seqnum we receive */
187   GstClockTime last_in_dts;
188   guint32 last_in_seqnum;
189   guint32 next_in_seqnum;
190
191   GArray *timers;
192
193   /* start and stop ranges */
194   GstClockTime npt_start;
195   GstClockTime npt_stop;
196   guint64 ext_timestamp;
197   guint64 last_elapsed;
198   guint64 estimated_eos;
199   GstClockID eos_id;
200
201   /* state */
202   gboolean eos;
203
204   /* clock rate and rtp timestamp offset */
205   gint last_pt;
206   gint32 clock_rate;
207   gint64 clock_base;
208   gint64 prev_ts_offset;
209
210   /* when we are shutting down */
211   GstFlowReturn srcresult;
212   gboolean blocked;
213
214   /* for sync */
215   GstSegment segment;
216   GstClockID clock_id;
217   GstClockTime timer_timeout;
218   guint16 timer_seqnum;
219   /* the latency of the upstream peer, we have to take this into account when
220    * synchronizing the buffers. */
221   GstClockTime peer_latency;
222   guint64 ext_rtptime;
223   GstBuffer *last_sr;
224
225   /* some accounting */
226   guint64 num_late;
227   guint64 num_duplicates;
228 };
229
230 typedef enum
231 {
232   TIMER_TYPE_EXPECTED,
233   TIMER_TYPE_LOST,
234   TIMER_TYPE_DEADLINE,
235   TIMER_TYPE_EOS
236 } TimerType;
237
238 typedef struct
239 {
240   guint idx;
241   guint16 seqnum;
242   guint num;
243   TimerType type;
244   GstClockTime timeout;
245   GstClockTime duration;
246   GstClockTime rtx_base;
247   GstClockTime rtx_delay;
248   GstClockTime rtx_retry;
249 } TimerData;
250
251 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
252   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
253                                 GstRtpJitterBufferPrivate))
254
255 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
256 GST_STATIC_PAD_TEMPLATE ("sink",
257     GST_PAD_SINK,
258     GST_PAD_ALWAYS,
259     GST_STATIC_CAPS ("application/x-rtp, "
260         "clock-rate = (int) [ 1, 2147483647 ]"
261         /* "payload = (int) , "
262          * "encoding-name = (string) "
263          */ )
264     );
265
266 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
267 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
268     GST_PAD_SINK,
269     GST_PAD_REQUEST,
270     GST_STATIC_CAPS ("application/x-rtcp")
271     );
272
273 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
274 GST_STATIC_PAD_TEMPLATE ("src",
275     GST_PAD_SRC,
276     GST_PAD_ALWAYS,
277     GST_STATIC_CAPS ("application/x-rtp"
278         /* "payload = (int) , "
279          * "clock-rate = (int) , "
280          * "encoding-name = (string) "
281          */ )
282     );
283
284 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
285
286 #define gst_rtp_jitter_buffer_parent_class parent_class
287 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
288
289 /* object overrides */
290 static void gst_rtp_jitter_buffer_set_property (GObject * object,
291     guint prop_id, const GValue * value, GParamSpec * pspec);
292 static void gst_rtp_jitter_buffer_get_property (GObject * object,
293     guint prop_id, GValue * value, GParamSpec * pspec);
294 static void gst_rtp_jitter_buffer_finalize (GObject * object);
295
296 /* element overrides */
297 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
298     * element, GstStateChange transition);
299 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
300     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
301 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
302     GstPad * pad);
303 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
304
305 /* pad overrides */
306 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
307 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
308     GstObject * parent);
309
310 /* sinkpad overrides */
311 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
312     GstObject * parent, GstEvent * event);
313 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
314     GstObject * parent, GstBuffer * buffer);
315
316 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
317     GstObject * parent, GstEvent * event);
318 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
319     GstObject * parent, GstBuffer * buffer);
320
321 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
322     GstObject * parent, GstQuery * query);
323
324 /* srcpad overrides */
325 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
326     GstObject * parent, GstEvent * event);
327 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
328     GstObject * parent, GstPadMode mode, gboolean active);
329 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
330 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
331     GstObject * parent, GstQuery * query);
332
333 static void
334 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
335 static GstClockTime
336 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
337     gboolean active, guint64 base_time);
338 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
339
340 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
341 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
342
343 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
344
345 static void
346 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
347 {
348   GObjectClass *gobject_class;
349   GstElementClass *gstelement_class;
350
351   gobject_class = (GObjectClass *) klass;
352   gstelement_class = (GstElementClass *) klass;
353
354   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
355
356   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
357
358   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
359   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
360
361   /**
362    * GstRtpJitterBuffer::latency:
363    *
364    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
365    * for at most this time.
366    */
367   g_object_class_install_property (gobject_class, PROP_LATENCY,
368       g_param_spec_uint ("latency", "Buffer latency in ms",
369           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   /**
372    * GstRtpJitterBuffer::drop-on-latency:
373    *
374    * Drop oldest buffers when the queue is completely filled.
375    */
376   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
377       g_param_spec_boolean ("drop-on-latency",
378           "Drop buffers when maximum latency is reached",
379           "Tells the jitterbuffer to never exceed the given latency in size",
380           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381   /**
382    * GstRtpJitterBuffer::ts-offset:
383    *
384    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
385    * This is mainly used to ensure interstream synchronisation.
386    */
387   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
388       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
389           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
390           G_MAXINT64, DEFAULT_TS_OFFSET,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   /**
394    * GstRtpJitterBuffer::do-lost:
395    *
396    * Send out a GstRTPPacketLost event downstream when a packet is considered
397    * lost.
398    */
399   g_object_class_install_property (gobject_class, PROP_DO_LOST,
400       g_param_spec_boolean ("do-lost", "Do Lost",
401           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
402           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
403
404   /**
405    * GstRtpJitterBuffer::mode:
406    *
407    * Control the buffering and timestamping mode used by the jitterbuffer.
408    */
409   g_object_class_install_property (gobject_class, PROP_MODE,
410       g_param_spec_enum ("mode", "Mode",
411           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
412           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413   /**
414    * GstRtpJitterBuffer::percent:
415    *
416    * The percent of the jitterbuffer that is filled.
417    *
418    * Since: 0.10.19
419    */
420   g_object_class_install_property (gobject_class, PROP_PERCENT,
421       g_param_spec_int ("percent", "percent",
422           "The buffer filled percent", 0, 100,
423           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
424   /**
425    * GstRtpJitterBuffer::do-retransmission:
426    *
427    * Send out a GstRTPRetransmission event upstream when a packet is considered
428    * late and should be retransmitted.
429    *
430    * Since: 1.2
431    */
432   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
433       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
434           "Send retransmission events upstream when a packet is late",
435           DEFAULT_DO_RETRANSMISSION,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   /**
439    * GstRtpJitterBuffer::rtx-delay:
440    *
441    * When a packet did not arrive at the expected time, wait this extra amount
442    * of time before sending a retransmission event.
443    *
444    * When -1 is used, the max jitter will be used as extra delay.
445    *
446    * Since: 1.2
447    */
448   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
449       g_param_spec_int ("rtx-delay", "RTX Delay",
450           "Extra time in ms to wait before sending retransmission "
451           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
452           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453   /**
454    * GstRtpJitterBuffer::rtx-delay-reorder:
455    *
456    * Assume that a retransmission event should be sent when we see
457    * this much packet reordering.
458    *
459    * When -1 is used, the value will be estimated based on observed packet
460    * reordering.
461    *
462    * Since: 1.2
463    */
464   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
465       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
466           "Sending retransmission event when this much reordering (-1 automatic)",
467           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstRtpJitterBuffer::rtx-retry-timeout:
471    *
472    * When no packet has been received after sending a retransmission event
473    * for this time, retry sending a retransmission event.
474    *
475    * When -1 is used, the value will be estimated based on observed round
476    * trip time.
477    *
478    * Since: 1.2
479    */
480   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
481       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
482           "Retry sending a transmission event after this timeout in "
483           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
484           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
485   /**
486    * GstRtpJitterBuffer::rtx-retry-period:
487    *
488    * The amount of time to try to get a retransmission.
489    *
490    * When -1 is used, the value will be estimated based on the jitterbuffer
491    * latency and the observed round trip time.
492    *
493    * Since: 1.2
494    */
495   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
496       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
497           "Try to get a retransmission for this many ms "
498           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
499           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
500
501   /**
502    * GstRtpJitterBuffer::request-pt-map:
503    * @buffer: the object which received the signal
504    * @pt: the pt
505    *
506    * Request the payload type as #GstCaps for @pt.
507    */
508   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
509       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
510       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
511           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
512       GST_TYPE_CAPS, 1, G_TYPE_UINT);
513   /**
514    * GstRtpJitterBuffer::handle-sync:
515    * @buffer: the object which received the signal
516    * @struct: a GstStructure containing sync values.
517    *
518    * Be notified of new sync values.
519    */
520   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
521       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
523           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
524       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
525
526   /**
527    * GstRtpJitterBuffer::on-npt-stop
528    * @buffer: the object which received the signal
529    *
530    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
531    * the npt-stop position.
532    */
533   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
534       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
535       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
536           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
537       G_TYPE_NONE, 0, G_TYPE_NONE);
538
539   /**
540    * GstRtpJitterBuffer::clear-pt-map:
541    * @buffer: the object which received the signal
542    *
543    * Invalidate the clock-rate as obtained with the
544    * #GstRtpJitterBuffer::request-pt-map signal.
545    */
546   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
547       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
548       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
549       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
550       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
551
552   /**
553    * GstRtpJitterBuffer::set-active:
554    * @buffer: the object which received the signal
555    *
556    * Start pushing out packets with the given base time. This signal is only
557    * useful in buffering mode.
558    *
559    * Returns: the time of the last pushed packet.
560    *
561    * Since: 0.10.19
562    */
563   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
564       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
565       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
566       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
567       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
568       G_TYPE_UINT64);
569
570   gstelement_class->change_state =
571       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
572   gstelement_class->request_new_pad =
573       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
574   gstelement_class->release_pad =
575       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
576   gstelement_class->provide_clock =
577       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
578
579   gst_element_class_add_pad_template (gstelement_class,
580       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
581   gst_element_class_add_pad_template (gstelement_class,
582       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
583   gst_element_class_add_pad_template (gstelement_class,
584       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
585
586   gst_element_class_set_static_metadata (gstelement_class,
587       "RTP packet jitter-buffer", "Filter/Network/RTP",
588       "A buffer that deals with network jitter and other transmission faults",
589       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
590       "Wim Taymans <wim.taymans@gmail.com>");
591
592   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
593   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
594
595   GST_DEBUG_CATEGORY_INIT
596       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
597 }
598
599 static void
600 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
601 {
602   GstRtpJitterBufferPrivate *priv;
603
604   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
605   jitterbuffer->priv = priv;
606
607   priv->latency_ms = DEFAULT_LATENCY_MS;
608   priv->latency_ns = priv->latency_ms * GST_MSECOND;
609   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
610   priv->do_lost = DEFAULT_DO_LOST;
611   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
612   priv->rtx_delay = DEFAULT_RTX_DELAY;
613   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
614   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
615   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
616
617   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
618   priv->jbuf = rtp_jitter_buffer_new ();
619   g_mutex_init (&priv->jbuf_lock);
620   g_cond_init (&priv->jbuf_timer);
621   g_cond_init (&priv->jbuf_event);
622
623   /* reset skew detection initialy */
624   rtp_jitter_buffer_reset_skew (priv->jbuf);
625   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
626   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
627   priv->active = TRUE;
628
629   priv->srcpad =
630       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
631       "src");
632
633   gst_pad_set_activatemode_function (priv->srcpad,
634       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
635   gst_pad_set_query_function (priv->srcpad,
636       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
637   gst_pad_set_event_function (priv->srcpad,
638       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
639
640   priv->sinkpad =
641       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
642       "sink");
643
644   gst_pad_set_chain_function (priv->sinkpad,
645       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
646   gst_pad_set_event_function (priv->sinkpad,
647       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
648   gst_pad_set_query_function (priv->sinkpad,
649       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
650
651   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
652   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
653
654   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
655 }
656
657 static void
658 gst_rtp_jitter_buffer_finalize (GObject * object)
659 {
660   GstRtpJitterBuffer *jitterbuffer;
661
662   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
663
664   g_array_free (jitterbuffer->priv->timers, TRUE);
665   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
666   g_cond_clear (&jitterbuffer->priv->jbuf_timer);
667   g_cond_clear (&jitterbuffer->priv->jbuf_event);
668
669   g_object_unref (jitterbuffer->priv->jbuf);
670
671   G_OBJECT_CLASS (parent_class)->finalize (object);
672 }
673
674 static GstIterator *
675 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
676 {
677   GstRtpJitterBuffer *jitterbuffer;
678   GstPad *otherpad = NULL;
679   GstIterator *it;
680   GValue val = { 0, };
681
682   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
683
684   if (pad == jitterbuffer->priv->sinkpad) {
685     otherpad = jitterbuffer->priv->srcpad;
686   } else if (pad == jitterbuffer->priv->srcpad) {
687     otherpad = jitterbuffer->priv->sinkpad;
688   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
689     otherpad = NULL;
690   }
691
692   g_value_init (&val, GST_TYPE_PAD);
693   g_value_set_object (&val, otherpad);
694   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
695   g_value_unset (&val);
696
697   return it;
698 }
699
700 static GstPad *
701 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
702 {
703   GstRtpJitterBufferPrivate *priv;
704
705   priv = jitterbuffer->priv;
706
707   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
708
709   priv->rtcpsinkpad =
710       gst_pad_new_from_static_template
711       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
712   gst_pad_set_chain_function (priv->rtcpsinkpad,
713       gst_rtp_jitter_buffer_chain_rtcp);
714   gst_pad_set_event_function (priv->rtcpsinkpad,
715       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
716   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
717       gst_rtp_jitter_buffer_iterate_internal_links);
718   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
719   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
720
721   return priv->rtcpsinkpad;
722 }
723
724 static void
725 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
726 {
727   GstRtpJitterBufferPrivate *priv;
728
729   priv = jitterbuffer->priv;
730
731   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
732
733   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
734
735   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
736   priv->rtcpsinkpad = NULL;
737 }
738
739 static GstPad *
740 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
741     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
742 {
743   GstRtpJitterBuffer *jitterbuffer;
744   GstElementClass *klass;
745   GstPad *result;
746   GstRtpJitterBufferPrivate *priv;
747
748   g_return_val_if_fail (templ != NULL, NULL);
749   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
750
751   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
752   priv = jitterbuffer->priv;
753   klass = GST_ELEMENT_GET_CLASS (element);
754
755   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
756
757   /* figure out the template */
758   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
759     if (priv->rtcpsinkpad != NULL)
760       goto exists;
761
762     result = create_rtcp_sink (jitterbuffer);
763   } else
764     goto wrong_template;
765
766   return result;
767
768   /* ERRORS */
769 wrong_template:
770   {
771     g_warning ("gstrtpjitterbuffer: this is not our template");
772     return NULL;
773   }
774 exists:
775   {
776     g_warning ("gstrtpjitterbuffer: pad already requested");
777     return NULL;
778   }
779 }
780
781 static void
782 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
783 {
784   GstRtpJitterBuffer *jitterbuffer;
785   GstRtpJitterBufferPrivate *priv;
786
787   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
788   g_return_if_fail (GST_IS_PAD (pad));
789
790   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
791   priv = jitterbuffer->priv;
792
793   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
794
795   if (priv->rtcpsinkpad == pad) {
796     remove_rtcp_sink (jitterbuffer);
797   } else
798     goto wrong_pad;
799
800   return;
801
802   /* ERRORS */
803 wrong_pad:
804   {
805     g_warning ("gstjitterbuffer: asked to release an unknown pad");
806     return;
807   }
808 }
809
810 static GstClock *
811 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
812 {
813   return gst_system_clock_obtain ();
814 }
815
816 static void
817 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
818 {
819   GstRtpJitterBufferPrivate *priv;
820
821   priv = jitterbuffer->priv;
822
823   /* this will trigger a new pt-map request signal, FIXME, do something better. */
824
825   JBUF_LOCK (priv);
826   priv->clock_rate = -1;
827   /* do not clear current content, but refresh state for new arrival */
828   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
829   rtp_jitter_buffer_reset_skew (priv->jbuf);
830   priv->last_popped_seqnum = -1;
831   priv->next_seqnum = -1;
832   JBUF_UNLOCK (priv);
833 }
834
835 static GstClockTime
836 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
837     guint64 offset)
838 {
839   GstRtpJitterBufferPrivate *priv;
840   GstClockTime last_out;
841   GstBuffer *head;
842
843   priv = jbuf->priv;
844
845   JBUF_LOCK (priv);
846   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
847       active, GST_TIME_ARGS (offset));
848
849   if (active != priv->active) {
850     /* add the amount of time spent in paused to the output offset. All
851      * outgoing buffers will have this offset applied to their timestamps in
852      * order to make them arrive in time in the sink. */
853     priv->out_offset = offset;
854     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
855         GST_TIME_ARGS (priv->out_offset));
856     priv->active = active;
857     JBUF_SIGNAL_EVENT (priv);
858   }
859   if (!active) {
860     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
861   }
862   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
863     /* head buffer timestamp and offset gives our output time */
864     last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
865   } else {
866     /* use last known time when the buffer is empty */
867     last_out = priv->last_out_time;
868   }
869   JBUF_UNLOCK (priv);
870
871   return last_out;
872 }
873
874 static GstCaps *
875 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
876 {
877   GstRtpJitterBuffer *jitterbuffer;
878   GstRtpJitterBufferPrivate *priv;
879   GstPad *other;
880   GstCaps *caps;
881   GstCaps *templ;
882
883   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
884   priv = jitterbuffer->priv;
885
886   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
887
888   caps = gst_pad_peer_query_caps (other, filter);
889
890   templ = gst_pad_get_pad_template_caps (pad);
891   if (caps == NULL) {
892     GST_DEBUG_OBJECT (jitterbuffer, "use template");
893     caps = templ;
894   } else {
895     GstCaps *intersect;
896
897     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
898
899     intersect = gst_caps_intersect (caps, templ);
900     gst_caps_unref (caps);
901     gst_caps_unref (templ);
902
903     caps = intersect;
904   }
905   gst_object_unref (jitterbuffer);
906
907   return caps;
908 }
909
910 /*
911  * Must be called with JBUF_LOCK held
912  */
913
914 static gboolean
915 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
916     GstCaps * caps)
917 {
918   GstRtpJitterBufferPrivate *priv;
919   GstStructure *caps_struct;
920   guint val;
921   GstClockTime tval;
922
923   priv = jitterbuffer->priv;
924
925   /* first parse the caps */
926   caps_struct = gst_caps_get_structure (caps, 0);
927
928   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
929
930   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
931    * measure the amount of data in the buffer */
932   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
933     goto error;
934
935   if (priv->clock_rate <= 0)
936     goto wrong_rate;
937
938   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
939
940   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
941    * can use this to track the amount of time elapsed on the sender. */
942   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
943     priv->clock_base = val;
944   else
945     priv->clock_base = -1;
946
947   priv->ext_timestamp = priv->clock_base;
948
949   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
950       priv->clock_base);
951
952   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
953     /* first expected seqnum, only update when we didn't have a previous base. */
954     if (priv->next_in_seqnum == -1)
955       priv->next_in_seqnum = val;
956     if (priv->next_seqnum == -1)
957       priv->next_seqnum = val;
958   }
959
960   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
961
962   /* the start and stop times. The seqnum-base corresponds to the start time. We
963    * will keep track of the seqnums on the output and when we reach the one
964    * corresponding to npt-stop, we emit the npt-stop-reached signal */
965   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
966     priv->npt_start = tval;
967   else
968     priv->npt_start = 0;
969
970   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
971     priv->npt_stop = tval;
972   else
973     priv->npt_stop = -1;
974
975   GST_DEBUG_OBJECT (jitterbuffer,
976       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
977       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
978
979   return TRUE;
980
981   /* ERRORS */
982 error:
983   {
984     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
985     return FALSE;
986   }
987 wrong_rate:
988   {
989     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
990     return FALSE;
991   }
992 }
993
994 static void
995 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
996 {
997   GstRtpJitterBufferPrivate *priv;
998
999   priv = jitterbuffer->priv;
1000
1001   JBUF_LOCK (priv);
1002   /* mark ourselves as flushing */
1003   priv->srcresult = GST_FLOW_FLUSHING;
1004   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1005   /* this unblocks any waiting pops on the src pad task */
1006   JBUF_SIGNAL_EVENT (priv);
1007   /* unlock clock, we just unschedule, the entry will be released by the
1008    * locking streaming thread. */
1009   unschedule_current_timer (jitterbuffer);
1010   JBUF_UNLOCK (priv);
1011 }
1012
1013 static void
1014 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1015 {
1016   GstRtpJitterBufferPrivate *priv;
1017
1018   priv = jitterbuffer->priv;
1019
1020   JBUF_LOCK (priv);
1021   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1022   /* Mark as non flushing */
1023   priv->srcresult = GST_FLOW_OK;
1024   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1025   priv->last_popped_seqnum = -1;
1026   priv->last_out_time = -1;
1027   priv->next_seqnum = -1;
1028   priv->ips_rtptime = -1;
1029   priv->ips_dts = GST_CLOCK_TIME_NONE;
1030   priv->packet_spacing = 0;
1031   priv->next_in_seqnum = -1;
1032   priv->clock_rate = -1;
1033   priv->eos = FALSE;
1034   priv->estimated_eos = -1;
1035   priv->last_elapsed = 0;
1036   priv->ext_timestamp = -1;
1037   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1038   rtp_jitter_buffer_flush (priv->jbuf);
1039   rtp_jitter_buffer_reset_skew (priv->jbuf);
1040   remove_all_timers (jitterbuffer);
1041   JBUF_UNLOCK (priv);
1042 }
1043
1044 static gboolean
1045 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1046     GstPadMode mode, gboolean active)
1047 {
1048   gboolean result;
1049   GstRtpJitterBuffer *jitterbuffer = NULL;
1050
1051   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1052
1053   switch (mode) {
1054     case GST_PAD_MODE_PUSH:
1055       if (active) {
1056         /* allow data processing */
1057         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1058
1059         /* start pushing out buffers */
1060         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1061         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1062             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1063       } else {
1064         /* make sure all data processing stops ASAP */
1065         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1066
1067         /* NOTE this will hardlock if the state change is called from the src pad
1068          * task thread because we will _join() the thread. */
1069         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1070         result = gst_pad_stop_task (pad);
1071       }
1072       break;
1073     default:
1074       result = FALSE;
1075       break;
1076   }
1077   return result;
1078 }
1079
1080 static GstStateChangeReturn
1081 gst_rtp_jitter_buffer_change_state (GstElement * element,
1082     GstStateChange transition)
1083 {
1084   GstRtpJitterBuffer *jitterbuffer;
1085   GstRtpJitterBufferPrivate *priv;
1086   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1087
1088   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1089   priv = jitterbuffer->priv;
1090
1091   switch (transition) {
1092     case GST_STATE_CHANGE_NULL_TO_READY:
1093       break;
1094     case GST_STATE_CHANGE_READY_TO_PAUSED:
1095       JBUF_LOCK (priv);
1096       /* reset negotiated values */
1097       priv->clock_rate = -1;
1098       priv->clock_base = -1;
1099       priv->peer_latency = 0;
1100       priv->last_pt = -1;
1101       /* block until we go to PLAYING */
1102       priv->blocked = TRUE;
1103       priv->timer_running = TRUE;
1104       priv->timer_thread =
1105           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1106       JBUF_UNLOCK (priv);
1107       break;
1108     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1109       JBUF_LOCK (priv);
1110       /* unblock to allow streaming in PLAYING */
1111       priv->blocked = FALSE;
1112       JBUF_SIGNAL_EVENT (priv);
1113       JBUF_UNLOCK (priv);
1114       break;
1115     default:
1116       break;
1117   }
1118
1119   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1120
1121   switch (transition) {
1122     case GST_STATE_CHANGE_READY_TO_PAUSED:
1123       /* we are a live element because we sync to the clock, which we can only
1124        * do in the PLAYING state */
1125       if (ret != GST_STATE_CHANGE_FAILURE)
1126         ret = GST_STATE_CHANGE_NO_PREROLL;
1127       break;
1128     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1129       JBUF_LOCK (priv);
1130       /* block to stop streaming when PAUSED */
1131       priv->blocked = TRUE;
1132       JBUF_UNLOCK (priv);
1133       if (ret != GST_STATE_CHANGE_FAILURE)
1134         ret = GST_STATE_CHANGE_NO_PREROLL;
1135       break;
1136     case GST_STATE_CHANGE_PAUSED_TO_READY:
1137       JBUF_LOCK (priv);
1138       gst_buffer_replace (&priv->last_sr, NULL);
1139       priv->timer_running = FALSE;
1140       JBUF_SIGNAL_TIMER (priv);
1141       JBUF_UNLOCK (priv);
1142       g_thread_join (priv->timer_thread);
1143       priv->timer_thread = NULL;
1144       break;
1145     case GST_STATE_CHANGE_READY_TO_NULL:
1146       break;
1147     default:
1148       break;
1149   }
1150
1151   return ret;
1152 }
1153
1154 static gboolean
1155 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1156     GstEvent * event)
1157 {
1158   gboolean ret = TRUE;
1159   GstRtpJitterBuffer *jitterbuffer;
1160   GstRtpJitterBufferPrivate *priv;
1161
1162   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1163   priv = jitterbuffer->priv;
1164
1165   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1166
1167   switch (GST_EVENT_TYPE (event)) {
1168     case GST_EVENT_LATENCY:
1169     {
1170       GstClockTime latency;
1171
1172       gst_event_parse_latency (event, &latency);
1173
1174       GST_DEBUG_OBJECT (jitterbuffer,
1175           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1176
1177       JBUF_LOCK (priv);
1178       /* adjust the overall buffer delay to the total pipeline latency in
1179        * buffering mode because if downstream consumes too fast (because of
1180        * large latency or queues, we would start rebuffering again. */
1181       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1182           RTP_JITTER_BUFFER_MODE_BUFFER) {
1183         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1184       }
1185       JBUF_UNLOCK (priv);
1186
1187       ret = gst_pad_push_event (priv->sinkpad, event);
1188       break;
1189     }
1190     default:
1191       ret = gst_pad_push_event (priv->sinkpad, event);
1192       break;
1193   }
1194
1195   return ret;
1196 }
1197
1198 static gboolean
1199 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1200     GstEvent * event)
1201 {
1202   gboolean ret = TRUE;
1203   GstRtpJitterBuffer *jitterbuffer;
1204   GstRtpJitterBufferPrivate *priv;
1205
1206   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1207   priv = jitterbuffer->priv;
1208
1209   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1210
1211   switch (GST_EVENT_TYPE (event)) {
1212     case GST_EVENT_CAPS:
1213     {
1214       GstCaps *caps;
1215
1216       gst_event_parse_caps (event, &caps);
1217
1218       JBUF_LOCK (priv);
1219       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1220       JBUF_UNLOCK (priv);
1221
1222       /* set same caps on srcpad on success */
1223       if (ret)
1224         ret = gst_pad_push_event (priv->srcpad, event);
1225       else
1226         gst_event_unref (event);
1227       break;
1228     }
1229     case GST_EVENT_SEGMENT:
1230     {
1231       gst_event_copy_segment (event, &priv->segment);
1232
1233       /* we need time for now */
1234       if (priv->segment.format != GST_FORMAT_TIME)
1235         goto newseg_wrong_format;
1236
1237       GST_DEBUG_OBJECT (jitterbuffer,
1238           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1239
1240       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1241       ret = gst_pad_push_event (priv->srcpad, event);
1242       break;
1243     }
1244     case GST_EVENT_FLUSH_START:
1245       ret = gst_pad_push_event (priv->srcpad, event);
1246       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1247       break;
1248     case GST_EVENT_FLUSH_STOP:
1249       ret = gst_pad_push_event (priv->srcpad, event);
1250       ret =
1251           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1252           GST_PAD_MODE_PUSH, TRUE);
1253       break;
1254     case GST_EVENT_EOS:
1255     {
1256       /* push EOS in queue. We always push it at the head */
1257       JBUF_LOCK (priv);
1258       /* check for flushing, we need to discard the event and return FALSE when
1259        * we are flushing */
1260       ret = priv->srcresult == GST_FLOW_OK;
1261       if (ret && !priv->eos) {
1262         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1263         priv->eos = TRUE;
1264         JBUF_SIGNAL_EVENT (priv);
1265       } else if (priv->eos) {
1266         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1267       } else {
1268         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1269             gst_flow_get_name (priv->srcresult));
1270       }
1271       JBUF_UNLOCK (priv);
1272       gst_event_unref (event);
1273       break;
1274     }
1275     default:
1276       ret = gst_pad_push_event (priv->srcpad, event);
1277       break;
1278   }
1279
1280 done:
1281
1282   return ret;
1283
1284   /* ERRORS */
1285 newseg_wrong_format:
1286   {
1287     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1288     ret = FALSE;
1289     gst_event_unref (event);
1290     goto done;
1291   }
1292 }
1293
1294 static gboolean
1295 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1296     GstEvent * event)
1297 {
1298   gboolean ret = TRUE;
1299   GstRtpJitterBuffer *jitterbuffer;
1300
1301   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1302
1303   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1304
1305   switch (GST_EVENT_TYPE (event)) {
1306     case GST_EVENT_FLUSH_START:
1307       gst_event_unref (event);
1308       break;
1309     case GST_EVENT_FLUSH_STOP:
1310       gst_event_unref (event);
1311       break;
1312     default:
1313       ret = gst_pad_event_default (pad, parent, event);
1314       break;
1315   }
1316
1317   return ret;
1318 }
1319
1320 /*
1321  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1322  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1323  * GST_FLOW_FLUSHING when the element is shutting down. On success
1324  * GST_FLOW_OK is returned.
1325  */
1326 static GstFlowReturn
1327 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1328     guint8 pt)
1329 {
1330   GValue ret = { 0 };
1331   GValue args[2] = { {0}, {0} };
1332   GstCaps *caps;
1333   gboolean res;
1334
1335   g_value_init (&args[0], GST_TYPE_ELEMENT);
1336   g_value_set_object (&args[0], jitterbuffer);
1337   g_value_init (&args[1], G_TYPE_UINT);
1338   g_value_set_uint (&args[1], pt);
1339
1340   g_value_init (&ret, GST_TYPE_CAPS);
1341   g_value_set_boxed (&ret, NULL);
1342
1343   JBUF_UNLOCK (jitterbuffer->priv);
1344   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1345       &ret);
1346   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1347
1348   g_value_unset (&args[0]);
1349   g_value_unset (&args[1]);
1350   caps = (GstCaps *) g_value_dup_boxed (&ret);
1351   g_value_unset (&ret);
1352   if (!caps)
1353     goto no_caps;
1354
1355   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1356   gst_caps_unref (caps);
1357
1358   if (G_UNLIKELY (!res))
1359     goto parse_failed;
1360
1361   return GST_FLOW_OK;
1362
1363   /* ERRORS */
1364 no_caps:
1365   {
1366     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1367     return GST_FLOW_ERROR;
1368   }
1369 out_flushing:
1370   {
1371     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1372     return GST_FLOW_FLUSHING;
1373   }
1374 parse_failed:
1375   {
1376     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1377     return GST_FLOW_ERROR;
1378   }
1379 }
1380
1381 /* call with jbuf lock held */
1382 static void
1383 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1384 {
1385   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1386
1387   /* too short a stream, or too close to EOS will never really fill buffer */
1388   if (*percent != -1 && priv->npt_stop != -1 &&
1389       priv->npt_stop - priv->npt_start <=
1390       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1391     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1392     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1393     *percent = 100;
1394   }
1395 }
1396
1397 static void
1398 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1399 {
1400   GstMessage *message;
1401
1402   /* Post a buffering message */
1403   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1404   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1405
1406   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1407 }
1408
1409 static GstClockTime
1410 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1411 {
1412   GstRtpJitterBufferPrivate *priv;
1413
1414   priv = jitterbuffer->priv;
1415
1416   if (timestamp == -1)
1417     return -1;
1418
1419   /* apply the timestamp offset, this is used for inter stream sync */
1420   timestamp += priv->ts_offset;
1421   /* add the offset, this is used when buffering */
1422   timestamp += priv->out_offset;
1423
1424   return timestamp;
1425 }
1426
1427 static TimerData *
1428 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1429 {
1430   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1431   TimerData *timer = NULL;
1432   gint i, len;
1433
1434   len = priv->timers->len;
1435   for (i = 0; i < len; i++) {
1436     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1437     if (test->seqnum == seqnum && test->type == type) {
1438       timer = test;
1439       break;
1440     }
1441   }
1442   return timer;
1443 }
1444
1445 static void
1446 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1447 {
1448   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1449
1450   if (priv->clock_id) {
1451     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1452     gst_clock_id_unschedule (priv->clock_id);
1453   }
1454 }
1455
1456 static GstClockTime
1457 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1458 {
1459   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1460   GstClockTime test_timeout;
1461
1462   if ((test_timeout = timer->timeout) == -1)
1463     return -1;
1464
1465   if (timer->type != TIMER_TYPE_EXPECTED) {
1466     /* add our latency and offset to get output times. */
1467     test_timeout = apply_offset (jitterbuffer, test_timeout);
1468     test_timeout += priv->latency_ns;
1469   }
1470   return test_timeout;
1471 }
1472
1473 static void
1474 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1475 {
1476   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1477
1478   if (priv->clock_id) {
1479     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1480
1481     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1482         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1483
1484     if (timeout == -1 || timeout < priv->timer_timeout)
1485       unschedule_current_timer (jitterbuffer);
1486   }
1487 }
1488
1489 static TimerData *
1490 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1491     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1492     GstClockTime duration)
1493 {
1494   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1495   TimerData *timer;
1496   gint len;
1497
1498   GST_DEBUG_OBJECT (jitterbuffer,
1499       "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1500       GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
1501
1502   len = priv->timers->len;
1503   g_array_set_size (priv->timers, len + 1);
1504   timer = &g_array_index (priv->timers, TimerData, len);
1505   timer->idx = len;
1506   timer->type = type;
1507   timer->seqnum = seqnum;
1508   timer->num = num;
1509   timer->timeout = timeout + delay;
1510   timer->duration = duration;
1511   if (type == TIMER_TYPE_EXPECTED) {
1512     timer->rtx_base = timeout;
1513     timer->rtx_delay = delay;
1514     timer->rtx_retry = 0;
1515   }
1516   recalculate_timer (jitterbuffer, timer);
1517   JBUF_SIGNAL_TIMER (priv);
1518
1519   return timer;
1520 }
1521
1522 static void
1523 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1524     guint16 seqnum, GstClockTime timeout, GstClockTime delay)
1525 {
1526   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1527   gboolean seqchange, timechange;
1528   guint16 oldseq;
1529
1530   seqchange = timer->seqnum != seqnum;
1531   timechange = timer->timeout != timeout;
1532
1533   if (!seqchange && !timechange)
1534     return;
1535
1536   oldseq = timer->seqnum;
1537
1538   GST_DEBUG_OBJECT (jitterbuffer,
1539       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1540       oldseq, seqnum, GST_TIME_ARGS (timeout));
1541
1542   timer->timeout = timeout + delay;
1543   timer->seqnum = seqnum;
1544   if (seqchange && timer->type == TIMER_TYPE_EXPECTED) {
1545     timer->rtx_base = timeout;
1546     timer->rtx_delay = delay;
1547     timer->rtx_retry = 0;
1548   }
1549
1550   if (priv->clock_id) {
1551     /* we changed the seqnum and there is a timer currently waiting with this
1552      * seqnum, unschedule it */
1553     if (seqchange && priv->timer_seqnum == oldseq)
1554       unschedule_current_timer (jitterbuffer);
1555     /* we changed the time, check if it is earlier than what we are waiting
1556      * for and unschedule if so */
1557     else if (timechange)
1558       recalculate_timer (jitterbuffer, timer);
1559   }
1560 }
1561
1562 static TimerData *
1563 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1564     guint16 seqnum, GstClockTime timeout)
1565 {
1566   TimerData *timer;
1567
1568   /* find the seqnum timer */
1569   timer = find_timer (jitterbuffer, type, seqnum);
1570   if (timer == NULL) {
1571     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1572   } else {
1573     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0);
1574   }
1575   return timer;
1576 }
1577
1578 static void
1579 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1580 {
1581   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1582   guint idx;
1583
1584   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1585     unschedule_current_timer (jitterbuffer);
1586
1587   idx = timer->idx;
1588   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1589   g_array_remove_index_fast (priv->timers, idx);
1590   timer->idx = idx;
1591 }
1592
1593 static void
1594 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1595 {
1596   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1597   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1598   g_array_set_size (priv->timers, 0);
1599   unschedule_current_timer (jitterbuffer);
1600 }
1601
1602 /* we just received a packet with seqnum and dts.
1603  *
1604  * First check for old seqnum that we are still expecting. If the gap with the
1605  * current timestamp is too big, unschedule the timeouts.
1606  *
1607  * If we have a valid packet spacing estimate we can set a timer for when we
1608  * should receive the next packet.
1609  * If we don't have a valid estimate, we remove any timer we might have
1610  * had for this packet.
1611  */
1612 static void
1613 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1614     GstClockTime dts, gboolean do_next_seqnum)
1615 {
1616   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1617   TimerData *timer = NULL;
1618   gint i, len;
1619
1620   /* go through all timers and unschedule the ones with a large gap, also find
1621    * the timer for the seqnum */
1622   len = priv->timers->len;
1623   for (i = 0; i < len; i++) {
1624     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1625     gint gap;
1626
1627     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1628
1629     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1630         test->seqnum, seqnum, gap);
1631
1632     if (gap == 0) {
1633       GST_DEBUG ("found timer for current seqnum");
1634       /* the timer for the current seqnum */
1635       timer = test;
1636     } else if (gap > priv->rtx_delay_reorder) {
1637       /* max gap, we exceeded the max reorder distance and we don't expect the
1638        * missing packet to be this reordered */
1639       if (test->rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1640         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0);
1641     }
1642   }
1643
1644   if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1645     GstClockTime expected, delay;
1646
1647     /* calculate expected arrival time of the next seqnum */
1648     expected = dts + priv->packet_spacing;
1649     delay = priv->rtx_delay * GST_MSECOND;
1650
1651     /* and update/install timer for next seqnum */
1652     if (timer)
1653       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1654           delay);
1655     else
1656       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1657           expected, delay, priv->packet_spacing);
1658   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1659     /* if we had a timer, remove it, we don't know when to expect the next
1660      * packet. */
1661     remove_timer (jitterbuffer, timer);
1662     JBUF_SIGNAL_EVENT (priv);
1663   }
1664 }
1665
1666 static void
1667 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1668     GstClockTime dts)
1669 {
1670   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1671
1672   /* we need consecutive seqnums with a different
1673    * rtptime to estimate the packet spacing. */
1674   if (priv->ips_rtptime != rtptime) {
1675     /* rtptime changed, check dts diff */
1676     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1677       priv->packet_spacing = dts - priv->ips_dts;
1678       GST_DEBUG_OBJECT (jitterbuffer,
1679           "new packet spacing %" GST_TIME_FORMAT,
1680           GST_TIME_ARGS (priv->packet_spacing));
1681     }
1682     priv->ips_rtptime = rtptime;
1683     priv->ips_dts = dts;
1684   }
1685 }
1686
1687 static void
1688 send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
1689     guint lost_packets, GstClockTime timestamp, GstClockTime duration,
1690     gboolean late)
1691 {
1692   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1693
1694   /* we had a gap and thus we lost some packets. Create an event for this.  */
1695   if (lost_packets > 1)
1696     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
1697         seqnum + lost_packets - 1);
1698   else
1699     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
1700
1701   priv->num_late += lost_packets;
1702   priv->discont = TRUE;
1703
1704   /* update our expected next packet but make sure the seqnum increases */
1705   if (seqnum + lost_packets > priv->next_seqnum) {
1706     priv->next_seqnum = (seqnum + lost_packets) & 0xffff;
1707     priv->last_popped_seqnum = seqnum;
1708     priv->last_out_time = timestamp;
1709   }
1710   if (priv->do_lost) {
1711     GstEvent *event;
1712
1713     /* create paket lost event */
1714     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1715         gst_structure_new ("GstRTPPacketLost",
1716             "seqnum", G_TYPE_UINT, (guint) seqnum,
1717             "timestamp", G_TYPE_UINT64, timestamp,
1718             "duration", G_TYPE_UINT64, duration,
1719             "late", G_TYPE_BOOLEAN, late, NULL));
1720     JBUF_UNLOCK (priv);
1721     gst_pad_push_event (priv->srcpad, event);
1722     JBUF_LOCK (priv);
1723   }
1724 }
1725
1726 static void
1727 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1728     guint16 seqnum, GstClockTime dts, gint gap)
1729 {
1730   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1731   GstClockTime total_duration, duration, expected_dts;
1732   TimerType type;
1733
1734   GST_DEBUG_OBJECT (jitterbuffer,
1735       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1736       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1737
1738   /* the total duration spanned by the missing packets */
1739   if (dts >= priv->last_in_dts)
1740     total_duration = dts - priv->last_in_dts;
1741   else
1742     total_duration = 0;
1743
1744   /* interpolate between the current time and the last time based on
1745    * number of packets we are missing, this is the estimated duration
1746    * for the missing packet based on equidistant packet spacing. */
1747   duration = total_duration / (gap + 1);
1748
1749   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1750       GST_TIME_ARGS (duration));
1751
1752   if (total_duration > priv->latency_ns) {
1753     GstClockTime gap_time;
1754     guint lost_packets;
1755
1756     gap_time = total_duration - priv->latency_ns;
1757
1758     if (duration > 0) {
1759       lost_packets = gap_time / duration;
1760       gap_time = lost_packets * duration;
1761     } else {
1762       lost_packets = gap;
1763     }
1764
1765     /* too many lost packets, some of the missing packets are already
1766      * too late and we can generate lost packet events for them. */
1767     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
1768         " > %" GST_TIME_FORMAT ", consider %u lost",
1769         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
1770         lost_packets);
1771
1772     /* this timer will fire immediately and the lost event will be pushed from
1773      * the timer thread */
1774     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
1775         priv->last_in_dts + duration, 0, gap_time);
1776
1777     expected += lost_packets;
1778     priv->last_in_dts += gap_time;
1779   }
1780
1781   expected_dts = priv->last_in_dts + duration;
1782
1783   if (priv->do_retransmission) {
1784     type = TIMER_TYPE_EXPECTED;
1785     /* if we had a timer for the first missing packet, leave it. */
1786     if (find_timer (jitterbuffer, type, expected)) {
1787       expected++;
1788       expected_dts += duration;
1789     }
1790   } else {
1791     type = TIMER_TYPE_LOST;
1792   }
1793
1794   while (expected < seqnum) {
1795     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
1796     expected_dts += duration;
1797     expected++;
1798   }
1799 }
1800
1801 static GstFlowReturn
1802 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1803     GstBuffer * buffer)
1804 {
1805   GstRtpJitterBuffer *jitterbuffer;
1806   GstRtpJitterBufferPrivate *priv;
1807   guint16 seqnum;
1808   guint32 expected, rtptime;
1809   GstFlowReturn ret = GST_FLOW_OK;
1810   GstClockTime dts, pts;
1811   guint64 latency_ts;
1812   gboolean tail;
1813   gint percent = -1;
1814   guint8 pt;
1815   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1816   gboolean do_next_seqnum = FALSE;
1817
1818   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1819
1820   priv = jitterbuffer->priv;
1821
1822   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1823     goto invalid_buffer;
1824
1825   pt = gst_rtp_buffer_get_payload_type (&rtp);
1826   seqnum = gst_rtp_buffer_get_seq (&rtp);
1827   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1828   gst_rtp_buffer_unmap (&rtp);
1829
1830   /* make sure we have PTS and DTS set */
1831   pts = GST_BUFFER_PTS (buffer);
1832   dts = GST_BUFFER_DTS (buffer);
1833   if (dts == -1)
1834     dts = pts;
1835   else if (pts == -1)
1836     pts = dts;
1837
1838   /* take the DTS of the buffer. This is the time when the packet was
1839    * received and is used to calculate jitter and clock skew. We will adjust
1840    * this DTS with the smoothed value after processing it in the
1841    * jitterbuffer and assign it as the PTS. */
1842   /* bring to running time */
1843   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1844
1845   GST_DEBUG_OBJECT (jitterbuffer,
1846       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
1847       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
1848
1849   JBUF_LOCK_CHECK (priv, out_flushing);
1850
1851   if (G_UNLIKELY (priv->last_pt != pt)) {
1852     GstCaps *caps;
1853
1854     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1855         pt);
1856
1857     priv->last_pt = pt;
1858     /* reset clock-rate so that we get a new one */
1859     priv->clock_rate = -1;
1860
1861     /* Try to get the clock-rate from the caps first if we can. If there are no
1862      * caps we must fire the signal to get the clock-rate. */
1863     if ((caps = gst_pad_get_current_caps (pad))) {
1864       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1865       gst_caps_unref (caps);
1866     }
1867   }
1868
1869   if (G_UNLIKELY (priv->clock_rate == -1)) {
1870     /* no clock rate given on the caps, try to get one with the signal */
1871     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1872             pt) == GST_FLOW_FLUSHING)
1873       goto out_flushing;
1874
1875     if (G_UNLIKELY (priv->clock_rate == -1))
1876       goto no_clock_rate;
1877   }
1878
1879   /* don't accept more data on EOS */
1880   if (G_UNLIKELY (priv->eos))
1881     goto have_eos;
1882
1883   expected = priv->next_in_seqnum;
1884
1885   /* now check against our expected seqnum */
1886   if (G_LIKELY (expected != -1)) {
1887     gint gap;
1888
1889     /* now calculate gap */
1890     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
1891
1892     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1893         expected, seqnum, gap);
1894
1895     if (G_LIKELY (gap == 0)) {
1896       /* packet is expected */
1897       calculate_packet_spacing (jitterbuffer, rtptime, dts);
1898       do_next_seqnum = TRUE;
1899     } else {
1900       gboolean reset = FALSE;
1901
1902       if (gap < 0) {
1903         /* we received an old packet */
1904         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
1905           /* too old packet, reset */
1906           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
1907               -RTP_MAX_MISORDER);
1908           reset = TRUE;
1909         } else {
1910           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
1911         }
1912       } else {
1913         /* new packet, we are missing some packets */
1914         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1915           /* packet too far in future, reset */
1916           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
1917               RTP_MAX_DROPOUT);
1918           reset = TRUE;
1919         } else {
1920           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
1921           /* fill in the gap with EXPECTED timers */
1922           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
1923
1924           do_next_seqnum = TRUE;
1925         }
1926       }
1927       if (G_UNLIKELY (reset)) {
1928         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1929         rtp_jitter_buffer_flush (priv->jbuf);
1930         rtp_jitter_buffer_reset_skew (priv->jbuf);
1931         remove_all_timers (jitterbuffer);
1932         priv->last_popped_seqnum = -1;
1933         priv->next_seqnum = seqnum;
1934         do_next_seqnum = TRUE;
1935       }
1936       /* reset spacing estimation when gap */
1937       priv->ips_rtptime = -1;
1938       priv->ips_dts = GST_CLOCK_TIME_NONE;
1939     }
1940   } else {
1941     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
1942     /* we don't know what the next_in_seqnum should be, wait for the last
1943      * possible moment to push this buffer, maybe we get an earlier seqnum
1944      * while we wait */
1945     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
1946     do_next_seqnum = TRUE;
1947     /* take rtptime and dts to calculate packet spacing */
1948     priv->ips_rtptime = rtptime;
1949     priv->ips_dts = dts;
1950   }
1951   if (do_next_seqnum) {
1952     priv->last_in_seqnum = seqnum;
1953     priv->last_in_dts = dts;
1954     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1955   }
1956
1957   /* let's check if this buffer is too late, we can only accept packets with
1958    * bigger seqnum than the one we last pushed. */
1959   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1960     gint gap;
1961
1962     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1963
1964     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1965     if (G_UNLIKELY (gap <= 0))
1966       goto too_late;
1967   }
1968
1969   /* let's drop oldest packet if the queue is already full and drop-on-latency
1970    * is set. We can only do this when there actually is a latency. When no
1971    * latency is set, we just pump it in the queue and let the other end push it
1972    * out as fast as possible. */
1973   if (priv->latency_ms && priv->drop_on_latency) {
1974     latency_ts =
1975         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1976
1977     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1978       GstBuffer *old_buf;
1979
1980       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1981
1982       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1983           old_buf);
1984
1985       gst_buffer_unref (old_buf);
1986     }
1987   }
1988
1989   /* we need to make the metadata writable before pushing it in the jitterbuffer
1990    * because the jitterbuffer will update the PTS */
1991   buffer = gst_buffer_make_writable (buffer);
1992   GST_BUFFER_DTS (buffer) = dts;
1993   GST_BUFFER_PTS (buffer) = pts;
1994
1995   /* now insert the packet into the queue in sorted order. This function returns
1996    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1997    * have a duplicate. */
1998   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
1999               priv->clock_rate, &tail, &percent)))
2000     goto duplicate;
2001
2002   /* update timers */
2003   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2004
2005   /* we had an unhandled SR, handle it now */
2006   if (priv->last_sr)
2007     do_handle_sync (jitterbuffer);
2008
2009   /* signal addition of new buffer when the _loop is waiting. */
2010   if (priv->active && priv->waiting_timer)
2011     JBUF_SIGNAL_EVENT (priv);
2012
2013   /* let's unschedule and unblock any waiting buffers. We only want to do this
2014    * when the tail buffer changed */
2015   if (G_UNLIKELY (priv->clock_id && tail)) {
2016     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2017     unschedule_current_timer (jitterbuffer);
2018   }
2019
2020   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
2021       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
2022
2023   check_buffering_percent (jitterbuffer, &percent);
2024
2025 finished:
2026   JBUF_UNLOCK (priv);
2027
2028   if (percent != -1)
2029     post_buffering_percent (jitterbuffer, percent);
2030
2031   return ret;
2032
2033   /* ERRORS */
2034 invalid_buffer:
2035   {
2036     /* this is not fatal but should be filtered earlier */
2037     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2038         ("Received invalid RTP payload, dropping"));
2039     gst_buffer_unref (buffer);
2040     return GST_FLOW_OK;
2041   }
2042 no_clock_rate:
2043   {
2044     GST_WARNING_OBJECT (jitterbuffer,
2045         "No clock-rate in caps!, dropping buffer");
2046     gst_buffer_unref (buffer);
2047     goto finished;
2048   }
2049 out_flushing:
2050   {
2051     ret = priv->srcresult;
2052     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2053     gst_buffer_unref (buffer);
2054     goto finished;
2055   }
2056 have_eos:
2057   {
2058     ret = GST_FLOW_EOS;
2059     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2060     gst_buffer_unref (buffer);
2061     goto finished;
2062   }
2063 too_late:
2064   {
2065     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2066         " popped, dropping", seqnum, priv->last_popped_seqnum);
2067     priv->num_late++;
2068     gst_buffer_unref (buffer);
2069     goto finished;
2070   }
2071 duplicate:
2072   {
2073     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2074         seqnum);
2075     priv->num_duplicates++;
2076     gst_buffer_unref (buffer);
2077     goto finished;
2078   }
2079 }
2080
2081 static GstClockTime
2082 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2083 {
2084   guint64 ext_time, elapsed;
2085   guint32 rtp_time;
2086   GstRtpJitterBufferPrivate *priv;
2087   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2088
2089   priv = jitterbuffer->priv;
2090   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2091   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
2092   gst_rtp_buffer_unmap (&rtp);
2093
2094   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2095       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2096
2097   if (rtp_time < priv->ext_timestamp) {
2098     ext_time = priv->ext_timestamp;
2099   } else {
2100     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2101   }
2102
2103   if (ext_time > priv->clock_base)
2104     elapsed = ext_time - priv->clock_base;
2105   else
2106     elapsed = 0;
2107
2108   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2109   return elapsed;
2110 }
2111
2112 static void
2113 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2114 {
2115   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2116
2117   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2118       && priv->clock_base != -1 && priv->clock_rate > 0) {
2119     guint64 elapsed, estimated;
2120
2121     elapsed = compute_elapsed (jitterbuffer, outbuf);
2122
2123     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2124       guint64 left;
2125       GstClockTime out_time;
2126
2127       priv->last_elapsed = elapsed;
2128
2129       left = priv->npt_stop - priv->npt_start;
2130       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2131           GST_TIME_ARGS (left));
2132
2133       out_time = GST_BUFFER_DTS (outbuf);
2134
2135       if (elapsed > 0)
2136         estimated = gst_util_uint64_scale (out_time, left, elapsed);
2137       else {
2138         /* if there is almost nothing left,
2139          * we may never advance enough to end up in the above case */
2140         if (left < GST_SECOND)
2141           estimated = GST_SECOND;
2142         else
2143           estimated = -1;
2144       }
2145
2146       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2147           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2148
2149       if (estimated != -1 && priv->estimated_eos != estimated) {
2150         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2151         priv->estimated_eos = estimated;
2152       }
2153     }
2154   }
2155 }
2156
2157 /* take a buffer from the queue and push it */
2158 static GstFlowReturn
2159 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2160 {
2161   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2162   GstFlowReturn result;
2163   GstBuffer *outbuf;
2164   GstClockTime dts, pts;
2165   gint percent = -1;
2166
2167   /* when we get here we are ready to pop and push the buffer */
2168   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2169
2170   check_buffering_percent (jitterbuffer, &percent);
2171
2172   if (G_UNLIKELY (priv->discont)) {
2173     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2174      * into the jitterbuffer so we can modify now. */
2175     GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2176     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2177     priv->discont = FALSE;
2178   }
2179   if (G_UNLIKELY (priv->ts_discont)) {
2180     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2181     priv->ts_discont = FALSE;
2182   }
2183
2184   dts = GST_BUFFER_DTS (outbuf);
2185   pts = GST_BUFFER_PTS (outbuf);
2186
2187   dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
2188   pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
2189
2190   /* apply timestamp with offset to buffer now */
2191   GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2192   GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2193
2194   /* update the elapsed time when we need to check against the npt stop time. */
2195   update_estimated_eos (jitterbuffer, outbuf);
2196
2197   /* now we are ready to push the buffer. Save the seqnum and release the lock
2198    * so the other end can push stuff in the queue again. */
2199   priv->last_popped_seqnum = seqnum;
2200   priv->last_out_time = GST_BUFFER_PTS (outbuf);
2201   priv->next_seqnum = (seqnum + 1) & 0xffff;
2202   JBUF_UNLOCK (priv);
2203
2204   if (percent != -1)
2205     post_buffering_percent (jitterbuffer, percent);
2206
2207   /* push buffer */
2208   GST_DEBUG_OBJECT (jitterbuffer,
2209       "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2210       seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2211       GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2212
2213   result = gst_pad_push (priv->srcpad, outbuf);
2214
2215   JBUF_LOCK_CHECK (priv, out_flushing);
2216
2217   return result;
2218
2219   /* ERRORS */
2220 out_flushing:
2221   {
2222     return priv->srcresult;
2223   }
2224 }
2225
2226 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2227
2228 /* Peek a buffer and compare the seqnum to the expected seqnum.
2229  * If all is fine, the buffer is pushed.
2230  * If something is wrong, we wait for some event
2231  */
2232 static GstFlowReturn
2233 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2234 {
2235   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2236   GstFlowReturn result = GST_FLOW_OK;
2237   GstBuffer *outbuf;
2238   guint16 seqnum;
2239   guint32 next_seqnum;
2240   gint gap;
2241   GstRTPBuffer rtp = { NULL, };
2242
2243   /* only push buffers when PLAYING and active and not buffering */
2244   if (priv->blocked || !priv->active ||
2245       rtp_jitter_buffer_is_buffering (priv->jbuf))
2246     return GST_FLOW_WAIT;
2247
2248 again:
2249   /* peek a buffer, we're just looking at the sequence number.
2250    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2251    * wait for a timeout or something to change.
2252    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2253   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
2254   if (outbuf == NULL)
2255     goto wait;
2256
2257   /* get the seqnum and the next expected seqnum */
2258   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2259   seqnum = gst_rtp_buffer_get_seq (&rtp);
2260   gst_rtp_buffer_unmap (&rtp);
2261
2262   next_seqnum = priv->next_seqnum;
2263
2264   /* get the gap between this and the previous packet. If we don't know the
2265    * previous packet seqnum assume no gap. */
2266   if (G_UNLIKELY (next_seqnum == -1)) {
2267     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2268     /* we don't know what the next_seqnum should be, the chain function should
2269      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2270      * fires, so wait for that */
2271     result = GST_FLOW_WAIT;
2272   } else {
2273     /* else calculate GAP */
2274     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2275
2276     if (G_LIKELY (gap == 0)) {
2277       /* no missing packet, pop and push */
2278       result = pop_and_push_next (jitterbuffer, seqnum);
2279     } else if (G_UNLIKELY (gap < 0)) {
2280       /* if we have a packet that we already pushed or considered dropped, pop it
2281        * off and get the next packet */
2282       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2283           seqnum, next_seqnum);
2284       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2285       gst_buffer_unref (outbuf);
2286       goto again;
2287     } else {
2288       /* the chain function has scheduled timers to request retransmission or
2289        * when to consider the packet lost, wait for that */
2290       GST_DEBUG_OBJECT (jitterbuffer,
2291           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2292           next_seqnum, seqnum, gap);
2293       result = GST_FLOW_WAIT;
2294     }
2295   }
2296   return result;
2297
2298 wait:
2299   {
2300     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2301     if (priv->eos)
2302       result = GST_FLOW_EOS;
2303     else
2304       result = GST_FLOW_WAIT;
2305     return result;
2306   }
2307 }
2308
2309 /* the timeout for when we expected a packet expired */
2310 static gboolean
2311 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2312     GstClockTime now)
2313 {
2314   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2315   GstEvent *event;
2316
2317   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2318
2319   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2320       gst_structure_new ("GstRTPRetransmissionRequest",
2321           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2322           "running-time", G_TYPE_UINT64, timer->rtx_base,
2323           "delay", G_TYPE_UINT,
2324           GST_TIME_AS_MSECONDS (timer->rtx_delay + timer->rtx_retry),
2325           "frequency", G_TYPE_UINT, priv->rtx_retry_timeout, "period",
2326           G_TYPE_UINT, priv->rtx_retry_period, "deadline", G_TYPE_UINT,
2327           priv->latency_ms, "packet-spacing", G_TYPE_UINT64,
2328           priv->packet_spacing, NULL));
2329
2330   JBUF_UNLOCK (priv);
2331   gst_pad_push_event (priv->sinkpad, event);
2332   JBUF_LOCK (priv);
2333
2334   /* calculate the timeout for the next retransmission attempt */
2335   timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2336   if (timer->rtx_retry + timer->rtx_delay >
2337       (priv->rtx_retry_period * GST_MSECOND)) {
2338     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2339     /* too many retransmission request, we now convert the timer
2340      * to a lost timer */
2341     timer->type = TIMER_TYPE_LOST;
2342     timer->rtx_delay = 0;
2343     timer->rtx_retry = 0;
2344   }
2345   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2346       timer->rtx_base + timer->rtx_retry, timer->rtx_delay);
2347
2348   return FALSE;
2349 }
2350
2351 /* a packet is lost */
2352 static gboolean
2353 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2354     GstClockTime now)
2355 {
2356   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2357   GstClockTime duration, timestamp;
2358   guint seqnum, num;
2359   gboolean late;
2360
2361   seqnum = timer->seqnum;
2362   timestamp = apply_offset (jitterbuffer, timer->timeout);
2363   duration = timer->duration;
2364   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2365     duration = priv->packet_spacing;
2366   num = MAX (timer->num, 1);
2367   late = timer->num > 0;
2368
2369   /* remove timer now */
2370   remove_timer (jitterbuffer, timer);
2371   JBUF_SIGNAL_EVENT (priv);
2372
2373   send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
2374
2375   return TRUE;
2376 }
2377
2378 static gboolean
2379 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2380     GstClockTime now)
2381 {
2382   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2383
2384   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2385   remove_timer (jitterbuffer, timer);
2386   JBUF_SIGNAL_EVENT (priv);
2387
2388   return TRUE;
2389 }
2390
2391 static gboolean
2392 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2393     GstClockTime now)
2394 {
2395   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2396
2397   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2398
2399   priv->next_seqnum = timer->seqnum;
2400   remove_timer (jitterbuffer, timer);
2401   JBUF_SIGNAL_EVENT (priv);
2402
2403   return TRUE;
2404 }
2405
2406 static gboolean
2407 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2408     GstClockTime now)
2409 {
2410   gboolean removed = FALSE;
2411
2412   switch (timer->type) {
2413     case TIMER_TYPE_EXPECTED:
2414       removed = do_expected_timeout (jitterbuffer, timer, now);
2415       break;
2416     case TIMER_TYPE_LOST:
2417       removed = do_lost_timeout (jitterbuffer, timer, now);
2418       break;
2419     case TIMER_TYPE_DEADLINE:
2420       removed = do_deadline_timeout (jitterbuffer, timer, now);
2421       break;
2422     case TIMER_TYPE_EOS:
2423       removed = do_eos_timeout (jitterbuffer, timer, now);
2424       break;
2425   }
2426   return removed;
2427 }
2428
2429 /* called when we need to wait for the next timeout.
2430  *
2431  * We loop over the array of recorded timeouts and wait for the earliest one.
2432  * When it timed out, do the logic associated with the timer.
2433  *
2434  * If there are no timers, we wait on a gcond until something new happens.
2435  */
2436 static void
2437 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2438 {
2439   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2440   GstClockTime now = 0;
2441
2442   JBUF_LOCK (priv);
2443   while (priv->timer_running) {
2444     TimerData *timer = NULL;
2445     GstClockTime timer_timeout = -1;
2446     gint i, len;
2447
2448     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
2449         GST_TIME_ARGS (now));
2450
2451     len = priv->timers->len;
2452     for (i = 0; i < len; i++) {
2453       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2454       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2455
2456       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2457           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2458
2459       /* no timestamp, timeout immeditately */
2460       if (test_timeout == -1 || test_timeout <= now) {
2461         if (do_timeout (jitterbuffer, test, now))
2462           len--;
2463         i--;
2464       } else if (timer == NULL || test_timeout < timer_timeout) {
2465         /* find the smallest timeout */
2466         timer = test;
2467         timer_timeout = test_timeout;
2468       }
2469     }
2470     if (timer) {
2471       GstClock *clock;
2472       GstClockTime sync_time;
2473       GstClockID id;
2474       GstClockReturn ret;
2475       GstClockTimeDiff clock_jitter;
2476
2477       GST_OBJECT_LOCK (jitterbuffer);
2478       clock = GST_ELEMENT_CLOCK (jitterbuffer);
2479       if (!clock) {
2480         GST_OBJECT_UNLOCK (jitterbuffer);
2481         /* let's just push if there is no clock */
2482         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2483         now = timer_timeout;
2484         continue;
2485       }
2486
2487       /* prepare for sync against clock */
2488       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2489       /* add latency of peer to get input time */
2490       sync_time += priv->peer_latency;
2491
2492       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2493           " with sync time %" GST_TIME_FORMAT,
2494           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2495
2496       /* create an entry for the clock */
2497       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2498       priv->timer_timeout = timer_timeout;
2499       priv->timer_seqnum = timer->seqnum;
2500       GST_OBJECT_UNLOCK (jitterbuffer);
2501
2502       /* release the lock so that the other end can push stuff or unlock */
2503       JBUF_UNLOCK (priv);
2504
2505       ret = gst_clock_id_wait (id, &clock_jitter);
2506
2507       JBUF_LOCK (priv);
2508       if (!priv->timer_running)
2509         break;
2510
2511       if (ret != GST_CLOCK_UNSCHEDULED) {
2512         now = timer_timeout + MAX (clock_jitter, 0);
2513         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2514             ret, priv->timer_seqnum, clock_jitter);
2515       } else {
2516         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
2517       }
2518       /* and free the entry */
2519       gst_clock_id_unref (id);
2520       priv->clock_id = NULL;
2521     } else {
2522       /* no timers, wait for activity */
2523       GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2524       JBUF_WAIT_TIMER (priv);
2525       GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2526     }
2527   }
2528   JBUF_UNLOCK (priv);
2529
2530   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2531   return;
2532 }
2533
2534 /*
2535  * This funcion implements the main pushing loop on the source pad.
2536  *
2537  * It first tries to push as many buffers as possible. If there is a seqnum
2538  * mismatch, we wait for the next timeouts.
2539  */
2540 static void
2541 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2542 {
2543   GstRtpJitterBufferPrivate *priv;
2544   GstFlowReturn result;
2545
2546   priv = jitterbuffer->priv;
2547
2548   JBUF_LOCK_CHECK (priv, flushing);
2549   do {
2550     result = handle_next_buffer (jitterbuffer);
2551     if (G_LIKELY (result == GST_FLOW_WAIT)) {
2552       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
2553       /* now wait for the next event */
2554       JBUF_WAIT_EVENT (priv, flushing);
2555       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
2556       result = GST_FLOW_OK;
2557     }
2558   }
2559   while (result == GST_FLOW_OK);
2560   JBUF_UNLOCK (priv);
2561
2562   /* if we get here we need to pause */
2563   goto pause;
2564
2565   /* ERRORS */
2566 flushing:
2567   {
2568     result = priv->srcresult;
2569     JBUF_UNLOCK (priv);
2570     goto pause;
2571   }
2572 pause:
2573   {
2574     const gchar *reason = gst_flow_get_name (result);
2575     GstEvent *event;
2576
2577     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2578     gst_pad_pause_task (priv->srcpad);
2579     if (result == GST_FLOW_EOS) {
2580       event = gst_event_new_eos ();
2581       gst_pad_push_event (priv->srcpad, event);
2582     }
2583     return;
2584   }
2585 }
2586
2587 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2588  * some sanity checks and then emit the handle-sync signal with the parameters.
2589  * This function must be called with the LOCK */
2590 static void
2591 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2592 {
2593   GstRtpJitterBufferPrivate *priv;
2594   guint64 base_rtptime, base_time;
2595   guint32 clock_rate;
2596   guint64 last_rtptime;
2597   guint64 clock_base;
2598   guint64 ext_rtptime, diff;
2599   gboolean drop = FALSE;
2600
2601   priv = jitterbuffer->priv;
2602
2603   /* get the last values from the jitterbuffer */
2604   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2605       &clock_rate, &last_rtptime);
2606
2607   clock_base = priv->clock_base;
2608   ext_rtptime = priv->ext_rtptime;
2609
2610   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2611       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2612       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2613       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2614
2615   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2616     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2617     drop = TRUE;
2618   } else {
2619     /* we can't accept anything that happened before we did the last resync */
2620     if (base_rtptime > ext_rtptime) {
2621       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2622       drop = TRUE;
2623     } else {
2624       /* the SR RTP timestamp must be something close to what we last observed
2625        * in the jitterbuffer */
2626       if (ext_rtptime > last_rtptime) {
2627         /* check how far ahead it is to our RTP timestamps */
2628         diff = ext_rtptime - last_rtptime;
2629         /* if bigger than 1 second, we drop it */
2630         if (diff > clock_rate) {
2631           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2632           /* should drop this, but some RTSP servers end up with bogus
2633            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2634            * so still trigger rptbin sync but invalidate RTCP data
2635            * (sync might use other methods) */
2636           ext_rtptime = -1;
2637         }
2638         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2639             G_GUINT64_FORMAT, last_rtptime, diff);
2640       }
2641     }
2642   }
2643
2644   if (!drop) {
2645     GstStructure *s;
2646
2647     s = gst_structure_new ("application/x-rtp-sync",
2648         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2649         "base-time", G_TYPE_UINT64, base_time,
2650         "clock-rate", G_TYPE_UINT, clock_rate,
2651         "clock-base", G_TYPE_UINT64, clock_base,
2652         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2653         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2654
2655     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2656     gst_buffer_replace (&priv->last_sr, NULL);
2657     JBUF_UNLOCK (priv);
2658     g_signal_emit (jitterbuffer,
2659         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2660     JBUF_LOCK (priv);
2661     gst_structure_free (s);
2662   } else {
2663     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2664   }
2665 }
2666
2667 static GstFlowReturn
2668 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2669     GstBuffer * buffer)
2670 {
2671   GstRtpJitterBuffer *jitterbuffer;
2672   GstRtpJitterBufferPrivate *priv;
2673   GstFlowReturn ret = GST_FLOW_OK;
2674   guint32 ssrc;
2675   GstRTCPPacket packet;
2676   guint64 ext_rtptime;
2677   guint32 rtptime;
2678   GstRTCPBuffer rtcp = { NULL, };
2679
2680   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2681
2682   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2683     goto invalid_buffer;
2684
2685   priv = jitterbuffer->priv;
2686
2687   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2688
2689   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2690     goto empty_buffer;
2691
2692   /* first packet must be SR or RR or else the validate would have failed */
2693   switch (gst_rtcp_packet_get_type (&packet)) {
2694     case GST_RTCP_TYPE_SR:
2695       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2696           NULL, NULL);
2697       break;
2698     default:
2699       goto ignore_buffer;
2700   }
2701   gst_rtcp_buffer_unmap (&rtcp);
2702
2703   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2704
2705   JBUF_LOCK (priv);
2706   /* convert the RTP timestamp to our extended timestamp, using the same offset
2707    * we used in the jitterbuffer */
2708   ext_rtptime = priv->jbuf->ext_rtptime;
2709   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2710
2711   priv->ext_rtptime = ext_rtptime;
2712   gst_buffer_replace (&priv->last_sr, buffer);
2713
2714   do_handle_sync (jitterbuffer);
2715   JBUF_UNLOCK (priv);
2716
2717 done:
2718   gst_buffer_unref (buffer);
2719
2720   return ret;
2721
2722 invalid_buffer:
2723   {
2724     /* this is not fatal but should be filtered earlier */
2725     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2726         ("Received invalid RTCP payload, dropping"));
2727     ret = GST_FLOW_OK;
2728     goto done;
2729   }
2730 empty_buffer:
2731   {
2732     /* this is not fatal but should be filtered earlier */
2733     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2734         ("Received empty RTCP payload, dropping"));
2735     gst_rtcp_buffer_unmap (&rtcp);
2736     ret = GST_FLOW_OK;
2737     goto done;
2738   }
2739 ignore_buffer:
2740   {
2741     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2742     gst_rtcp_buffer_unmap (&rtcp);
2743     ret = GST_FLOW_OK;
2744     goto done;
2745   }
2746 }
2747
2748 static gboolean
2749 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2750     GstQuery * query)
2751 {
2752   gboolean res = FALSE;
2753
2754   switch (GST_QUERY_TYPE (query)) {
2755     case GST_QUERY_CAPS:
2756     {
2757       GstCaps *filter, *caps;
2758
2759       gst_query_parse_caps (query, &filter);
2760       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2761       gst_query_set_caps_result (query, caps);
2762       gst_caps_unref (caps);
2763       res = TRUE;
2764       break;
2765     }
2766     default:
2767       if (GST_QUERY_IS_SERIALIZED (query)) {
2768         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2769         res = FALSE;
2770       } else {
2771         res = gst_pad_query_default (pad, parent, query);
2772       }
2773       break;
2774   }
2775   return res;
2776 }
2777
2778 static gboolean
2779 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2780     GstQuery * query)
2781 {
2782   GstRtpJitterBuffer *jitterbuffer;
2783   GstRtpJitterBufferPrivate *priv;
2784   gboolean res = FALSE;
2785
2786   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2787   priv = jitterbuffer->priv;
2788
2789   switch (GST_QUERY_TYPE (query)) {
2790     case GST_QUERY_LATENCY:
2791     {
2792       /* We need to send the query upstream and add the returned latency to our
2793        * own */
2794       GstClockTime min_latency, max_latency;
2795       gboolean us_live;
2796       GstClockTime our_latency;
2797
2798       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2799         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2800
2801         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2802             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2803             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2804
2805         /* store this so that we can safely sync on the peer buffers. */
2806         JBUF_LOCK (priv);
2807         priv->peer_latency = min_latency;
2808         our_latency = priv->latency_ns;
2809         JBUF_UNLOCK (priv);
2810
2811         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2812             GST_TIME_ARGS (our_latency));
2813
2814         /* we add some latency but can buffer an infinite amount of time */
2815         min_latency += our_latency;
2816         max_latency = -1;
2817
2818         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2819             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2820             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2821
2822         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2823       }
2824       break;
2825     }
2826     case GST_QUERY_POSITION:
2827     {
2828       GstClockTime start, last_out;
2829       GstFormat fmt;
2830
2831       gst_query_parse_position (query, &fmt, NULL);
2832       if (fmt != GST_FORMAT_TIME) {
2833         res = gst_pad_query_default (pad, parent, query);
2834         break;
2835       }
2836
2837       JBUF_LOCK (priv);
2838       start = priv->npt_start;
2839       last_out = priv->last_out_time;
2840       JBUF_UNLOCK (priv);
2841
2842       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2843           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2844           GST_TIME_ARGS (last_out));
2845
2846       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2847         /* bring 0-based outgoing time to stream time */
2848         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2849         res = TRUE;
2850       } else {
2851         res = gst_pad_query_default (pad, parent, query);
2852       }
2853       break;
2854     }
2855     case GST_QUERY_CAPS:
2856     {
2857       GstCaps *filter, *caps;
2858
2859       gst_query_parse_caps (query, &filter);
2860       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2861       gst_query_set_caps_result (query, caps);
2862       gst_caps_unref (caps);
2863       res = TRUE;
2864       break;
2865     }
2866     default:
2867       res = gst_pad_query_default (pad, parent, query);
2868       break;
2869   }
2870
2871   return res;
2872 }
2873
2874 static void
2875 gst_rtp_jitter_buffer_set_property (GObject * object,
2876     guint prop_id, const GValue * value, GParamSpec * pspec)
2877 {
2878   GstRtpJitterBuffer *jitterbuffer;
2879   GstRtpJitterBufferPrivate *priv;
2880
2881   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2882   priv = jitterbuffer->priv;
2883
2884   switch (prop_id) {
2885     case PROP_LATENCY:
2886     {
2887       guint new_latency, old_latency;
2888
2889       new_latency = g_value_get_uint (value);
2890
2891       JBUF_LOCK (priv);
2892       old_latency = priv->latency_ms;
2893       priv->latency_ms = new_latency;
2894       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2895       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2896       JBUF_UNLOCK (priv);
2897
2898       /* post message if latency changed, this will inform the parent pipeline
2899        * that a latency reconfiguration is possible/needed. */
2900       if (new_latency != old_latency) {
2901         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2902             GST_TIME_ARGS (new_latency * GST_MSECOND));
2903
2904         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2905             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2906       }
2907       break;
2908     }
2909     case PROP_DROP_ON_LATENCY:
2910       JBUF_LOCK (priv);
2911       priv->drop_on_latency = g_value_get_boolean (value);
2912       JBUF_UNLOCK (priv);
2913       break;
2914     case PROP_TS_OFFSET:
2915       JBUF_LOCK (priv);
2916       priv->ts_offset = g_value_get_int64 (value);
2917       priv->ts_discont = TRUE;
2918       JBUF_UNLOCK (priv);
2919       break;
2920     case PROP_DO_LOST:
2921       JBUF_LOCK (priv);
2922       priv->do_lost = g_value_get_boolean (value);
2923       JBUF_UNLOCK (priv);
2924       break;
2925     case PROP_MODE:
2926       JBUF_LOCK (priv);
2927       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2928       JBUF_UNLOCK (priv);
2929       break;
2930     case PROP_DO_RETRANSMISSION:
2931       JBUF_LOCK (priv);
2932       priv->do_retransmission = g_value_get_boolean (value);
2933       JBUF_UNLOCK (priv);
2934       break;
2935     case PROP_RTX_DELAY:
2936       JBUF_LOCK (priv);
2937       priv->rtx_delay = g_value_get_int (value);
2938       JBUF_UNLOCK (priv);
2939       break;
2940     case PROP_RTX_DELAY_REORDER:
2941       JBUF_LOCK (priv);
2942       priv->rtx_delay_reorder = g_value_get_int (value);
2943       JBUF_UNLOCK (priv);
2944       break;
2945     case PROP_RTX_RETRY_TIMEOUT:
2946       JBUF_LOCK (priv);
2947       priv->rtx_retry_timeout = g_value_get_int (value);
2948       JBUF_UNLOCK (priv);
2949       break;
2950     case PROP_RTX_RETRY_PERIOD:
2951       JBUF_LOCK (priv);
2952       priv->rtx_retry_period = g_value_get_int (value);
2953       JBUF_UNLOCK (priv);
2954       break;
2955     default:
2956       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2957       break;
2958   }
2959 }
2960
2961 static void
2962 gst_rtp_jitter_buffer_get_property (GObject * object,
2963     guint prop_id, GValue * value, GParamSpec * pspec)
2964 {
2965   GstRtpJitterBuffer *jitterbuffer;
2966   GstRtpJitterBufferPrivate *priv;
2967
2968   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2969   priv = jitterbuffer->priv;
2970
2971   switch (prop_id) {
2972     case PROP_LATENCY:
2973       JBUF_LOCK (priv);
2974       g_value_set_uint (value, priv->latency_ms);
2975       JBUF_UNLOCK (priv);
2976       break;
2977     case PROP_DROP_ON_LATENCY:
2978       JBUF_LOCK (priv);
2979       g_value_set_boolean (value, priv->drop_on_latency);
2980       JBUF_UNLOCK (priv);
2981       break;
2982     case PROP_TS_OFFSET:
2983       JBUF_LOCK (priv);
2984       g_value_set_int64 (value, priv->ts_offset);
2985       JBUF_UNLOCK (priv);
2986       break;
2987     case PROP_DO_LOST:
2988       JBUF_LOCK (priv);
2989       g_value_set_boolean (value, priv->do_lost);
2990       JBUF_UNLOCK (priv);
2991       break;
2992     case PROP_MODE:
2993       JBUF_LOCK (priv);
2994       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2995       JBUF_UNLOCK (priv);
2996       break;
2997     case PROP_PERCENT:
2998     {
2999       gint percent;
3000
3001       JBUF_LOCK (priv);
3002       if (priv->srcresult != GST_FLOW_OK)
3003         percent = 100;
3004       else
3005         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3006
3007       g_value_set_int (value, percent);
3008       JBUF_UNLOCK (priv);
3009       break;
3010     }
3011     case PROP_DO_RETRANSMISSION:
3012       JBUF_LOCK (priv);
3013       g_value_set_boolean (value, priv->do_retransmission);
3014       JBUF_UNLOCK (priv);
3015       break;
3016     case PROP_RTX_DELAY:
3017       JBUF_LOCK (priv);
3018       g_value_set_int (value, priv->rtx_delay);
3019       JBUF_UNLOCK (priv);
3020       break;
3021     case PROP_RTX_DELAY_REORDER:
3022       JBUF_LOCK (priv);
3023       g_value_set_int (value, priv->rtx_delay_reorder);
3024       JBUF_UNLOCK (priv);
3025       break;
3026     case PROP_RTX_RETRY_TIMEOUT:
3027       JBUF_LOCK (priv);
3028       g_value_set_int (value, priv->rtx_retry_timeout);
3029       JBUF_UNLOCK (priv);
3030       break;
3031     case PROP_RTX_RETRY_PERIOD:
3032       JBUF_LOCK (priv);
3033       g_value_set_int (value, priv->rtx_retry_period);
3034       JBUF_UNLOCK (priv);
3035       break;
3036     default:
3037       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3038       break;
3039   }
3040 }