rtpjitterbuffer: add option to reset retransmission timers
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-gstrtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source. It will also wait for missing packets up to a
31  * configurable time limit using the #GstRtpJitterBuffer:latency property.
32  * Packets arriving too late are considered to be lost packets.
33  *
34  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
35  * to the pipeline.
36  *
37  * The element needs the clock-rate of the RTP payload in order to estimate the
38  * delay. This information is obtained either from the caps on the sink pad or,
39  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
40  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
41  *
42  * This element will automatically be used inside gstrtpbin.
43  *
44  * <refsect2>
45  * <title>Example pipelines</title>
46  * |[
47  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
48  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
49  * inserted into the pipeline to smooth out network jitter and to reorder the
50  * out-of-order RTP packets.
51  * </refsect2>
52  *
53  * Last reviewed on 2007-05-28 (0.10.5)
54  */
55
56 #ifdef HAVE_CONFIG_H
57 #include "config.h"
58 #endif
59
60 #include <stdlib.h>
61 #include <string.h>
62 #include <gst/rtp/gstrtpbuffer.h>
63
64 #include "gstrtpjitterbuffer.h"
65 #include "rtpjitterbuffer.h"
66 #include "rtpstats.h"
67
68 #include <gst/glib-compat-private.h>
69
70 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
71 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
72
73 /* RTPJitterBuffer signals and args */
74 enum
75 {
76   SIGNAL_REQUEST_PT_MAP,
77   SIGNAL_CLEAR_PT_MAP,
78   SIGNAL_HANDLE_SYNC,
79   SIGNAL_ON_NPT_STOP,
80   SIGNAL_SET_ACTIVE,
81   LAST_SIGNAL
82 };
83
84 #define DEFAULT_LATENCY_MS          200
85 #define DEFAULT_DROP_ON_LATENCY     FALSE
86 #define DEFAULT_TS_OFFSET           0
87 #define DEFAULT_DO_LOST             FALSE
88 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
89 #define DEFAULT_PERCENT             0
90 #define DEFAULT_DO_RETRANSMISSION   FALSE
91 #define DEFAULT_RTX_DELAY           20
92 #define DEFAULT_RTX_DELAY_REORDER   3
93 #define DEFAULT_RTX_RETRY_TIMEOUT   40
94 #define DEFAULT_RTX_RETRY_PERIOD    160
95
96 enum
97 {
98   PROP_0,
99   PROP_LATENCY,
100   PROP_DROP_ON_LATENCY,
101   PROP_TS_OFFSET,
102   PROP_DO_LOST,
103   PROP_MODE,
104   PROP_PERCENT,
105   PROP_DO_RETRANSMISSION,
106   PROP_RTX_DELAY,
107   PROP_RTX_DELAY_REORDER,
108   PROP_RTX_RETRY_TIMEOUT,
109   PROP_RTX_RETRY_PERIOD,
110   PROP_LAST
111 };
112
113 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
114
115 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
116   JBUF_LOCK (priv);                                   \
117   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
118     goto label;                                       \
119 } G_STMT_END
120 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
121
122 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
123   (priv)->waiting_timer = TRUE;                           \
124   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
125   (priv)->waiting_timer = FALSE;                          \
126 } G_STMT_END
127 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {    \
128   if (G_UNLIKELY ((priv)->waiting_timer))         \
129     g_cond_signal (&(priv)->jbuf_timer);          \
130 } G_STMT_END
131
132 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
133   (priv)->waiting_event = TRUE;                          \
134   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
135   (priv)->waiting_event = FALSE;                         \
136   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
137     goto label;                                          \
138 } G_STMT_END
139 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {    \
140   if (G_UNLIKELY ((priv)->waiting_event))         \
141     g_cond_signal (&(priv)->jbuf_event);          \
142 } G_STMT_END
143
144 struct _GstRtpJitterBufferPrivate
145 {
146   GstPad *sinkpad, *srcpad;
147   GstPad *rtcpsinkpad;
148
149   RTPJitterBuffer *jbuf;
150   GMutex jbuf_lock;
151   gboolean waiting_timer;
152   GCond jbuf_timer;
153   gboolean waiting_event;
154   GCond jbuf_event;
155   gboolean discont;
156   gboolean ts_discont;
157   gboolean active;
158   guint64 out_offset;
159
160   gboolean timer_running;
161   GThread *timer_thread;
162
163   /* properties */
164   guint latency_ms;
165   guint64 latency_ns;
166   gboolean drop_on_latency;
167   gint64 ts_offset;
168   gboolean do_lost;
169   gboolean do_retransmission;
170   gint rtx_delay;
171   gint rtx_delay_reorder;
172   gint rtx_retry_timeout;
173   gint rtx_retry_period;
174
175   /* the last seqnum we pushed out */
176   guint32 last_popped_seqnum;
177   /* the next expected seqnum we push */
178   guint32 next_seqnum;
179   /* last output time */
180   GstClockTime last_out_time;
181   /* last valid input timestamp and rtptime pair */
182   GstClockTime ips_dts;
183   guint64 ips_rtptime;
184   GstClockTime packet_spacing;
185
186   /* the next expected seqnum we receive */
187   GstClockTime last_in_dts;
188   guint32 last_in_seqnum;
189   guint32 next_in_seqnum;
190
191   GArray *timers;
192
193   /* start and stop ranges */
194   GstClockTime npt_start;
195   GstClockTime npt_stop;
196   guint64 ext_timestamp;
197   guint64 last_elapsed;
198   guint64 estimated_eos;
199   GstClockID eos_id;
200
201   /* state */
202   gboolean eos;
203
204   /* clock rate and rtp timestamp offset */
205   gint last_pt;
206   gint32 clock_rate;
207   gint64 clock_base;
208   gint64 prev_ts_offset;
209
210   /* when we are shutting down */
211   GstFlowReturn srcresult;
212   gboolean blocked;
213
214   /* for sync */
215   GstSegment segment;
216   GstClockID clock_id;
217   GstClockTime timer_timeout;
218   guint16 timer_seqnum;
219   /* the latency of the upstream peer, we have to take this into account when
220    * synchronizing the buffers. */
221   GstClockTime peer_latency;
222   guint64 ext_rtptime;
223   GstBuffer *last_sr;
224
225   /* some accounting */
226   guint64 num_late;
227   guint64 num_duplicates;
228 };
229
230 typedef enum
231 {
232   TIMER_TYPE_EXPECTED,
233   TIMER_TYPE_LOST,
234   TIMER_TYPE_DEADLINE,
235   TIMER_TYPE_EOS
236 } TimerType;
237
238 typedef struct
239 {
240   guint idx;
241   guint16 seqnum;
242   guint num;
243   TimerType type;
244   GstClockTime timeout;
245   GstClockTime duration;
246   GstClockTime rtx_base;
247   GstClockTime rtx_delay;
248   GstClockTime rtx_retry;
249 } TimerData;
250
251 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
252   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
253                                 GstRtpJitterBufferPrivate))
254
255 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
256 GST_STATIC_PAD_TEMPLATE ("sink",
257     GST_PAD_SINK,
258     GST_PAD_ALWAYS,
259     GST_STATIC_CAPS ("application/x-rtp, "
260         "clock-rate = (int) [ 1, 2147483647 ]"
261         /* "payload = (int) , "
262          * "encoding-name = (string) "
263          */ )
264     );
265
266 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
267 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
268     GST_PAD_SINK,
269     GST_PAD_REQUEST,
270     GST_STATIC_CAPS ("application/x-rtcp")
271     );
272
273 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
274 GST_STATIC_PAD_TEMPLATE ("src",
275     GST_PAD_SRC,
276     GST_PAD_ALWAYS,
277     GST_STATIC_CAPS ("application/x-rtp"
278         /* "payload = (int) , "
279          * "clock-rate = (int) , "
280          * "encoding-name = (string) "
281          */ )
282     );
283
284 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
285
286 #define gst_rtp_jitter_buffer_parent_class parent_class
287 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
288
289 /* object overrides */
290 static void gst_rtp_jitter_buffer_set_property (GObject * object,
291     guint prop_id, const GValue * value, GParamSpec * pspec);
292 static void gst_rtp_jitter_buffer_get_property (GObject * object,
293     guint prop_id, GValue * value, GParamSpec * pspec);
294 static void gst_rtp_jitter_buffer_finalize (GObject * object);
295
296 /* element overrides */
297 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
298     * element, GstStateChange transition);
299 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
300     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
301 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
302     GstPad * pad);
303 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
304
305 /* pad overrides */
306 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
307 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
308     GstObject * parent);
309
310 /* sinkpad overrides */
311 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
312     GstObject * parent, GstEvent * event);
313 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
314     GstObject * parent, GstBuffer * buffer);
315
316 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
317     GstObject * parent, GstEvent * event);
318 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
319     GstObject * parent, GstBuffer * buffer);
320
321 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
322     GstObject * parent, GstQuery * query);
323
324 /* srcpad overrides */
325 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
326     GstObject * parent, GstEvent * event);
327 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
328     GstObject * parent, GstPadMode mode, gboolean active);
329 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
330 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
331     GstObject * parent, GstQuery * query);
332
333 static void
334 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
335 static GstClockTime
336 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
337     gboolean active, guint64 base_time);
338 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
339
340 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
341 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
342
343 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
344
345 static void
346 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
347 {
348   GObjectClass *gobject_class;
349   GstElementClass *gstelement_class;
350
351   gobject_class = (GObjectClass *) klass;
352   gstelement_class = (GstElementClass *) klass;
353
354   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
355
356   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
357
358   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
359   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
360
361   /**
362    * GstRtpJitterBuffer::latency:
363    *
364    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
365    * for at most this time.
366    */
367   g_object_class_install_property (gobject_class, PROP_LATENCY,
368       g_param_spec_uint ("latency", "Buffer latency in ms",
369           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   /**
372    * GstRtpJitterBuffer::drop-on-latency:
373    *
374    * Drop oldest buffers when the queue is completely filled.
375    */
376   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
377       g_param_spec_boolean ("drop-on-latency",
378           "Drop buffers when maximum latency is reached",
379           "Tells the jitterbuffer to never exceed the given latency in size",
380           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
381   /**
382    * GstRtpJitterBuffer::ts-offset:
383    *
384    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
385    * This is mainly used to ensure interstream synchronisation.
386    */
387   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
388       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
389           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
390           G_MAXINT64, DEFAULT_TS_OFFSET,
391           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
392
393   /**
394    * GstRtpJitterBuffer::do-lost:
395    *
396    * Send out a GstRTPPacketLost event downstream when a packet is considered
397    * lost.
398    */
399   g_object_class_install_property (gobject_class, PROP_DO_LOST,
400       g_param_spec_boolean ("do-lost", "Do Lost",
401           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
402           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
403
404   /**
405    * GstRtpJitterBuffer::mode:
406    *
407    * Control the buffering and timestamping mode used by the jitterbuffer.
408    */
409   g_object_class_install_property (gobject_class, PROP_MODE,
410       g_param_spec_enum ("mode", "Mode",
411           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
412           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
413   /**
414    * GstRtpJitterBuffer::percent:
415    *
416    * The percent of the jitterbuffer that is filled.
417    *
418    * Since: 0.10.19
419    */
420   g_object_class_install_property (gobject_class, PROP_PERCENT,
421       g_param_spec_int ("percent", "percent",
422           "The buffer filled percent", 0, 100,
423           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
424   /**
425    * GstRtpJitterBuffer::do-retransmission:
426    *
427    * Send out a GstRTPRetransmission event upstream when a packet is considered
428    * late and should be retransmitted.
429    *
430    * Since: 1.2
431    */
432   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
433       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
434           "Send retransmission events upstream when a packet is late",
435           DEFAULT_DO_RETRANSMISSION,
436           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
437
438   /**
439    * GstRtpJitterBuffer::rtx-delay:
440    *
441    * When a packet did not arrive at the expected time, wait this extra amount
442    * of time before sending a retransmission event.
443    *
444    * When -1 is used, the max jitter will be used as extra delay.
445    *
446    * Since: 1.2
447    */
448   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
449       g_param_spec_int ("rtx-delay", "RTX Delay",
450           "Extra time in ms to wait before sending retransmission "
451           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
452           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453   /**
454    * GstRtpJitterBuffer::rtx-delay-reorder:
455    *
456    * Assume that a retransmission event should be sent when we see
457    * this much packet reordering.
458    *
459    * When -1 is used, the value will be estimated based on observed packet
460    * reordering.
461    *
462    * Since: 1.2
463    */
464   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
465       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
466           "Sending retransmission event when this much reordering (-1 automatic)",
467           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstRtpJitterBuffer::rtx-retry-timeout:
471    *
472    * When no packet has been received after sending a retransmission event
473    * for this time, retry sending a retransmission event.
474    *
475    * When -1 is used, the value will be estimated based on observed round
476    * trip time.
477    *
478    * Since: 1.2
479    */
480   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
481       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
482           "Retry sending a transmission event after this timeout in "
483           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
484           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
485   /**
486    * GstRtpJitterBuffer::rtx-retry-period:
487    *
488    * The amount of time to try to get a retransmission.
489    *
490    * When -1 is used, the value will be estimated based on the jitterbuffer
491    * latency and the observed round trip time.
492    *
493    * Since: 1.2
494    */
495   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
496       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
497           "Try to get a retransmission for this many ms "
498           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
499           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
500
501   /**
502    * GstRtpJitterBuffer::request-pt-map:
503    * @buffer: the object which received the signal
504    * @pt: the pt
505    *
506    * Request the payload type as #GstCaps for @pt.
507    */
508   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
509       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
510       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
511           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
512       GST_TYPE_CAPS, 1, G_TYPE_UINT);
513   /**
514    * GstRtpJitterBuffer::handle-sync:
515    * @buffer: the object which received the signal
516    * @struct: a GstStructure containing sync values.
517    *
518    * Be notified of new sync values.
519    */
520   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
521       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
522       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
523           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
524       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
525
526   /**
527    * GstRtpJitterBuffer::on-npt-stop
528    * @buffer: the object which received the signal
529    *
530    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
531    * the npt-stop position.
532    */
533   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
534       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
535       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
536           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
537       G_TYPE_NONE, 0, G_TYPE_NONE);
538
539   /**
540    * GstRtpJitterBuffer::clear-pt-map:
541    * @buffer: the object which received the signal
542    *
543    * Invalidate the clock-rate as obtained with the
544    * #GstRtpJitterBuffer::request-pt-map signal.
545    */
546   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
547       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
548       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
549       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
550       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
551
552   /**
553    * GstRtpJitterBuffer::set-active:
554    * @buffer: the object which received the signal
555    *
556    * Start pushing out packets with the given base time. This signal is only
557    * useful in buffering mode.
558    *
559    * Returns: the time of the last pushed packet.
560    *
561    * Since: 0.10.19
562    */
563   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
564       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
565       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
566       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
567       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
568       G_TYPE_UINT64);
569
570   gstelement_class->change_state =
571       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
572   gstelement_class->request_new_pad =
573       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
574   gstelement_class->release_pad =
575       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
576   gstelement_class->provide_clock =
577       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
578
579   gst_element_class_add_pad_template (gstelement_class,
580       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
581   gst_element_class_add_pad_template (gstelement_class,
582       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
583   gst_element_class_add_pad_template (gstelement_class,
584       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
585
586   gst_element_class_set_static_metadata (gstelement_class,
587       "RTP packet jitter-buffer", "Filter/Network/RTP",
588       "A buffer that deals with network jitter and other transmission faults",
589       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
590       "Wim Taymans <wim.taymans@gmail.com>");
591
592   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
593   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
594
595   GST_DEBUG_CATEGORY_INIT
596       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
597 }
598
599 static void
600 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
601 {
602   GstRtpJitterBufferPrivate *priv;
603
604   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
605   jitterbuffer->priv = priv;
606
607   priv->latency_ms = DEFAULT_LATENCY_MS;
608   priv->latency_ns = priv->latency_ms * GST_MSECOND;
609   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
610   priv->do_lost = DEFAULT_DO_LOST;
611   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
612   priv->rtx_delay = DEFAULT_RTX_DELAY;
613   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
614   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
615   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
616
617   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
618   priv->jbuf = rtp_jitter_buffer_new ();
619   g_mutex_init (&priv->jbuf_lock);
620   g_cond_init (&priv->jbuf_timer);
621   g_cond_init (&priv->jbuf_event);
622
623   /* reset skew detection initialy */
624   rtp_jitter_buffer_reset_skew (priv->jbuf);
625   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
626   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
627   priv->active = TRUE;
628
629   priv->srcpad =
630       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
631       "src");
632
633   gst_pad_set_activatemode_function (priv->srcpad,
634       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
635   gst_pad_set_query_function (priv->srcpad,
636       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
637   gst_pad_set_event_function (priv->srcpad,
638       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
639
640   priv->sinkpad =
641       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
642       "sink");
643
644   gst_pad_set_chain_function (priv->sinkpad,
645       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
646   gst_pad_set_event_function (priv->sinkpad,
647       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
648   gst_pad_set_query_function (priv->sinkpad,
649       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
650
651   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
652   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
653
654   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
655 }
656
657 static void
658 gst_rtp_jitter_buffer_finalize (GObject * object)
659 {
660   GstRtpJitterBuffer *jitterbuffer;
661
662   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
663
664   g_array_free (jitterbuffer->priv->timers, TRUE);
665   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
666   g_cond_clear (&jitterbuffer->priv->jbuf_timer);
667   g_cond_clear (&jitterbuffer->priv->jbuf_event);
668
669   g_object_unref (jitterbuffer->priv->jbuf);
670
671   G_OBJECT_CLASS (parent_class)->finalize (object);
672 }
673
674 static GstIterator *
675 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
676 {
677   GstRtpJitterBuffer *jitterbuffer;
678   GstPad *otherpad = NULL;
679   GstIterator *it;
680   GValue val = { 0, };
681
682   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
683
684   if (pad == jitterbuffer->priv->sinkpad) {
685     otherpad = jitterbuffer->priv->srcpad;
686   } else if (pad == jitterbuffer->priv->srcpad) {
687     otherpad = jitterbuffer->priv->sinkpad;
688   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
689     otherpad = NULL;
690   }
691
692   g_value_init (&val, GST_TYPE_PAD);
693   g_value_set_object (&val, otherpad);
694   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
695   g_value_unset (&val);
696
697   return it;
698 }
699
700 static GstPad *
701 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
702 {
703   GstRtpJitterBufferPrivate *priv;
704
705   priv = jitterbuffer->priv;
706
707   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
708
709   priv->rtcpsinkpad =
710       gst_pad_new_from_static_template
711       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
712   gst_pad_set_chain_function (priv->rtcpsinkpad,
713       gst_rtp_jitter_buffer_chain_rtcp);
714   gst_pad_set_event_function (priv->rtcpsinkpad,
715       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
716   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
717       gst_rtp_jitter_buffer_iterate_internal_links);
718   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
719   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
720
721   return priv->rtcpsinkpad;
722 }
723
724 static void
725 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
726 {
727   GstRtpJitterBufferPrivate *priv;
728
729   priv = jitterbuffer->priv;
730
731   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
732
733   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
734
735   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
736   priv->rtcpsinkpad = NULL;
737 }
738
739 static GstPad *
740 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
741     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
742 {
743   GstRtpJitterBuffer *jitterbuffer;
744   GstElementClass *klass;
745   GstPad *result;
746   GstRtpJitterBufferPrivate *priv;
747
748   g_return_val_if_fail (templ != NULL, NULL);
749   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
750
751   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
752   priv = jitterbuffer->priv;
753   klass = GST_ELEMENT_GET_CLASS (element);
754
755   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
756
757   /* figure out the template */
758   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
759     if (priv->rtcpsinkpad != NULL)
760       goto exists;
761
762     result = create_rtcp_sink (jitterbuffer);
763   } else
764     goto wrong_template;
765
766   return result;
767
768   /* ERRORS */
769 wrong_template:
770   {
771     g_warning ("gstrtpjitterbuffer: this is not our template");
772     return NULL;
773   }
774 exists:
775   {
776     g_warning ("gstrtpjitterbuffer: pad already requested");
777     return NULL;
778   }
779 }
780
781 static void
782 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
783 {
784   GstRtpJitterBuffer *jitterbuffer;
785   GstRtpJitterBufferPrivate *priv;
786
787   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
788   g_return_if_fail (GST_IS_PAD (pad));
789
790   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
791   priv = jitterbuffer->priv;
792
793   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
794
795   if (priv->rtcpsinkpad == pad) {
796     remove_rtcp_sink (jitterbuffer);
797   } else
798     goto wrong_pad;
799
800   return;
801
802   /* ERRORS */
803 wrong_pad:
804   {
805     g_warning ("gstjitterbuffer: asked to release an unknown pad");
806     return;
807   }
808 }
809
810 static GstClock *
811 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
812 {
813   return gst_system_clock_obtain ();
814 }
815
816 static void
817 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
818 {
819   GstRtpJitterBufferPrivate *priv;
820
821   priv = jitterbuffer->priv;
822
823   /* this will trigger a new pt-map request signal, FIXME, do something better. */
824
825   JBUF_LOCK (priv);
826   priv->clock_rate = -1;
827   /* do not clear current content, but refresh state for new arrival */
828   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
829   rtp_jitter_buffer_reset_skew (priv->jbuf);
830   priv->last_popped_seqnum = -1;
831   priv->next_seqnum = -1;
832   JBUF_UNLOCK (priv);
833 }
834
835 static GstClockTime
836 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
837     guint64 offset)
838 {
839   GstRtpJitterBufferPrivate *priv;
840   GstClockTime last_out;
841   GstBuffer *head;
842
843   priv = jbuf->priv;
844
845   JBUF_LOCK (priv);
846   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
847       active, GST_TIME_ARGS (offset));
848
849   if (active != priv->active) {
850     /* add the amount of time spent in paused to the output offset. All
851      * outgoing buffers will have this offset applied to their timestamps in
852      * order to make them arrive in time in the sink. */
853     priv->out_offset = offset;
854     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
855         GST_TIME_ARGS (priv->out_offset));
856     priv->active = active;
857     JBUF_SIGNAL_EVENT (priv);
858   }
859   if (!active) {
860     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
861   }
862   if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
863     /* head buffer timestamp and offset gives our output time */
864     last_out = GST_BUFFER_DTS (head) + priv->ts_offset;
865   } else {
866     /* use last known time when the buffer is empty */
867     last_out = priv->last_out_time;
868   }
869   JBUF_UNLOCK (priv);
870
871   return last_out;
872 }
873
874 static GstCaps *
875 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
876 {
877   GstRtpJitterBuffer *jitterbuffer;
878   GstRtpJitterBufferPrivate *priv;
879   GstPad *other;
880   GstCaps *caps;
881   GstCaps *templ;
882
883   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
884   priv = jitterbuffer->priv;
885
886   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
887
888   caps = gst_pad_peer_query_caps (other, filter);
889
890   templ = gst_pad_get_pad_template_caps (pad);
891   if (caps == NULL) {
892     GST_DEBUG_OBJECT (jitterbuffer, "use template");
893     caps = templ;
894   } else {
895     GstCaps *intersect;
896
897     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
898
899     intersect = gst_caps_intersect (caps, templ);
900     gst_caps_unref (caps);
901     gst_caps_unref (templ);
902
903     caps = intersect;
904   }
905   gst_object_unref (jitterbuffer);
906
907   return caps;
908 }
909
910 /*
911  * Must be called with JBUF_LOCK held
912  */
913
914 static gboolean
915 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
916     GstCaps * caps)
917 {
918   GstRtpJitterBufferPrivate *priv;
919   GstStructure *caps_struct;
920   guint val;
921   GstClockTime tval;
922
923   priv = jitterbuffer->priv;
924
925   /* first parse the caps */
926   caps_struct = gst_caps_get_structure (caps, 0);
927
928   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
929
930   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
931    * measure the amount of data in the buffer */
932   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
933     goto error;
934
935   if (priv->clock_rate <= 0)
936     goto wrong_rate;
937
938   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
939
940   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
941    * can use this to track the amount of time elapsed on the sender. */
942   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
943     priv->clock_base = val;
944   else
945     priv->clock_base = -1;
946
947   priv->ext_timestamp = priv->clock_base;
948
949   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
950       priv->clock_base);
951
952   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
953     /* first expected seqnum, only update when we didn't have a previous base. */
954     if (priv->next_in_seqnum == -1)
955       priv->next_in_seqnum = val;
956     if (priv->next_seqnum == -1)
957       priv->next_seqnum = val;
958   }
959
960   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
961
962   /* the start and stop times. The seqnum-base corresponds to the start time. We
963    * will keep track of the seqnums on the output and when we reach the one
964    * corresponding to npt-stop, we emit the npt-stop-reached signal */
965   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
966     priv->npt_start = tval;
967   else
968     priv->npt_start = 0;
969
970   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
971     priv->npt_stop = tval;
972   else
973     priv->npt_stop = -1;
974
975   GST_DEBUG_OBJECT (jitterbuffer,
976       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
977       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
978
979   return TRUE;
980
981   /* ERRORS */
982 error:
983   {
984     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
985     return FALSE;
986   }
987 wrong_rate:
988   {
989     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
990     return FALSE;
991   }
992 }
993
994 static void
995 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
996 {
997   GstRtpJitterBufferPrivate *priv;
998
999   priv = jitterbuffer->priv;
1000
1001   JBUF_LOCK (priv);
1002   /* mark ourselves as flushing */
1003   priv->srcresult = GST_FLOW_FLUSHING;
1004   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1005   /* this unblocks any waiting pops on the src pad task */
1006   JBUF_SIGNAL_EVENT (priv);
1007   JBUF_UNLOCK (priv);
1008 }
1009
1010 static void
1011 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1012 {
1013   GstRtpJitterBufferPrivate *priv;
1014
1015   priv = jitterbuffer->priv;
1016
1017   JBUF_LOCK (priv);
1018   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1019   /* Mark as non flushing */
1020   priv->srcresult = GST_FLOW_OK;
1021   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1022   priv->last_popped_seqnum = -1;
1023   priv->last_out_time = -1;
1024   priv->next_seqnum = -1;
1025   priv->ips_rtptime = -1;
1026   priv->ips_dts = GST_CLOCK_TIME_NONE;
1027   priv->packet_spacing = 0;
1028   priv->next_in_seqnum = -1;
1029   priv->clock_rate = -1;
1030   priv->eos = FALSE;
1031   priv->estimated_eos = -1;
1032   priv->last_elapsed = 0;
1033   priv->ext_timestamp = -1;
1034   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1035   rtp_jitter_buffer_flush (priv->jbuf);
1036   rtp_jitter_buffer_reset_skew (priv->jbuf);
1037   remove_all_timers (jitterbuffer);
1038   JBUF_UNLOCK (priv);
1039 }
1040
1041 static gboolean
1042 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1043     GstPadMode mode, gboolean active)
1044 {
1045   gboolean result;
1046   GstRtpJitterBuffer *jitterbuffer = NULL;
1047
1048   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1049
1050   switch (mode) {
1051     case GST_PAD_MODE_PUSH:
1052       if (active) {
1053         /* allow data processing */
1054         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1055
1056         /* start pushing out buffers */
1057         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1058         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1059             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1060       } else {
1061         /* make sure all data processing stops ASAP */
1062         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1063
1064         /* NOTE this will hardlock if the state change is called from the src pad
1065          * task thread because we will _join() the thread. */
1066         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1067         result = gst_pad_stop_task (pad);
1068       }
1069       break;
1070     default:
1071       result = FALSE;
1072       break;
1073   }
1074   return result;
1075 }
1076
1077 static GstStateChangeReturn
1078 gst_rtp_jitter_buffer_change_state (GstElement * element,
1079     GstStateChange transition)
1080 {
1081   GstRtpJitterBuffer *jitterbuffer;
1082   GstRtpJitterBufferPrivate *priv;
1083   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1084
1085   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1086   priv = jitterbuffer->priv;
1087
1088   switch (transition) {
1089     case GST_STATE_CHANGE_NULL_TO_READY:
1090       break;
1091     case GST_STATE_CHANGE_READY_TO_PAUSED:
1092       JBUF_LOCK (priv);
1093       /* reset negotiated values */
1094       priv->clock_rate = -1;
1095       priv->clock_base = -1;
1096       priv->peer_latency = 0;
1097       priv->last_pt = -1;
1098       /* block until we go to PLAYING */
1099       priv->blocked = TRUE;
1100       priv->timer_running = TRUE;
1101       priv->timer_thread =
1102           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1103       JBUF_UNLOCK (priv);
1104       break;
1105     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1106       JBUF_LOCK (priv);
1107       /* unblock to allow streaming in PLAYING */
1108       priv->blocked = FALSE;
1109       JBUF_SIGNAL_EVENT (priv);
1110       JBUF_UNLOCK (priv);
1111       break;
1112     default:
1113       break;
1114   }
1115
1116   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1117
1118   switch (transition) {
1119     case GST_STATE_CHANGE_READY_TO_PAUSED:
1120       /* we are a live element because we sync to the clock, which we can only
1121        * do in the PLAYING state */
1122       if (ret != GST_STATE_CHANGE_FAILURE)
1123         ret = GST_STATE_CHANGE_NO_PREROLL;
1124       break;
1125     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1126       JBUF_LOCK (priv);
1127       /* block to stop streaming when PAUSED */
1128       priv->blocked = TRUE;
1129       JBUF_UNLOCK (priv);
1130       if (ret != GST_STATE_CHANGE_FAILURE)
1131         ret = GST_STATE_CHANGE_NO_PREROLL;
1132       break;
1133     case GST_STATE_CHANGE_PAUSED_TO_READY:
1134       JBUF_LOCK (priv);
1135       gst_buffer_replace (&priv->last_sr, NULL);
1136       priv->timer_running = FALSE;
1137       unschedule_current_timer (jitterbuffer);
1138       JBUF_SIGNAL_TIMER (priv);
1139       JBUF_UNLOCK (priv);
1140       g_thread_join (priv->timer_thread);
1141       priv->timer_thread = NULL;
1142       break;
1143     case GST_STATE_CHANGE_READY_TO_NULL:
1144       break;
1145     default:
1146       break;
1147   }
1148
1149   return ret;
1150 }
1151
1152 static gboolean
1153 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1154     GstEvent * event)
1155 {
1156   gboolean ret = TRUE;
1157   GstRtpJitterBuffer *jitterbuffer;
1158   GstRtpJitterBufferPrivate *priv;
1159
1160   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1161   priv = jitterbuffer->priv;
1162
1163   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1164
1165   switch (GST_EVENT_TYPE (event)) {
1166     case GST_EVENT_LATENCY:
1167     {
1168       GstClockTime latency;
1169
1170       gst_event_parse_latency (event, &latency);
1171
1172       GST_DEBUG_OBJECT (jitterbuffer,
1173           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1174
1175       JBUF_LOCK (priv);
1176       /* adjust the overall buffer delay to the total pipeline latency in
1177        * buffering mode because if downstream consumes too fast (because of
1178        * large latency or queues, we would start rebuffering again. */
1179       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1180           RTP_JITTER_BUFFER_MODE_BUFFER) {
1181         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1182       }
1183       JBUF_UNLOCK (priv);
1184
1185       ret = gst_pad_push_event (priv->sinkpad, event);
1186       break;
1187     }
1188     default:
1189       ret = gst_pad_push_event (priv->sinkpad, event);
1190       break;
1191   }
1192
1193   return ret;
1194 }
1195
1196 static gboolean
1197 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1198     GstEvent * event)
1199 {
1200   gboolean ret = TRUE;
1201   GstRtpJitterBuffer *jitterbuffer;
1202   GstRtpJitterBufferPrivate *priv;
1203
1204   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1205   priv = jitterbuffer->priv;
1206
1207   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1208
1209   switch (GST_EVENT_TYPE (event)) {
1210     case GST_EVENT_CAPS:
1211     {
1212       GstCaps *caps;
1213
1214       gst_event_parse_caps (event, &caps);
1215
1216       JBUF_LOCK (priv);
1217       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1218       JBUF_UNLOCK (priv);
1219
1220       /* set same caps on srcpad on success */
1221       if (ret)
1222         ret = gst_pad_push_event (priv->srcpad, event);
1223       else
1224         gst_event_unref (event);
1225       break;
1226     }
1227     case GST_EVENT_SEGMENT:
1228     {
1229       gst_event_copy_segment (event, &priv->segment);
1230
1231       /* we need time for now */
1232       if (priv->segment.format != GST_FORMAT_TIME)
1233         goto newseg_wrong_format;
1234
1235       GST_DEBUG_OBJECT (jitterbuffer,
1236           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1237
1238       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1239       ret = gst_pad_push_event (priv->srcpad, event);
1240       break;
1241     }
1242     case GST_EVENT_FLUSH_START:
1243       ret = gst_pad_push_event (priv->srcpad, event);
1244       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1245       break;
1246     case GST_EVENT_FLUSH_STOP:
1247       ret = gst_pad_push_event (priv->srcpad, event);
1248       ret =
1249           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1250           GST_PAD_MODE_PUSH, TRUE);
1251       break;
1252     case GST_EVENT_EOS:
1253     {
1254       /* push EOS in queue. We always push it at the head */
1255       JBUF_LOCK (priv);
1256       /* check for flushing, we need to discard the event and return FALSE when
1257        * we are flushing */
1258       ret = priv->srcresult == GST_FLOW_OK;
1259       if (ret && !priv->eos) {
1260         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1261         priv->eos = TRUE;
1262         JBUF_SIGNAL_EVENT (priv);
1263       } else if (priv->eos) {
1264         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1265       } else {
1266         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1267             gst_flow_get_name (priv->srcresult));
1268       }
1269       JBUF_UNLOCK (priv);
1270       gst_event_unref (event);
1271       break;
1272     }
1273     default:
1274       ret = gst_pad_push_event (priv->srcpad, event);
1275       break;
1276   }
1277
1278 done:
1279
1280   return ret;
1281
1282   /* ERRORS */
1283 newseg_wrong_format:
1284   {
1285     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1286     ret = FALSE;
1287     gst_event_unref (event);
1288     goto done;
1289   }
1290 }
1291
1292 static gboolean
1293 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1294     GstEvent * event)
1295 {
1296   gboolean ret = TRUE;
1297   GstRtpJitterBuffer *jitterbuffer;
1298
1299   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1300
1301   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1302
1303   switch (GST_EVENT_TYPE (event)) {
1304     case GST_EVENT_FLUSH_START:
1305       gst_event_unref (event);
1306       break;
1307     case GST_EVENT_FLUSH_STOP:
1308       gst_event_unref (event);
1309       break;
1310     default:
1311       ret = gst_pad_event_default (pad, parent, event);
1312       break;
1313   }
1314
1315   return ret;
1316 }
1317
1318 /*
1319  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1320  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1321  * GST_FLOW_FLUSHING when the element is shutting down. On success
1322  * GST_FLOW_OK is returned.
1323  */
1324 static GstFlowReturn
1325 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1326     guint8 pt)
1327 {
1328   GValue ret = { 0 };
1329   GValue args[2] = { {0}, {0} };
1330   GstCaps *caps;
1331   gboolean res;
1332
1333   g_value_init (&args[0], GST_TYPE_ELEMENT);
1334   g_value_set_object (&args[0], jitterbuffer);
1335   g_value_init (&args[1], G_TYPE_UINT);
1336   g_value_set_uint (&args[1], pt);
1337
1338   g_value_init (&ret, GST_TYPE_CAPS);
1339   g_value_set_boxed (&ret, NULL);
1340
1341   JBUF_UNLOCK (jitterbuffer->priv);
1342   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1343       &ret);
1344   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1345
1346   g_value_unset (&args[0]);
1347   g_value_unset (&args[1]);
1348   caps = (GstCaps *) g_value_dup_boxed (&ret);
1349   g_value_unset (&ret);
1350   if (!caps)
1351     goto no_caps;
1352
1353   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1354   gst_caps_unref (caps);
1355
1356   if (G_UNLIKELY (!res))
1357     goto parse_failed;
1358
1359   return GST_FLOW_OK;
1360
1361   /* ERRORS */
1362 no_caps:
1363   {
1364     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1365     return GST_FLOW_ERROR;
1366   }
1367 out_flushing:
1368   {
1369     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1370     return GST_FLOW_FLUSHING;
1371   }
1372 parse_failed:
1373   {
1374     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1375     return GST_FLOW_ERROR;
1376   }
1377 }
1378
1379 /* call with jbuf lock held */
1380 static void
1381 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1382 {
1383   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1384
1385   /* too short a stream, or too close to EOS will never really fill buffer */
1386   if (*percent != -1 && priv->npt_stop != -1 &&
1387       priv->npt_stop - priv->npt_start <=
1388       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1389     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1390     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1391     *percent = 100;
1392   }
1393 }
1394
1395 static void
1396 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1397 {
1398   GstMessage *message;
1399
1400   /* Post a buffering message */
1401   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1402   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1403
1404   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1405 }
1406
1407 static GstClockTime
1408 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1409 {
1410   GstRtpJitterBufferPrivate *priv;
1411
1412   priv = jitterbuffer->priv;
1413
1414   if (timestamp == -1)
1415     return -1;
1416
1417   /* apply the timestamp offset, this is used for inter stream sync */
1418   timestamp += priv->ts_offset;
1419   /* add the offset, this is used when buffering */
1420   timestamp += priv->out_offset;
1421
1422   return timestamp;
1423 }
1424
1425 static TimerData *
1426 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1427 {
1428   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1429   TimerData *timer = NULL;
1430   gint i, len;
1431
1432   len = priv->timers->len;
1433   for (i = 0; i < len; i++) {
1434     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1435     if (test->seqnum == seqnum && test->type == type) {
1436       timer = test;
1437       break;
1438     }
1439   }
1440   return timer;
1441 }
1442
1443 static void
1444 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1445 {
1446   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1447
1448   if (priv->clock_id) {
1449     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1450     gst_clock_id_unschedule (priv->clock_id);
1451     priv->clock_id = NULL;
1452   }
1453 }
1454
1455 static GstClockTime
1456 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1457 {
1458   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1459   GstClockTime test_timeout;
1460
1461   if ((test_timeout = timer->timeout) == -1)
1462     return -1;
1463
1464   if (timer->type != TIMER_TYPE_EXPECTED) {
1465     /* add our latency and offset to get output times. */
1466     test_timeout = apply_offset (jitterbuffer, test_timeout);
1467     test_timeout += priv->latency_ns;
1468   }
1469   return test_timeout;
1470 }
1471
1472 static void
1473 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1474 {
1475   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1476
1477   if (priv->clock_id) {
1478     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1479
1480     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1481         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1482
1483     if (timeout == -1 || timeout < priv->timer_timeout)
1484       unschedule_current_timer (jitterbuffer);
1485   }
1486 }
1487
1488 static TimerData *
1489 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1490     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1491     GstClockTime duration)
1492 {
1493   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1494   TimerData *timer;
1495   gint len;
1496
1497   GST_DEBUG_OBJECT (jitterbuffer,
1498       "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1499       GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
1500
1501   len = priv->timers->len;
1502   g_array_set_size (priv->timers, len + 1);
1503   timer = &g_array_index (priv->timers, TimerData, len);
1504   timer->idx = len;
1505   timer->type = type;
1506   timer->seqnum = seqnum;
1507   timer->num = num;
1508   timer->timeout = timeout + delay;
1509   timer->duration = duration;
1510   if (type == TIMER_TYPE_EXPECTED) {
1511     timer->rtx_base = timeout;
1512     timer->rtx_delay = delay;
1513     timer->rtx_retry = 0;
1514   }
1515   recalculate_timer (jitterbuffer, timer);
1516   JBUF_SIGNAL_TIMER (priv);
1517
1518   return timer;
1519 }
1520
1521 static void
1522 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1523     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1524 {
1525   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1526   gboolean seqchange, timechange;
1527   guint16 oldseq;
1528
1529   seqchange = timer->seqnum != seqnum;
1530   timechange = timer->timeout != timeout;
1531
1532   if (!seqchange && !timechange)
1533     return;
1534
1535   oldseq = timer->seqnum;
1536
1537   GST_DEBUG_OBJECT (jitterbuffer,
1538       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1539       oldseq, seqnum, GST_TIME_ARGS (timeout));
1540
1541   timer->timeout = timeout + delay;
1542   timer->seqnum = seqnum;
1543   if (reset) {
1544     timer->rtx_base = timeout;
1545     timer->rtx_delay = delay;
1546     timer->rtx_retry = 0;
1547   }
1548
1549   if (priv->clock_id) {
1550     /* we changed the seqnum and there is a timer currently waiting with this
1551      * seqnum, unschedule it */
1552     if (seqchange && priv->timer_seqnum == oldseq)
1553       unschedule_current_timer (jitterbuffer);
1554     /* we changed the time, check if it is earlier than what we are waiting
1555      * for and unschedule if so */
1556     else if (timechange)
1557       recalculate_timer (jitterbuffer, timer);
1558   }
1559 }
1560
1561 static TimerData *
1562 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1563     guint16 seqnum, GstClockTime timeout)
1564 {
1565   TimerData *timer;
1566
1567   /* find the seqnum timer */
1568   timer = find_timer (jitterbuffer, type, seqnum);
1569   if (timer == NULL) {
1570     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1571   } else {
1572     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1573   }
1574   return timer;
1575 }
1576
1577 static void
1578 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1579 {
1580   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1581   guint idx;
1582
1583   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1584     unschedule_current_timer (jitterbuffer);
1585
1586   idx = timer->idx;
1587   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1588   g_array_remove_index_fast (priv->timers, idx);
1589   timer->idx = idx;
1590 }
1591
1592 static void
1593 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1594 {
1595   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1596   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1597   g_array_set_size (priv->timers, 0);
1598   unschedule_current_timer (jitterbuffer);
1599 }
1600
1601 /* we just received a packet with seqnum and dts.
1602  *
1603  * First check for old seqnum that we are still expecting. If the gap with the
1604  * current timestamp is too big, unschedule the timeouts.
1605  *
1606  * If we have a valid packet spacing estimate we can set a timer for when we
1607  * should receive the next packet.
1608  * If we don't have a valid estimate, we remove any timer we might have
1609  * had for this packet.
1610  */
1611 static void
1612 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1613     GstClockTime dts, gboolean do_next_seqnum)
1614 {
1615   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1616   TimerData *timer = NULL;
1617   gint i, len;
1618
1619   /* go through all timers and unschedule the ones with a large gap, also find
1620    * the timer for the seqnum */
1621   len = priv->timers->len;
1622   for (i = 0; i < len; i++) {
1623     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1624     gint gap;
1625
1626     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1627
1628     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1629         test->seqnum, seqnum, gap);
1630
1631     if (gap == 0) {
1632       GST_DEBUG ("found timer for current seqnum");
1633       /* the timer for the current seqnum */
1634       timer = test;
1635     } else if (gap > priv->rtx_delay_reorder) {
1636       /* max gap, we exceeded the max reorder distance and we don't expect the
1637        * missing packet to be this reordered */
1638       if (test->rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1639         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1640     }
1641   }
1642
1643   if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1644     GstClockTime expected, delay;
1645
1646     /* calculate expected arrival time of the next seqnum */
1647     expected = dts + priv->packet_spacing;
1648     delay = priv->rtx_delay * GST_MSECOND;
1649
1650     /* and update/install timer for next seqnum */
1651     if (timer)
1652       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1653           delay, TRUE);
1654     else
1655       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1656           expected, delay, priv->packet_spacing);
1657   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1658     /* if we had a timer, remove it, we don't know when to expect the next
1659      * packet. */
1660     remove_timer (jitterbuffer, timer);
1661     JBUF_SIGNAL_EVENT (priv);
1662   }
1663 }
1664
1665 static void
1666 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1667     GstClockTime dts)
1668 {
1669   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1670
1671   /* we need consecutive seqnums with a different
1672    * rtptime to estimate the packet spacing. */
1673   if (priv->ips_rtptime != rtptime) {
1674     /* rtptime changed, check dts diff */
1675     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1676       priv->packet_spacing = dts - priv->ips_dts;
1677       GST_DEBUG_OBJECT (jitterbuffer,
1678           "new packet spacing %" GST_TIME_FORMAT,
1679           GST_TIME_ARGS (priv->packet_spacing));
1680     }
1681     priv->ips_rtptime = rtptime;
1682     priv->ips_dts = dts;
1683   }
1684 }
1685
1686 static void
1687 send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
1688     guint lost_packets, GstClockTime timestamp, GstClockTime duration,
1689     gboolean late)
1690 {
1691   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1692
1693   /* we had a gap and thus we lost some packets. Create an event for this.  */
1694   if (lost_packets > 1)
1695     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
1696         seqnum + lost_packets - 1);
1697   else
1698     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
1699
1700   priv->num_late += lost_packets;
1701   priv->discont = TRUE;
1702
1703   /* update our expected next packet but make sure the seqnum increases */
1704   if (seqnum + lost_packets > priv->next_seqnum) {
1705     priv->next_seqnum = (seqnum + lost_packets) & 0xffff;
1706     priv->last_popped_seqnum = seqnum;
1707     priv->last_out_time = timestamp;
1708   }
1709   if (priv->do_lost) {
1710     GstEvent *event;
1711
1712     /* create paket lost event */
1713     event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
1714         gst_structure_new ("GstRTPPacketLost",
1715             "seqnum", G_TYPE_UINT, (guint) seqnum,
1716             "timestamp", G_TYPE_UINT64, timestamp,
1717             "duration", G_TYPE_UINT64, duration,
1718             "late", G_TYPE_BOOLEAN, late, NULL));
1719     JBUF_UNLOCK (priv);
1720     gst_pad_push_event (priv->srcpad, event);
1721     JBUF_LOCK (priv);
1722   }
1723 }
1724
1725 static void
1726 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1727     guint16 seqnum, GstClockTime dts, gint gap)
1728 {
1729   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1730   GstClockTime total_duration, duration, expected_dts;
1731   TimerType type;
1732
1733   GST_DEBUG_OBJECT (jitterbuffer,
1734       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1735       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1736
1737   /* the total duration spanned by the missing packets */
1738   if (dts >= priv->last_in_dts)
1739     total_duration = dts - priv->last_in_dts;
1740   else
1741     total_duration = 0;
1742
1743   /* interpolate between the current time and the last time based on
1744    * number of packets we are missing, this is the estimated duration
1745    * for the missing packet based on equidistant packet spacing. */
1746   duration = total_duration / (gap + 1);
1747
1748   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1749       GST_TIME_ARGS (duration));
1750
1751   if (total_duration > priv->latency_ns) {
1752     GstClockTime gap_time;
1753     guint lost_packets;
1754
1755     gap_time = total_duration - priv->latency_ns;
1756
1757     if (duration > 0) {
1758       lost_packets = gap_time / duration;
1759       gap_time = lost_packets * duration;
1760     } else {
1761       lost_packets = gap;
1762     }
1763
1764     /* too many lost packets, some of the missing packets are already
1765      * too late and we can generate lost packet events for them. */
1766     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
1767         " > %" GST_TIME_FORMAT ", consider %u lost",
1768         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
1769         lost_packets);
1770
1771     /* this timer will fire immediately and the lost event will be pushed from
1772      * the timer thread */
1773     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
1774         priv->last_in_dts + duration, 0, gap_time);
1775
1776     expected += lost_packets;
1777     priv->last_in_dts += gap_time;
1778   }
1779
1780   expected_dts = priv->last_in_dts + duration;
1781
1782   if (priv->do_retransmission) {
1783     type = TIMER_TYPE_EXPECTED;
1784     /* if we had a timer for the first missing packet, leave it. */
1785     if (find_timer (jitterbuffer, type, expected)) {
1786       expected++;
1787       expected_dts += duration;
1788     }
1789   } else {
1790     type = TIMER_TYPE_LOST;
1791   }
1792
1793   while (expected < seqnum) {
1794     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
1795     expected_dts += duration;
1796     expected++;
1797   }
1798 }
1799
1800 static GstFlowReturn
1801 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1802     GstBuffer * buffer)
1803 {
1804   GstRtpJitterBuffer *jitterbuffer;
1805   GstRtpJitterBufferPrivate *priv;
1806   guint16 seqnum;
1807   guint32 expected, rtptime;
1808   GstFlowReturn ret = GST_FLOW_OK;
1809   GstClockTime dts, pts;
1810   guint64 latency_ts;
1811   gboolean tail;
1812   gint percent = -1;
1813   guint8 pt;
1814   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1815   gboolean do_next_seqnum = FALSE;
1816
1817   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1818
1819   priv = jitterbuffer->priv;
1820
1821   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1822     goto invalid_buffer;
1823
1824   pt = gst_rtp_buffer_get_payload_type (&rtp);
1825   seqnum = gst_rtp_buffer_get_seq (&rtp);
1826   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1827   gst_rtp_buffer_unmap (&rtp);
1828
1829   /* make sure we have PTS and DTS set */
1830   pts = GST_BUFFER_PTS (buffer);
1831   dts = GST_BUFFER_DTS (buffer);
1832   if (dts == -1)
1833     dts = pts;
1834   else if (pts == -1)
1835     pts = dts;
1836
1837   /* take the DTS of the buffer. This is the time when the packet was
1838    * received and is used to calculate jitter and clock skew. We will adjust
1839    * this DTS with the smoothed value after processing it in the
1840    * jitterbuffer and assign it as the PTS. */
1841   /* bring to running time */
1842   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1843
1844   GST_DEBUG_OBJECT (jitterbuffer,
1845       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
1846       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
1847
1848   JBUF_LOCK_CHECK (priv, out_flushing);
1849
1850   if (G_UNLIKELY (priv->last_pt != pt)) {
1851     GstCaps *caps;
1852
1853     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1854         pt);
1855
1856     priv->last_pt = pt;
1857     /* reset clock-rate so that we get a new one */
1858     priv->clock_rate = -1;
1859
1860     /* Try to get the clock-rate from the caps first if we can. If there are no
1861      * caps we must fire the signal to get the clock-rate. */
1862     if ((caps = gst_pad_get_current_caps (pad))) {
1863       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1864       gst_caps_unref (caps);
1865     }
1866   }
1867
1868   if (G_UNLIKELY (priv->clock_rate == -1)) {
1869     /* no clock rate given on the caps, try to get one with the signal */
1870     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1871             pt) == GST_FLOW_FLUSHING)
1872       goto out_flushing;
1873
1874     if (G_UNLIKELY (priv->clock_rate == -1))
1875       goto no_clock_rate;
1876   }
1877
1878   /* don't accept more data on EOS */
1879   if (G_UNLIKELY (priv->eos))
1880     goto have_eos;
1881
1882   expected = priv->next_in_seqnum;
1883
1884   /* now check against our expected seqnum */
1885   if (G_LIKELY (expected != -1)) {
1886     gint gap;
1887
1888     /* now calculate gap */
1889     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
1890
1891     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1892         expected, seqnum, gap);
1893
1894     if (G_LIKELY (gap == 0)) {
1895       /* packet is expected */
1896       calculate_packet_spacing (jitterbuffer, rtptime, dts);
1897       do_next_seqnum = TRUE;
1898     } else {
1899       gboolean reset = FALSE;
1900
1901       if (gap < 0) {
1902         /* we received an old packet */
1903         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
1904           /* too old packet, reset */
1905           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
1906               -RTP_MAX_MISORDER);
1907           reset = TRUE;
1908         } else {
1909           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
1910         }
1911       } else {
1912         /* new packet, we are missing some packets */
1913         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
1914           /* packet too far in future, reset */
1915           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
1916               RTP_MAX_DROPOUT);
1917           reset = TRUE;
1918         } else {
1919           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
1920           /* fill in the gap with EXPECTED timers */
1921           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
1922
1923           do_next_seqnum = TRUE;
1924         }
1925       }
1926       if (G_UNLIKELY (reset)) {
1927         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1928         rtp_jitter_buffer_flush (priv->jbuf);
1929         rtp_jitter_buffer_reset_skew (priv->jbuf);
1930         remove_all_timers (jitterbuffer);
1931         priv->last_popped_seqnum = -1;
1932         priv->next_seqnum = seqnum;
1933         do_next_seqnum = TRUE;
1934       }
1935       /* reset spacing estimation when gap */
1936       priv->ips_rtptime = -1;
1937       priv->ips_dts = GST_CLOCK_TIME_NONE;
1938     }
1939   } else {
1940     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
1941     /* we don't know what the next_in_seqnum should be, wait for the last
1942      * possible moment to push this buffer, maybe we get an earlier seqnum
1943      * while we wait */
1944     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
1945     do_next_seqnum = TRUE;
1946     /* take rtptime and dts to calculate packet spacing */
1947     priv->ips_rtptime = rtptime;
1948     priv->ips_dts = dts;
1949   }
1950   if (do_next_seqnum) {
1951     priv->last_in_seqnum = seqnum;
1952     priv->last_in_dts = dts;
1953     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
1954   }
1955
1956   /* let's check if this buffer is too late, we can only accept packets with
1957    * bigger seqnum than the one we last pushed. */
1958   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
1959     gint gap;
1960
1961     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
1962
1963     /* priv->last_popped_seqnum >= seqnum, we're too late. */
1964     if (G_UNLIKELY (gap <= 0))
1965       goto too_late;
1966   }
1967
1968   /* let's drop oldest packet if the queue is already full and drop-on-latency
1969    * is set. We can only do this when there actually is a latency. When no
1970    * latency is set, we just pump it in the queue and let the other end push it
1971    * out as fast as possible. */
1972   if (priv->latency_ms && priv->drop_on_latency) {
1973     latency_ts =
1974         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
1975
1976     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
1977       GstBuffer *old_buf;
1978
1979       old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
1980
1981       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
1982           old_buf);
1983
1984       gst_buffer_unref (old_buf);
1985     }
1986   }
1987
1988   /* we need to make the metadata writable before pushing it in the jitterbuffer
1989    * because the jitterbuffer will update the PTS */
1990   buffer = gst_buffer_make_writable (buffer);
1991   GST_BUFFER_DTS (buffer) = dts;
1992   GST_BUFFER_PTS (buffer) = pts;
1993
1994   /* now insert the packet into the queue in sorted order. This function returns
1995    * FALSE if a packet with the same seqnum was already in the queue, meaning we
1996    * have a duplicate. */
1997   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, dts,
1998               priv->clock_rate, &tail, &percent)))
1999     goto duplicate;
2000
2001   /* update timers */
2002   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2003
2004   /* we had an unhandled SR, handle it now */
2005   if (priv->last_sr)
2006     do_handle_sync (jitterbuffer);
2007
2008   /* signal addition of new buffer when the _loop is waiting. */
2009   if (priv->active && priv->waiting_timer)
2010     JBUF_SIGNAL_EVENT (priv);
2011
2012   /* let's unschedule and unblock any waiting buffers. We only want to do this
2013    * when the tail buffer changed */
2014   if (G_UNLIKELY (priv->clock_id && tail)) {
2015     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2016     unschedule_current_timer (jitterbuffer);
2017   }
2018
2019   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
2020       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
2021
2022   check_buffering_percent (jitterbuffer, &percent);
2023
2024 finished:
2025   JBUF_UNLOCK (priv);
2026
2027   if (percent != -1)
2028     post_buffering_percent (jitterbuffer, percent);
2029
2030   return ret;
2031
2032   /* ERRORS */
2033 invalid_buffer:
2034   {
2035     /* this is not fatal but should be filtered earlier */
2036     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2037         ("Received invalid RTP payload, dropping"));
2038     gst_buffer_unref (buffer);
2039     return GST_FLOW_OK;
2040   }
2041 no_clock_rate:
2042   {
2043     GST_WARNING_OBJECT (jitterbuffer,
2044         "No clock-rate in caps!, dropping buffer");
2045     gst_buffer_unref (buffer);
2046     goto finished;
2047   }
2048 out_flushing:
2049   {
2050     ret = priv->srcresult;
2051     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2052     gst_buffer_unref (buffer);
2053     goto finished;
2054   }
2055 have_eos:
2056   {
2057     ret = GST_FLOW_EOS;
2058     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2059     gst_buffer_unref (buffer);
2060     goto finished;
2061   }
2062 too_late:
2063   {
2064     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2065         " popped, dropping", seqnum, priv->last_popped_seqnum);
2066     priv->num_late++;
2067     gst_buffer_unref (buffer);
2068     goto finished;
2069   }
2070 duplicate:
2071   {
2072     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2073         seqnum);
2074     priv->num_duplicates++;
2075     gst_buffer_unref (buffer);
2076     goto finished;
2077   }
2078 }
2079
2080 static GstClockTime
2081 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2082 {
2083   guint64 ext_time, elapsed;
2084   guint32 rtp_time;
2085   GstRtpJitterBufferPrivate *priv;
2086   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2087
2088   priv = jitterbuffer->priv;
2089   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2090   rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
2091   gst_rtp_buffer_unmap (&rtp);
2092
2093   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2094       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2095
2096   if (rtp_time < priv->ext_timestamp) {
2097     ext_time = priv->ext_timestamp;
2098   } else {
2099     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2100   }
2101
2102   if (ext_time > priv->clock_base)
2103     elapsed = ext_time - priv->clock_base;
2104   else
2105     elapsed = 0;
2106
2107   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2108   return elapsed;
2109 }
2110
2111 static void
2112 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
2113 {
2114   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2115
2116   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2117       && priv->clock_base != -1 && priv->clock_rate > 0) {
2118     guint64 elapsed, estimated;
2119
2120     elapsed = compute_elapsed (jitterbuffer, outbuf);
2121
2122     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2123       guint64 left;
2124       GstClockTime out_time;
2125
2126       priv->last_elapsed = elapsed;
2127
2128       left = priv->npt_stop - priv->npt_start;
2129       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2130           GST_TIME_ARGS (left));
2131
2132       out_time = GST_BUFFER_DTS (outbuf);
2133
2134       if (elapsed > 0)
2135         estimated = gst_util_uint64_scale (out_time, left, elapsed);
2136       else {
2137         /* if there is almost nothing left,
2138          * we may never advance enough to end up in the above case */
2139         if (left < GST_SECOND)
2140           estimated = GST_SECOND;
2141         else
2142           estimated = -1;
2143       }
2144
2145       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2146           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2147
2148       if (estimated != -1 && priv->estimated_eos != estimated) {
2149         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2150         priv->estimated_eos = estimated;
2151       }
2152     }
2153   }
2154 }
2155
2156 /* take a buffer from the queue and push it */
2157 static GstFlowReturn
2158 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2159 {
2160   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2161   GstFlowReturn result;
2162   GstBuffer *outbuf;
2163   GstClockTime dts, pts;
2164   gint percent = -1;
2165
2166   /* when we get here we are ready to pop and push the buffer */
2167   outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2168
2169   check_buffering_percent (jitterbuffer, &percent);
2170
2171   if (G_UNLIKELY (priv->discont)) {
2172     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2173      * into the jitterbuffer so we can modify now. */
2174     GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2175     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2176     priv->discont = FALSE;
2177   }
2178   if (G_UNLIKELY (priv->ts_discont)) {
2179     GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2180     priv->ts_discont = FALSE;
2181   }
2182
2183   dts = GST_BUFFER_DTS (outbuf);
2184   pts = GST_BUFFER_PTS (outbuf);
2185
2186   dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, dts);
2187   pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, pts);
2188
2189   /* apply timestamp with offset to buffer now */
2190   GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2191   GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2192
2193   /* update the elapsed time when we need to check against the npt stop time. */
2194   update_estimated_eos (jitterbuffer, outbuf);
2195
2196   /* now we are ready to push the buffer. Save the seqnum and release the lock
2197    * so the other end can push stuff in the queue again. */
2198   priv->last_popped_seqnum = seqnum;
2199   priv->last_out_time = GST_BUFFER_PTS (outbuf);
2200   priv->next_seqnum = (seqnum + 1) & 0xffff;
2201   JBUF_UNLOCK (priv);
2202
2203   if (percent != -1)
2204     post_buffering_percent (jitterbuffer, percent);
2205
2206   /* push buffer */
2207   GST_DEBUG_OBJECT (jitterbuffer,
2208       "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2209       seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2210       GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2211
2212   result = gst_pad_push (priv->srcpad, outbuf);
2213
2214   JBUF_LOCK_CHECK (priv, out_flushing);
2215
2216   return result;
2217
2218   /* ERRORS */
2219 out_flushing:
2220   {
2221     return priv->srcresult;
2222   }
2223 }
2224
2225 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2226
2227 /* Peek a buffer and compare the seqnum to the expected seqnum.
2228  * If all is fine, the buffer is pushed.
2229  * If something is wrong, we wait for some event
2230  */
2231 static GstFlowReturn
2232 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2233 {
2234   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2235   GstFlowReturn result = GST_FLOW_OK;
2236   GstBuffer *outbuf;
2237   guint16 seqnum;
2238   guint32 next_seqnum;
2239   gint gap;
2240   GstRTPBuffer rtp = { NULL, };
2241
2242   /* only push buffers when PLAYING and active and not buffering */
2243   if (priv->blocked || !priv->active ||
2244       rtp_jitter_buffer_is_buffering (priv->jbuf))
2245     return GST_FLOW_WAIT;
2246
2247 again:
2248   /* peek a buffer, we're just looking at the sequence number.
2249    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2250    * wait for a timeout or something to change.
2251    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2252   outbuf = rtp_jitter_buffer_peek (priv->jbuf);
2253   if (outbuf == NULL)
2254     goto wait;
2255
2256   /* get the seqnum and the next expected seqnum */
2257   gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
2258   seqnum = gst_rtp_buffer_get_seq (&rtp);
2259   gst_rtp_buffer_unmap (&rtp);
2260
2261   next_seqnum = priv->next_seqnum;
2262
2263   /* get the gap between this and the previous packet. If we don't know the
2264    * previous packet seqnum assume no gap. */
2265   if (G_UNLIKELY (next_seqnum == -1)) {
2266     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2267     /* we don't know what the next_seqnum should be, the chain function should
2268      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2269      * fires, so wait for that */
2270     result = GST_FLOW_WAIT;
2271   } else {
2272     /* else calculate GAP */
2273     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2274
2275     if (G_LIKELY (gap == 0)) {
2276       /* no missing packet, pop and push */
2277       result = pop_and_push_next (jitterbuffer, seqnum);
2278     } else if (G_UNLIKELY (gap < 0)) {
2279       /* if we have a packet that we already pushed or considered dropped, pop it
2280        * off and get the next packet */
2281       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2282           seqnum, next_seqnum);
2283       outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2284       gst_buffer_unref (outbuf);
2285       goto again;
2286     } else {
2287       /* the chain function has scheduled timers to request retransmission or
2288        * when to consider the packet lost, wait for that */
2289       GST_DEBUG_OBJECT (jitterbuffer,
2290           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2291           next_seqnum, seqnum, gap);
2292       result = GST_FLOW_WAIT;
2293     }
2294   }
2295   return result;
2296
2297 wait:
2298   {
2299     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2300     if (priv->eos)
2301       result = GST_FLOW_EOS;
2302     else
2303       result = GST_FLOW_WAIT;
2304     return result;
2305   }
2306 }
2307
2308 /* the timeout for when we expected a packet expired */
2309 static gboolean
2310 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2311     GstClockTime now)
2312 {
2313   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2314   GstEvent *event;
2315
2316   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2317
2318   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2319       gst_structure_new ("GstRTPRetransmissionRequest",
2320           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2321           "running-time", G_TYPE_UINT64, timer->rtx_base,
2322           "delay", G_TYPE_UINT,
2323           GST_TIME_AS_MSECONDS (timer->rtx_delay + timer->rtx_retry),
2324           "frequency", G_TYPE_UINT, priv->rtx_retry_timeout, "period",
2325           G_TYPE_UINT, priv->rtx_retry_period, "deadline", G_TYPE_UINT,
2326           priv->latency_ms, "packet-spacing", G_TYPE_UINT64,
2327           priv->packet_spacing, NULL));
2328
2329   JBUF_UNLOCK (priv);
2330   gst_pad_push_event (priv->sinkpad, event);
2331   JBUF_LOCK (priv);
2332
2333   /* calculate the timeout for the next retransmission attempt */
2334   timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2335   if (timer->rtx_retry + timer->rtx_delay >
2336       (priv->rtx_retry_period * GST_MSECOND)) {
2337     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2338     /* too many retransmission request, we now convert the timer
2339      * to a lost timer */
2340     timer->type = TIMER_TYPE_LOST;
2341     timer->rtx_delay = 0;
2342     timer->rtx_retry = 0;
2343   }
2344   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2345       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2346
2347   return FALSE;
2348 }
2349
2350 /* a packet is lost */
2351 static gboolean
2352 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2353     GstClockTime now)
2354 {
2355   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2356   GstClockTime duration, timestamp;
2357   guint seqnum, num;
2358   gboolean late;
2359
2360   seqnum = timer->seqnum;
2361   timestamp = apply_offset (jitterbuffer, timer->timeout);
2362   duration = timer->duration;
2363   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2364     duration = priv->packet_spacing;
2365   num = MAX (timer->num, 1);
2366   late = timer->num > 0;
2367
2368   /* remove timer now */
2369   remove_timer (jitterbuffer, timer);
2370   JBUF_SIGNAL_EVENT (priv);
2371
2372   send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
2373
2374   return TRUE;
2375 }
2376
2377 static gboolean
2378 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2379     GstClockTime now)
2380 {
2381   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2382
2383   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2384   remove_timer (jitterbuffer, timer);
2385   JBUF_SIGNAL_EVENT (priv);
2386
2387   return TRUE;
2388 }
2389
2390 static gboolean
2391 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2392     GstClockTime now)
2393 {
2394   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2395
2396   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2397
2398   priv->next_seqnum = timer->seqnum;
2399   remove_timer (jitterbuffer, timer);
2400   JBUF_SIGNAL_EVENT (priv);
2401
2402   return TRUE;
2403 }
2404
2405 static gboolean
2406 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2407     GstClockTime now)
2408 {
2409   gboolean removed = FALSE;
2410
2411   switch (timer->type) {
2412     case TIMER_TYPE_EXPECTED:
2413       removed = do_expected_timeout (jitterbuffer, timer, now);
2414       break;
2415     case TIMER_TYPE_LOST:
2416       removed = do_lost_timeout (jitterbuffer, timer, now);
2417       break;
2418     case TIMER_TYPE_DEADLINE:
2419       removed = do_deadline_timeout (jitterbuffer, timer, now);
2420       break;
2421     case TIMER_TYPE_EOS:
2422       removed = do_eos_timeout (jitterbuffer, timer, now);
2423       break;
2424   }
2425   return removed;
2426 }
2427
2428 /* called when we need to wait for the next timeout.
2429  *
2430  * We loop over the array of recorded timeouts and wait for the earliest one.
2431  * When it timed out, do the logic associated with the timer.
2432  *
2433  * If there are no timers, we wait on a gcond until something new happens.
2434  */
2435 static void
2436 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2437 {
2438   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2439   GstClockTime now = 0;
2440
2441   JBUF_LOCK (priv);
2442   while (priv->timer_running) {
2443     TimerData *timer = NULL;
2444     GstClockTime timer_timeout = -1;
2445     gint i, len;
2446
2447     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
2448         GST_TIME_ARGS (now));
2449
2450     len = priv->timers->len;
2451     for (i = 0; i < len; i++) {
2452       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2453       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2454
2455       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2456           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2457
2458       /* no timestamp, timeout immeditately */
2459       if (test_timeout == -1 || test_timeout <= now) {
2460         if (do_timeout (jitterbuffer, test, now))
2461           len--;
2462         i--;
2463       } else if (timer == NULL || test_timeout < timer_timeout) {
2464         /* find the smallest timeout */
2465         timer = test;
2466         timer_timeout = test_timeout;
2467       }
2468     }
2469     if (timer) {
2470       GstClock *clock;
2471       GstClockTime sync_time;
2472       GstClockID id;
2473       GstClockReturn ret;
2474       GstClockTimeDiff clock_jitter;
2475
2476       /* check here, do_timeout could have released the lock */
2477       if (!priv->timer_running)
2478         break;
2479
2480       GST_OBJECT_LOCK (jitterbuffer);
2481       clock = GST_ELEMENT_CLOCK (jitterbuffer);
2482       if (!clock) {
2483         GST_OBJECT_UNLOCK (jitterbuffer);
2484         /* let's just push if there is no clock */
2485         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2486         now = timer_timeout;
2487         continue;
2488       }
2489
2490       /* prepare for sync against clock */
2491       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2492       /* add latency of peer to get input time */
2493       sync_time += priv->peer_latency;
2494
2495       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2496           " with sync time %" GST_TIME_FORMAT,
2497           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2498
2499       /* create an entry for the clock */
2500       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2501       priv->timer_timeout = timer_timeout;
2502       priv->timer_seqnum = timer->seqnum;
2503       GST_OBJECT_UNLOCK (jitterbuffer);
2504
2505       /* release the lock so that the other end can push stuff or unlock */
2506       JBUF_UNLOCK (priv);
2507
2508       ret = gst_clock_id_wait (id, &clock_jitter);
2509
2510       JBUF_LOCK (priv);
2511       if (!priv->timer_running)
2512         break;
2513
2514       if (ret != GST_CLOCK_UNSCHEDULED) {
2515         now = timer_timeout + MAX (clock_jitter, 0);
2516         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2517             ret, priv->timer_seqnum, clock_jitter);
2518       } else {
2519         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
2520       }
2521       /* and free the entry */
2522       gst_clock_id_unref (id);
2523       priv->clock_id = NULL;
2524     } else {
2525       /* no timers, wait for activity */
2526       GST_DEBUG_OBJECT (jitterbuffer, "waiting");
2527       JBUF_WAIT_TIMER (priv);
2528       GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
2529     }
2530   }
2531   JBUF_UNLOCK (priv);
2532
2533   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2534   return;
2535 }
2536
2537 /*
2538  * This funcion implements the main pushing loop on the source pad.
2539  *
2540  * It first tries to push as many buffers as possible. If there is a seqnum
2541  * mismatch, we wait for the next timeouts.
2542  */
2543 static void
2544 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2545 {
2546   GstRtpJitterBufferPrivate *priv;
2547   GstFlowReturn result;
2548
2549   priv = jitterbuffer->priv;
2550
2551   JBUF_LOCK_CHECK (priv, flushing);
2552   do {
2553     result = handle_next_buffer (jitterbuffer);
2554     if (G_LIKELY (result == GST_FLOW_WAIT)) {
2555       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
2556       /* now wait for the next event */
2557       JBUF_WAIT_EVENT (priv, flushing);
2558       GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
2559       result = GST_FLOW_OK;
2560     }
2561   }
2562   while (result == GST_FLOW_OK);
2563   JBUF_UNLOCK (priv);
2564
2565   /* if we get here we need to pause */
2566   goto pause;
2567
2568   /* ERRORS */
2569 flushing:
2570   {
2571     result = priv->srcresult;
2572     JBUF_UNLOCK (priv);
2573     goto pause;
2574   }
2575 pause:
2576   {
2577     const gchar *reason = gst_flow_get_name (result);
2578     GstEvent *event;
2579
2580     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2581     gst_pad_pause_task (priv->srcpad);
2582     if (result == GST_FLOW_EOS) {
2583       event = gst_event_new_eos ();
2584       gst_pad_push_event (priv->srcpad, event);
2585     }
2586     return;
2587   }
2588 }
2589
2590 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2591  * some sanity checks and then emit the handle-sync signal with the parameters.
2592  * This function must be called with the LOCK */
2593 static void
2594 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2595 {
2596   GstRtpJitterBufferPrivate *priv;
2597   guint64 base_rtptime, base_time;
2598   guint32 clock_rate;
2599   guint64 last_rtptime;
2600   guint64 clock_base;
2601   guint64 ext_rtptime, diff;
2602   gboolean drop = FALSE;
2603
2604   priv = jitterbuffer->priv;
2605
2606   /* get the last values from the jitterbuffer */
2607   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2608       &clock_rate, &last_rtptime);
2609
2610   clock_base = priv->clock_base;
2611   ext_rtptime = priv->ext_rtptime;
2612
2613   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2614       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2615       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2616       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2617
2618   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2619     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2620     drop = TRUE;
2621   } else {
2622     /* we can't accept anything that happened before we did the last resync */
2623     if (base_rtptime > ext_rtptime) {
2624       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2625       drop = TRUE;
2626     } else {
2627       /* the SR RTP timestamp must be something close to what we last observed
2628        * in the jitterbuffer */
2629       if (ext_rtptime > last_rtptime) {
2630         /* check how far ahead it is to our RTP timestamps */
2631         diff = ext_rtptime - last_rtptime;
2632         /* if bigger than 1 second, we drop it */
2633         if (diff > clock_rate) {
2634           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2635           /* should drop this, but some RTSP servers end up with bogus
2636            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2637            * so still trigger rptbin sync but invalidate RTCP data
2638            * (sync might use other methods) */
2639           ext_rtptime = -1;
2640         }
2641         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2642             G_GUINT64_FORMAT, last_rtptime, diff);
2643       }
2644     }
2645   }
2646
2647   if (!drop) {
2648     GstStructure *s;
2649
2650     s = gst_structure_new ("application/x-rtp-sync",
2651         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2652         "base-time", G_TYPE_UINT64, base_time,
2653         "clock-rate", G_TYPE_UINT, clock_rate,
2654         "clock-base", G_TYPE_UINT64, clock_base,
2655         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2656         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2657
2658     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2659     gst_buffer_replace (&priv->last_sr, NULL);
2660     JBUF_UNLOCK (priv);
2661     g_signal_emit (jitterbuffer,
2662         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2663     JBUF_LOCK (priv);
2664     gst_structure_free (s);
2665   } else {
2666     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2667   }
2668 }
2669
2670 static GstFlowReturn
2671 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2672     GstBuffer * buffer)
2673 {
2674   GstRtpJitterBuffer *jitterbuffer;
2675   GstRtpJitterBufferPrivate *priv;
2676   GstFlowReturn ret = GST_FLOW_OK;
2677   guint32 ssrc;
2678   GstRTCPPacket packet;
2679   guint64 ext_rtptime;
2680   guint32 rtptime;
2681   GstRTCPBuffer rtcp = { NULL, };
2682
2683   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2684
2685   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2686     goto invalid_buffer;
2687
2688   priv = jitterbuffer->priv;
2689
2690   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2691
2692   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2693     goto empty_buffer;
2694
2695   /* first packet must be SR or RR or else the validate would have failed */
2696   switch (gst_rtcp_packet_get_type (&packet)) {
2697     case GST_RTCP_TYPE_SR:
2698       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2699           NULL, NULL);
2700       break;
2701     default:
2702       goto ignore_buffer;
2703   }
2704   gst_rtcp_buffer_unmap (&rtcp);
2705
2706   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2707
2708   JBUF_LOCK (priv);
2709   /* convert the RTP timestamp to our extended timestamp, using the same offset
2710    * we used in the jitterbuffer */
2711   ext_rtptime = priv->jbuf->ext_rtptime;
2712   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2713
2714   priv->ext_rtptime = ext_rtptime;
2715   gst_buffer_replace (&priv->last_sr, buffer);
2716
2717   do_handle_sync (jitterbuffer);
2718   JBUF_UNLOCK (priv);
2719
2720 done:
2721   gst_buffer_unref (buffer);
2722
2723   return ret;
2724
2725 invalid_buffer:
2726   {
2727     /* this is not fatal but should be filtered earlier */
2728     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2729         ("Received invalid RTCP payload, dropping"));
2730     ret = GST_FLOW_OK;
2731     goto done;
2732   }
2733 empty_buffer:
2734   {
2735     /* this is not fatal but should be filtered earlier */
2736     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2737         ("Received empty RTCP payload, dropping"));
2738     gst_rtcp_buffer_unmap (&rtcp);
2739     ret = GST_FLOW_OK;
2740     goto done;
2741   }
2742 ignore_buffer:
2743   {
2744     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2745     gst_rtcp_buffer_unmap (&rtcp);
2746     ret = GST_FLOW_OK;
2747     goto done;
2748   }
2749 }
2750
2751 static gboolean
2752 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2753     GstQuery * query)
2754 {
2755   gboolean res = FALSE;
2756
2757   switch (GST_QUERY_TYPE (query)) {
2758     case GST_QUERY_CAPS:
2759     {
2760       GstCaps *filter, *caps;
2761
2762       gst_query_parse_caps (query, &filter);
2763       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2764       gst_query_set_caps_result (query, caps);
2765       gst_caps_unref (caps);
2766       res = TRUE;
2767       break;
2768     }
2769     default:
2770       if (GST_QUERY_IS_SERIALIZED (query)) {
2771         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2772         res = FALSE;
2773       } else {
2774         res = gst_pad_query_default (pad, parent, query);
2775       }
2776       break;
2777   }
2778   return res;
2779 }
2780
2781 static gboolean
2782 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2783     GstQuery * query)
2784 {
2785   GstRtpJitterBuffer *jitterbuffer;
2786   GstRtpJitterBufferPrivate *priv;
2787   gboolean res = FALSE;
2788
2789   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2790   priv = jitterbuffer->priv;
2791
2792   switch (GST_QUERY_TYPE (query)) {
2793     case GST_QUERY_LATENCY:
2794     {
2795       /* We need to send the query upstream and add the returned latency to our
2796        * own */
2797       GstClockTime min_latency, max_latency;
2798       gboolean us_live;
2799       GstClockTime our_latency;
2800
2801       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2802         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2803
2804         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2805             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2806             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2807
2808         /* store this so that we can safely sync on the peer buffers. */
2809         JBUF_LOCK (priv);
2810         priv->peer_latency = min_latency;
2811         our_latency = priv->latency_ns;
2812         JBUF_UNLOCK (priv);
2813
2814         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2815             GST_TIME_ARGS (our_latency));
2816
2817         /* we add some latency but can buffer an infinite amount of time */
2818         min_latency += our_latency;
2819         max_latency = -1;
2820
2821         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2822             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2823             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2824
2825         gst_query_set_latency (query, TRUE, min_latency, max_latency);
2826       }
2827       break;
2828     }
2829     case GST_QUERY_POSITION:
2830     {
2831       GstClockTime start, last_out;
2832       GstFormat fmt;
2833
2834       gst_query_parse_position (query, &fmt, NULL);
2835       if (fmt != GST_FORMAT_TIME) {
2836         res = gst_pad_query_default (pad, parent, query);
2837         break;
2838       }
2839
2840       JBUF_LOCK (priv);
2841       start = priv->npt_start;
2842       last_out = priv->last_out_time;
2843       JBUF_UNLOCK (priv);
2844
2845       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
2846           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
2847           GST_TIME_ARGS (last_out));
2848
2849       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
2850         /* bring 0-based outgoing time to stream time */
2851         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
2852         res = TRUE;
2853       } else {
2854         res = gst_pad_query_default (pad, parent, query);
2855       }
2856       break;
2857     }
2858     case GST_QUERY_CAPS:
2859     {
2860       GstCaps *filter, *caps;
2861
2862       gst_query_parse_caps (query, &filter);
2863       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2864       gst_query_set_caps_result (query, caps);
2865       gst_caps_unref (caps);
2866       res = TRUE;
2867       break;
2868     }
2869     default:
2870       res = gst_pad_query_default (pad, parent, query);
2871       break;
2872   }
2873
2874   return res;
2875 }
2876
2877 static void
2878 gst_rtp_jitter_buffer_set_property (GObject * object,
2879     guint prop_id, const GValue * value, GParamSpec * pspec)
2880 {
2881   GstRtpJitterBuffer *jitterbuffer;
2882   GstRtpJitterBufferPrivate *priv;
2883
2884   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2885   priv = jitterbuffer->priv;
2886
2887   switch (prop_id) {
2888     case PROP_LATENCY:
2889     {
2890       guint new_latency, old_latency;
2891
2892       new_latency = g_value_get_uint (value);
2893
2894       JBUF_LOCK (priv);
2895       old_latency = priv->latency_ms;
2896       priv->latency_ms = new_latency;
2897       priv->latency_ns = priv->latency_ms * GST_MSECOND;
2898       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
2899       JBUF_UNLOCK (priv);
2900
2901       /* post message if latency changed, this will inform the parent pipeline
2902        * that a latency reconfiguration is possible/needed. */
2903       if (new_latency != old_latency) {
2904         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
2905             GST_TIME_ARGS (new_latency * GST_MSECOND));
2906
2907         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
2908             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
2909       }
2910       break;
2911     }
2912     case PROP_DROP_ON_LATENCY:
2913       JBUF_LOCK (priv);
2914       priv->drop_on_latency = g_value_get_boolean (value);
2915       JBUF_UNLOCK (priv);
2916       break;
2917     case PROP_TS_OFFSET:
2918       JBUF_LOCK (priv);
2919       priv->ts_offset = g_value_get_int64 (value);
2920       priv->ts_discont = TRUE;
2921       JBUF_UNLOCK (priv);
2922       break;
2923     case PROP_DO_LOST:
2924       JBUF_LOCK (priv);
2925       priv->do_lost = g_value_get_boolean (value);
2926       JBUF_UNLOCK (priv);
2927       break;
2928     case PROP_MODE:
2929       JBUF_LOCK (priv);
2930       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
2931       JBUF_UNLOCK (priv);
2932       break;
2933     case PROP_DO_RETRANSMISSION:
2934       JBUF_LOCK (priv);
2935       priv->do_retransmission = g_value_get_boolean (value);
2936       JBUF_UNLOCK (priv);
2937       break;
2938     case PROP_RTX_DELAY:
2939       JBUF_LOCK (priv);
2940       priv->rtx_delay = g_value_get_int (value);
2941       JBUF_UNLOCK (priv);
2942       break;
2943     case PROP_RTX_DELAY_REORDER:
2944       JBUF_LOCK (priv);
2945       priv->rtx_delay_reorder = g_value_get_int (value);
2946       JBUF_UNLOCK (priv);
2947       break;
2948     case PROP_RTX_RETRY_TIMEOUT:
2949       JBUF_LOCK (priv);
2950       priv->rtx_retry_timeout = g_value_get_int (value);
2951       JBUF_UNLOCK (priv);
2952       break;
2953     case PROP_RTX_RETRY_PERIOD:
2954       JBUF_LOCK (priv);
2955       priv->rtx_retry_period = g_value_get_int (value);
2956       JBUF_UNLOCK (priv);
2957       break;
2958     default:
2959       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2960       break;
2961   }
2962 }
2963
2964 static void
2965 gst_rtp_jitter_buffer_get_property (GObject * object,
2966     guint prop_id, GValue * value, GParamSpec * pspec)
2967 {
2968   GstRtpJitterBuffer *jitterbuffer;
2969   GstRtpJitterBufferPrivate *priv;
2970
2971   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
2972   priv = jitterbuffer->priv;
2973
2974   switch (prop_id) {
2975     case PROP_LATENCY:
2976       JBUF_LOCK (priv);
2977       g_value_set_uint (value, priv->latency_ms);
2978       JBUF_UNLOCK (priv);
2979       break;
2980     case PROP_DROP_ON_LATENCY:
2981       JBUF_LOCK (priv);
2982       g_value_set_boolean (value, priv->drop_on_latency);
2983       JBUF_UNLOCK (priv);
2984       break;
2985     case PROP_TS_OFFSET:
2986       JBUF_LOCK (priv);
2987       g_value_set_int64 (value, priv->ts_offset);
2988       JBUF_UNLOCK (priv);
2989       break;
2990     case PROP_DO_LOST:
2991       JBUF_LOCK (priv);
2992       g_value_set_boolean (value, priv->do_lost);
2993       JBUF_UNLOCK (priv);
2994       break;
2995     case PROP_MODE:
2996       JBUF_LOCK (priv);
2997       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
2998       JBUF_UNLOCK (priv);
2999       break;
3000     case PROP_PERCENT:
3001     {
3002       gint percent;
3003
3004       JBUF_LOCK (priv);
3005       if (priv->srcresult != GST_FLOW_OK)
3006         percent = 100;
3007       else
3008         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3009
3010       g_value_set_int (value, percent);
3011       JBUF_UNLOCK (priv);
3012       break;
3013     }
3014     case PROP_DO_RETRANSMISSION:
3015       JBUF_LOCK (priv);
3016       g_value_set_boolean (value, priv->do_retransmission);
3017       JBUF_UNLOCK (priv);
3018       break;
3019     case PROP_RTX_DELAY:
3020       JBUF_LOCK (priv);
3021       g_value_set_int (value, priv->rtx_delay);
3022       JBUF_UNLOCK (priv);
3023       break;
3024     case PROP_RTX_DELAY_REORDER:
3025       JBUF_LOCK (priv);
3026       g_value_set_int (value, priv->rtx_delay_reorder);
3027       JBUF_UNLOCK (priv);
3028       break;
3029     case PROP_RTX_RETRY_TIMEOUT:
3030       JBUF_LOCK (priv);
3031       g_value_set_int (value, priv->rtx_retry_timeout);
3032       JBUF_UNLOCK (priv);
3033       break;
3034     case PROP_RTX_RETRY_PERIOD:
3035       JBUF_LOCK (priv);
3036       g_value_set_int (value, priv->rtx_retry_period);
3037       JBUF_UNLOCK (priv);
3038       break;
3039     default:
3040       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3041       break;
3042   }
3043 }