jitterbuffer: improve debug
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpjitterbuffer.h"
65 #include "rtpjitterbuffer.h"
66 #include "rtpstats.h"
67
68 #include <gst/glib-compat-private.h>
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* RTPJitterBuffer signals and args */
74 enum
75 {
76   SIGNAL_REQUEST_PT_MAP,
77   SIGNAL_CLEAR_PT_MAP,
78   SIGNAL_HANDLE_SYNC,
79   SIGNAL_ON_NPT_STOP,
80   SIGNAL_SET_ACTIVE,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS          200
85 #define DEFAULT_DROP_ON_LATENCY     FALSE
86 #define DEFAULT_TS_OFFSET           0
87 #define DEFAULT_DO_LOST             FALSE
88 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
89 #define DEFAULT_PERCENT             0
90 #define DEFAULT_DO_RETRANSMISSION   FALSE
91 #define DEFAULT_RTX_DELAY           20
92 #define DEFAULT_RTX_DELAY_REORDER   3
93 #define DEFAULT_RTX_RETRY_TIMEOUT   40
94 #define DEFAULT_RTX_RETRY_PERIOD    160
95
96 enum
97 {
98   PROP_0,
99   PROP_LATENCY,
100   PROP_DROP_ON_LATENCY,
101   PROP_TS_OFFSET,
102   PROP_DO_LOST,
103   PROP_MODE,
104   PROP_PERCENT,
105   PROP_DO_RETRANSMISSION,
106   PROP_RTX_DELAY,
107   PROP_RTX_DELAY_REORDER,
108   PROP_RTX_RETRY_TIMEOUT,
109   PROP_RTX_RETRY_PERIOD,
110   PROP_LAST
111 };
112
113 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
114
115 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
116   JBUF_LOCK (priv);                                   \
117   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
118     goto label;                                       \
119 } G_STMT_END
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
121
122 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
123   (priv)->waiting_timer = TRUE;                           \
124   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
125   (priv)->waiting_timer = FALSE;                          \
126 } G_STMT_END
127 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {    \
128   if (G_UNLIKELY ((priv)->waiting_timer))         \
129     g_cond_signal (&(priv)->jbuf_timer);          \
130 } G_STMT_END
131
132 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
133   (priv)->waiting_event = TRUE;                          \
134   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
135   (priv)->waiting_event = FALSE;                         \
136   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
137     goto label;                                          \
138 } G_STMT_END
139 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {    \
140   if (G_UNLIKELY ((priv)->waiting_event))         \
141     g_cond_signal (&(priv)->jbuf_event);          \
142 } G_STMT_END
143
144 struct _GstRtpJitterBufferPrivate
145 {
146   GstPad *sinkpad, *srcpad;
147   GstPad *rtcpsinkpad;
148
149   RTPJitterBuffer *jbuf;
150   GMutex jbuf_lock;
151   gboolean waiting_timer;
152   GCond jbuf_timer;
153   gboolean waiting_event;
154   GCond jbuf_event;
155   gboolean discont;
156   gboolean ts_discont;
157   gboolean active;
158   guint64 out_offset;
159
160   gboolean timer_running;
161   GThread *timer_thread;
162
163   /* properties */
164   guint latency_ms;
165   guint64 latency_ns;
166   gboolean drop_on_latency;
167   gint64 ts_offset;
168   gboolean do_lost;
169   gboolean do_retransmission;
170   gint rtx_delay;
171   gint rtx_delay_reorder;
172   gint rtx_retry_timeout;
173   gint rtx_retry_period;
174
175   /* the last seqnum we pushed out */
176   guint32 last_popped_seqnum;
177   /* the next expected seqnum we push */
178   guint32 next_seqnum;
179   /* last output time */
180   GstClockTime last_out_time;
181   /* last valid input timestamp and rtptime pair */
182   GstClockTime ips_dts;
183   guint64 ips_rtptime;
184   GstClockTime packet_spacing;
185
186   /* the next expected seqnum we receive */
187   GstClockTime last_in_dts;
188   guint32 last_in_seqnum;
189   guint32 next_in_seqnum;
190
191   GArray *timers;
192
193   /* start and stop ranges */
194   GstClockTime npt_start;
195   GstClockTime npt_stop;
196   guint64 ext_timestamp;
197   guint64 last_elapsed;
198   guint64 estimated_eos;
199   GstClockID eos_id;
200
201   /* state */
202   gboolean eos;
203
204   /* clock rate and rtp timestamp offset */
205   gint last_pt;
206   gint32 clock_rate;
207   gint64 clock_base;
208   gint64 prev_ts_offset;
209
210   /* when we are shutting down */
211   GstFlowReturn srcresult;
212   gboolean blocked;
213
214   /* for sync */
215   GstSegment segment;
216   GstClockID clock_id;
217   GstClockTime timer_timeout;
218   guint16 timer_seqnum;
219   /* the latency of the upstream peer, we have to take this into account when
220    * synchronizing the buffers. */
221   GstClockTime peer_latency;
222   guint64 ext_rtptime;
223   GstBuffer *last_sr;
224
225   /* some accounting */
226   guint64 num_late;
227   guint64 num_duplicates;
228 };
229
230 typedef enum
231 {
232   TIMER_TYPE_EXPECTED,
233   TIMER_TYPE_LOST,
234   TIMER_TYPE_DEADLINE,
235   TIMER_TYPE_EOS
236 } TimerType;
237
238 typedef struct
239 {
240   guint idx;
241   guint16 seqnum;
242   guint num;
243   TimerType type;
244   GstClockTime timeout;
245   GstClockTime duration;
246   GstClockTime rtx_base;
247   GstClockTime rtx_delay;
248   GstClockTime rtx_retry;
249 } TimerData;
250
251 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
252   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
253                                 GstRtpJitterBufferPrivate))
254
255 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
256 GST_STATIC_PAD_TEMPLATE ("sink",
257     GST_PAD_SINK,
258     GST_PAD_ALWAYS,
259     GST_STATIC_CAPS ("application/x-rtp, "
260         "clock-rate = (int) [ 1, 2147483647 ]"
261         /* "payload = (int) , "
262          * "encoding-name = (string) "
263          */ )
264     );
265
266 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
267 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
268     GST_PAD_SINK,
269     GST_PAD_REQUEST,
270     GST_STATIC_CAPS ("application/x-rtcp")
271     );
272
273 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
274 GST_STATIC_PAD_TEMPLATE ("src",
275     GST_PAD_SRC,
276     GST_PAD_ALWAYS,
277     GST_STATIC_CAPS ("application/x-rtp"
278         /* "payload = (int) , "
279          * "clock-rate = (int) , "
280          * "encoding-name = (string) "
281          */ )
282     );
283
284 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
285
286 #define gst_rtp_jitter_buffer_parent_class parent_class
287 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
288
289 /* object overrides */
290 static void gst_rtp_jitter_buffer_set_property (GObject * object,
291     guint prop_id, const GValue * value, GParamSpec * pspec);
292 static void gst_rtp_jitter_buffer_get_property (GObject * object,
293     guint prop_id, GValue * value, GParamSpec * pspec);
294 static void gst_rtp_jitter_buffer_finalize (GObject * object);
295
296 /* element overrides */
297 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
298     * element, GstStateChange transition);
299 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
300     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
301 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
302     GstPad * pad);
303 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
304
305 /* pad overrides */
306 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
307 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
308     GstObject * parent);
309
310 /* sinkpad overrides */
311 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
312     GstObject * parent, GstEvent * event);
313 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
314     GstObject * parent, GstBuffer * buffer);
315
316 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
317     GstObject * parent, GstEvent * event);
318 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
319     GstObject * parent, GstBuffer * buffer);
320
321 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
322     GstObject * parent, GstQuery * query);
323
324 /* srcpad overrides */
325 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
326     GstObject * parent, GstEvent * event);
327 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
328     GstObject * parent, GstPadMode mode, gboolean active);
329 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
330 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
331     GstObject * parent, GstQuery * query);
332
333 static void
334 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
335 static GstClockTime
336 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
337     gboolean active, guint64 base_time);
338 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
339
340 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
341 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
342
343 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
344
345 static void
346 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
347 {
348   GObjectClass *gobject_class;
349   GstElementClass *gstelement_class;
350
351   gobject_class = (GObjectClass *) klass;
352   gstelement_class = (GstElementClass *) klass;
353
354   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
355
356   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
357
358   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
359   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
360
361   /**
362    * GstRtpJitterBuffer::latency:
363    *
364    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
365    * for at most this time.
366    */
367   g_object_class_install_property (gobject_class, PROP_LATENCY,
368       g_param_spec_uint ("latency", "Buffer latency in ms",
369           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   /**
372    * GstRtpJitterBuffer::drop-on-latency:
373    *
374    * Drop oldest buffers when the queue is completely filled.
375    */
376   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
377       g_param_spec_boolean ("drop-on-latency",
378           "Drop buffers when maximum latency is reached",
379           "Tells the jitterbuffer to never exceed the given latency in size",
380           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381   /**
382    * GstRtpJitterBuffer::ts-offset:
383    *
384    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
385    * This is mainly used to ensure interstream synchronisation.
386    */
387   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
388       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
389           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
390           G_MAXINT64, DEFAULT_TS_OFFSET,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   /**
394    * GstRtpJitterBuffer::do-lost:
395    *
396    * Send out a GstRTPPacketLost event downstream when a packet is considered
397    * lost.
398    */
399   g_object_class_install_property (gobject_class, PROP_DO_LOST,
400       g_param_spec_boolean ("do-lost", "Do Lost",
401           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
402           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
403
404   /**
405    * GstRtpJitterBuffer::mode:
406    *
407    * Control the buffering and timestamping mode used by the jitterbuffer.
408    */
409   g_object_class_install_property (gobject_class, PROP_MODE,
410       g_param_spec_enum ("mode", "Mode",
411           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
412           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413   /**
414    * GstRtpJitterBuffer::percent:
415    *
416    * The percent of the jitterbuffer that is filled.
417    *
418    * Since: 0.10.19
419    */
420   g_object_class_install_property (gobject_class, PROP_PERCENT,
421       g_param_spec_int ("percent", "percent",
422           "The buffer filled percent", 0, 100,
423           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
424   /**
425    * GstRtpJitterBuffer::do-retransmission:
426    *
427    * Send out a GstRTPRetransmission event upstream when a packet is considered
428    * late and should be retransmitted.
429    *
430    * Since: 1.2
431    */
432   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
433       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
434           "Send retransmission events upstream when a packet is late",
435           DEFAULT_DO_RETRANSMISSION,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   /**
439    * GstRtpJitterBuffer::rtx-delay:
440    *
441    * When a packet did not arrive at the expected time, wait this extra amount
442    * of time before sending a retransmission event.
443    *
444    * When -1 is used, the max jitter will be used as extra delay.
445    *
446    * Since: 1.2
447    */
448   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
449       g_param_spec_int ("rtx-delay", "RTX Delay",
450           "Extra time in ms to wait before sending retransmission "
451           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
452           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453   /**
454    * GstRtpJitterBuffer::rtx-delay-reorder:
455    *
456    * Assume that a retransmission event should be sent when we see
457    * this much packet reordering.
458    *
459    * When -1 is used, the value will be estimated based on observed packet
460    * reordering.
461    *
462    * Since: 1.2
463    */
464   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
465       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
466           "Sending retransmission event when this much reordering (-1 automatic)",
467           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstRtpJitterBuffer::rtx-retry-timeout:
471    *
472    * When no packet has been received after sending a retransmission event
473    * for this time, retry sending a retransmission event.
474    *
475    * When -1 is used, the value will be estimated based on observed round
476    * trip time.
477    *
478    * Since: 1.2
479    */
480   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
481       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
482           "Retry sending a transmission event after this timeout in "
483           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
484           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
485   /**
486    * GstRtpJitterBuffer::rtx-retry-period:
487    *
488    * The amount of time to try to get a retransmission.
489    *
490    * When -1 is used, the value will be estimated based on the jitterbuffer
491    * latency and the observed round trip time.
492    *
493    * Since: 1.2
494    */
495   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
496       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
497           "Try to get a retransmission for this many ms "
498           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
499           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
500
501   /**
502    * GstRtpJitterBuffer::request-pt-map:
503    * @buffer: the object which received the signal
504    * @pt: the pt
505    *
506    * Request the payload type as #GstCaps for @pt.
507    */
508   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
509       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
510       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
511           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
512       GST_TYPE_CAPS, 1, G_TYPE_UINT);
513   /**
514    * GstRtpJitterBuffer::handle-sync:
515    * @buffer: the object which received the signal
516    * @struct: a GstStructure containing sync values.
517    *
518    * Be notified of new sync values.
519    */
520   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
521       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
523           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
524       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
525
526   /**
527    * GstRtpJitterBuffer::on-npt-stop
528    * @buffer: the object which received the signal
529    *
530    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
531    * the npt-stop position.
532    */
533   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
534       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
535       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
536           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
537       G_TYPE_NONE, 0, G_TYPE_NONE);
538
539   /**
540    * GstRtpJitterBuffer::clear-pt-map:
541    * @buffer: the object which received the signal
542    *
543    * Invalidate the clock-rate as obtained with the
544    * #GstRtpJitterBuffer::request-pt-map signal.
545    */
546   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
547       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
548       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
549       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
550       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
551
552   /**
553    * GstRtpJitterBuffer::set-active:
554    * @buffer: the object which received the signal
555    *
556    * Start pushing out packets with the given base time. This signal is only
557    * useful in buffering mode.
558    *
559    * Returns: the time of the last pushed packet.
560    *
561    * Since: 0.10.19
562    */
563   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
564       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
565       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
566       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
567       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
568       G_TYPE_UINT64);
569
570   gstelement_class->change_state =
571       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
572   gstelement_class->request_new_pad =
573       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
574   gstelement_class->release_pad =
575       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
576   gstelement_class->provide_clock =
577       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
578
579   gst_element_class_add_pad_template (gstelement_class,
580       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
581   gst_element_class_add_pad_template (gstelement_class,
582       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
583   gst_element_class_add_pad_template (gstelement_class,
584       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
585
586   gst_element_class_set_static_metadata (gstelement_class,
587       "RTP packet jitter-buffer", "Filter/Network/RTP",
588       "A buffer that deals with network jitter and other transmission faults",
589       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
590       "Wim Taymans <wim.taymans@gmail.com>");
591
592   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
593   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
594
595   GST_DEBUG_CATEGORY_INIT
596       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
597 }
598
599 static void
600 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
601 {
602   GstRtpJitterBufferPrivate *priv;
603
604   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
605   jitterbuffer->priv = priv;
606
607   priv->latency_ms = DEFAULT_LATENCY_MS;
608   priv->latency_ns = priv->latency_ms * GST_MSECOND;
609   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
610   priv->do_lost = DEFAULT_DO_LOST;
611   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
612   priv->rtx_delay = DEFAULT_RTX_DELAY;
613   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
614   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
615   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
616
617   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
618   priv->jbuf = rtp_jitter_buffer_new ();
619   g_mutex_init (&priv->jbuf_lock);
620   g_cond_init (&priv->jbuf_timer);
621   g_cond_init (&priv->jbuf_event);
622
623   /* reset skew detection initialy */
624   rtp_jitter_buffer_reset_skew (priv->jbuf);
625   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
626   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
627   priv->active = TRUE;
628
629   priv->srcpad =
630       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
631       "src");
632
633   gst_pad_set_activatemode_function (priv->srcpad,
634       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
635   gst_pad_set_query_function (priv->srcpad,
636       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
637   gst_pad_set_event_function (priv->srcpad,
638       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
639
640   priv->sinkpad =
641       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
642       "sink");
643
644   gst_pad_set_chain_function (priv->sinkpad,
645       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
646   gst_pad_set_event_function (priv->sinkpad,
647       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
648   gst_pad_set_query_function (priv->sinkpad,
649       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
650
651   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
652   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
653
654   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
655 }
656
657 static void
658 gst_rtp_jitter_buffer_finalize (GObject * object)
659 {
660   GstRtpJitterBuffer *jitterbuffer;
661
662   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
663
664   g_array_free (jitterbuffer->priv->timers, TRUE);
665   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
666   g_cond_clear (&jitterbuffer->priv->jbuf_timer);
667   g_cond_clear (&jitterbuffer->priv->jbuf_event);
668
669   g_object_unref (jitterbuffer->priv->jbuf);
670
671   G_OBJECT_CLASS (parent_class)->finalize (object);
672 }
673
674 static GstIterator *
675 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
676 {
677   GstRtpJitterBuffer *jitterbuffer;
678   GstPad *otherpad = NULL;
679   GstIterator *it;
680   GValue val = { 0, };
681
682   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
683
684   if (pad == jitterbuffer->priv->sinkpad) {
685     otherpad = jitterbuffer->priv->srcpad;
686   } else if (pad == jitterbuffer->priv->srcpad) {
687     otherpad = jitterbuffer->priv->sinkpad;
688   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
689     otherpad = NULL;
690   }
691
692   g_value_init (&val, GST_TYPE_PAD);
693   g_value_set_object (&val, otherpad);
694   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
695   g_value_unset (&val);
696
697   return it;
698 }
699
700 static GstPad *
701 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
702 {
703   GstRtpJitterBufferPrivate *priv;
704
705   priv = jitterbuffer->priv;
706
707   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
708
709   priv->rtcpsinkpad =
710       gst_pad_new_from_static_template
711       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
712   gst_pad_set_chain_function (priv->rtcpsinkpad,
713       gst_rtp_jitter_buffer_chain_rtcp);
714   gst_pad_set_event_function (priv->rtcpsinkpad,
715       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
716   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
717       gst_rtp_jitter_buffer_iterate_internal_links);
718   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
719   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
720
721   return priv->rtcpsinkpad;
722 }
723
724 static void
725 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
726 {
727   GstRtpJitterBufferPrivate *priv;
728
729   priv = jitterbuffer->priv;
730
731   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
732
733   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
734
735   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
736   priv->rtcpsinkpad = NULL;
737 }
738
739 static GstPad *
740 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
741     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
742 {
743   GstRtpJitterBuffer *jitterbuffer;
744   GstElementClass *klass;
745   GstPad *result;
746   GstRtpJitterBufferPrivate *priv;
747
748   g_return_val_if_fail (templ != NULL, NULL);
749   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
750
751   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
752   priv = jitterbuffer->priv;
753   klass = GST_ELEMENT_GET_CLASS (element);
754
755   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
756
757   /* figure out the template */
758   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
759     if (priv->rtcpsinkpad != NULL)
760       goto exists;
761
762     result = create_rtcp_sink (jitterbuffer);
763   } else
764     goto wrong_template;
765
766   return result;
767
768   /* ERRORS */
769 wrong_template:
770   {
771     g_warning ("gstrtpjitterbuffer: this is not our template");
772     return NULL;
773   }
774 exists:
775   {
776     g_warning ("gstrtpjitterbuffer: pad already requested");
777     return NULL;
778   }
779 }
780
781 static void
782 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
783 {
784   GstRtpJitterBuffer *jitterbuffer;
785   GstRtpJitterBufferPrivate *priv;
786
787   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
788   g_return_if_fail (GST_IS_PAD (pad));
789
790   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
791   priv = jitterbuffer->priv;
792
793   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
794
795   if (priv->rtcpsinkpad == pad) {
796     remove_rtcp_sink (jitterbuffer);
797   } else
798     goto wrong_pad;
799
800   return;
801
802   /* ERRORS */
803 wrong_pad:
804   {
805     g_warning ("gstjitterbuffer: asked to release an unknown pad");
806     return;
807   }
808 }
809
810 static GstClock *
811 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
812 {
813   return gst_system_clock_obtain ();
814 }
815
816 static void
817 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
818 {
819   GstRtpJitterBufferPrivate *priv;
820
821   priv = jitterbuffer->priv;
822
823   /* this will trigger a new pt-map request signal, FIXME, do something better. */
824
825   JBUF_LOCK (priv);
826   priv->clock_rate = -1;
827   /* do not clear current content, but refresh state for new arrival */
828   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
829   rtp_jitter_buffer_reset_skew (priv->jbuf);
830   priv->last_popped_seqnum = -1;
831   priv->next_seqnum = -1;
832   JBUF_UNLOCK (priv);
833 }
834
835 static GstClockTime
836 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
837     guint64 offset)
838 {
839   GstRtpJitterBufferPrivate *priv;
840   GstClockTime last_out;
841   GstBuffer *head;
842
843   priv = jbuf->priv;
844
845   JBUF_LOCK (priv);
846   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
847       active, GST_TIME_ARGS (offset));
848
849   if (active != priv->active) {
850     /* add the amount of time spent in paused to the output offset. All
851      * outgoing buffers will have this offset applied to their timestamps in
852      * order to make them arrive in time in the sink. */
853     priv->out_offset = offset;
854     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
855         GST_TIME_ARGS (priv->out_offset));
856     priv->active = active;
857     JBUF_SIGNAL_EVENT (priv);
858   }
859   if (!active) {
860     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
861   }
862   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
863     /* head buffer timestamp and offset gives our output time */
864     last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
865   } else {
866     /* use last known time when the buffer is empty */
867     last_out = priv->last_out_time;
868   }
869   JBUF_UNLOCK (priv);
870
871   return last_out;
872 }
873
874 static GstCaps *
875 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
876 {
877   GstRtpJitterBuffer *jitterbuffer;
878   GstRtpJitterBufferPrivate *priv;
879   GstPad *other;
880   GstCaps *caps;
881   GstCaps *templ;
882
883   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
884   priv = jitterbuffer->priv;
885
886   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
887
888   caps = gst_pad_peer_query_caps (other, filter);
889
890   templ = gst_pad_get_pad_template_caps (pad);
891   if (caps == NULL) {
892     GST_DEBUG_OBJECT (jitterbuffer, "use template");
893     caps = templ;
894   } else {
895     GstCaps *intersect;
896
897     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
898
899     intersect = gst_caps_intersect (caps, templ);
900     gst_caps_unref (caps);
901     gst_caps_unref (templ);
902
903     caps = intersect;
904   }
905   gst_object_unref (jitterbuffer);
906
907   return caps;
908 }
909
910 /*
911  * Must be called with JBUF_LOCK held
912  */
913
914 static gboolean
915 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
916     GstCaps * caps)
917 {
918   GstRtpJitterBufferPrivate *priv;
919   GstStructure *caps_struct;
920   guint val;
921   GstClockTime tval;
922
923   priv = jitterbuffer->priv;
924
925   /* first parse the caps */
926   caps_struct = gst_caps_get_structure (caps, 0);
927
928   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
929
930   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
931    * measure the amount of data in the buffer */
932   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
933     goto error;
934
935   if (priv->clock_rate <= 0)
936     goto wrong_rate;
937
938   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
939
940   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
941    * can use this to track the amount of time elapsed on the sender. */
942   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
943     priv->clock_base = val;
944   else
945     priv->clock_base = -1;
946
947   priv->ext_timestamp = priv->clock_base;
948
949   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
950       priv->clock_base);
951
952   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
953     /* first expected seqnum, only update when we didn't have a previous base. */
954     if (priv->next_in_seqnum == -1)
955       priv->next_in_seqnum = val;
956     if (priv->next_seqnum == -1)
957       priv->next_seqnum = val;
958   }
959
960   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
961
962   /* the start and stop times. The seqnum-base corresponds to the start time. We
963    * will keep track of the seqnums on the output and when we reach the one
964    * corresponding to npt-stop, we emit the npt-stop-reached signal */
965   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
966     priv->npt_start = tval;
967   else
968     priv->npt_start = 0;
969
970   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
971     priv->npt_stop = tval;
972   else
973     priv->npt_stop = -1;
974
975   GST_DEBUG_OBJECT (jitterbuffer,
976       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
977       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
978
979   return TRUE;
980
981   /* ERRORS */
982 error:
983   {
984     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
985     return FALSE;
986   }
987 wrong_rate:
988   {
989     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
990     return FALSE;
991   }
992 }
993
994 static void
995 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
996 {
997   GstRtpJitterBufferPrivate *priv;
998
999   priv = jitterbuffer->priv;
1000
1001   JBUF_LOCK (priv);
1002   /* mark ourselves as flushing */
1003   priv->srcresult = GST_FLOW_FLUSHING;
1004   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1005   /* this unblocks any waiting pops on the src pad task */
1006   JBUF_SIGNAL_EVENT (priv);
1007   /* unlock clock, we just unschedule, the entry will be released by the
1008    * locking streaming thread. */
1009   unschedule_current_timer (jitterbuffer);
1010   JBUF_UNLOCK (priv);
1011 }
1012
1013 static void
1014 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1015 {
1016   GstRtpJitterBufferPrivate *priv;
1017
1018   priv = jitterbuffer->priv;
1019
1020   JBUF_LOCK (priv);
1021   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1022   /* Mark as non flushing */
1023   priv->srcresult = GST_FLOW_OK;
1024   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1025   priv->last_popped_seqnum = -1;
1026   priv->last_out_time = -1;
1027   priv->next_seqnum = -1;
1028   priv->ips_rtptime = -1;
1029   priv->ips_dts = GST_CLOCK_TIME_NONE;
1030   priv->packet_spacing = 0;
1031   priv->next_in_seqnum = -1;
1032   priv->clock_rate = -1;
1033   priv->eos = FALSE;
1034   priv->estimated_eos = -1;
1035   priv->last_elapsed = 0;
1036   priv->ext_timestamp = -1;
1037   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1038   rtp_jitter_buffer_flush (priv->jbuf);
1039   rtp_jitter_buffer_reset_skew (priv->jbuf);
1040   remove_all_timers (jitterbuffer);
1041   JBUF_UNLOCK (priv);
1042 }
1043
1044 static gboolean
1045 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1046     GstPadMode mode, gboolean active)
1047 {
1048   gboolean result;
1049   GstRtpJitterBuffer *jitterbuffer = NULL;
1050
1051   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1052
1053   switch (mode) {
1054     case GST_PAD_MODE_PUSH:
1055       if (active) {
1056         /* allow data processing */
1057         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1058
1059         /* start pushing out buffers */
1060         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1061         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1062             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1063       } else {
1064         /* make sure all data processing stops ASAP */
1065         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1066
1067         /* NOTE this will hardlock if the state change is called from the src pad
1068          * task thread because we will _join() the thread. */
1069         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1070         result = gst_pad_stop_task (pad);
1071       }
1072       break;
1073     default:
1074       result = FALSE;
1075       break;
1076   }
1077   return result;
1078 }
1079
1080 static GstStateChangeReturn
1081 gst_rtp_jitter_buffer_change_state (GstElement * element,
1082     GstStateChange transition)
1083 {
1084   GstRtpJitterBuffer *jitterbuffer;
1085   GstRtpJitterBufferPrivate *priv;
1086   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1087
1088   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1089   priv = jitterbuffer->priv;
1090
1091   switch (transition) {
1092     case GST_STATE_CHANGE_NULL_TO_READY:
1093       break;
1094     case GST_STATE_CHANGE_READY_TO_PAUSED:
1095       JBUF_LOCK (priv);
1096       /* reset negotiated values */
1097       priv->clock_rate = -1;
1098       priv->clock_base = -1;
1099       priv->peer_latency = 0;
1100       priv->last_pt = -1;
1101       /* block until we go to PLAYING */
1102       priv->blocked = TRUE;
1103       priv->timer_running = TRUE;
1104       priv->timer_thread =
1105           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1106       JBUF_UNLOCK (priv);
1107       break;
1108     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1109       JBUF_LOCK (priv);
1110       /* unblock to allow streaming in PLAYING */
1111       priv->blocked = FALSE;
1112       JBUF_SIGNAL_EVENT (priv);
1113       JBUF_UNLOCK (priv);
1114       break;
1115     default:
1116       break;
1117   }
1118
1119   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1120
1121   switch (transition) {
1122     case GST_STATE_CHANGE_READY_TO_PAUSED:
1123       /* we are a live element because we sync to the clock, which we can only
1124        * do in the PLAYING state */
1125       if (ret != GST_STATE_CHANGE_FAILURE)
1126         ret = GST_STATE_CHANGE_NO_PREROLL;
1127       break;
1128     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1129       JBUF_LOCK (priv);
1130       /* block to stop streaming when PAUSED */
1131       priv->blocked = TRUE;
1132       JBUF_UNLOCK (priv);
1133       if (ret != GST_STATE_CHANGE_FAILURE)
1134         ret = GST_STATE_CHANGE_NO_PREROLL;
1135       break;
1136     case GST_STATE_CHANGE_PAUSED_TO_READY:
1137       JBUF_LOCK (priv);
1138       gst_buffer_replace (&priv->last_sr, NULL);
1139       priv->timer_running = FALSE;
1140       JBUF_SIGNAL_TIMER (priv);
1141       JBUF_UNLOCK (priv);
1142       g_thread_join (priv->timer_thread);
1143       priv->timer_thread = NULL;
1144       break;
1145     case GST_STATE_CHANGE_READY_TO_NULL:
1146       break;
1147     default:
1148       break;
1149   }
1150
1151   return ret;
1152 }
1153
1154 static gboolean
1155 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1156     GstEvent * event)
1157 {
1158   gboolean ret = TRUE;
1159   GstRtpJitterBuffer *jitterbuffer;
1160   GstRtpJitterBufferPrivate *priv;
1161
1162   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1163   priv = jitterbuffer->priv;
1164
1165   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1166
1167   switch (GST_EVENT_TYPE (event)) {
1168     case GST_EVENT_LATENCY:
1169     {
1170       GstClockTime latency;
1171
1172       gst_event_parse_latency (event, &latency);
1173
1174       GST_DEBUG_OBJECT (jitterbuffer,
1175           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1176
1177       JBUF_LOCK (priv);
1178       /* adjust the overall buffer delay to the total pipeline latency in
1179        * buffering mode because if downstream consumes too fast (because of
1180        * large latency or queues, we would start rebuffering again. */
1181       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1182           RTP_JITTER_BUFFER_MODE_BUFFER) {
1183         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1184       }
1185       JBUF_UNLOCK (priv);
1186
1187       ret = gst_pad_push_event (priv->sinkpad, event);
1188       break;
1189     }
1190     default:
1191       ret = gst_pad_push_event (priv->sinkpad, event);
1192       break;
1193   }
1194
1195   return ret;
1196 }
1197
1198 static gboolean
1199 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1200     GstEvent * event)
1201 {
1202   gboolean ret = TRUE;
1203   GstRtpJitterBuffer *jitterbuffer;
1204   GstRtpJitterBufferPrivate *priv;
1205
1206   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1207   priv = jitterbuffer->priv;
1208
1209   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1210
1211   switch (GST_EVENT_TYPE (event)) {
1212     case GST_EVENT_CAPS:
1213     {
1214       GstCaps *caps;
1215
1216       gst_event_parse_caps (event, &caps);
1217
1218       JBUF_LOCK (priv);
1219       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1220       JBUF_UNLOCK (priv);
1221
1222       /* set same caps on srcpad on success */
1223       if (ret)
1224         ret = gst_pad_push_event (priv->srcpad, event);
1225       else
1226         gst_event_unref (event);
1227       break;
1228     }
1229     case GST_EVENT_SEGMENT:
1230     {
1231       gst_event_copy_segment (event, &priv->segment);
1232
1233       /* we need time for now */
1234       if (priv->segment.format != GST_FORMAT_TIME)
1235         goto newseg_wrong_format;
1236
1237       GST_DEBUG_OBJECT (jitterbuffer,
1238           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1239
1240       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1241       ret = gst_pad_push_event (priv->srcpad, event);
1242       break;
1243     }
1244     case GST_EVENT_FLUSH_START:
1245       ret = gst_pad_push_event (priv->srcpad, event);
1246       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1247       break;
1248     case GST_EVENT_FLUSH_STOP:
1249       ret = gst_pad_push_event (priv->srcpad, event);
1250       ret =
1251           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1252           GST_PAD_MODE_PUSH, TRUE);
1253       break;
1254     case GST_EVENT_EOS:
1255     {
1256       /* push EOS in queue. We always push it at the head */
1257       JBUF_LOCK (priv);
1258       /* check for flushing, we need to discard the event and return FALSE when
1259        * we are flushing */
1260       ret = priv->srcresult == GST_FLOW_OK;
1261       if (ret && !priv->eos) {
1262         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1263         priv->eos = TRUE;
1264         JBUF_SIGNAL_EVENT (priv);
1265       } else if (priv->eos) {
1266         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1267       } else {
1268         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1269             gst_flow_get_name (priv->srcresult));
1270       }
1271       JBUF_UNLOCK (priv);
1272       gst_event_unref (event);
1273       break;
1274     }
1275     default:
1276       ret = gst_pad_push_event (priv->srcpad, event);
1277       break;
1278   }
1279
1280 done:
1281
1282   return ret;
1283
1284   /* ERRORS */
1285 newseg_wrong_format:
1286   {
1287     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1288     ret = FALSE;
1289     gst_event_unref (event);
1290     goto done;
1291   }
1292 }
1293
1294 static gboolean
1295 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1296     GstEvent * event)
1297 {
1298   gboolean ret = TRUE;
1299   GstRtpJitterBuffer *jitterbuffer;
1300
1301   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1302
1303   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1304
1305   switch (GST_EVENT_TYPE (event)) {
1306     case GST_EVENT_FLUSH_START:
1307       gst_event_unref (event);
1308       break;
1309     case GST_EVENT_FLUSH_STOP:
1310       gst_event_unref (event);
1311       break;
1312     default:
1313       ret = gst_pad_event_default (pad, parent, event);
1314       break;
1315   }
1316
1317   return ret;
1318 }
1319
1320 /*
1321  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1322  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1323  * GST_FLOW_FLUSHING when the element is shutting down. On success
1324  * GST_FLOW_OK is returned.
1325  */
1326 static GstFlowReturn
1327 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1328     guint8 pt)
1329 {
1330   GValue ret = { 0 };
1331   GValue args[2] = { {0}, {0} };
1332   GstCaps *caps;
1333   gboolean res;
1334
1335   g_value_init (&args[0], GST_TYPE_ELEMENT);
1336   g_value_set_object (&args[0], jitterbuffer);
1337   g_value_init (&args[1], G_TYPE_UINT);
1338   g_value_set_uint (&args[1], pt);
1339
1340   g_value_init (&ret, GST_TYPE_CAPS);
1341   g_value_set_boxed (&ret, NULL);
1342
1343   JBUF_UNLOCK (jitterbuffer->priv);
1344   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1345       &ret);
1346   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1347
1348   g_value_unset (&args[0]);
1349   g_value_unset (&args[1]);
1350   caps = (GstCaps *) g_value_dup_boxed (&ret);
1351   g_value_unset (&ret);
1352   if (!caps)
1353     goto no_caps;
1354
1355   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1356   gst_caps_unref (caps);
1357
1358   if (G_UNLIKELY (!res))
1359     goto parse_failed;
1360
1361   return GST_FLOW_OK;
1362
1363   /* ERRORS */
1364 no_caps:
1365   {
1366     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1367     return GST_FLOW_ERROR;
1368   }
1369 out_flushing:
1370   {
1371     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1372     return GST_FLOW_FLUSHING;
1373   }
1374 parse_failed:
1375   {
1376     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1377     return GST_FLOW_ERROR;
1378   }
1379 }
1380
1381 /* call with jbuf lock held */
1382 static void
1383 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1384 {
1385   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1386
1387   /* too short a stream, or too close to EOS will never really fill buffer */
1388   if (*percent != -1 && priv->npt_stop != -1 &&
1389       priv->npt_stop - priv->npt_start <=
1390       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1391     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1392     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1393     *percent = 100;
1394   }
1395 }
1396
1397 static void
1398 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1399 {
1400   GstMessage *message;
1401
1402   /* Post a buffering message */
1403   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1404   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1405
1406   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1407 }
1408
1409 static GstClockTime
1410 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1411 {
1412   GstRtpJitterBufferPrivate *priv;
1413
1414   priv = jitterbuffer->priv;
1415
1416   if (timestamp == -1)
1417     return -1;
1418
1419   /* apply the timestamp offset, this is used for inter stream sync */
1420   timestamp += priv->ts_offset;
1421   /* add the offset, this is used when buffering */
1422   timestamp += priv->out_offset;
1423
1424   return timestamp;
1425 }
1426
1427 static TimerData *
1428 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1429 {
1430   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1431   TimerData *timer = NULL;
1432   gint i, len;
1433
1434   len = priv->timers->len;
1435   for (i = 0; i < len; i++) {
1436     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1437     if (test->seqnum == seqnum && test->type == type) {
1438       timer = test;
1439       break;
1440     }
1441   }
1442   return timer;
1443 }
1444
1445 static void
1446 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1447 {
1448   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1449
1450   if (priv->clock_id) {
1451     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1452     gst_clock_id_unschedule (priv->clock_id);
1453   }
1454 }
1455
1456 static GstClockTime
1457 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1458 {
1459   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1460   GstClockTime test_timeout;
1461
1462   if ((test_timeout = timer->timeout) == -1)
1463     return -1;
1464
1465   if (timer->type != TIMER_TYPE_EXPECTED) {
1466     /* add our latency and offset to get output times. */
1467     test_timeout = apply_offset (jitterbuffer, test_timeout);
1468     test_timeout += priv->latency_ns;
1469   }
1470   return test_timeout;
1471 }
1472
1473 static void
1474 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1475 {
1476   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1477
1478   if (priv->clock_id) {
1479     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1480
1481     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1482         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1483
1484     if (timeout == -1 || timeout < priv->timer_timeout)
1485       unschedule_current_timer (jitterbuffer);
1486   }
1487 }
1488
1489 static TimerData *
1490 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1491     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1492     GstClockTime duration)
1493 {
1494   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1495   TimerData *timer;
1496   gint len;
1497
1498   GST_DEBUG_OBJECT (jitterbuffer,
1499       "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1500       GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
1501
1502   len = priv->timers->len;
1503   g_array_set_size (priv->timers, len + 1);
1504   timer = &g_array_index (priv->timers, TimerData, len);
1505   timer->idx = len;
1506   timer->type = type;
1507   timer->seqnum = seqnum;
1508   timer->num = num;
1509   timer->timeout = timeout + delay;
1510   timer->duration = duration;
1511   if (type == TIMER_TYPE_EXPECTED) {
1512     timer->rtx_base = timeout;
1513     timer->rtx_delay = delay;
1514     timer->rtx_retry = 0;
1515   }
1516   recalculate_timer (jitterbuffer, timer);
1517   JBUF_SIGNAL_TIMER (priv);
1518
1519   return timer;
1520 }
1521
1522 static void
1523 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1524     guint16 seqnum, GstClockTime timeout, GstClockTime delay)
1525 {
1526   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1527   gboolean seqchange, timechange;
1528   guint16 oldseq;
1529
1530   seqchange = timer->seqnum != seqnum;
1531   timechange = timer->timeout != timeout;
1532
1533   if (!seqchange && !timechange)
1534     return;
1535
1536   oldseq = timer->seqnum;
1537
1538   GST_DEBUG_OBJECT (jitterbuffer,
1539       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1540       oldseq, seqnum, GST_TIME_ARGS (timeout));
1541
1542   timer->timeout = timeout + delay;
1543   timer->seqnum = seqnum;
1544   if (seqchange && timer->type == TIMER_TYPE_EXPECTED) {
1545     timer->rtx_base = timeout;
1546     timer->rtx_delay = delay;
1547     timer->rtx_retry = 0;
1548   }
1549
1550   if (priv->clock_id) {
1551     /* we changed the seqnum and there is a timer currently waiting with this
1552      * seqnum, unschedule it */
1553     if (seqchange && priv->timer_seqnum == oldseq)
1554       unschedule_current_timer (jitterbuffer);
1555     /* we changed the time, check if it is earlier than what we are waiting
1556      * for and unschedule if so */
1557     else if (timechange)
1558       recalculate_timer (jitterbuffer, timer);
1559   }
1560 }
1561
1562 static TimerData *
1563 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1564     guint16 seqnum, GstClockTime timeout)
1565 {
1566   TimerData *timer;
1567
1568   /* find the seqnum timer */
1569   timer = find_timer (jitterbuffer, type, seqnum);
1570   if (timer == NULL) {
1571     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1572   } else {
1573     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0);
1574   }
1575   return timer;
1576 }
1577
1578 static void
1579 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1580 {
1581   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1582   guint idx;
1583
1584   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1585     unschedule_current_timer (jitterbuffer);
1586
1587   idx = timer->idx;
1588   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1589   g_array_remove_index_fast (priv->timers, idx);
1590   timer->idx = idx;
1591 }
1592
1593 static void
1594 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1595 {
1596   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1597   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1598   g_array_set_size (priv->timers, 0);
1599   unschedule_current_timer (jitterbuffer);
1600 }
1601
1602 /* we just received a packet with seqnum and dts.
1603  *
1604  * First check for old seqnum that we are still expecting. If the gap with the
1605  * current timestamp is too big, unschedule the timeouts.
1606  *
1607  * If we have a valid packet spacing estimate we can set a timer for when we
1608  * should receive the next packet.
1609  * If we don't have a valid estimate, we remove any timer we might have
1610  * had for this packet.
1611  */
1612 static void
1613 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1614     GstClockTime dts, gboolean do_next_seqnum)
1615 {
1616   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1617   TimerData *timer = NULL;
1618   gint i, len;
1619
1620   /* go through all timers and unschedule the ones with a large gap, also find
1621    * the timer for the seqnum */
1622   len = priv->timers->len;
1623   for (i = 0; i < len; i++) {
1624     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1625     gint gap;
1626
1627     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1628
1629     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1630         test->seqnum, seqnum, gap);
1631
1632     if (gap == 0) {
1633       GST_DEBUG ("found timer for current seqnum");
1634       /* the timer for the current seqnum */
1635       timer = test;
1636     } else if (gap > priv->rtx_delay_reorder) {
1637       /* max gap, we exceeded the max reorder distance and we don't expect the
1638        * missing packet to be this reordered */
1639       if (test->rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1640         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0);
1641     }
1642   }
1643
1644   if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1645     GstClockTime expected, delay;
1646
1647     /* calculate expected arrival time of the next seqnum */
1648     expected = dts + priv->packet_spacing;
1649     delay = priv->rtx_delay * GST_MSECOND;
1650
1651     /* and update/install timer for next seqnum */
1652     if (timer)
1653       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1654           delay);
1655     else
1656       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1657           expected, delay, priv->packet_spacing);
1658   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1659     /* if we had a timer, remove it, we don't know when to expect the next
1660      * packet. */
1661     remove_timer (jitterbuffer, timer);
1662     JBUF_SIGNAL_EVENT (priv);
1663   }
1664 }
1665
1666 static void
1667 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1668     GstClockTime dts)
1669 {
1670   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1671
1672   /* we need consecutive seqnums with a different
1673    * rtptime to estimate the packet spacing. */
1674   if (priv->ips_rtptime != rtptime) {
1675     /* rtptime changed, check dts diff */
1676     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1677       priv->packet_spacing = dts - priv->ips_dts;
1678       GST_DEBUG_OBJECT (jitterbuffer,
1679           "new packet spacing %" GST_TIME_FORMAT,
1680           GST_TIME_ARGS (priv->packet_spacing));
1681     }
1682     priv->ips_rtptime = rtptime;
1683     priv->ips_dts = dts;
1684   }
1685 }
1686
1687 static void
1688 send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
1689     guint lost_packets, GstClockTime timestamp, GstClockTime duration,
1690     gboolean late)
1691 {
1692   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1693
1694   /* we had a gap and thus we lost some packets. Create an event for this.  */
1695   if (lost_packets > 1)
1696     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
1697         seqnum + lost_packets - 1);
1698   else
1699     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
1700
1701   priv->num_late += lost_packets;
1702   priv->discont = TRUE;
1703
1704   /* update our expected next packet but make sure the seqnum increases */
1705   if (seqnum + lost_packets > priv->next_seqnum) {
1706     priv->next_seqnum = (seqnum + lost_packets) & 0xffff;
1707     priv->last_popped_seqnum = seqnum;
1708     priv->last_out_time = timestamp;
1709   }
1710   if (priv->do_lost) {
1711     GstEvent *event;
1712
1713     /* create paket lost event */
1714     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1715         gst_structure_new ("GstRTPPacketLost",
1716             "seqnum", G_TYPE_UINT, (guint) seqnum,
1717             "timestamp", G_TYPE_UINT64, timestamp,
1718             "duration", G_TYPE_UINT64, duration,
1719             "late", G_TYPE_BOOLEAN, late, NULL));
1720     JBUF_UNLOCK (priv);
1721     gst_pad_push_event (priv->srcpad, event);
1722     JBUF_LOCK (priv);
1723   }
1724 }
1725
1726 static void
1727 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1728     guint16 seqnum, GstClockTime dts, gint gap)
1729 {
1730   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1731   GstClockTime total_duration, duration, expected_dts;
1732   TimerType type;
1733
1734   GST_DEBUG_OBJECT (jitterbuffer,
1735       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1736       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1737
1738   /* the total duration spanned by the missing packets */
1739   if (dts >= priv->last_in_dts)
1740     total_duration = dts - priv->last_in_dts;
1741   else
1742     total_duration = 0;
1743
1744   /* interpolate between the current time and the last time based on
1745    * number of packets we are missing, this is the estimated duration
1746    * for the missing packet based on equidistant packet spacing. */
1747   duration = total_duration / (gap + 1);
1748
1749   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1750       GST_TIME_ARGS (duration));
1751
1752   if (total_duration > priv->latency_ns) {
1753     GstClockTime gap_time;
1754     guint lost_packets;
1755
1756     gap_time = total_duration - priv->latency_ns;
1757
1758     if (duration > 0) {
1759       lost_packets = gap_time / duration;
1760       gap_time = lost_packets * duration;
1761     } else {
1762       lost_packets = gap;
1763     }
1764
1765     /* too many lost packets, some of the missing packets are already
1766      * too late and we can generate lost packet events for them. */
1767     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
1768         " > %" GST_TIME_FORMAT ", consider %u lost",
1769         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
1770         lost_packets);
1771
1772     /* this timer will fire immediately and the lost event will be pushed from
1773      * the timer thread */
1774     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
1775         priv->last_in_dts + duration, 0, gap_time);
1776
1777     expected += lost_packets;
1778     priv->last_in_dts += gap_time;
1779   }
1780
1781   expected_dts = priv->last_in_dts + duration;
1782
1783   if (priv->do_retransmission) {
1784     type = TIMER_TYPE_EXPECTED;
1785     /* if we had a timer for the first missing packet, leave it. */
1786     if (find_timer (jitterbuffer, type, expected))
1787       expected++;
1788   } else {
1789     type = TIMER_TYPE_LOST;
1790   }
1791
1792   while (expected < seqnum) {
1793     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
1794     expected_dts += duration;
1795     expected++;
1796   }
1797 }
1798
1799 static GstFlowReturn
1800 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1801     GstBuffer * buffer)
1802 {
1803   GstRtpJitterBuffer *jitterbuffer;
1804   GstRtpJitterBufferPrivate *priv;
1805   guint16 seqnum;
1806   guint32 expected, rtptime;
1807   GstFlowReturn ret = GST_FLOW_OK;
1808   GstClockTime dts, pts;
1809   guint64 latency_ts;
1810   gboolean tail;
1811   gint percent = -1;
1812   guint8 pt;
1813   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1814   gboolean do_next_seqnum = FALSE;
1815
1816   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1817
1818   priv = jitterbuffer->priv;
1819
1820   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1821     goto invalid_buffer;
1822
1823   pt = gst_rtp_buffer_get_payload_type (&rtp);
1824   seqnum = gst_rtp_buffer_get_seq (&rtp);
1825   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1826   gst_rtp_buffer_unmap (&rtp);
1827
1828   /* make sure we have PTS and DTS set */
1829   pts = GST_BUFFER_PTS (buffer);
1830   dts = GST_BUFFER_DTS (buffer);
1831   if (dts == -1)
1832     dts = pts;
1833   else if (pts == -1)
1834     pts = dts;
1835
1836   /* take the DTS of the buffer. This is the time when the packet was
1837    * received and is used to calculate jitter and clock skew. We will adjust
1838    * this DTS with the smoothed value after processing it in the
1839    * jitterbuffer and assign it as the PTS. */
1840   /* bring to running time */
1841   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1842
1843   GST_DEBUG_OBJECT (jitterbuffer,
1844       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
1845       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
1846
1847   JBUF_LOCK_CHECK (priv, out_flushing);
1848
1849   if (G_UNLIKELY (priv->last_pt != pt)) {
1850     GstCaps *caps;
1851
1852     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1853         pt);
1854
1855     priv->last_pt = pt;
1856     /* reset clock-rate so that we get a new one */
1857     priv->clock_rate = -1;
1858
1859     /* Try to get the clock-rate from the caps first if we can. If there are no
1860      * caps we must fire the signal to get the clock-rate. */
1861     if ((caps = gst_pad_get_current_caps (pad))) {
1862       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1863       gst_caps_unref (caps);
1864     }
1865   }
1866
1867   if (G_UNLIKELY (priv->clock_rate == -1)) {
1868     /* no clock rate given on the caps, try to get one with the signal */
1869     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1870             pt) == GST_FLOW_FLUSHING)
1871       goto out_flushing;
1872
1873     if (G_UNLIKELY (priv->clock_rate == -1))
1874       goto no_clock_rate;
1875   }
1876
1877   /* don't accept more data on EOS */
1878   if (G_UNLIKELY (priv->eos))
1879     goto have_eos;
1880
1881   expected = priv->next_in_seqnum;
1882
1883   /* now check against our expected seqnum */
1884   if (G_LIKELY (expected != -1)) {
1885     gint gap;
1886
1887     /* now calculate gap */
1888     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
1889
1890     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1891         expected, seqnum, gap);
1892
1893     if (G_LIKELY (gap == 0)) {
1894       /* packet is expected */
1895       calculate_packet_spacing (jitterbuffer, rtptime, dts);
1896       do_next_seqnum = TRUE;
1897     } else {
1898       gboolean reset = FALSE;
1899
1900       if (gap < 0) {
1901         /* we received an old packet */
1902         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
1903           /* too old packet, reset */
1904           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
1905               -RTP_MAX_MISORDER);
1906           reset = TRUE;
1907         } else {
1908           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
1909         }
1910       } else {
1911         /* new packet, we are missing some packets */
1912         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1913           /* packet too far in future, reset */
1914           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
1915               RTP_MAX_DROPOUT);
1916           reset = TRUE;
1917         } else {
1918           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
1919           /* fill in the gap with EXPECTED timers */
1920           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
1921
1922           do_next_seqnum = TRUE;
1923         }
1924       }
1925       if (G_UNLIKELY (reset)) {
1926         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1927         rtp_jitter_buffer_flush (priv->jbuf);
1928         rtp_jitter_buffer_reset_skew (priv->jbuf);
1929         remove_all_timers (jitterbuffer);
1930         priv->last_popped_seqnum = -1;
1931         priv->next_seqnum = seqnum;
1932         do_next_seqnum = TRUE;
1933       }
1934       /* reset spacing estimation when gap */
1935       priv->ips_rtptime = -1;
1936       priv->ips_dts = GST_CLOCK_TIME_NONE;
1937     }
1938   } else {
1939     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
1940     /* we don't know what the next_in_seqnum should be, wait for the last
1941      * possible moment to push this buffer, maybe we get an earlier seqnum
1942      * while we wait */
1943     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
1944     do_next_seqnum = TRUE;
1945     /* take rtptime and dts to calculate packet spacing */
1946     priv->ips_rtptime = rtptime;
1947     priv->ips_dts = dts;
1948   }
1949   if (do_next_seqnum) {
1950     priv->last_in_seqnum = seqnum;
1951     priv->last_in_dts = dts;
1952     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1953   }
1954
1955   /* let's check if this buffer is too late, we can only accept packets with
1956    * bigger seqnum than the one we last pushed. */
1957   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1958     gint gap;
1959
1960     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1961
1962     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1963     if (G_UNLIKELY (gap <= 0))
1964       goto too_late;
1965   }
1966
1967   /* let's drop oldest packet if the queue is already full and drop-on-latency
1968    * is set. We can only do this when there actually is a latency. When no
1969    * latency is set, we just pump it in the queue and let the other end push it
1970    * out as fast as possible. */
1971   if (priv->latency_ms && priv->drop_on_latency) {
1972     latency_ts =
1973         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1974
1975     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1976       GstBuffer *old_buf;
1977
1978       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1979
1980       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1981           old_buf);
1982
1983       gst_buffer_unref (old_buf);
1984     }
1985   }
1986
1987   /* we need to make the metadata writable before pushing it in the jitterbuffer
1988    * because the jitterbuffer will update the PTS */
1989   buffer = gst_buffer_make_writable (buffer);
1990   GST_BUFFER_DTS (buffer) = dts;
1991   GST_BUFFER_PTS (buffer) = pts;
1992
1993   /* now insert the packet into the queue in sorted order. This function returns
1994    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1995    * have a duplicate. */
1996   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
1997               priv->clock_rate, &tail, &percent)))
1998     goto duplicate;
1999
2000   /* update timers */
2001   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2002
2003   /* we had an unhandled SR, handle it now */
2004   if (priv->last_sr)
2005     do_handle_sync (jitterbuffer);
2006
2007   /* signal addition of new buffer when the _loop is waiting. */
2008   if (priv->active && priv->waiting_timer)
2009     JBUF_SIGNAL_EVENT (priv);
2010
2011   /* let's unschedule and unblock any waiting buffers. We only want to do this
2012    * when the tail buffer changed */
2013   if (G_UNLIKELY (priv->clock_id && tail)) {
2014     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2015     unschedule_current_timer (jitterbuffer);
2016   }
2017
2018   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
2019       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
2020
2021   check_buffering_percent (jitterbuffer, &percent);
2022
2023 finished:
2024   JBUF_UNLOCK (priv);
2025
2026   if (percent != -1)
2027     post_buffering_percent (jitterbuffer, percent);
2028
2029   return ret;
2030
2031   /* ERRORS */
2032 invalid_buffer:
2033   {
2034     /* this is not fatal but should be filtered earlier */
2035     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2036         ("Received invalid RTP payload, dropping"));
2037     gst_buffer_unref (buffer);
2038     return GST_FLOW_OK;
2039   }
2040 no_clock_rate:
2041   {
2042     GST_WARNING_OBJECT (jitterbuffer,
2043         "No clock-rate in caps!, dropping buffer");
2044     gst_buffer_unref (buffer);
2045     goto finished;
2046   }
2047 out_flushing:
2048   {
2049     ret = priv->srcresult;
2050     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2051     gst_buffer_unref (buffer);
2052     goto finished;
2053   }
2054 have_eos:
2055   {
2056     ret = GST_FLOW_EOS;
2057     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2058     gst_buffer_unref (buffer);
2059     goto finished;
2060   }
2061 too_late:
2062   {
2063     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2064         " popped, dropping", seqnum, priv->last_popped_seqnum);
2065     priv->num_late++;
2066     gst_buffer_unref (buffer);
2067     goto finished;
2068   }
2069 duplicate:
2070   {
2071     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2072         seqnum);
2073     priv->num_duplicates++;
2074     gst_buffer_unref (buffer);
2075     goto finished;
2076   }
2077 }
2078
2079 static GstClockTime
2080 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2081 {
2082   guint64 ext_time, elapsed;
2083   guint32 rtp_time;
2084   GstRtpJitterBufferPrivate *priv;
2085   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2086
2087   priv = jitterbuffer->priv;
2088   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2089   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
2090   gst_rtp_buffer_unmap (&rtp);
2091
2092   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2093       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2094
2095   if (rtp_time < priv->ext_timestamp) {
2096     ext_time = priv->ext_timestamp;
2097   } else {
2098     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2099   }
2100
2101   if (ext_time > priv->clock_base)
2102     elapsed = ext_time - priv->clock_base;
2103   else
2104     elapsed = 0;
2105
2106   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2107   return elapsed;
2108 }
2109
2110 static void
2111 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2112 {
2113   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2114
2115   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2116       && priv->clock_base != -1 && priv->clock_rate > 0) {
2117     guint64 elapsed, estimated;
2118
2119     elapsed = compute_elapsed (jitterbuffer, outbuf);
2120
2121     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2122       guint64 left;
2123       GstClockTime out_time;
2124
2125       priv->last_elapsed = elapsed;
2126
2127       left = priv->npt_stop - priv->npt_start;
2128       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2129           GST_TIME_ARGS (left));
2130
2131       out_time = GST_BUFFER_DTS (outbuf);
2132
2133       if (elapsed > 0)
2134         estimated = gst_util_uint64_scale (out_time, left, elapsed);
2135       else {
2136         /* if there is almost nothing left,
2137          * we may never advance enough to end up in the above case */
2138         if (left < GST_SECOND)
2139           estimated = GST_SECOND;
2140         else
2141           estimated = -1;
2142       }
2143
2144       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2145           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2146
2147       if (estimated != -1 && priv->estimated_eos != estimated) {
2148         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2149         priv->estimated_eos = estimated;
2150       }
2151     }
2152   }
2153 }
2154
2155 /* take a buffer from the queue and push it */
2156 static GstFlowReturn
2157 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2158 {
2159   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2160   GstFlowReturn result;
2161   GstBuffer *outbuf;
2162   GstClockTime dts, pts;
2163   gint percent = -1;
2164
2165   /* when we get here we are ready to pop and push the buffer */
2166   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2167
2168   check_buffering_percent (jitterbuffer, &percent);
2169
2170   if (G_UNLIKELY (priv->discont)) {
2171     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2172      * into the jitterbuffer so we can modify now. */
2173     GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2174     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2175     priv->discont = FALSE;
2176   }
2177   if (G_UNLIKELY (priv->ts_discont)) {
2178     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2179     priv->ts_discont = FALSE;
2180   }
2181
2182   dts = GST_BUFFER_DTS (outbuf);
2183   pts = GST_BUFFER_PTS (outbuf);
2184
2185   dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
2186   pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
2187
2188   /* apply timestamp with offset to buffer now */
2189   GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2190   GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2191
2192   /* update the elapsed time when we need to check against the npt stop time. */
2193   update_estimated_eos (jitterbuffer, outbuf);
2194
2195   /* now we are ready to push the buffer. Save the seqnum and release the lock
2196    * so the other end can push stuff in the queue again. */
2197   priv->last_popped_seqnum = seqnum;
2198   priv->last_out_time = GST_BUFFER_PTS (outbuf);
2199   priv->next_seqnum = (seqnum + 1) & 0xffff;
2200   JBUF_UNLOCK (priv);
2201
2202   if (percent != -1)
2203     post_buffering_percent (jitterbuffer, percent);
2204
2205   /* push buffer */
2206   GST_DEBUG_OBJECT (jitterbuffer,
2207       "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2208       seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2209       GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2210
2211   result = gst_pad_push (priv->srcpad, outbuf);
2212
2213   JBUF_LOCK_CHECK (priv, out_flushing);
2214
2215   return result;
2216
2217   /* ERRORS */
2218 out_flushing:
2219   {
2220     return priv->srcresult;
2221   }
2222 }
2223
2224 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2225
2226 /* Peek a buffer and compare the seqnum to the expected seqnum.
2227  * If all is fine, the buffer is pushed.
2228  * If something is wrong, we wait for some event
2229  */
2230 static GstFlowReturn
2231 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2232 {
2233   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2234   GstFlowReturn result = GST_FLOW_OK;
2235   GstBuffer *outbuf;
2236   guint16 seqnum;
2237   guint32 next_seqnum;
2238   gint gap;
2239   GstRTPBuffer rtp = { NULL, };
2240
2241   /* only push buffers when PLAYING and active and not buffering */
2242   if (priv->blocked || !priv->active ||
2243       rtp_jitter_buffer_is_buffering (priv->jbuf))
2244     return GST_FLOW_WAIT;
2245
2246 again:
2247   /* peek a buffer, we're just looking at the sequence number.
2248    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2249    * wait for a timeout or something to change.
2250    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2251   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
2252   if (outbuf == NULL)
2253     goto wait;
2254
2255   /* get the seqnum and the next expected seqnum */
2256   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2257   seqnum = gst_rtp_buffer_get_seq (&rtp);
2258   gst_rtp_buffer_unmap (&rtp);
2259
2260   next_seqnum = priv->next_seqnum;
2261
2262   /* get the gap between this and the previous packet. If we don't know the
2263    * previous packet seqnum assume no gap. */
2264   if (G_UNLIKELY (next_seqnum == -1)) {
2265     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2266     /* we don't know what the next_seqnum should be, the chain function should
2267      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2268      * fires, so wait for that */
2269     result = GST_FLOW_WAIT;
2270   } else {
2271     /* else calculate GAP */
2272     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2273
2274     if (G_LIKELY (gap == 0)) {
2275       /* no missing packet, pop and push */
2276       result = pop_and_push_next (jitterbuffer, seqnum);
2277     } else if (G_UNLIKELY (gap < 0)) {
2278       /* if we have a packet that we already pushed or considered dropped, pop it
2279        * off and get the next packet */
2280       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2281           seqnum, next_seqnum);
2282       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2283       gst_buffer_unref (outbuf);
2284       goto again;
2285     } else {
2286       /* the chain function has scheduled timers to request retransmission or
2287        * when to consider the packet lost, wait for that */
2288       GST_DEBUG_OBJECT (jitterbuffer,
2289           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2290           next_seqnum, seqnum, gap);
2291       result = GST_FLOW_WAIT;
2292     }
2293   }
2294   return result;
2295
2296 wait:
2297   {
2298     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2299     if (priv->eos)
2300       result = GST_FLOW_EOS;
2301     else
2302       result = GST_FLOW_WAIT;
2303     return result;
2304   }
2305 }
2306
2307 /* the timeout for when we expected a packet expired */
2308 static gboolean
2309 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2310     GstClockTime now)
2311 {
2312   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2313   GstEvent *event;
2314
2315   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2316
2317   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2318       gst_structure_new ("GstRTPRetransmissionRequest",
2319           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2320           "running-time", G_TYPE_UINT64, timer->rtx_base,
2321           "delay", G_TYPE_UINT,
2322           GST_TIME_AS_MSECONDS (timer->rtx_delay + timer->rtx_retry),
2323           "frequency", G_TYPE_UINT, priv->rtx_retry_timeout, "period",
2324           G_TYPE_UINT, priv->rtx_retry_period, "deadline", G_TYPE_UINT,
2325           priv->latency_ms, "packet-spacing", G_TYPE_UINT64,
2326           priv->packet_spacing, NULL));
2327
2328   JBUF_UNLOCK (priv);
2329   gst_pad_push_event (priv->sinkpad, event);
2330   JBUF_LOCK (priv);
2331
2332   /* calculate the timeout for the next retransmission attempt */
2333   timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2334   if (timer->rtx_retry + timer->rtx_delay >
2335       (priv->rtx_retry_period * GST_MSECOND)) {
2336     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2337     /* too many retransmission request, we now convert the timer
2338      * to a lost timer */
2339     timer->type = TIMER_TYPE_LOST;
2340     timer->rtx_delay = 0;
2341     timer->rtx_retry = 0;
2342   }
2343   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2344       timer->rtx_base + timer->rtx_retry, timer->rtx_delay);
2345
2346   return FALSE;
2347 }
2348
2349 /* a packet is lost */
2350 static gboolean
2351 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2352     GstClockTime now)
2353 {
2354   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2355   GstClockTime duration, timestamp;
2356   guint seqnum, num;
2357   gboolean late;
2358
2359   seqnum = timer->seqnum;
2360   timestamp = apply_offset (jitterbuffer, timer->timeout);
2361   duration = timer->duration;
2362   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2363     duration = priv->packet_spacing;
2364   num = MAX (timer->num, 1);
2365   late = timer->num > 0;
2366
2367   /* remove timer now */
2368   remove_timer (jitterbuffer, timer);
2369   JBUF_SIGNAL_EVENT (priv);
2370
2371   send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
2372
2373   return TRUE;
2374 }
2375
2376 static gboolean
2377 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2378     GstClockTime now)
2379 {
2380   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2381
2382   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2383   remove_timer (jitterbuffer, timer);
2384   JBUF_SIGNAL_EVENT (priv);
2385
2386   return TRUE;
2387 }
2388
2389 static gboolean
2390 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2391     GstClockTime now)
2392 {
2393   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2394
2395   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2396
2397   priv->next_seqnum = timer->seqnum;
2398   remove_timer (jitterbuffer, timer);
2399   JBUF_SIGNAL_EVENT (priv);
2400
2401   return TRUE;
2402 }
2403
2404 static gboolean
2405 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2406     GstClockTime now)
2407 {
2408   gboolean removed = FALSE;
2409
2410   switch (timer->type) {
2411     case TIMER_TYPE_EXPECTED:
2412       removed = do_expected_timeout (jitterbuffer, timer, now);
2413       break;
2414     case TIMER_TYPE_LOST:
2415       removed = do_lost_timeout (jitterbuffer, timer, now);
2416       break;
2417     case TIMER_TYPE_DEADLINE:
2418       removed = do_deadline_timeout (jitterbuffer, timer, now);
2419       break;
2420     case TIMER_TYPE_EOS:
2421       removed = do_eos_timeout (jitterbuffer, timer, now);
2422       break;
2423   }
2424   return removed;
2425 }
2426
2427 /* called when we need to wait for the next timeout.
2428  *
2429  * We loop over the array of recorded timeouts and wait for the earliest one.
2430  * When it timed out, do the logic associated with the timer.
2431  *
2432  * If there are no timers, we wait on a gcond until something new happens.
2433  */
2434 static void
2435 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2436 {
2437   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2438   GstClockTime now = 0;
2439
2440   JBUF_LOCK (priv);
2441   while (priv->timer_running) {
2442     TimerData *timer = NULL;
2443     GstClockTime timer_timeout = -1;
2444     gint i, len;
2445
2446     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
2447         GST_TIME_ARGS (now));
2448
2449     len = priv->timers->len;
2450     for (i = 0; i < len; i++) {
2451       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2452       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2453
2454       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2455           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2456
2457       /* no timestamp, timeout immeditately */
2458       if (test_timeout == -1 || test_timeout <= now) {
2459         if (do_timeout (jitterbuffer, test, now))
2460           len--;
2461         i--;
2462       } else if (timer == NULL || test_timeout < timer_timeout) {
2463         /* find the smallest timeout */
2464         timer = test;
2465         timer_timeout = test_timeout;
2466       }
2467     }
2468     if (timer) {
2469       GstClock *clock;
2470       GstClockTime sync_time;
2471       GstClockID id;
2472       GstClockReturn ret;
2473       GstClockTimeDiff clock_jitter;
2474
2475       GST_OBJECT_LOCK (jitterbuffer);
2476       clock = GST_ELEMENT_CLOCK (jitterbuffer);
2477       if (!clock) {
2478         GST_OBJECT_UNLOCK (jitterbuffer);
2479         /* let's just push if there is no clock */
2480         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2481         now = timer_timeout;
2482         continue;
2483       }
2484
2485       /* prepare for sync against clock */
2486       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2487       /* add latency of peer to get input time */
2488       sync_time += priv->peer_latency;
2489
2490       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2491           " with sync time %" GST_TIME_FORMAT,
2492           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2493
2494       /* create an entry for the clock */
2495       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2496       priv->timer_timeout = timer_timeout;
2497       priv->timer_seqnum = timer->seqnum;
2498       GST_OBJECT_UNLOCK (jitterbuffer);
2499
2500       /* release the lock so that the other end can push stuff or unlock */
2501       JBUF_UNLOCK (priv);
2502
2503       ret = gst_clock_id_wait (id, &clock_jitter);
2504
2505       JBUF_LOCK (priv);
2506       if (!priv->timer_running)
2507         break;
2508
2509       if (ret != GST_CLOCK_UNSCHEDULED) {
2510         now = timer_timeout + MAX (clock_jitter, 0);
2511         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2512             ret, priv->timer_seqnum, clock_jitter);
2513       } else {
2514         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
2515       }
2516       /* and free the entry */
2517       gst_clock_id_unref (id);
2518       priv->clock_id = NULL;
2519     } else {
2520       /* no timers, wait for activity */
2521       GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2522       JBUF_WAIT_TIMER (priv);
2523       GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2524     }
2525   }
2526   JBUF_UNLOCK (priv);
2527
2528   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2529   return;
2530 }
2531
2532 /*
2533  * This funcion implements the main pushing loop on the source pad.
2534  *
2535  * It first tries to push as many buffers as possible. If there is a seqnum
2536  * mismatch, we wait for the next timeouts.
2537  */
2538 static void
2539 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2540 {
2541   GstRtpJitterBufferPrivate *priv;
2542   GstFlowReturn result;
2543
2544   priv = jitterbuffer->priv;
2545
2546   JBUF_LOCK_CHECK (priv, flushing);
2547   do {
2548     result = handle_next_buffer (jitterbuffer);
2549     if (G_LIKELY (result == GST_FLOW_WAIT)) {
2550       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
2551       /* now wait for the next event */
2552       JBUF_WAIT_EVENT (priv, flushing);
2553       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
2554       result = GST_FLOW_OK;
2555     }
2556   }
2557   while (result == GST_FLOW_OK);
2558   JBUF_UNLOCK (priv);
2559
2560   /* if we get here we need to pause */
2561   goto pause;
2562
2563   /* ERRORS */
2564 flushing:
2565   {
2566     result = priv->srcresult;
2567     JBUF_UNLOCK (priv);
2568     goto pause;
2569   }
2570 pause:
2571   {
2572     const gchar *reason = gst_flow_get_name (result);
2573     GstEvent *event;
2574
2575     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2576     gst_pad_pause_task (priv->srcpad);
2577     if (result == GST_FLOW_EOS) {
2578       event = gst_event_new_eos ();
2579       gst_pad_push_event (priv->srcpad, event);
2580     }
2581     return;
2582   }
2583 }
2584
2585 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2586  * some sanity checks and then emit the handle-sync signal with the parameters.
2587  * This function must be called with the LOCK */
2588 static void
2589 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2590 {
2591   GstRtpJitterBufferPrivate *priv;
2592   guint64 base_rtptime, base_time;
2593   guint32 clock_rate;
2594   guint64 last_rtptime;
2595   guint64 clock_base;
2596   guint64 ext_rtptime, diff;
2597   gboolean drop = FALSE;
2598
2599   priv = jitterbuffer->priv;
2600
2601   /* get the last values from the jitterbuffer */
2602   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2603       &clock_rate, &last_rtptime);
2604
2605   clock_base = priv->clock_base;
2606   ext_rtptime = priv->ext_rtptime;
2607
2608   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2609       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2610       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2611       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2612
2613   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2614     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2615     drop = TRUE;
2616   } else {
2617     /* we can't accept anything that happened before we did the last resync */
2618     if (base_rtptime > ext_rtptime) {
2619       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2620       drop = TRUE;
2621     } else {
2622       /* the SR RTP timestamp must be something close to what we last observed
2623        * in the jitterbuffer */
2624       if (ext_rtptime > last_rtptime) {
2625         /* check how far ahead it is to our RTP timestamps */
2626         diff = ext_rtptime - last_rtptime;
2627         /* if bigger than 1 second, we drop it */
2628         if (diff > clock_rate) {
2629           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2630           /* should drop this, but some RTSP servers end up with bogus
2631            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2632            * so still trigger rptbin sync but invalidate RTCP data
2633            * (sync might use other methods) */
2634           ext_rtptime = -1;
2635         }
2636         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2637             G_GUINT64_FORMAT, last_rtptime, diff);
2638       }
2639     }
2640   }
2641
2642   if (!drop) {
2643     GstStructure *s;
2644
2645     s = gst_structure_new ("application/x-rtp-sync",
2646         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2647         "base-time", G_TYPE_UINT64, base_time,
2648         "clock-rate", G_TYPE_UINT, clock_rate,
2649         "clock-base", G_TYPE_UINT64, clock_base,
2650         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2651         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2652
2653     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2654     gst_buffer_replace (&priv->last_sr, NULL);
2655     JBUF_UNLOCK (priv);
2656     g_signal_emit (jitterbuffer,
2657         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2658     JBUF_LOCK (priv);
2659     gst_structure_free (s);
2660   } else {
2661     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2662   }
2663 }
2664
2665 static GstFlowReturn
2666 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2667     GstBuffer * buffer)
2668 {
2669   GstRtpJitterBuffer *jitterbuffer;
2670   GstRtpJitterBufferPrivate *priv;
2671   GstFlowReturn ret = GST_FLOW_OK;
2672   guint32 ssrc;
2673   GstRTCPPacket packet;
2674   guint64 ext_rtptime;
2675   guint32 rtptime;
2676   GstRTCPBuffer rtcp = { NULL, };
2677
2678   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2679
2680   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2681     goto invalid_buffer;
2682
2683   priv = jitterbuffer->priv;
2684
2685   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2686
2687   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2688     goto empty_buffer;
2689
2690   /* first packet must be SR or RR or else the validate would have failed */
2691   switch (gst_rtcp_packet_get_type (&packet)) {
2692     case GST_RTCP_TYPE_SR:
2693       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2694           NULL, NULL);
2695       break;
2696     default:
2697       goto ignore_buffer;
2698   }
2699   gst_rtcp_buffer_unmap (&rtcp);
2700
2701   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2702
2703   JBUF_LOCK (priv);
2704   /* convert the RTP timestamp to our extended timestamp, using the same offset
2705    * we used in the jitterbuffer */
2706   ext_rtptime = priv->jbuf->ext_rtptime;
2707   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2708
2709   priv->ext_rtptime = ext_rtptime;
2710   gst_buffer_replace (&priv->last_sr, buffer);
2711
2712   do_handle_sync (jitterbuffer);
2713   JBUF_UNLOCK (priv);
2714
2715 done:
2716   gst_buffer_unref (buffer);
2717
2718   return ret;
2719
2720 invalid_buffer:
2721   {
2722     /* this is not fatal but should be filtered earlier */
2723     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2724         ("Received invalid RTCP payload, dropping"));
2725     ret = GST_FLOW_OK;
2726     goto done;
2727   }
2728 empty_buffer:
2729   {
2730     /* this is not fatal but should be filtered earlier */
2731     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2732         ("Received empty RTCP payload, dropping"));
2733     gst_rtcp_buffer_unmap (&rtcp);
2734     ret = GST_FLOW_OK;
2735     goto done;
2736   }
2737 ignore_buffer:
2738   {
2739     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2740     gst_rtcp_buffer_unmap (&rtcp);
2741     ret = GST_FLOW_OK;
2742     goto done;
2743   }
2744 }
2745
2746 static gboolean
2747 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2748     GstQuery * query)
2749 {
2750   gboolean res = FALSE;
2751
2752   switch (GST_QUERY_TYPE (query)) {
2753     case GST_QUERY_CAPS:
2754     {
2755       GstCaps *filter, *caps;
2756
2757       gst_query_parse_caps (query, &filter);
2758       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2759       gst_query_set_caps_result (query, caps);
2760       gst_caps_unref (caps);
2761       res = TRUE;
2762       break;
2763     }
2764     default:
2765       if (GST_QUERY_IS_SERIALIZED (query)) {
2766         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2767         res = FALSE;
2768       } else {
2769         res = gst_pad_query_default (pad, parent, query);
2770       }
2771       break;
2772   }
2773   return res;
2774 }
2775
2776 static gboolean
2777 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2778     GstQuery * query)
2779 {
2780   GstRtpJitterBuffer *jitterbuffer;
2781   GstRtpJitterBufferPrivate *priv;
2782   gboolean res = FALSE;
2783
2784   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2785   priv = jitterbuffer->priv;
2786
2787   switch (GST_QUERY_TYPE (query)) {
2788     case GST_QUERY_LATENCY:
2789     {
2790       /* We need to send the query upstream and add the returned latency to our
2791        * own */
2792       GstClockTime min_latency, max_latency;
2793       gboolean us_live;
2794       GstClockTime our_latency;
2795
2796       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2797         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2798
2799         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2800             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2801             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2802
2803         /* store this so that we can safely sync on the peer buffers. */
2804         JBUF_LOCK (priv);
2805         priv->peer_latency = min_latency;
2806         our_latency = priv->latency_ns;
2807         JBUF_UNLOCK (priv);
2808
2809         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2810             GST_TIME_ARGS (our_latency));
2811
2812         /* we add some latency but can buffer an infinite amount of time */
2813         min_latency += our_latency;
2814         max_latency = -1;
2815
2816         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2817             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2818             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2819
2820         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2821       }
2822       break;
2823     }
2824     case GST_QUERY_POSITION:
2825     {
2826       GstClockTime start, last_out;
2827       GstFormat fmt;
2828
2829       gst_query_parse_position (query, &fmt, NULL);
2830       if (fmt != GST_FORMAT_TIME) {
2831         res = gst_pad_query_default (pad, parent, query);
2832         break;
2833       }
2834
2835       JBUF_LOCK (priv);
2836       start = priv->npt_start;
2837       last_out = priv->last_out_time;
2838       JBUF_UNLOCK (priv);
2839
2840       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2841           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2842           GST_TIME_ARGS (last_out));
2843
2844       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2845         /* bring 0-based outgoing time to stream time */
2846         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2847         res = TRUE;
2848       } else {
2849         res = gst_pad_query_default (pad, parent, query);
2850       }
2851       break;
2852     }
2853     case GST_QUERY_CAPS:
2854     {
2855       GstCaps *filter, *caps;
2856
2857       gst_query_parse_caps (query, &filter);
2858       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2859       gst_query_set_caps_result (query, caps);
2860       gst_caps_unref (caps);
2861       res = TRUE;
2862       break;
2863     }
2864     default:
2865       res = gst_pad_query_default (pad, parent, query);
2866       break;
2867   }
2868
2869   return res;
2870 }
2871
2872 static void
2873 gst_rtp_jitter_buffer_set_property (GObject * object,
2874     guint prop_id, const GValue * value, GParamSpec * pspec)
2875 {
2876   GstRtpJitterBuffer *jitterbuffer;
2877   GstRtpJitterBufferPrivate *priv;
2878
2879   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2880   priv = jitterbuffer->priv;
2881
2882   switch (prop_id) {
2883     case PROP_LATENCY:
2884     {
2885       guint new_latency, old_latency;
2886
2887       new_latency = g_value_get_uint (value);
2888
2889       JBUF_LOCK (priv);
2890       old_latency = priv->latency_ms;
2891       priv->latency_ms = new_latency;
2892       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2893       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2894       JBUF_UNLOCK (priv);
2895
2896       /* post message if latency changed, this will inform the parent pipeline
2897        * that a latency reconfiguration is possible/needed. */
2898       if (new_latency != old_latency) {
2899         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2900             GST_TIME_ARGS (new_latency * GST_MSECOND));
2901
2902         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2903             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2904       }
2905       break;
2906     }
2907     case PROP_DROP_ON_LATENCY:
2908       JBUF_LOCK (priv);
2909       priv->drop_on_latency = g_value_get_boolean (value);
2910       JBUF_UNLOCK (priv);
2911       break;
2912     case PROP_TS_OFFSET:
2913       JBUF_LOCK (priv);
2914       priv->ts_offset = g_value_get_int64 (value);
2915       priv->ts_discont = TRUE;
2916       JBUF_UNLOCK (priv);
2917       break;
2918     case PROP_DO_LOST:
2919       JBUF_LOCK (priv);
2920       priv->do_lost = g_value_get_boolean (value);
2921       JBUF_UNLOCK (priv);
2922       break;
2923     case PROP_MODE:
2924       JBUF_LOCK (priv);
2925       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2926       JBUF_UNLOCK (priv);
2927       break;
2928     case PROP_DO_RETRANSMISSION:
2929       JBUF_LOCK (priv);
2930       priv->do_retransmission = g_value_get_boolean (value);
2931       JBUF_UNLOCK (priv);
2932       break;
2933     case PROP_RTX_DELAY:
2934       JBUF_LOCK (priv);
2935       priv->rtx_delay = g_value_get_int (value);
2936       JBUF_UNLOCK (priv);
2937       break;
2938     case PROP_RTX_DELAY_REORDER:
2939       JBUF_LOCK (priv);
2940       priv->rtx_delay_reorder = g_value_get_int (value);
2941       JBUF_UNLOCK (priv);
2942       break;
2943     case PROP_RTX_RETRY_TIMEOUT:
2944       JBUF_LOCK (priv);
2945       priv->rtx_retry_timeout = g_value_get_int (value);
2946       JBUF_UNLOCK (priv);
2947       break;
2948     case PROP_RTX_RETRY_PERIOD:
2949       JBUF_LOCK (priv);
2950       priv->rtx_retry_period = g_value_get_int (value);
2951       JBUF_UNLOCK (priv);
2952       break;
2953     default:
2954       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2955       break;
2956   }
2957 }
2958
2959 static void
2960 gst_rtp_jitter_buffer_get_property (GObject * object,
2961     guint prop_id, GValue * value, GParamSpec * pspec)
2962 {
2963   GstRtpJitterBuffer *jitterbuffer;
2964   GstRtpJitterBufferPrivate *priv;
2965
2966   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2967   priv = jitterbuffer->priv;
2968
2969   switch (prop_id) {
2970     case PROP_LATENCY:
2971       JBUF_LOCK (priv);
2972       g_value_set_uint (value, priv->latency_ms);
2973       JBUF_UNLOCK (priv);
2974       break;
2975     case PROP_DROP_ON_LATENCY:
2976       JBUF_LOCK (priv);
2977       g_value_set_boolean (value, priv->drop_on_latency);
2978       JBUF_UNLOCK (priv);
2979       break;
2980     case PROP_TS_OFFSET:
2981       JBUF_LOCK (priv);
2982       g_value_set_int64 (value, priv->ts_offset);
2983       JBUF_UNLOCK (priv);
2984       break;
2985     case PROP_DO_LOST:
2986       JBUF_LOCK (priv);
2987       g_value_set_boolean (value, priv->do_lost);
2988       JBUF_UNLOCK (priv);
2989       break;
2990     case PROP_MODE:
2991       JBUF_LOCK (priv);
2992       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2993       JBUF_UNLOCK (priv);
2994       break;
2995     case PROP_PERCENT:
2996     {
2997       gint percent;
2998
2999       JBUF_LOCK (priv);
3000       if (priv->srcresult != GST_FLOW_OK)
3001         percent = 100;
3002       else
3003         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3004
3005       g_value_set_int (value, percent);
3006       JBUF_UNLOCK (priv);
3007       break;
3008     }
3009     case PROP_DO_RETRANSMISSION:
3010       JBUF_LOCK (priv);
3011       g_value_set_boolean (value, priv->do_retransmission);
3012       JBUF_UNLOCK (priv);
3013       break;
3014     case PROP_RTX_DELAY:
3015       JBUF_LOCK (priv);
3016       g_value_set_int (value, priv->rtx_delay);
3017       JBUF_UNLOCK (priv);
3018       break;
3019     case PROP_RTX_DELAY_REORDER:
3020       JBUF_LOCK (priv);
3021       g_value_set_int (value, priv->rtx_delay_reorder);
3022       JBUF_UNLOCK (priv);
3023       break;
3024     case PROP_RTX_RETRY_TIMEOUT:
3025       JBUF_LOCK (priv);
3026       g_value_set_int (value, priv->rtx_retry_timeout);
3027       JBUF_UNLOCK (priv);
3028       break;
3029     case PROP_RTX_RETRY_PERIOD:
3030       JBUF_LOCK (priv);
3031       g_value_set_int (value, priv->rtx_retry_period);
3032       JBUF_UNLOCK (priv);
3033       break;
3034     default:
3035       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3036       break;
3037   }
3038 }