rtpjitterbuffer: When request retransmissions for future packets, consider the packet...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  */
95
96 #ifdef HAVE_CONFIG_H
97 #include "config.h"
98 #endif
99
100 #include <stdlib.h>
101 #include <string.h>
102 #include <gst/rtp/gstrtpbuffer.h>
103
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
107
108 #include <gst/glib-compat-private.h>
109
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
112
113 /* RTPJitterBuffer signals and args */
114 enum
115 {
116   SIGNAL_REQUEST_PT_MAP,
117   SIGNAL_CLEAR_PT_MAP,
118   SIGNAL_HANDLE_SYNC,
119   SIGNAL_ON_NPT_STOP,
120   SIGNAL_SET_ACTIVE,
121   LAST_SIGNAL
122 };
123
124 #define DEFAULT_LATENCY_MS          200
125 #define DEFAULT_DROP_ON_LATENCY     FALSE
126 #define DEFAULT_TS_OFFSET           0
127 #define DEFAULT_DO_LOST             FALSE
128 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT             0
130 #define DEFAULT_DO_RETRANSMISSION   FALSE
131 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
132 #define DEFAULT_RTX_DELAY           -1
133 #define DEFAULT_RTX_MIN_DELAY       0
134 #define DEFAULT_RTX_DELAY_REORDER   3
135 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
136 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
137 #define DEFAULT_RTX_RETRY_PERIOD    -1
138 #define DEFAULT_RTX_MAX_RETRIES    -1
139
140 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
141 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
142
143 enum
144 {
145   PROP_0,
146   PROP_LATENCY,
147   PROP_DROP_ON_LATENCY,
148   PROP_TS_OFFSET,
149   PROP_DO_LOST,
150   PROP_MODE,
151   PROP_PERCENT,
152   PROP_DO_RETRANSMISSION,
153   PROP_RTX_NEXT_SEQNUM,
154   PROP_RTX_DELAY,
155   PROP_RTX_MIN_DELAY,
156   PROP_RTX_DELAY_REORDER,
157   PROP_RTX_RETRY_TIMEOUT,
158   PROP_RTX_MIN_RETRY_TIMEOUT,
159   PROP_RTX_RETRY_PERIOD,
160   PROP_RTX_MAX_RETRIES,
161   PROP_STATS,
162   PROP_LAST
163 };
164
165 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
166
167 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
168   JBUF_LOCK (priv);                                   \
169   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
170     goto label;                                       \
171 } G_STMT_END
172 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
173
174 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
175   GST_DEBUG ("waiting timer");                            \
176   (priv)->waiting_timer = TRUE;                           \
177   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
178   (priv)->waiting_timer = FALSE;                          \
179   GST_DEBUG ("waiting timer done");                       \
180 } G_STMT_END
181 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
182   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
183     GST_DEBUG ("signal timer");                           \
184     g_cond_signal (&(priv)->jbuf_timer);                  \
185   }                                                       \
186 } G_STMT_END
187
188 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
189   GST_DEBUG ("waiting event");                           \
190   (priv)->waiting_event = TRUE;                          \
191   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
192   (priv)->waiting_event = FALSE;                         \
193   GST_DEBUG ("waiting event done");                      \
194   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
195     goto label;                                          \
196 } G_STMT_END
197 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
198   if (G_UNLIKELY ((priv)->waiting_event)) {              \
199     GST_DEBUG ("signal event");                          \
200     g_cond_signal (&(priv)->jbuf_event);                 \
201   }                                                      \
202 } G_STMT_END
203
204 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
205   GST_DEBUG ("waiting query");                           \
206   (priv)->waiting_query = TRUE;                          \
207   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
208   (priv)->waiting_query = FALSE;                         \
209   GST_DEBUG ("waiting query done");                      \
210   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
211     goto label;                                          \
212 } G_STMT_END
213 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
214   (priv)->last_query = res;                              \
215   if (G_UNLIKELY ((priv)->waiting_query)) {              \
216     GST_DEBUG ("signal query");                          \
217     g_cond_signal (&(priv)->jbuf_query);                 \
218   }                                                      \
219 } G_STMT_END
220
221
222 struct _GstRtpJitterBufferPrivate
223 {
224   GstPad *sinkpad, *srcpad;
225   GstPad *rtcpsinkpad;
226
227   RTPJitterBuffer *jbuf;
228   GMutex jbuf_lock;
229   gboolean waiting_timer;
230   GCond jbuf_timer;
231   gboolean waiting_event;
232   GCond jbuf_event;
233   gboolean waiting_query;
234   GCond jbuf_query;
235   gboolean last_query;
236   gboolean discont;
237   gboolean ts_discont;
238   gboolean active;
239   guint64 out_offset;
240
241   gboolean timer_running;
242   GThread *timer_thread;
243
244   /* properties */
245   guint latency_ms;
246   guint64 latency_ns;
247   gboolean drop_on_latency;
248   gint64 ts_offset;
249   gboolean do_lost;
250   gboolean do_retransmission;
251   gboolean rtx_next_seqnum;
252   gint rtx_delay;
253   guint rtx_min_delay;
254   gint rtx_delay_reorder;
255   gint rtx_retry_timeout;
256   gint rtx_min_retry_timeout;
257   gint rtx_retry_period;
258   gint rtx_max_retries;
259
260   /* the last seqnum we pushed out */
261   guint32 last_popped_seqnum;
262   /* the next expected seqnum we push */
263   guint32 next_seqnum;
264   /* seqnum-base, if known */
265   guint32 seqnum_base;
266   /* last output time */
267   GstClockTime last_out_time;
268   /* last valid input timestamp and rtptime pair */
269   GstClockTime ips_dts;
270   guint64 ips_rtptime;
271   GstClockTime packet_spacing;
272
273   /* the next expected seqnum we receive */
274   GstClockTime last_in_dts;
275   guint32 last_in_seqnum;
276   guint32 next_in_seqnum;
277
278   GArray *timers;
279
280   /* start and stop ranges */
281   GstClockTime npt_start;
282   GstClockTime npt_stop;
283   guint64 ext_timestamp;
284   guint64 last_elapsed;
285   guint64 estimated_eos;
286   GstClockID eos_id;
287
288   /* state */
289   gboolean eos;
290   guint last_percent;
291
292   /* clock rate and rtp timestamp offset */
293   gint last_pt;
294   gint32 clock_rate;
295   gint64 clock_base;
296   gint64 prev_ts_offset;
297
298   /* when we are shutting down */
299   GstFlowReturn srcresult;
300   gboolean blocked;
301
302   /* for sync */
303   GstSegment segment;
304   GstClockID clock_id;
305   GstClockTime timer_timeout;
306   guint16 timer_seqnum;
307   /* the latency of the upstream peer, we have to take this into account when
308    * synchronizing the buffers. */
309   GstClockTime peer_latency;
310   guint64 ext_rtptime;
311   GstBuffer *last_sr;
312
313   /* some accounting */
314   guint64 num_late;
315   guint64 num_duplicates;
316   guint64 num_rtx_requests;
317   guint64 num_rtx_success;
318   guint64 num_rtx_failed;
319   gdouble avg_rtx_num;
320   guint64 avg_rtx_rtt;
321
322   /* for the jitter */
323   GstClockTime last_dts;
324   guint64 last_rtptime;
325   GstClockTime avg_jitter;
326 };
327
328 typedef enum
329 {
330   TIMER_TYPE_EXPECTED,
331   TIMER_TYPE_LOST,
332   TIMER_TYPE_DEADLINE,
333   TIMER_TYPE_EOS
334 } TimerType;
335
336 typedef struct
337 {
338   guint idx;
339   guint16 seqnum;
340   guint num;
341   TimerType type;
342   GstClockTime timeout;
343   GstClockTime duration;
344   GstClockTime rtx_base;
345   GstClockTime rtx_delay;
346   GstClockTime rtx_retry;
347   GstClockTime rtx_last;
348   guint num_rtx_retry;
349 } TimerData;
350
351 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
352   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
353                                 GstRtpJitterBufferPrivate))
354
355 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
356 GST_STATIC_PAD_TEMPLATE ("sink",
357     GST_PAD_SINK,
358     GST_PAD_ALWAYS,
359     GST_STATIC_CAPS ("application/x-rtp"
360         /* "clock-rate = (int) [ 1, 2147483647 ], "
361          * "payload = (int) , "
362          * "encoding-name = (string) "
363          */ )
364     );
365
366 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
367 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
368     GST_PAD_SINK,
369     GST_PAD_REQUEST,
370     GST_STATIC_CAPS ("application/x-rtcp")
371     );
372
373 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
374 GST_STATIC_PAD_TEMPLATE ("src",
375     GST_PAD_SRC,
376     GST_PAD_ALWAYS,
377     GST_STATIC_CAPS ("application/x-rtp"
378         /* "payload = (int) , "
379          * "clock-rate = (int) , "
380          * "encoding-name = (string) "
381          */ )
382     );
383
384 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
385
386 #define gst_rtp_jitter_buffer_parent_class parent_class
387 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
388
389 /* object overrides */
390 static void gst_rtp_jitter_buffer_set_property (GObject * object,
391     guint prop_id, const GValue * value, GParamSpec * pspec);
392 static void gst_rtp_jitter_buffer_get_property (GObject * object,
393     guint prop_id, GValue * value, GParamSpec * pspec);
394 static void gst_rtp_jitter_buffer_finalize (GObject * object);
395
396 /* element overrides */
397 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
398     * element, GstStateChange transition);
399 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
400     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
401 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
402     GstPad * pad);
403 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
404
405 /* pad overrides */
406 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
407 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
408     GstObject * parent);
409
410 /* sinkpad overrides */
411 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
412     GstObject * parent, GstEvent * event);
413 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
414     GstObject * parent, GstBuffer * buffer);
415
416 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
417     GstObject * parent, GstEvent * event);
418 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
419     GstObject * parent, GstBuffer * buffer);
420
421 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
422     GstObject * parent, GstQuery * query);
423
424 /* srcpad overrides */
425 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
426     GstObject * parent, GstEvent * event);
427 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
428     GstObject * parent, GstPadMode mode, gboolean active);
429 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
430 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
431     GstObject * parent, GstQuery * query);
432
433 static void
434 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
435 static GstClockTime
436 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
437     gboolean active, guint64 base_time);
438 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
439
440 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
441 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
442
443 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
444
445 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
446     jitterbuffer);
447
448 static void
449 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
450 {
451   GObjectClass *gobject_class;
452   GstElementClass *gstelement_class;
453
454   gobject_class = (GObjectClass *) klass;
455   gstelement_class = (GstElementClass *) klass;
456
457   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
458
459   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
460
461   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
462   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
463
464   /**
465    * GstRtpJitterBuffer:latency:
466    *
467    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
468    * for at most this time.
469    */
470   g_object_class_install_property (gobject_class, PROP_LATENCY,
471       g_param_spec_uint ("latency", "Buffer latency in ms",
472           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474   /**
475    * GstRtpJitterBuffer:drop-on-latency:
476    *
477    * Drop oldest buffers when the queue is completely filled.
478    */
479   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
480       g_param_spec_boolean ("drop-on-latency",
481           "Drop buffers when maximum latency is reached",
482           "Tells the jitterbuffer to never exceed the given latency in size",
483           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
484   /**
485    * GstRtpJitterBuffer:ts-offset:
486    *
487    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
488    * This is mainly used to ensure interstream synchronisation.
489    */
490   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
491       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
492           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
493           G_MAXINT64, DEFAULT_TS_OFFSET,
494           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
495
496   /**
497    * GstRtpJitterBuffer:do-lost:
498    *
499    * Send out a GstRTPPacketLost event downstream when a packet is considered
500    * lost.
501    */
502   g_object_class_install_property (gobject_class, PROP_DO_LOST,
503       g_param_spec_boolean ("do-lost", "Do Lost",
504           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
505           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
506
507   /**
508    * GstRtpJitterBuffer:mode:
509    *
510    * Control the buffering and timestamping mode used by the jitterbuffer.
511    */
512   g_object_class_install_property (gobject_class, PROP_MODE,
513       g_param_spec_enum ("mode", "Mode",
514           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
515           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
516   /**
517    * GstRtpJitterBuffer:percent:
518    *
519    * The percent of the jitterbuffer that is filled.
520    */
521   g_object_class_install_property (gobject_class, PROP_PERCENT,
522       g_param_spec_int ("percent", "percent",
523           "The buffer filled percent", 0, 100,
524           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
525   /**
526    * GstRtpJitterBuffer:do-retransmission:
527    *
528    * Send out a GstRTPRetransmission event upstream when a packet is considered
529    * late and should be retransmitted.
530    *
531    * Since: 1.2
532    */
533   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
534       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
535           "Send retransmission events upstream when a packet is late",
536           DEFAULT_DO_RETRANSMISSION,
537           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
538
539   /**
540    * GstRtpJitterBuffer:rtx-next-seqnum
541    *
542    * Estimate when the next packet should arrive and schedule a retransmission
543    * request for it.
544    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
545    * for packet N+1. So it will be requested if it does not arrive at the expected time.
546    * The expected time is calculated using the dts of N and the packet spacing.
547    *
548    * Since: 1.6
549    */
550   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
551       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
552           "Estimate when the next packet should arrive and schedule a "
553           "retransmission request for it.",
554           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
555
556   /**
557    * GstRtpJitterBuffer:rtx-delay:
558    *
559    * When a packet did not arrive at the expected time, wait this extra amount
560    * of time before sending a retransmission event.
561    *
562    * When -1 is used, the max jitter will be used as extra delay.
563    *
564    * Since: 1.2
565    */
566   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
567       g_param_spec_int ("rtx-delay", "RTX Delay",
568           "Extra time in ms to wait before sending retransmission "
569           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
570           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
571
572   /**
573    * GstRtpJitterBuffer:rtx-min-delay:
574    *
575    * When a packet did not arrive at the expected time, wait at least this extra amount
576    * of time before sending a retransmission event.
577    *
578    * Since: 1.6
579    */
580   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
581       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
582           "Minimum time in ms to wait before sending retransmission "
583           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
584           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
585   /**
586    * GstRtpJitterBuffer:rtx-delay-reorder:
587    *
588    * Assume that a retransmission event should be sent when we see
589    * this much packet reordering.
590    *
591    * When -1 is used, the value will be estimated based on observed packet
592    * reordering.
593    *
594    * Since: 1.2
595    */
596   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
597       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
598           "Sending retransmission event when this much reordering (-1 automatic)",
599           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
600           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601   /**
602    * GstRtpJitterBuffer::rtx-retry-timeout:
603    *
604    * When no packet has been received after sending a retransmission event
605    * for this time, retry sending a retransmission event.
606    *
607    * When -1 is used, the value will be estimated based on observed round
608    * trip time.
609    *
610    * Since: 1.2
611    */
612   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
613       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
614           "Retry sending a transmission event after this timeout in "
615           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
616           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
617   /**
618    * GstRtpJitterBuffer::rtx-min-retry-timeout:
619    *
620    * The minimum amount of time between retry timeouts. When
621    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
622    * minimum interval between retry timeouts.
623    *
624    * When -1 is used, the value will be estimated based on the
625    * packet spacing.
626    *
627    * Since: 1.6
628    */
629   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
630       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
631           "Minimum timeout between sending a transmission event in "
632           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
633           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
634   /**
635    * GstRtpJitterBuffer:rtx-retry-period:
636    *
637    * The amount of time to try to get a retransmission.
638    *
639    * When -1 is used, the value will be estimated based on the jitterbuffer
640    * latency and the observed round trip time.
641    *
642    * Since: 1.2
643    */
644   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
645       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
646           "Try to get a retransmission for this many ms "
647           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
648           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
649   /**
650    * GstRtpJitterBuffer:rtx-max-retries:
651    *
652    * The maximum number of retries to request a retransmission.
653    *
654    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
655    * When -1 is used, the number of retransmission request will not be limited.
656    *
657    * Since: 1.6
658    */
659   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
660       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
661           "The maximum number of retries to request a retransmission. "
662           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
663           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
664   /**
665    * GstRtpJitterBuffer:stats:
666    *
667    * Various jitterbuffer statistics. This property returns a GstStructure
668    * with name application/x-rtp-jitterbuffer-stats with the following fields:
669    *
670    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
671    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
672    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
673    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
674    *
675    * Since: 1.4
676    */
677   g_object_class_install_property (gobject_class, PROP_STATS,
678       g_param_spec_boxed ("stats", "Statistics",
679           "Various statistics", GST_TYPE_STRUCTURE,
680           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
681
682   /**
683    * GstRtpJitterBuffer::request-pt-map:
684    * @buffer: the object which received the signal
685    * @pt: the pt
686    *
687    * Request the payload type as #GstCaps for @pt.
688    */
689   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
690       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
691       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
692           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
693       GST_TYPE_CAPS, 1, G_TYPE_UINT);
694   /**
695    * GstRtpJitterBuffer::handle-sync:
696    * @buffer: the object which received the signal
697    * @struct: a GstStructure containing sync values.
698    *
699    * Be notified of new sync values.
700    */
701   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
702       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
703       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
704           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
705       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
706
707   /**
708    * GstRtpJitterBuffer::on-npt-stop:
709    * @buffer: the object which received the signal
710    *
711    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
712    * the npt-stop position.
713    */
714   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
715       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
716       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
717           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
718       G_TYPE_NONE, 0, G_TYPE_NONE);
719
720   /**
721    * GstRtpJitterBuffer::clear-pt-map:
722    * @buffer: the object which received the signal
723    *
724    * Invalidate the clock-rate as obtained with the
725    * #GstRtpJitterBuffer::request-pt-map signal.
726    */
727   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
728       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
729       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
730       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
731       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
732
733   /**
734    * GstRtpJitterBuffer::set-active:
735    * @buffer: the object which received the signal
736    *
737    * Start pushing out packets with the given base time. This signal is only
738    * useful in buffering mode.
739    *
740    * Returns: the time of the last pushed packet.
741    */
742   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
743       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
744       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
745       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
746       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
747       G_TYPE_UINT64);
748
749   gstelement_class->change_state =
750       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
751   gstelement_class->request_new_pad =
752       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
753   gstelement_class->release_pad =
754       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
755   gstelement_class->provide_clock =
756       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
757
758   gst_element_class_add_pad_template (gstelement_class,
759       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
760   gst_element_class_add_pad_template (gstelement_class,
761       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
762   gst_element_class_add_pad_template (gstelement_class,
763       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
764
765   gst_element_class_set_static_metadata (gstelement_class,
766       "RTP packet jitter-buffer", "Filter/Network/RTP",
767       "A buffer that deals with network jitter and other transmission faults",
768       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
769       "Wim Taymans <wim.taymans@gmail.com>");
770
771   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
772   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
773
774   GST_DEBUG_CATEGORY_INIT
775       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
776 }
777
778 static void
779 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
780 {
781   GstRtpJitterBufferPrivate *priv;
782
783   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
784   jitterbuffer->priv = priv;
785
786   priv->latency_ms = DEFAULT_LATENCY_MS;
787   priv->latency_ns = priv->latency_ms * GST_MSECOND;
788   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
789   priv->do_lost = DEFAULT_DO_LOST;
790   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
791   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
792   priv->rtx_delay = DEFAULT_RTX_DELAY;
793   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
794   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
795   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
796   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
797   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
798   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
799
800   priv->last_dts = -1;
801   priv->last_rtptime = -1;
802   priv->avg_jitter = 0;
803   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
804   priv->jbuf = rtp_jitter_buffer_new ();
805   g_mutex_init (&priv->jbuf_lock);
806   g_cond_init (&priv->jbuf_timer);
807   g_cond_init (&priv->jbuf_event);
808   g_cond_init (&priv->jbuf_query);
809
810   /* reset skew detection initialy */
811   rtp_jitter_buffer_reset_skew (priv->jbuf);
812   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
813   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
814   priv->active = TRUE;
815
816   priv->srcpad =
817       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
818       "src");
819
820   gst_pad_set_activatemode_function (priv->srcpad,
821       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
822   gst_pad_set_query_function (priv->srcpad,
823       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
824   gst_pad_set_event_function (priv->srcpad,
825       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
826
827   priv->sinkpad =
828       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
829       "sink");
830
831   gst_pad_set_chain_function (priv->sinkpad,
832       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
833   gst_pad_set_event_function (priv->sinkpad,
834       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
835   gst_pad_set_query_function (priv->sinkpad,
836       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
837
838   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
839   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
840
841   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
842 }
843
844 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
845
846 #define ITEM_TYPE_BUFFER        0
847 #define ITEM_TYPE_LOST          1
848 #define ITEM_TYPE_EVENT         2
849 #define ITEM_TYPE_QUERY         3
850
851 static RTPJitterBufferItem *
852 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
853     guint seqnum, guint count, guint rtptime)
854 {
855   RTPJitterBufferItem *item;
856
857   item = g_slice_new (RTPJitterBufferItem);
858   item->data = data;
859   item->next = NULL;
860   item->prev = NULL;
861   item->type = type;
862   item->dts = dts;
863   item->pts = pts;
864   item->seqnum = seqnum;
865   item->count = count;
866   item->rtptime = rtptime;
867
868   return item;
869 }
870
871 static void
872 free_item (RTPJitterBufferItem * item)
873 {
874   if (item->data && item->type != ITEM_TYPE_QUERY)
875     gst_mini_object_unref (item->data);
876   g_slice_free (RTPJitterBufferItem, item);
877 }
878
879 static void
880 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
881 {
882   GList **l = user_data;
883
884   if (item->data && item->type == ITEM_TYPE_EVENT
885       && GST_EVENT_IS_STICKY (item->data)) {
886     *l = g_list_prepend (*l, item->data);
887   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
888     gst_mini_object_unref (item->data);
889   }
890   g_slice_free (RTPJitterBufferItem, item);
891 }
892
893 static void
894 gst_rtp_jitter_buffer_finalize (GObject * object)
895 {
896   GstRtpJitterBuffer *jitterbuffer;
897   GstRtpJitterBufferPrivate *priv;
898
899   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
900   priv = jitterbuffer->priv;
901
902   g_array_free (priv->timers, TRUE);
903   g_mutex_clear (&priv->jbuf_lock);
904   g_cond_clear (&priv->jbuf_timer);
905   g_cond_clear (&priv->jbuf_event);
906   g_cond_clear (&priv->jbuf_query);
907
908   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
909   g_object_unref (priv->jbuf);
910
911   G_OBJECT_CLASS (parent_class)->finalize (object);
912 }
913
914 static GstIterator *
915 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
916 {
917   GstRtpJitterBuffer *jitterbuffer;
918   GstPad *otherpad = NULL;
919   GstIterator *it = NULL;
920   GValue val = { 0, };
921
922   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
923
924   if (pad == jitterbuffer->priv->sinkpad) {
925     otherpad = jitterbuffer->priv->srcpad;
926   } else if (pad == jitterbuffer->priv->srcpad) {
927     otherpad = jitterbuffer->priv->sinkpad;
928   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
929     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
930   }
931
932   if (it == NULL) {
933     g_value_init (&val, GST_TYPE_PAD);
934     g_value_set_object (&val, otherpad);
935     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
936     g_value_unset (&val);
937   }
938
939   return it;
940 }
941
942 static GstPad *
943 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
944 {
945   GstRtpJitterBufferPrivate *priv;
946
947   priv = jitterbuffer->priv;
948
949   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
950
951   priv->rtcpsinkpad =
952       gst_pad_new_from_static_template
953       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
954   gst_pad_set_chain_function (priv->rtcpsinkpad,
955       gst_rtp_jitter_buffer_chain_rtcp);
956   gst_pad_set_event_function (priv->rtcpsinkpad,
957       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
958   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
959       gst_rtp_jitter_buffer_iterate_internal_links);
960   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
961   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
962
963   return priv->rtcpsinkpad;
964 }
965
966 static void
967 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
968 {
969   GstRtpJitterBufferPrivate *priv;
970
971   priv = jitterbuffer->priv;
972
973   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
974
975   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
976
977   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
978   priv->rtcpsinkpad = NULL;
979 }
980
981 static GstPad *
982 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
983     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
984 {
985   GstRtpJitterBuffer *jitterbuffer;
986   GstElementClass *klass;
987   GstPad *result;
988   GstRtpJitterBufferPrivate *priv;
989
990   g_return_val_if_fail (templ != NULL, NULL);
991   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
992
993   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
994   priv = jitterbuffer->priv;
995   klass = GST_ELEMENT_GET_CLASS (element);
996
997   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
998
999   /* figure out the template */
1000   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1001     if (priv->rtcpsinkpad != NULL)
1002       goto exists;
1003
1004     result = create_rtcp_sink (jitterbuffer);
1005   } else
1006     goto wrong_template;
1007
1008   return result;
1009
1010   /* ERRORS */
1011 wrong_template:
1012   {
1013     g_warning ("rtpjitterbuffer: this is not our template");
1014     return NULL;
1015   }
1016 exists:
1017   {
1018     g_warning ("rtpjitterbuffer: pad already requested");
1019     return NULL;
1020   }
1021 }
1022
1023 static void
1024 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1025 {
1026   GstRtpJitterBuffer *jitterbuffer;
1027   GstRtpJitterBufferPrivate *priv;
1028
1029   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1030   g_return_if_fail (GST_IS_PAD (pad));
1031
1032   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1033   priv = jitterbuffer->priv;
1034
1035   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1036
1037   if (priv->rtcpsinkpad == pad) {
1038     remove_rtcp_sink (jitterbuffer);
1039   } else
1040     goto wrong_pad;
1041
1042   return;
1043
1044   /* ERRORS */
1045 wrong_pad:
1046   {
1047     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1048     return;
1049   }
1050 }
1051
1052 static GstClock *
1053 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1054 {
1055   return gst_system_clock_obtain ();
1056 }
1057
1058 static void
1059 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1060 {
1061   GstRtpJitterBufferPrivate *priv;
1062
1063   priv = jitterbuffer->priv;
1064
1065   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1066
1067   JBUF_LOCK (priv);
1068   priv->clock_rate = -1;
1069   /* do not clear current content, but refresh state for new arrival */
1070   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1071   rtp_jitter_buffer_reset_skew (priv->jbuf);
1072   JBUF_UNLOCK (priv);
1073 }
1074
1075 static GstClockTime
1076 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1077     guint64 offset)
1078 {
1079   GstRtpJitterBufferPrivate *priv;
1080   GstClockTime last_out;
1081   RTPJitterBufferItem *item;
1082
1083   priv = jbuf->priv;
1084
1085   JBUF_LOCK (priv);
1086   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1087       active, GST_TIME_ARGS (offset));
1088
1089   if (active != priv->active) {
1090     /* add the amount of time spent in paused to the output offset. All
1091      * outgoing buffers will have this offset applied to their timestamps in
1092      * order to make them arrive in time in the sink. */
1093     priv->out_offset = offset;
1094     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1095         GST_TIME_ARGS (priv->out_offset));
1096     priv->active = active;
1097     JBUF_SIGNAL_EVENT (priv);
1098   }
1099   if (!active) {
1100     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1101   }
1102   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1103     /* head buffer timestamp and offset gives our output time */
1104     last_out = item->dts + priv->ts_offset;
1105   } else {
1106     /* use last known time when the buffer is empty */
1107     last_out = priv->last_out_time;
1108   }
1109   JBUF_UNLOCK (priv);
1110
1111   return last_out;
1112 }
1113
1114 static GstCaps *
1115 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1116 {
1117   GstRtpJitterBuffer *jitterbuffer;
1118   GstRtpJitterBufferPrivate *priv;
1119   GstPad *other;
1120   GstCaps *caps;
1121   GstCaps *templ;
1122
1123   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1124   priv = jitterbuffer->priv;
1125
1126   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1127
1128   caps = gst_pad_peer_query_caps (other, filter);
1129
1130   templ = gst_pad_get_pad_template_caps (pad);
1131   if (caps == NULL) {
1132     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1133     caps = templ;
1134   } else {
1135     GstCaps *intersect;
1136
1137     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1138
1139     intersect = gst_caps_intersect (caps, templ);
1140     gst_caps_unref (caps);
1141     gst_caps_unref (templ);
1142
1143     caps = intersect;
1144   }
1145   gst_object_unref (jitterbuffer);
1146
1147   return caps;
1148 }
1149
1150 /*
1151  * Must be called with JBUF_LOCK held
1152  */
1153
1154 static gboolean
1155 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1156     GstCaps * caps)
1157 {
1158   GstRtpJitterBufferPrivate *priv;
1159   GstStructure *caps_struct;
1160   guint val;
1161   GstClockTime tval;
1162
1163   priv = jitterbuffer->priv;
1164
1165   /* first parse the caps */
1166   caps_struct = gst_caps_get_structure (caps, 0);
1167
1168   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1169
1170   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1171    * measure the amount of data in the buffer */
1172   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1173     goto error;
1174
1175   if (priv->clock_rate <= 0)
1176     goto wrong_rate;
1177
1178   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1179
1180   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1181
1182   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1183    * can use this to track the amount of time elapsed on the sender. */
1184   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1185     priv->clock_base = val;
1186   else
1187     priv->clock_base = -1;
1188
1189   priv->ext_timestamp = priv->clock_base;
1190
1191   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1192       priv->clock_base);
1193
1194   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1195     /* first expected seqnum, only update when we didn't have a previous base. */
1196     if (priv->next_in_seqnum == -1)
1197       priv->next_in_seqnum = val;
1198     if (priv->next_seqnum == -1) {
1199       priv->next_seqnum = val;
1200       JBUF_SIGNAL_EVENT (priv);
1201     }
1202     priv->seqnum_base = val;
1203   } else {
1204     priv->seqnum_base = -1;
1205   }
1206
1207   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1208
1209   /* the start and stop times. The seqnum-base corresponds to the start time. We
1210    * will keep track of the seqnums on the output and when we reach the one
1211    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1212   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1213     priv->npt_start = tval;
1214   else
1215     priv->npt_start = 0;
1216
1217   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1218     priv->npt_stop = tval;
1219   else
1220     priv->npt_stop = -1;
1221
1222   GST_DEBUG_OBJECT (jitterbuffer,
1223       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1224       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1225
1226   return TRUE;
1227
1228   /* ERRORS */
1229 error:
1230   {
1231     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1232     return FALSE;
1233   }
1234 wrong_rate:
1235   {
1236     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1237     return FALSE;
1238   }
1239 }
1240
1241 static void
1242 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1243 {
1244   GstRtpJitterBufferPrivate *priv;
1245
1246   priv = jitterbuffer->priv;
1247
1248   JBUF_LOCK (priv);
1249   /* mark ourselves as flushing */
1250   priv->srcresult = GST_FLOW_FLUSHING;
1251   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1252   /* this unblocks any waiting pops on the src pad task */
1253   JBUF_SIGNAL_EVENT (priv);
1254   JBUF_SIGNAL_QUERY (priv, FALSE);
1255   JBUF_UNLOCK (priv);
1256 }
1257
1258 static void
1259 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1260 {
1261   GstRtpJitterBufferPrivate *priv;
1262
1263   priv = jitterbuffer->priv;
1264
1265   JBUF_LOCK (priv);
1266   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1267   /* Mark as non flushing */
1268   priv->srcresult = GST_FLOW_OK;
1269   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1270   priv->last_popped_seqnum = -1;
1271   priv->last_out_time = -1;
1272   priv->next_seqnum = -1;
1273   priv->seqnum_base = -1;
1274   priv->ips_rtptime = -1;
1275   priv->ips_dts = GST_CLOCK_TIME_NONE;
1276   priv->packet_spacing = 0;
1277   priv->next_in_seqnum = -1;
1278   priv->clock_rate = -1;
1279   priv->last_pt = -1;
1280   priv->eos = FALSE;
1281   priv->estimated_eos = -1;
1282   priv->last_elapsed = 0;
1283   priv->ext_timestamp = -1;
1284   priv->avg_jitter = 0;
1285   priv->last_dts = -1;
1286   priv->last_rtptime = -1;
1287   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1288   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1289   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1290   rtp_jitter_buffer_reset_skew (priv->jbuf);
1291   remove_all_timers (jitterbuffer);
1292   JBUF_UNLOCK (priv);
1293 }
1294
1295 static gboolean
1296 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1297     GstPadMode mode, gboolean active)
1298 {
1299   gboolean result;
1300   GstRtpJitterBuffer *jitterbuffer = NULL;
1301
1302   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1303
1304   switch (mode) {
1305     case GST_PAD_MODE_PUSH:
1306       if (active) {
1307         /* allow data processing */
1308         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1309
1310         /* start pushing out buffers */
1311         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1312         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1313             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1314       } else {
1315         /* make sure all data processing stops ASAP */
1316         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1317
1318         /* NOTE this will hardlock if the state change is called from the src pad
1319          * task thread because we will _join() the thread. */
1320         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1321         result = gst_pad_stop_task (pad);
1322       }
1323       break;
1324     default:
1325       result = FALSE;
1326       break;
1327   }
1328   return result;
1329 }
1330
1331 static GstStateChangeReturn
1332 gst_rtp_jitter_buffer_change_state (GstElement * element,
1333     GstStateChange transition)
1334 {
1335   GstRtpJitterBuffer *jitterbuffer;
1336   GstRtpJitterBufferPrivate *priv;
1337   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1338
1339   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1340   priv = jitterbuffer->priv;
1341
1342   switch (transition) {
1343     case GST_STATE_CHANGE_NULL_TO_READY:
1344       break;
1345     case GST_STATE_CHANGE_READY_TO_PAUSED:
1346       JBUF_LOCK (priv);
1347       /* reset negotiated values */
1348       priv->clock_rate = -1;
1349       priv->clock_base = -1;
1350       priv->peer_latency = 0;
1351       priv->last_pt = -1;
1352       /* block until we go to PLAYING */
1353       priv->blocked = TRUE;
1354       priv->timer_running = TRUE;
1355       priv->timer_thread =
1356           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1357       JBUF_UNLOCK (priv);
1358       break;
1359     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1360       JBUF_LOCK (priv);
1361       /* unblock to allow streaming in PLAYING */
1362       priv->blocked = FALSE;
1363       JBUF_SIGNAL_EVENT (priv);
1364       JBUF_SIGNAL_TIMER (priv);
1365       JBUF_UNLOCK (priv);
1366       break;
1367     default:
1368       break;
1369   }
1370
1371   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1372
1373   switch (transition) {
1374     case GST_STATE_CHANGE_READY_TO_PAUSED:
1375       /* we are a live element because we sync to the clock, which we can only
1376        * do in the PLAYING state */
1377       if (ret != GST_STATE_CHANGE_FAILURE)
1378         ret = GST_STATE_CHANGE_NO_PREROLL;
1379       break;
1380     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1381       JBUF_LOCK (priv);
1382       /* block to stop streaming when PAUSED */
1383       priv->blocked = TRUE;
1384       unschedule_current_timer (jitterbuffer);
1385       JBUF_UNLOCK (priv);
1386       if (ret != GST_STATE_CHANGE_FAILURE)
1387         ret = GST_STATE_CHANGE_NO_PREROLL;
1388       break;
1389     case GST_STATE_CHANGE_PAUSED_TO_READY:
1390       JBUF_LOCK (priv);
1391       gst_buffer_replace (&priv->last_sr, NULL);
1392       priv->timer_running = FALSE;
1393       unschedule_current_timer (jitterbuffer);
1394       JBUF_SIGNAL_TIMER (priv);
1395       JBUF_SIGNAL_QUERY (priv, FALSE);
1396       JBUF_UNLOCK (priv);
1397       g_thread_join (priv->timer_thread);
1398       priv->timer_thread = NULL;
1399       break;
1400     case GST_STATE_CHANGE_READY_TO_NULL:
1401       break;
1402     default:
1403       break;
1404   }
1405
1406   return ret;
1407 }
1408
1409 static gboolean
1410 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1411     GstEvent * event)
1412 {
1413   gboolean ret = TRUE;
1414   GstRtpJitterBuffer *jitterbuffer;
1415   GstRtpJitterBufferPrivate *priv;
1416
1417   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1418   priv = jitterbuffer->priv;
1419
1420   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1421
1422   switch (GST_EVENT_TYPE (event)) {
1423     case GST_EVENT_LATENCY:
1424     {
1425       GstClockTime latency;
1426
1427       gst_event_parse_latency (event, &latency);
1428
1429       GST_DEBUG_OBJECT (jitterbuffer,
1430           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1431
1432       JBUF_LOCK (priv);
1433       /* adjust the overall buffer delay to the total pipeline latency in
1434        * buffering mode because if downstream consumes too fast (because of
1435        * large latency or queues, we would start rebuffering again. */
1436       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1437           RTP_JITTER_BUFFER_MODE_BUFFER) {
1438         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1439       }
1440       JBUF_UNLOCK (priv);
1441
1442       ret = gst_pad_push_event (priv->sinkpad, event);
1443       break;
1444     }
1445     default:
1446       ret = gst_pad_push_event (priv->sinkpad, event);
1447       break;
1448   }
1449
1450   return ret;
1451 }
1452
1453 /* handles and stores the event in the jitterbuffer, must be called with
1454  * LOCK */
1455 static gboolean
1456 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1457 {
1458   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1459   RTPJitterBufferItem *item;
1460   gboolean head;
1461
1462   switch (GST_EVENT_TYPE (event)) {
1463     case GST_EVENT_CAPS:
1464     {
1465       GstCaps *caps;
1466
1467       gst_event_parse_caps (event, &caps);
1468       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1469       break;
1470     }
1471     case GST_EVENT_SEGMENT:
1472       gst_event_copy_segment (event, &priv->segment);
1473
1474       /* we need time for now */
1475       if (priv->segment.format != GST_FORMAT_TIME)
1476         goto newseg_wrong_format;
1477
1478       GST_DEBUG_OBJECT (jitterbuffer,
1479           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1480       break;
1481     case GST_EVENT_EOS:
1482       priv->eos = TRUE;
1483       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1484       break;
1485     default:
1486       break;
1487   }
1488
1489
1490   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1491   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1492   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1493   if (head)
1494     JBUF_SIGNAL_EVENT (priv);
1495
1496   return TRUE;
1497
1498   /* ERRORS */
1499 newseg_wrong_format:
1500   {
1501     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1502     gst_event_unref (event);
1503     return FALSE;
1504   }
1505 }
1506
1507 static gboolean
1508 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1509     GstEvent * event)
1510 {
1511   gboolean ret = TRUE;
1512   GstRtpJitterBuffer *jitterbuffer;
1513   GstRtpJitterBufferPrivate *priv;
1514
1515   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1516   priv = jitterbuffer->priv;
1517
1518   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1519
1520   switch (GST_EVENT_TYPE (event)) {
1521     case GST_EVENT_FLUSH_START:
1522       ret = gst_pad_push_event (priv->srcpad, event);
1523       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1524       /* wait for the loop to go into PAUSED */
1525       gst_pad_pause_task (priv->srcpad);
1526       break;
1527     case GST_EVENT_FLUSH_STOP:
1528       ret = gst_pad_push_event (priv->srcpad, event);
1529       ret =
1530           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1531           GST_PAD_MODE_PUSH, TRUE);
1532       break;
1533     default:
1534       if (GST_EVENT_IS_SERIALIZED (event)) {
1535         /* serialized events go in the queue */
1536         JBUF_LOCK (priv);
1537         if (priv->srcresult != GST_FLOW_OK) {
1538           /* Errors in sticky event pushing are no problem and ignored here
1539            * as they will cause more meaningful errors during data flow.
1540            * For EOS events, that are not followed by data flow, we still
1541            * return FALSE here though.
1542            */
1543           if (!GST_EVENT_IS_STICKY (event) ||
1544               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1545             goto out_flow_error;
1546         }
1547         /* refuse more events on EOS */
1548         if (priv->eos)
1549           goto out_eos;
1550         ret = queue_event (jitterbuffer, event);
1551         JBUF_UNLOCK (priv);
1552       } else {
1553         /* non-serialized events are forwarded downstream immediately */
1554         ret = gst_pad_push_event (priv->srcpad, event);
1555       }
1556       break;
1557   }
1558   return ret;
1559
1560   /* ERRORS */
1561 out_flow_error:
1562   {
1563     GST_DEBUG_OBJECT (jitterbuffer,
1564         "refusing event, we have a downstream flow error: %s",
1565         gst_flow_get_name (priv->srcresult));
1566     JBUF_UNLOCK (priv);
1567     gst_event_unref (event);
1568     return FALSE;
1569   }
1570 out_eos:
1571   {
1572     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1573     JBUF_UNLOCK (priv);
1574     gst_event_unref (event);
1575     return FALSE;
1576   }
1577 }
1578
1579 static gboolean
1580 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1581     GstEvent * event)
1582 {
1583   gboolean ret = TRUE;
1584   GstRtpJitterBuffer *jitterbuffer;
1585
1586   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1587
1588   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1589
1590   switch (GST_EVENT_TYPE (event)) {
1591     case GST_EVENT_FLUSH_START:
1592       gst_event_unref (event);
1593       break;
1594     case GST_EVENT_FLUSH_STOP:
1595       gst_event_unref (event);
1596       break;
1597     default:
1598       ret = gst_pad_event_default (pad, parent, event);
1599       break;
1600   }
1601
1602   return ret;
1603 }
1604
1605 /*
1606  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1607  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1608  * GST_FLOW_FLUSHING when the element is shutting down. On success
1609  * GST_FLOW_OK is returned.
1610  */
1611 static GstFlowReturn
1612 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1613     guint8 pt)
1614 {
1615   GValue ret = { 0 };
1616   GValue args[2] = { {0}, {0} };
1617   GstCaps *caps;
1618   gboolean res;
1619
1620   g_value_init (&args[0], GST_TYPE_ELEMENT);
1621   g_value_set_object (&args[0], jitterbuffer);
1622   g_value_init (&args[1], G_TYPE_UINT);
1623   g_value_set_uint (&args[1], pt);
1624
1625   g_value_init (&ret, GST_TYPE_CAPS);
1626   g_value_set_boxed (&ret, NULL);
1627
1628   JBUF_UNLOCK (jitterbuffer->priv);
1629   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1630       &ret);
1631   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1632
1633   g_value_unset (&args[0]);
1634   g_value_unset (&args[1]);
1635   caps = (GstCaps *) g_value_dup_boxed (&ret);
1636   g_value_unset (&ret);
1637   if (!caps)
1638     goto no_caps;
1639
1640   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1641   gst_caps_unref (caps);
1642
1643   if (G_UNLIKELY (!res))
1644     goto parse_failed;
1645
1646   return GST_FLOW_OK;
1647
1648   /* ERRORS */
1649 no_caps:
1650   {
1651     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1652     return GST_FLOW_ERROR;
1653   }
1654 out_flushing:
1655   {
1656     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1657     return GST_FLOW_FLUSHING;
1658   }
1659 parse_failed:
1660   {
1661     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1662     return GST_FLOW_ERROR;
1663   }
1664 }
1665
1666 /* call with jbuf lock held */
1667 static GstMessage *
1668 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1669 {
1670   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1671   GstMessage *message = NULL;
1672
1673   if (percent == -1)
1674     return NULL;
1675
1676   /* Post a buffering message */
1677   if (priv->last_percent != percent) {
1678     priv->last_percent = percent;
1679     message =
1680         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1681     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1682   }
1683
1684   return message;
1685 }
1686
1687 static GstClockTime
1688 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1689 {
1690   GstRtpJitterBufferPrivate *priv;
1691
1692   priv = jitterbuffer->priv;
1693
1694   if (timestamp == -1)
1695     return -1;
1696
1697   /* apply the timestamp offset, this is used for inter stream sync */
1698   timestamp += priv->ts_offset;
1699   /* add the offset, this is used when buffering */
1700   timestamp += priv->out_offset;
1701
1702   return timestamp;
1703 }
1704
1705 static TimerData *
1706 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1707 {
1708   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1709   TimerData *timer = NULL;
1710   gint i, len;
1711
1712   len = priv->timers->len;
1713   for (i = 0; i < len; i++) {
1714     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1715     if (test->seqnum == seqnum && test->type == type) {
1716       timer = test;
1717       break;
1718     }
1719   }
1720   return timer;
1721 }
1722
1723 static void
1724 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1725 {
1726   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1727
1728   if (priv->clock_id) {
1729     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1730     gst_clock_id_unschedule (priv->clock_id);
1731     priv->clock_id = NULL;
1732   }
1733 }
1734
1735 static GstClockTime
1736 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1737 {
1738   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1739   GstClockTime test_timeout;
1740
1741   if ((test_timeout = timer->timeout) == -1)
1742     return -1;
1743
1744   if (timer->type != TIMER_TYPE_EXPECTED) {
1745     /* add our latency and offset to get output times. */
1746     test_timeout = apply_offset (jitterbuffer, test_timeout);
1747     test_timeout += priv->latency_ns;
1748   }
1749   return test_timeout;
1750 }
1751
1752 static void
1753 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1754 {
1755   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1756
1757   if (priv->clock_id) {
1758     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1759
1760     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1761         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1762
1763     if (timeout == -1 || timeout < priv->timer_timeout)
1764       unschedule_current_timer (jitterbuffer);
1765   }
1766 }
1767
1768 static TimerData *
1769 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1770     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1771     GstClockTime duration)
1772 {
1773   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1774   TimerData *timer;
1775   gint len;
1776
1777   GST_DEBUG_OBJECT (jitterbuffer,
1778       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1779       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1780       GST_TIME_ARGS (delay));
1781
1782   len = priv->timers->len;
1783   g_array_set_size (priv->timers, len + 1);
1784   timer = &g_array_index (priv->timers, TimerData, len);
1785   timer->idx = len;
1786   timer->type = type;
1787   timer->seqnum = seqnum;
1788   timer->num = num;
1789   timer->timeout = timeout + delay;
1790   timer->duration = duration;
1791   if (type == TIMER_TYPE_EXPECTED) {
1792     timer->rtx_base = timeout;
1793     timer->rtx_delay = delay;
1794     timer->rtx_retry = 0;
1795   }
1796   timer->num_rtx_retry = 0;
1797   recalculate_timer (jitterbuffer, timer);
1798   JBUF_SIGNAL_TIMER (priv);
1799
1800   return timer;
1801 }
1802
1803 static void
1804 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1805     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1806 {
1807   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1808   gboolean seqchange, timechange;
1809   guint16 oldseq;
1810
1811   seqchange = timer->seqnum != seqnum;
1812   timechange = timer->timeout != timeout;
1813
1814   if (!seqchange && !timechange)
1815     return;
1816
1817   oldseq = timer->seqnum;
1818
1819   GST_DEBUG_OBJECT (jitterbuffer,
1820       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1821       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1822
1823   timer->timeout = timeout + delay;
1824   timer->seqnum = seqnum;
1825   if (reset) {
1826     timer->rtx_base = timeout;
1827     timer->rtx_delay = delay;
1828     timer->rtx_retry = 0;
1829   }
1830   if (seqchange)
1831     timer->num_rtx_retry = 0;
1832
1833   if (priv->clock_id) {
1834     /* we changed the seqnum and there is a timer currently waiting with this
1835      * seqnum, unschedule it */
1836     if (seqchange && priv->timer_seqnum == oldseq)
1837       unschedule_current_timer (jitterbuffer);
1838     /* we changed the time, check if it is earlier than what we are waiting
1839      * for and unschedule if so */
1840     else if (timechange)
1841       recalculate_timer (jitterbuffer, timer);
1842   }
1843 }
1844
1845 static TimerData *
1846 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1847     guint16 seqnum, GstClockTime timeout)
1848 {
1849   TimerData *timer;
1850
1851   /* find the seqnum timer */
1852   timer = find_timer (jitterbuffer, type, seqnum);
1853   if (timer == NULL) {
1854     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1855   } else {
1856     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1857   }
1858   return timer;
1859 }
1860
1861 static void
1862 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1863 {
1864   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1865   guint idx;
1866
1867   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1868     unschedule_current_timer (jitterbuffer);
1869
1870   idx = timer->idx;
1871   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1872   g_array_remove_index_fast (priv->timers, idx);
1873   timer->idx = idx;
1874 }
1875
1876 static void
1877 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1878 {
1879   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1880   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1881   g_array_set_size (priv->timers, 0);
1882   unschedule_current_timer (jitterbuffer);
1883 }
1884
1885 /* get the extra delay to wait before sending RTX */
1886 static GstClockTime
1887 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1888 {
1889   GstClockTime delay;
1890
1891   if (priv->rtx_delay == -1) {
1892     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
1893       delay = DEFAULT_AUTO_RTX_DELAY;
1894     } else {
1895       /* jitter is in nanoseconds, maximum of 2x jitter and half the
1896        * packet spacing is a good margin */
1897       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
1898     }
1899   } else {
1900     delay = priv->rtx_delay * GST_MSECOND;
1901   }
1902   if (priv->rtx_min_delay > 0)
1903     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1904
1905   return delay;
1906 }
1907
1908 /* we just received a packet with seqnum and dts.
1909  *
1910  * First check for old seqnum that we are still expecting. If the gap with the
1911  * current seqnum is too big, unschedule the timeouts.
1912  *
1913  * If we have a valid packet spacing estimate we can set a timer for when we
1914  * should receive the next packet.
1915  * If we don't have a valid estimate, we remove any timer we might have
1916  * had for this packet.
1917  */
1918 static void
1919 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1920     GstClockTime dts, gboolean do_next_seqnum)
1921 {
1922   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1923   TimerData *timer = NULL;
1924   gint i, len;
1925
1926   /* go through all timers and unschedule the ones with a large gap, also find
1927    * the timer for the seqnum */
1928   len = priv->timers->len;
1929   for (i = 0; i < len; i++) {
1930     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1931     gint gap;
1932
1933     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1934
1935     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1936         test->type, test->seqnum, seqnum, gap);
1937
1938     if (gap == 0) {
1939       GST_DEBUG ("found timer for current seqnum");
1940       /* the timer for the current seqnum */
1941       timer = test;
1942       /* when no retransmission, we can stop now, we only need to find the
1943        * timer for the current seqnum */
1944       if (!priv->do_retransmission)
1945         break;
1946     } else if (gap > priv->rtx_delay_reorder) {
1947       /* max gap, we exceeded the max reorder distance and we don't expect the
1948        * missing packet to be this reordered */
1949       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1950         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1951     }
1952   }
1953
1954   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1955       && priv->do_retransmission && priv->rtx_next_seqnum;
1956
1957   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1958     if (timer->num_rtx_retry > 0) {
1959       GstClockTime rtx_last, delay;
1960
1961       /* we scheduled a retry for this packet and now we have it */
1962       priv->num_rtx_success++;
1963       /* all the previous retry attempts failed */
1964       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1965       /* number of retries before receiving the packet */
1966       if (priv->avg_rtx_num == 0.0)
1967         priv->avg_rtx_num = timer->num_rtx_retry;
1968       else
1969         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1970       /* calculate the delay between retransmission request and receiving this
1971        * packet, start with when we scheduled this timeout last */
1972       rtx_last = timer->rtx_last;
1973       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1974         /* we have a valid delay if this packet arrived after we scheduled the
1975          * request */
1976         delay = dts - rtx_last;
1977         if (priv->avg_rtx_rtt == 0)
1978           priv->avg_rtx_rtt = delay;
1979         else
1980           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1981       } else
1982         delay = 0;
1983
1984       GST_LOG_OBJECT (jitterbuffer,
1985           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1986           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1987           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
1988           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
1989           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
1990           GST_TIME_ARGS (priv->avg_rtx_rtt));
1991
1992       /* don't try to estimate the next seqnum because this is a retransmitted
1993        * packet and it probably did not arrive with the expected packet
1994        * spacing. */
1995       do_next_seqnum = FALSE;
1996     }
1997   }
1998
1999   if (do_next_seqnum) {
2000     GstClockTime expected, delay;
2001
2002     /* calculate expected arrival time of the next seqnum */
2003     expected = dts + priv->packet_spacing;
2004
2005     delay = get_rtx_delay (priv);
2006
2007     /* and update/install timer for next seqnum */
2008     if (timer)
2009       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2010           delay, TRUE);
2011     else
2012       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2013           expected, delay, priv->packet_spacing);
2014   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2015     /* if we had a timer, remove it, we don't know when to expect the next
2016      * packet. */
2017     remove_timer (jitterbuffer, timer);
2018   }
2019 }
2020
2021 static void
2022 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2023     GstClockTime dts)
2024 {
2025   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2026
2027   /* we need consecutive seqnums with a different
2028    * rtptime to estimate the packet spacing. */
2029   if (priv->ips_rtptime != rtptime) {
2030     /* rtptime changed, check dts diff */
2031     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2032       GstClockTime new_packet_spacing = dts - priv->ips_dts;
2033       GstClockTime old_packet_spacing = priv->packet_spacing;
2034
2035       /* Biased towards bigger packet spacings to prevent
2036        * too many unneeded retransmission requests for next
2037        * packets that just arrive a little later than we would
2038        * expect */
2039       if (old_packet_spacing > new_packet_spacing)
2040         priv->packet_spacing =
2041             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2042       else if (old_packet_spacing > 0)
2043         priv->packet_spacing =
2044             (3 * new_packet_spacing + old_packet_spacing) / 4;
2045       else
2046         priv->packet_spacing = new_packet_spacing;
2047
2048       GST_DEBUG_OBJECT (jitterbuffer,
2049           "new packet spacing %" GST_TIME_FORMAT
2050           " old packet spacing %" GST_TIME_FORMAT
2051           " combined to %" GST_TIME_FORMAT,
2052           GST_TIME_ARGS (new_packet_spacing),
2053           GST_TIME_ARGS (old_packet_spacing),
2054           GST_TIME_ARGS (priv->packet_spacing));
2055     }
2056     priv->ips_rtptime = rtptime;
2057     priv->ips_dts = dts;
2058   }
2059 }
2060
2061 static void
2062 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2063     guint16 seqnum, GstClockTime dts, gint gap)
2064 {
2065   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2066   GstClockTime total_duration, duration, expected_dts;
2067   TimerType type;
2068   guint lost_packets = 0;
2069
2070   GST_DEBUG_OBJECT (jitterbuffer,
2071       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2072       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2073
2074   /* the total duration spanned by the missing packets */
2075   if (dts >= priv->last_in_dts)
2076     total_duration = dts - priv->last_in_dts;
2077   else
2078     total_duration = 0;
2079
2080   /* interpolate between the current time and the last time based on
2081    * number of packets we are missing, this is the estimated duration
2082    * for the missing packet based on equidistant packet spacing. */
2083   duration = total_duration / (gap + 1);
2084
2085   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2086       GST_TIME_ARGS (duration));
2087
2088   if (total_duration > priv->latency_ns) {
2089     GstClockTime gap_time;
2090
2091     gap_time = total_duration - priv->latency_ns;
2092
2093     if (duration > 0) {
2094       lost_packets = gap_time / duration;
2095       gap_time = lost_packets * duration;
2096     } else {
2097       lost_packets = gap;
2098     }
2099
2100     /* too many lost packets, some of the missing packets are already
2101      * too late and we can generate lost packet events for them. */
2102     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2103         " > %" GST_TIME_FORMAT ", consider %u lost",
2104         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2105         lost_packets);
2106
2107     /* this timer will fire immediately and the lost event will be pushed from
2108      * the timer thread */
2109     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2110         priv->last_in_dts + duration, 0, gap_time);
2111
2112     expected += lost_packets;
2113     priv->last_in_dts += gap_time;
2114   }
2115
2116   expected_dts = priv->last_in_dts + (lost_packets + 1) * duration;
2117
2118   if (priv->do_retransmission) {
2119     TimerData *timer;
2120
2121     type = TIMER_TYPE_EXPECTED;
2122     /* if we had a timer for the first missing packet, update it. */
2123     if ((timer = find_timer (jitterbuffer, type, expected))) {
2124       GstClockTime timeout = timer->timeout;
2125
2126       timer->duration = duration;
2127       if (timeout > (expected_dts + timer->rtx_retry)) {
2128         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2129         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2130             delay, TRUE);
2131       }
2132       expected++;
2133       expected_dts += duration;
2134     }
2135   } else {
2136     type = TIMER_TYPE_LOST;
2137   }
2138
2139   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2140     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2141     expected_dts += duration;
2142     expected++;
2143   }
2144 }
2145
2146 static void
2147 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2148     guint rtptime)
2149 {
2150   gint32 rtpdiff;
2151   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2152   GstRtpJitterBufferPrivate *priv;
2153
2154   priv = jitterbuffer->priv;
2155
2156   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2157     goto no_time;
2158
2159   if (priv->last_dts != -1)
2160     dtsdiff = dts - priv->last_dts;
2161   else
2162     dtsdiff = 0;
2163
2164   if (priv->last_rtptime != -1)
2165     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2166   else
2167     rtpdiff = 0;
2168
2169   priv->last_dts = dts;
2170   priv->last_rtptime = rtptime;
2171
2172   if (rtpdiff > 0)
2173     rtpdiffns =
2174         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2175   else
2176     rtpdiffns =
2177         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2178
2179   diff = ABS (dtsdiff - rtpdiffns);
2180
2181   /* jitter is stored in nanoseconds */
2182   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2183
2184   GST_LOG_OBJECT (jitterbuffer,
2185       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2186       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2187       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2188       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2189
2190   return;
2191
2192   /* ERRORS */
2193 no_time:
2194   {
2195     GST_DEBUG_OBJECT (jitterbuffer,
2196         "no dts or no clock-rate, can't calculate jitter");
2197     return;
2198   }
2199 }
2200
2201 static GstFlowReturn
2202 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2203     GstBuffer * buffer)
2204 {
2205   GstRtpJitterBuffer *jitterbuffer;
2206   GstRtpJitterBufferPrivate *priv;
2207   guint16 seqnum;
2208   guint32 expected, rtptime;
2209   GstFlowReturn ret = GST_FLOW_OK;
2210   GstClockTime dts, pts;
2211   guint64 latency_ts;
2212   gboolean head;
2213   gint percent = -1;
2214   guint8 pt;
2215   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2216   gboolean do_next_seqnum = FALSE;
2217   RTPJitterBufferItem *item;
2218   GstMessage *msg = NULL;
2219
2220   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2221
2222   priv = jitterbuffer->priv;
2223
2224   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2225     goto invalid_buffer;
2226
2227   pt = gst_rtp_buffer_get_payload_type (&rtp);
2228   seqnum = gst_rtp_buffer_get_seq (&rtp);
2229   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2230   gst_rtp_buffer_unmap (&rtp);
2231
2232   /* make sure we have PTS and DTS set */
2233   pts = GST_BUFFER_PTS (buffer);
2234   dts = GST_BUFFER_DTS (buffer);
2235   if (dts == -1)
2236     dts = pts;
2237   else if (pts == -1)
2238     pts = dts;
2239
2240   /* take the DTS of the buffer. This is the time when the packet was
2241    * received and is used to calculate jitter and clock skew. We will adjust
2242    * this DTS with the smoothed value after processing it in the
2243    * jitterbuffer and assign it as the PTS. */
2244   /* bring to running time */
2245   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2246
2247   GST_DEBUG_OBJECT (jitterbuffer,
2248       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2249       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2250
2251   JBUF_LOCK_CHECK (priv, out_flushing);
2252
2253   if (G_UNLIKELY (priv->last_pt != pt)) {
2254     GstCaps *caps;
2255
2256     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2257         pt);
2258
2259     priv->last_pt = pt;
2260     /* reset clock-rate so that we get a new one */
2261     priv->clock_rate = -1;
2262
2263     /* Try to get the clock-rate from the caps first if we can. If there are no
2264      * caps we must fire the signal to get the clock-rate. */
2265     if ((caps = gst_pad_get_current_caps (pad))) {
2266       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2267       gst_caps_unref (caps);
2268     }
2269   }
2270
2271   if (G_UNLIKELY (priv->clock_rate == -1)) {
2272     /* no clock rate given on the caps, try to get one with the signal */
2273     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2274             pt) == GST_FLOW_FLUSHING)
2275       goto out_flushing;
2276
2277     if (G_UNLIKELY (priv->clock_rate == -1))
2278       goto no_clock_rate;
2279   }
2280
2281   /* don't accept more data on EOS */
2282   if (G_UNLIKELY (priv->eos))
2283     goto have_eos;
2284
2285   calculate_jitter (jitterbuffer, dts, rtptime);
2286
2287   if (priv->seqnum_base != -1) {
2288     gint gap;
2289
2290     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2291
2292     if (gap < 0) {
2293       GST_DEBUG_OBJECT (jitterbuffer,
2294           "packet seqnum #%d before seqnum-base #%d", seqnum,
2295           priv->seqnum_base);
2296       gst_buffer_unref (buffer);
2297       ret = GST_FLOW_OK;
2298       goto finished;
2299     } else if (gap > 16384) {
2300       /* From now on don't compare against the seqnum base anymore as
2301        * at some point in the future we will wrap around and also that
2302        * much reordering is very unlikely */
2303       priv->seqnum_base = -1;
2304     }
2305   }
2306
2307   expected = priv->next_in_seqnum;
2308
2309   /* now check against our expected seqnum */
2310   if (G_LIKELY (expected != -1)) {
2311     gint gap;
2312
2313     /* now calculate gap */
2314     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2315
2316     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2317         expected, seqnum, gap);
2318
2319     if (G_LIKELY (gap == 0)) {
2320       /* packet is expected */
2321       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2322       do_next_seqnum = TRUE;
2323     } else {
2324       gboolean reset = FALSE;
2325
2326       if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2327         /* We would run into calculations with GST_CLOCK_TIME_NONE below
2328          * and can't compensate for anything without DTS on RTP packets
2329          */
2330         goto gap_but_no_dts;
2331       } else if (gap < 0) {
2332         /* we received an old packet */
2333         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2334           /* too old packet, reset */
2335           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2336               -RTP_MAX_MISORDER);
2337           reset = TRUE;
2338         } else {
2339           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2340         }
2341       } else {
2342         /* new packet, we are missing some packets */
2343         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2344           /* packet too far in future, reset */
2345           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2346               RTP_MAX_DROPOUT);
2347           reset = TRUE;
2348         } else {
2349           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2350           /* fill in the gap with EXPECTED timers */
2351           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2352
2353           do_next_seqnum = TRUE;
2354         }
2355       }
2356       if (G_UNLIKELY (reset)) {
2357         GList *events = NULL, *l;
2358
2359         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2360         rtp_jitter_buffer_flush (priv->jbuf,
2361             (GFunc) free_item_and_retain_events, &events);
2362         rtp_jitter_buffer_reset_skew (priv->jbuf);
2363         remove_all_timers (jitterbuffer);
2364         priv->discont = TRUE;
2365         priv->last_popped_seqnum = -1;
2366         priv->next_seqnum = seqnum;
2367         do_next_seqnum = TRUE;
2368
2369         /* Insert all sticky events again in order, otherwise we would
2370          * potentially loose STREAM_START, CAPS or SEGMENT events
2371          */
2372         events = g_list_reverse (events);
2373         for (l = events; l; l = l->next) {
2374           RTPJitterBufferItem *item;
2375
2376           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2377           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2378         }
2379         g_list_free (events);
2380
2381         JBUF_SIGNAL_EVENT (priv);
2382       }
2383       /* reset spacing estimation when gap */
2384       priv->ips_rtptime = -1;
2385       priv->ips_dts = GST_CLOCK_TIME_NONE;
2386     }
2387   } else {
2388     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2389     /* we don't know what the next_in_seqnum should be, wait for the last
2390      * possible moment to push this buffer, maybe we get an earlier seqnum
2391      * while we wait */
2392     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2393     do_next_seqnum = TRUE;
2394     /* take rtptime and dts to calculate packet spacing */
2395     priv->ips_rtptime = rtptime;
2396     priv->ips_dts = dts;
2397   }
2398   if (do_next_seqnum) {
2399     priv->last_in_seqnum = seqnum;
2400     priv->last_in_dts = dts;
2401     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2402   }
2403
2404   /* let's check if this buffer is too late, we can only accept packets with
2405    * bigger seqnum than the one we last pushed. */
2406   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2407     gint gap;
2408
2409     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2410
2411     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2412     if (G_UNLIKELY (gap <= 0))
2413       goto too_late;
2414   }
2415
2416   /* let's drop oldest packet if the queue is already full and drop-on-latency
2417    * is set. We can only do this when there actually is a latency. When no
2418    * latency is set, we just pump it in the queue and let the other end push it
2419    * out as fast as possible. */
2420   if (priv->latency_ms && priv->drop_on_latency) {
2421     latency_ts =
2422         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2423
2424     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2425       RTPJitterBufferItem *old_item;
2426
2427       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2428
2429       if (IS_DROPABLE (old_item)) {
2430         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2431         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2432             old_item);
2433         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2434         free_item (old_item);
2435       }
2436       /* we might have removed some head buffers, signal the pushing thread to
2437        * see if it can push now */
2438       JBUF_SIGNAL_EVENT (priv);
2439     }
2440   }
2441
2442   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2443
2444   /* now insert the packet into the queue in sorted order. This function returns
2445    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2446    * have a duplicate. */
2447   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2448               &head, &percent)))
2449     goto duplicate;
2450
2451   /* update timers */
2452   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2453
2454   /* we had an unhandled SR, handle it now */
2455   if (priv->last_sr)
2456     do_handle_sync (jitterbuffer);
2457
2458   if (G_UNLIKELY (head)) {
2459     /* signal addition of new buffer when the _loop is waiting. */
2460     if (G_LIKELY (priv->active))
2461       JBUF_SIGNAL_EVENT (priv);
2462
2463     /* let's unschedule and unblock any waiting buffers. We only want to do this
2464      * when the head buffer changed */
2465     if (G_UNLIKELY (priv->clock_id)) {
2466       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2467       unschedule_current_timer (jitterbuffer);
2468     }
2469   }
2470
2471   GST_DEBUG_OBJECT (jitterbuffer,
2472       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2473       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2474
2475   msg = check_buffering_percent (jitterbuffer, percent);
2476
2477 finished:
2478   JBUF_UNLOCK (priv);
2479
2480   if (msg)
2481     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2482
2483   return ret;
2484
2485   /* ERRORS */
2486 invalid_buffer:
2487   {
2488     /* this is not fatal but should be filtered earlier */
2489     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2490         ("Received invalid RTP payload, dropping"));
2491     gst_buffer_unref (buffer);
2492     return GST_FLOW_OK;
2493   }
2494 no_clock_rate:
2495   {
2496     GST_WARNING_OBJECT (jitterbuffer,
2497         "No clock-rate in caps!, dropping buffer");
2498     gst_buffer_unref (buffer);
2499     goto finished;
2500   }
2501 out_flushing:
2502   {
2503     ret = priv->srcresult;
2504     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2505     gst_buffer_unref (buffer);
2506     goto finished;
2507   }
2508 have_eos:
2509   {
2510     ret = GST_FLOW_EOS;
2511     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2512     gst_buffer_unref (buffer);
2513     goto finished;
2514   }
2515 too_late:
2516   {
2517     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2518         " popped, dropping", seqnum, priv->last_popped_seqnum);
2519     priv->num_late++;
2520     gst_buffer_unref (buffer);
2521     goto finished;
2522   }
2523 duplicate:
2524   {
2525     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2526         seqnum);
2527     priv->num_duplicates++;
2528     free_item (item);
2529     goto finished;
2530   }
2531 gap_but_no_dts:
2532   {
2533     /* this is fatal as we can't compensate for gaps without DTS */
2534     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2535         ("Received packet without DTS after a gap"));
2536     gst_buffer_unref (buffer);
2537     ret = GST_FLOW_ERROR;
2538     goto finished;
2539   }
2540 }
2541
2542 static GstClockTime
2543 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2544 {
2545   guint64 ext_time, elapsed;
2546   guint32 rtp_time;
2547   GstRtpJitterBufferPrivate *priv;
2548
2549   priv = jitterbuffer->priv;
2550   rtp_time = item->rtptime;
2551
2552   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2553       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2554
2555   if (rtp_time < priv->ext_timestamp) {
2556     ext_time = priv->ext_timestamp;
2557   } else {
2558     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2559   }
2560
2561   if (ext_time > priv->clock_base)
2562     elapsed = ext_time - priv->clock_base;
2563   else
2564     elapsed = 0;
2565
2566   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2567   return elapsed;
2568 }
2569
2570 static void
2571 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2572     RTPJitterBufferItem * item)
2573 {
2574   guint64 total, elapsed, left, estimated;
2575   GstClockTime out_time;
2576   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2577
2578   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2579       || priv->clock_base == -1 || priv->clock_rate <= 0)
2580     return;
2581
2582   /* compute the elapsed time */
2583   elapsed = compute_elapsed (jitterbuffer, item);
2584
2585   /* do nothing if elapsed time doesn't increment */
2586   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2587     return;
2588
2589   priv->last_elapsed = elapsed;
2590
2591   /* this is the total time we need to play */
2592   total = priv->npt_stop - priv->npt_start;
2593   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2594       GST_TIME_ARGS (total));
2595
2596   /* this is how much time there is left */
2597   if (total > elapsed)
2598     left = total - elapsed;
2599   else
2600     left = 0;
2601
2602   /* if we have less time left that the size of the buffer, we will not
2603    * be able to keep it filled, disabled buffering then */
2604   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2605     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2606         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2607     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2608   }
2609
2610   /* this is the current time as running-time */
2611   out_time = item->dts;
2612
2613   if (elapsed > 0)
2614     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2615   else {
2616     /* if there is almost nothing left,
2617      * we may never advance enough to end up in the above case */
2618     if (total < GST_SECOND)
2619       estimated = GST_SECOND;
2620     else
2621       estimated = -1;
2622   }
2623   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2624       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2625
2626   if (estimated != -1 && priv->estimated_eos != estimated) {
2627     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2628     priv->estimated_eos = estimated;
2629   }
2630 }
2631
2632 /* take a buffer from the queue and push it */
2633 static GstFlowReturn
2634 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2635 {
2636   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2637   GstFlowReturn result = GST_FLOW_OK;
2638   RTPJitterBufferItem *item;
2639   GstBuffer *outbuf = NULL;
2640   GstEvent *outevent = NULL;
2641   GstQuery *outquery = NULL;
2642   GstClockTime dts, pts;
2643   gint percent = -1;
2644   gboolean do_push = TRUE;
2645   guint type;
2646   GstMessage *msg;
2647
2648   /* when we get here we are ready to pop and push the buffer */
2649   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2650   type = item->type;
2651
2652   switch (type) {
2653     case ITEM_TYPE_BUFFER:
2654
2655       /* we need to make writable to change the flags and timestamps */
2656       outbuf = gst_buffer_make_writable (item->data);
2657
2658       if (G_UNLIKELY (priv->discont)) {
2659         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2660          * into the jitterbuffer so we can modify now. */
2661         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2662         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2663         priv->discont = FALSE;
2664       }
2665       if (G_UNLIKELY (priv->ts_discont)) {
2666         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2667         priv->ts_discont = FALSE;
2668       }
2669
2670       dts =
2671           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2672       pts =
2673           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2674
2675       /* apply timestamp with offset to buffer now */
2676       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2677       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2678
2679       /* update the elapsed time when we need to check against the npt stop time. */
2680       update_estimated_eos (jitterbuffer, item);
2681
2682       priv->last_out_time = GST_BUFFER_PTS (outbuf);
2683       break;
2684     case ITEM_TYPE_LOST:
2685       priv->discont = TRUE;
2686       if (!priv->do_lost)
2687         do_push = FALSE;
2688       /* FALLTHROUGH */
2689     case ITEM_TYPE_EVENT:
2690       outevent = item->data;
2691       break;
2692     case ITEM_TYPE_QUERY:
2693       outquery = item->data;
2694       break;
2695   }
2696
2697   /* now we are ready to push the buffer. Save the seqnum and release the lock
2698    * so the other end can push stuff in the queue again. */
2699   if (seqnum != -1) {
2700     priv->last_popped_seqnum = seqnum;
2701     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2702   }
2703   msg = check_buffering_percent (jitterbuffer, percent);
2704   JBUF_UNLOCK (priv);
2705
2706   item->data = NULL;
2707   free_item (item);
2708
2709   if (msg)
2710     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2711
2712   switch (type) {
2713     case ITEM_TYPE_BUFFER:
2714       /* push buffer */
2715       GST_DEBUG_OBJECT (jitterbuffer,
2716           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2717           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2718           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2719       result = gst_pad_push (priv->srcpad, outbuf);
2720
2721       JBUF_LOCK_CHECK (priv, out_flushing);
2722       break;
2723     case ITEM_TYPE_LOST:
2724     case ITEM_TYPE_EVENT:
2725       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2726           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2727
2728       if (do_push)
2729         gst_pad_push_event (priv->srcpad, outevent);
2730       else
2731         gst_event_unref (outevent);
2732
2733       result = GST_FLOW_OK;
2734
2735       JBUF_LOCK_CHECK (priv, out_flushing);
2736       break;
2737     case ITEM_TYPE_QUERY:
2738     {
2739       gboolean res;
2740
2741       res = gst_pad_peer_query (priv->srcpad, outquery);
2742
2743       JBUF_LOCK_CHECK (priv, out_flushing);
2744       result = GST_FLOW_OK;
2745       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2746       JBUF_SIGNAL_QUERY (priv, res);
2747       break;
2748     }
2749   }
2750   return result;
2751
2752   /* ERRORS */
2753 out_flushing:
2754   {
2755     return priv->srcresult;
2756   }
2757 }
2758
2759 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2760
2761 /* Peek a buffer and compare the seqnum to the expected seqnum.
2762  * If all is fine, the buffer is pushed.
2763  * If something is wrong, we wait for some event
2764  */
2765 static GstFlowReturn
2766 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2767 {
2768   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2769   GstFlowReturn result = GST_FLOW_OK;
2770   RTPJitterBufferItem *item;
2771   guint seqnum;
2772   guint32 next_seqnum;
2773   gint gap;
2774
2775   /* only push buffers when PLAYING and active and not buffering */
2776   if (priv->blocked || !priv->active ||
2777       rtp_jitter_buffer_is_buffering (priv->jbuf))
2778     return GST_FLOW_WAIT;
2779
2780 again:
2781   /* peek a buffer, we're just looking at the sequence number.
2782    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2783    * wait for a timeout or something to change.
2784    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2785   item = rtp_jitter_buffer_peek (priv->jbuf);
2786   if (item == NULL)
2787     goto wait;
2788
2789   /* get the seqnum and the next expected seqnum */
2790   seqnum = item->seqnum;
2791   if (seqnum == -1)
2792     goto do_push;
2793
2794   next_seqnum = priv->next_seqnum;
2795
2796   /* get the gap between this and the previous packet. If we don't know the
2797    * previous packet seqnum assume no gap. */
2798   if (G_UNLIKELY (next_seqnum == -1)) {
2799     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2800     /* we don't know what the next_seqnum should be, the chain function should
2801      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2802      * fires, so wait for that */
2803     result = GST_FLOW_WAIT;
2804   } else {
2805     /* else calculate GAP */
2806     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2807
2808     if (G_LIKELY (gap == 0)) {
2809     do_push:
2810       /* no missing packet, pop and push */
2811       result = pop_and_push_next (jitterbuffer, seqnum);
2812     } else if (G_UNLIKELY (gap < 0)) {
2813       RTPJitterBufferItem *item;
2814       /* if we have a packet that we already pushed or considered dropped, pop it
2815        * off and get the next packet */
2816       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2817           seqnum, next_seqnum);
2818       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2819       free_item (item);
2820       goto again;
2821     } else {
2822       /* the chain function has scheduled timers to request retransmission or
2823        * when to consider the packet lost, wait for that */
2824       GST_DEBUG_OBJECT (jitterbuffer,
2825           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2826           next_seqnum, seqnum, gap);
2827       result = GST_FLOW_WAIT;
2828     }
2829   }
2830   return result;
2831
2832 wait:
2833   {
2834     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2835     if (priv->eos)
2836       result = GST_FLOW_EOS;
2837     else
2838       result = GST_FLOW_WAIT;
2839     return result;
2840   }
2841 }
2842
2843 static GstClockTime
2844 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2845 {
2846   GstClockTime rtx_retry_timeout;
2847   GstClockTime rtx_min_retry_timeout;
2848
2849   if (priv->rtx_retry_timeout == -1) {
2850     if (priv->avg_rtx_rtt == 0)
2851       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2852     else
2853       /* we want to ask for a retransmission after we waited for a
2854        * complete RTT and the additional jitter */
2855       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2856   } else {
2857     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2858   }
2859   /* make sure we don't retry too often. On very low latency networks,
2860    * the RTT and jitter can be very low. */
2861   if (priv->rtx_min_retry_timeout == -1) {
2862     rtx_min_retry_timeout = priv->packet_spacing;
2863   } else {
2864     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2865   }
2866   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2867
2868   return rtx_retry_timeout;
2869 }
2870
2871 static GstClockTime
2872 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2873     GstClockTime rtx_retry_timeout)
2874 {
2875   GstClockTime rtx_retry_period;
2876
2877   if (priv->rtx_retry_period == -1) {
2878     /* we retry up to the configured jitterbuffer size but leaving some
2879      * room for the retransmission to arrive in time */
2880     if (rtx_retry_timeout > priv->latency_ns) {
2881       rtx_retry_period = 0;
2882     } else {
2883       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2884     }
2885   } else {
2886     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2887   }
2888   return rtx_retry_period;
2889 }
2890
2891 /* the timeout for when we expected a packet expired */
2892 static gboolean
2893 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2894     GstClockTime now)
2895 {
2896   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2897   GstEvent *event;
2898   guint delay, delay_ms, avg_rtx_rtt_ms;
2899   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2900   GstClockTime rtx_retry_period;
2901   GstClockTime rtx_retry_timeout;
2902   GstClock *clock;
2903
2904   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2905       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2906
2907   rtx_retry_timeout = get_rtx_retry_timeout (priv);
2908   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2909
2910   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2911       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2912       GST_TIME_ARGS (rtx_retry_period));
2913
2914   delay = timer->rtx_delay + timer->rtx_retry;
2915
2916   delay_ms = GST_TIME_AS_MSECONDS (delay);
2917   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2918   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2919   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2920
2921   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2922       gst_structure_new ("GstRTPRetransmissionRequest",
2923           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2924           "running-time", G_TYPE_UINT64, timer->rtx_base,
2925           "delay", G_TYPE_UINT, delay_ms,
2926           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2927           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2928           "period", G_TYPE_UINT, rtx_retry_period_ms,
2929           "deadline", G_TYPE_UINT, priv->latency_ms,
2930           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2931           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2932
2933   priv->num_rtx_requests++;
2934   timer->num_rtx_retry++;
2935
2936   GST_OBJECT_LOCK (jitterbuffer);
2937   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2938     timer->rtx_last = gst_clock_get_time (clock);
2939     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2940   } else {
2941     timer->rtx_last = now;
2942   }
2943   GST_OBJECT_UNLOCK (jitterbuffer);
2944
2945   /* calculate the timeout for the next retransmission attempt */
2946   timer->rtx_retry += rtx_retry_timeout;
2947   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2948       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2949       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2950       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2951   if ((priv->rtx_max_retries != -1
2952           && timer->num_rtx_retry >= priv->rtx_max_retries)
2953       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
2954     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2955     /* too many retransmission request, we now convert the timer
2956      * to a lost timer, leave the num_rtx_retry as it is for stats */
2957     timer->type = TIMER_TYPE_LOST;
2958     timer->rtx_delay = 0;
2959     timer->rtx_retry = 0;
2960   }
2961   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2962       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2963
2964   JBUF_UNLOCK (priv);
2965   gst_pad_push_event (priv->sinkpad, event);
2966   JBUF_LOCK (priv);
2967
2968   return FALSE;
2969 }
2970
2971 /* a packet is lost */
2972 static gboolean
2973 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2974     GstClockTime now)
2975 {
2976   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2977   GstClockTime duration, timestamp;
2978   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
2979   gboolean late, head;
2980   GstEvent *event;
2981   RTPJitterBufferItem *item;
2982
2983   seqnum = timer->seqnum;
2984   timestamp = apply_offset (jitterbuffer, timer->timeout);
2985   duration = timer->duration;
2986   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2987     duration = priv->packet_spacing;
2988   lost_packets = MAX (timer->num, 1);
2989   late = timer->num > 0;
2990   num_rtx_retry = timer->num_rtx_retry;
2991
2992   /* we had a gap and thus we lost some packets. Create an event for this.  */
2993   if (lost_packets > 1)
2994     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2995         seqnum + lost_packets - 1);
2996   else
2997     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2998
2999   priv->num_late += lost_packets;
3000   priv->num_rtx_failed += num_rtx_retry;
3001
3002   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3003
3004   /* we now only accept seqnum bigger than this */
3005   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
3006     priv->next_in_seqnum = next_in_seqnum;
3007
3008   /* create paket lost event */
3009   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3010       gst_structure_new ("GstRTPPacketLost",
3011           "seqnum", G_TYPE_UINT, (guint) seqnum,
3012           "timestamp", G_TYPE_UINT64, timestamp,
3013           "duration", G_TYPE_UINT64, duration,
3014           "late", G_TYPE_BOOLEAN, late,
3015           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3016
3017   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3018   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3019
3020   /* remove timer now */
3021   remove_timer (jitterbuffer, timer);
3022   if (head)
3023     JBUF_SIGNAL_EVENT (priv);
3024
3025   return TRUE;
3026 }
3027
3028 static gboolean
3029 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3030     GstClockTime now)
3031 {
3032   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3033
3034   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3035   remove_timer (jitterbuffer, timer);
3036   if (!priv->eos) {
3037     /* there was no EOS in the buffer, put one in there now */
3038     queue_event (jitterbuffer, gst_event_new_eos ());
3039   }
3040   JBUF_SIGNAL_EVENT (priv);
3041
3042   return TRUE;
3043 }
3044
3045 static gboolean
3046 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3047     GstClockTime now)
3048 {
3049   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3050
3051   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3052
3053   /* timer seqnum might have been obsoleted by caps seqnum-base,
3054    * only mess with current ongoing seqnum if still unknown */
3055   if (priv->next_seqnum == -1)
3056     priv->next_seqnum = timer->seqnum;
3057   remove_timer (jitterbuffer, timer);
3058   JBUF_SIGNAL_EVENT (priv);
3059
3060   return TRUE;
3061 }
3062
3063 static gboolean
3064 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3065     GstClockTime now)
3066 {
3067   gboolean removed = FALSE;
3068
3069   switch (timer->type) {
3070     case TIMER_TYPE_EXPECTED:
3071       removed = do_expected_timeout (jitterbuffer, timer, now);
3072       break;
3073     case TIMER_TYPE_LOST:
3074       removed = do_lost_timeout (jitterbuffer, timer, now);
3075       break;
3076     case TIMER_TYPE_DEADLINE:
3077       removed = do_deadline_timeout (jitterbuffer, timer, now);
3078       break;
3079     case TIMER_TYPE_EOS:
3080       removed = do_eos_timeout (jitterbuffer, timer, now);
3081       break;
3082   }
3083   return removed;
3084 }
3085
3086 /* called when we need to wait for the next timeout.
3087  *
3088  * We loop over the array of recorded timeouts and wait for the earliest one.
3089  * When it timed out, do the logic associated with the timer.
3090  *
3091  * If there are no timers, we wait on a gcond until something new happens.
3092  */
3093 static void
3094 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3095 {
3096   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3097   GstClockTime now = 0;
3098
3099   JBUF_LOCK (priv);
3100   while (priv->timer_running) {
3101     TimerData *timer = NULL;
3102     GstClockTime timer_timeout = -1;
3103     gint i, len;
3104
3105     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3106         GST_TIME_ARGS (now));
3107
3108     len = priv->timers->len;
3109     for (i = 0; i < len; i++) {
3110       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3111       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3112       gboolean save_best = FALSE;
3113
3114       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3115           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3116
3117       /* find the smallest timeout */
3118       if (timer == NULL) {
3119         save_best = TRUE;
3120       } else if (timer_timeout == -1) {
3121         /* we already have an immediate timeout, the new timer must be an
3122          * immediate timer with smaller seqnum to become the best */
3123         if (test_timeout == -1
3124             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3125                     timer->seqnum) > 0))
3126           save_best = TRUE;
3127       } else if (test_timeout == -1) {
3128         /* first immediate timer */
3129         save_best = TRUE;
3130       } else if (test_timeout < timer_timeout) {
3131         /* earlier timer */
3132         save_best = TRUE;
3133       } else if (test_timeout == timer_timeout
3134           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3135                   timer->seqnum) > 0)) {
3136         /* same timer, smaller seqnum */
3137         save_best = TRUE;
3138       }
3139       if (save_best) {
3140         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3141         timer = test;
3142         timer_timeout = test_timeout;
3143       }
3144     }
3145     if (timer && !priv->blocked) {
3146       GstClock *clock;
3147       GstClockTime sync_time;
3148       GstClockID id;
3149       GstClockReturn ret;
3150       GstClockTimeDiff clock_jitter;
3151
3152       if (timer_timeout == -1 || timer_timeout <= now) {
3153         do_timeout (jitterbuffer, timer, now);
3154         /* check here, do_timeout could have released the lock */
3155         if (!priv->timer_running)
3156           break;
3157         continue;
3158       }
3159
3160       GST_OBJECT_LOCK (jitterbuffer);
3161       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3162       if (!clock) {
3163         GST_OBJECT_UNLOCK (jitterbuffer);
3164         /* let's just push if there is no clock */
3165         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3166         now = timer_timeout;
3167         continue;
3168       }
3169
3170       /* prepare for sync against clock */
3171       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3172       /* add latency of peer to get input time */
3173       sync_time += priv->peer_latency;
3174
3175       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3176           " with sync time %" GST_TIME_FORMAT,
3177           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3178
3179       /* create an entry for the clock */
3180       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3181       priv->timer_timeout = timer_timeout;
3182       priv->timer_seqnum = timer->seqnum;
3183       GST_OBJECT_UNLOCK (jitterbuffer);
3184
3185       /* release the lock so that the other end can push stuff or unlock */
3186       JBUF_UNLOCK (priv);
3187
3188       ret = gst_clock_id_wait (id, &clock_jitter);
3189
3190       JBUF_LOCK (priv);
3191       if (!priv->timer_running) {
3192         gst_clock_id_unref (id);
3193         priv->clock_id = NULL;
3194         break;
3195       }
3196
3197       if (ret != GST_CLOCK_UNSCHEDULED) {
3198         now = timer_timeout + MAX (clock_jitter, 0);
3199         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3200             ret, priv->timer_seqnum, clock_jitter);
3201       } else {
3202         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3203       }
3204       /* and free the entry */
3205       gst_clock_id_unref (id);
3206       priv->clock_id = NULL;
3207     } else {
3208       /* no timers, wait for activity */
3209       JBUF_WAIT_TIMER (priv);
3210     }
3211   }
3212   JBUF_UNLOCK (priv);
3213
3214   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3215   return;
3216 }
3217
3218 /*
3219  * This funcion implements the main pushing loop on the source pad.
3220  *
3221  * It first tries to push as many buffers as possible. If there is a seqnum
3222  * mismatch, we wait for the next timeouts.
3223  */
3224 static void
3225 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3226 {
3227   GstRtpJitterBufferPrivate *priv;
3228   GstFlowReturn result = GST_FLOW_OK;
3229
3230   priv = jitterbuffer->priv;
3231
3232   JBUF_LOCK_CHECK (priv, flushing);
3233   do {
3234     result = handle_next_buffer (jitterbuffer);
3235     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3236       /* now wait for the next event */
3237       JBUF_WAIT_EVENT (priv, flushing);
3238       result = GST_FLOW_OK;
3239     }
3240   }
3241   while (result == GST_FLOW_OK);
3242   /* store result for upstream */
3243   priv->srcresult = result;
3244   /* if we get here we need to pause */
3245   goto pause;
3246
3247   /* ERRORS */
3248 flushing:
3249   {
3250     result = priv->srcresult;
3251     goto pause;
3252   }
3253 pause:
3254   {
3255     GstEvent *event;
3256
3257     JBUF_SIGNAL_QUERY (priv, FALSE);
3258     JBUF_UNLOCK (priv);
3259
3260     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3261         gst_flow_get_name (result));
3262     gst_pad_pause_task (priv->srcpad);
3263     if (result == GST_FLOW_EOS) {
3264       event = gst_event_new_eos ();
3265       gst_pad_push_event (priv->srcpad, event);
3266     }
3267     return;
3268   }
3269 }
3270
3271 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3272  * some sanity checks and then emit the handle-sync signal with the parameters.
3273  * This function must be called with the LOCK */
3274 static void
3275 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3276 {
3277   GstRtpJitterBufferPrivate *priv;
3278   guint64 base_rtptime, base_time;
3279   guint32 clock_rate;
3280   guint64 last_rtptime;
3281   guint64 clock_base;
3282   guint64 ext_rtptime, diff;
3283   gboolean valid = TRUE, keep = FALSE;
3284
3285   priv = jitterbuffer->priv;
3286
3287   /* get the last values from the jitterbuffer */
3288   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3289       &clock_rate, &last_rtptime);
3290
3291   clock_base = priv->clock_base;
3292   ext_rtptime = priv->ext_rtptime;
3293
3294   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3295       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3296       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3297       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3298
3299   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3300     /* we keep this SR packet for later. When we get a valid RTP packet the
3301      * above values will be set and we can try to use the SR packet */
3302     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3303     keep = TRUE;
3304   } else {
3305     /* we can't accept anything that happened before we did the last resync */
3306     if (base_rtptime > ext_rtptime) {
3307       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3308       valid = FALSE;
3309     } else {
3310       /* the SR RTP timestamp must be something close to what we last observed
3311        * in the jitterbuffer */
3312       if (ext_rtptime > last_rtptime) {
3313         /* check how far ahead it is to our RTP timestamps */
3314         diff = ext_rtptime - last_rtptime;
3315         /* if bigger than 1 second, we drop it */
3316         if (diff > clock_rate) {
3317           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3318           /* should drop this, but some RTSP servers end up with bogus
3319            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3320            * so still trigger rptbin sync but invalidate RTCP data
3321            * (sync might use other methods) */
3322           ext_rtptime = -1;
3323         }
3324         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3325             G_GUINT64_FORMAT, last_rtptime, diff);
3326       }
3327     }
3328   }
3329
3330   if (keep) {
3331     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3332   } else if (valid) {
3333     GstStructure *s;
3334
3335     s = gst_structure_new ("application/x-rtp-sync",
3336         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3337         "base-time", G_TYPE_UINT64, base_time,
3338         "clock-rate", G_TYPE_UINT, clock_rate,
3339         "clock-base", G_TYPE_UINT64, clock_base,
3340         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3341         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3342
3343     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3344     gst_buffer_replace (&priv->last_sr, NULL);
3345     JBUF_UNLOCK (priv);
3346     g_signal_emit (jitterbuffer,
3347         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3348     JBUF_LOCK (priv);
3349     gst_structure_free (s);
3350   } else {
3351     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3352     gst_buffer_replace (&priv->last_sr, NULL);
3353   }
3354 }
3355
3356 static GstFlowReturn
3357 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3358     GstBuffer * buffer)
3359 {
3360   GstRtpJitterBuffer *jitterbuffer;
3361   GstRtpJitterBufferPrivate *priv;
3362   GstFlowReturn ret = GST_FLOW_OK;
3363   guint32 ssrc;
3364   GstRTCPPacket packet;
3365   guint64 ext_rtptime;
3366   guint32 rtptime;
3367   GstRTCPBuffer rtcp = { NULL, };
3368
3369   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3370
3371   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3372     goto invalid_buffer;
3373
3374   priv = jitterbuffer->priv;
3375
3376   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3377
3378   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3379     goto empty_buffer;
3380
3381   /* first packet must be SR or RR or else the validate would have failed */
3382   switch (gst_rtcp_packet_get_type (&packet)) {
3383     case GST_RTCP_TYPE_SR:
3384       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3385           NULL, NULL);
3386       break;
3387     default:
3388       goto ignore_buffer;
3389   }
3390   gst_rtcp_buffer_unmap (&rtcp);
3391
3392   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3393
3394   JBUF_LOCK (priv);
3395   /* convert the RTP timestamp to our extended timestamp, using the same offset
3396    * we used in the jitterbuffer */
3397   ext_rtptime = priv->jbuf->ext_rtptime;
3398   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3399
3400   priv->ext_rtptime = ext_rtptime;
3401   gst_buffer_replace (&priv->last_sr, buffer);
3402
3403   do_handle_sync (jitterbuffer);
3404   JBUF_UNLOCK (priv);
3405
3406 done:
3407   gst_buffer_unref (buffer);
3408
3409   return ret;
3410
3411 invalid_buffer:
3412   {
3413     /* this is not fatal but should be filtered earlier */
3414     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3415         ("Received invalid RTCP payload, dropping"));
3416     ret = GST_FLOW_OK;
3417     goto done;
3418   }
3419 empty_buffer:
3420   {
3421     /* this is not fatal but should be filtered earlier */
3422     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3423         ("Received empty RTCP payload, dropping"));
3424     gst_rtcp_buffer_unmap (&rtcp);
3425     ret = GST_FLOW_OK;
3426     goto done;
3427   }
3428 ignore_buffer:
3429   {
3430     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3431     gst_rtcp_buffer_unmap (&rtcp);
3432     ret = GST_FLOW_OK;
3433     goto done;
3434   }
3435 }
3436
3437 static gboolean
3438 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3439     GstQuery * query)
3440 {
3441   gboolean res = FALSE;
3442   GstRtpJitterBuffer *jitterbuffer;
3443   GstRtpJitterBufferPrivate *priv;
3444
3445   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3446   priv = jitterbuffer->priv;
3447
3448   switch (GST_QUERY_TYPE (query)) {
3449     case GST_QUERY_CAPS:
3450     {
3451       GstCaps *filter, *caps;
3452
3453       gst_query_parse_caps (query, &filter);
3454       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3455       gst_query_set_caps_result (query, caps);
3456       gst_caps_unref (caps);
3457       res = TRUE;
3458       break;
3459     }
3460     default:
3461       if (GST_QUERY_IS_SERIALIZED (query)) {
3462         RTPJitterBufferItem *item;
3463         gboolean head;
3464
3465         JBUF_LOCK_CHECK (priv, out_flushing);
3466         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3467             RTP_JITTER_BUFFER_MODE_BUFFER) {
3468           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3469           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3470           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3471           if (head)
3472             JBUF_SIGNAL_EVENT (priv);
3473           JBUF_WAIT_QUERY (priv, out_flushing);
3474           res = priv->last_query;
3475         } else {
3476           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3477           res = FALSE;
3478         }
3479         JBUF_UNLOCK (priv);
3480       } else {
3481         res = gst_pad_query_default (pad, parent, query);
3482       }
3483       break;
3484   }
3485   return res;
3486   /* ERRORS */
3487 out_flushing:
3488   {
3489     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3490     JBUF_UNLOCK (priv);
3491     return FALSE;
3492   }
3493
3494 }
3495
3496 static gboolean
3497 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3498     GstQuery * query)
3499 {
3500   GstRtpJitterBuffer *jitterbuffer;
3501   GstRtpJitterBufferPrivate *priv;
3502   gboolean res = FALSE;
3503
3504   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3505   priv = jitterbuffer->priv;
3506
3507   switch (GST_QUERY_TYPE (query)) {
3508     case GST_QUERY_LATENCY:
3509     {
3510       /* We need to send the query upstream and add the returned latency to our
3511        * own */
3512       GstClockTime min_latency, max_latency;
3513       gboolean us_live;
3514       GstClockTime our_latency;
3515
3516       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3517         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3518
3519         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3520             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3521             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3522
3523         /* store this so that we can safely sync on the peer buffers. */
3524         JBUF_LOCK (priv);
3525         priv->peer_latency = min_latency;
3526         our_latency = priv->latency_ns;
3527         JBUF_UNLOCK (priv);
3528
3529         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3530             GST_TIME_ARGS (our_latency));
3531
3532         /* we add some latency but can buffer an infinite amount of time */
3533         min_latency += our_latency;
3534         max_latency = -1;
3535
3536         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3537             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3538             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3539
3540         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3541       }
3542       break;
3543     }
3544     case GST_QUERY_POSITION:
3545     {
3546       GstClockTime start, last_out;
3547       GstFormat fmt;
3548
3549       gst_query_parse_position (query, &fmt, NULL);
3550       if (fmt != GST_FORMAT_TIME) {
3551         res = gst_pad_query_default (pad, parent, query);
3552         break;
3553       }
3554
3555       JBUF_LOCK (priv);
3556       start = priv->npt_start;
3557       last_out = priv->last_out_time;
3558       JBUF_UNLOCK (priv);
3559
3560       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3561           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3562           GST_TIME_ARGS (last_out));
3563
3564       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3565         /* bring 0-based outgoing time to stream time */
3566         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3567         res = TRUE;
3568       } else {
3569         res = gst_pad_query_default (pad, parent, query);
3570       }
3571       break;
3572     }
3573     case GST_QUERY_CAPS:
3574     {
3575       GstCaps *filter, *caps;
3576
3577       gst_query_parse_caps (query, &filter);
3578       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3579       gst_query_set_caps_result (query, caps);
3580       gst_caps_unref (caps);
3581       res = TRUE;
3582       break;
3583     }
3584     default:
3585       res = gst_pad_query_default (pad, parent, query);
3586       break;
3587   }
3588
3589   return res;
3590 }
3591
3592 static void
3593 gst_rtp_jitter_buffer_set_property (GObject * object,
3594     guint prop_id, const GValue * value, GParamSpec * pspec)
3595 {
3596   GstRtpJitterBuffer *jitterbuffer;
3597   GstRtpJitterBufferPrivate *priv;
3598
3599   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3600   priv = jitterbuffer->priv;
3601
3602   switch (prop_id) {
3603     case PROP_LATENCY:
3604     {
3605       guint new_latency, old_latency;
3606
3607       new_latency = g_value_get_uint (value);
3608
3609       JBUF_LOCK (priv);
3610       old_latency = priv->latency_ms;
3611       priv->latency_ms = new_latency;
3612       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3613       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3614       JBUF_UNLOCK (priv);
3615
3616       /* post message if latency changed, this will inform the parent pipeline
3617        * that a latency reconfiguration is possible/needed. */
3618       if (new_latency != old_latency) {
3619         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3620             GST_TIME_ARGS (new_latency * GST_MSECOND));
3621
3622         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3623             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3624       }
3625       break;
3626     }
3627     case PROP_DROP_ON_LATENCY:
3628       JBUF_LOCK (priv);
3629       priv->drop_on_latency = g_value_get_boolean (value);
3630       JBUF_UNLOCK (priv);
3631       break;
3632     case PROP_TS_OFFSET:
3633       JBUF_LOCK (priv);
3634       priv->ts_offset = g_value_get_int64 (value);
3635       priv->ts_discont = TRUE;
3636       JBUF_UNLOCK (priv);
3637       break;
3638     case PROP_DO_LOST:
3639       JBUF_LOCK (priv);
3640       priv->do_lost = g_value_get_boolean (value);
3641       JBUF_UNLOCK (priv);
3642       break;
3643     case PROP_MODE:
3644       JBUF_LOCK (priv);
3645       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3646       JBUF_UNLOCK (priv);
3647       break;
3648     case PROP_DO_RETRANSMISSION:
3649       JBUF_LOCK (priv);
3650       priv->do_retransmission = g_value_get_boolean (value);
3651       JBUF_UNLOCK (priv);
3652       break;
3653     case PROP_RTX_NEXT_SEQNUM:
3654       JBUF_LOCK (priv);
3655       priv->rtx_next_seqnum = g_value_get_boolean (value);
3656       JBUF_UNLOCK (priv);
3657       break;
3658     case PROP_RTX_DELAY:
3659       JBUF_LOCK (priv);
3660       priv->rtx_delay = g_value_get_int (value);
3661       JBUF_UNLOCK (priv);
3662       break;
3663     case PROP_RTX_MIN_DELAY:
3664       JBUF_LOCK (priv);
3665       priv->rtx_min_delay = g_value_get_uint (value);
3666       JBUF_UNLOCK (priv);
3667       break;
3668     case PROP_RTX_DELAY_REORDER:
3669       JBUF_LOCK (priv);
3670       priv->rtx_delay_reorder = g_value_get_int (value);
3671       JBUF_UNLOCK (priv);
3672       break;
3673     case PROP_RTX_RETRY_TIMEOUT:
3674       JBUF_LOCK (priv);
3675       priv->rtx_retry_timeout = g_value_get_int (value);
3676       JBUF_UNLOCK (priv);
3677       break;
3678     case PROP_RTX_MIN_RETRY_TIMEOUT:
3679       JBUF_LOCK (priv);
3680       priv->rtx_min_retry_timeout = g_value_get_int (value);
3681       JBUF_UNLOCK (priv);
3682       break;
3683     case PROP_RTX_RETRY_PERIOD:
3684       JBUF_LOCK (priv);
3685       priv->rtx_retry_period = g_value_get_int (value);
3686       JBUF_UNLOCK (priv);
3687       break;
3688     case PROP_RTX_MAX_RETRIES:
3689       JBUF_LOCK (priv);
3690       priv->rtx_max_retries = g_value_get_int (value);
3691       JBUF_UNLOCK (priv);
3692       break;
3693     default:
3694       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3695       break;
3696   }
3697 }
3698
3699 static void
3700 gst_rtp_jitter_buffer_get_property (GObject * object,
3701     guint prop_id, GValue * value, GParamSpec * pspec)
3702 {
3703   GstRtpJitterBuffer *jitterbuffer;
3704   GstRtpJitterBufferPrivate *priv;
3705
3706   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3707   priv = jitterbuffer->priv;
3708
3709   switch (prop_id) {
3710     case PROP_LATENCY:
3711       JBUF_LOCK (priv);
3712       g_value_set_uint (value, priv->latency_ms);
3713       JBUF_UNLOCK (priv);
3714       break;
3715     case PROP_DROP_ON_LATENCY:
3716       JBUF_LOCK (priv);
3717       g_value_set_boolean (value, priv->drop_on_latency);
3718       JBUF_UNLOCK (priv);
3719       break;
3720     case PROP_TS_OFFSET:
3721       JBUF_LOCK (priv);
3722       g_value_set_int64 (value, priv->ts_offset);
3723       JBUF_UNLOCK (priv);
3724       break;
3725     case PROP_DO_LOST:
3726       JBUF_LOCK (priv);
3727       g_value_set_boolean (value, priv->do_lost);
3728       JBUF_UNLOCK (priv);
3729       break;
3730     case PROP_MODE:
3731       JBUF_LOCK (priv);
3732       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3733       JBUF_UNLOCK (priv);
3734       break;
3735     case PROP_PERCENT:
3736     {
3737       gint percent;
3738
3739       JBUF_LOCK (priv);
3740       if (priv->srcresult != GST_FLOW_OK)
3741         percent = 100;
3742       else
3743         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3744
3745       g_value_set_int (value, percent);
3746       JBUF_UNLOCK (priv);
3747       break;
3748     }
3749     case PROP_DO_RETRANSMISSION:
3750       JBUF_LOCK (priv);
3751       g_value_set_boolean (value, priv->do_retransmission);
3752       JBUF_UNLOCK (priv);
3753       break;
3754     case PROP_RTX_NEXT_SEQNUM:
3755       JBUF_LOCK (priv);
3756       g_value_set_boolean (value, priv->rtx_next_seqnum);
3757       JBUF_UNLOCK (priv);
3758       break;
3759     case PROP_RTX_DELAY:
3760       JBUF_LOCK (priv);
3761       g_value_set_int (value, priv->rtx_delay);
3762       JBUF_UNLOCK (priv);
3763       break;
3764     case PROP_RTX_MIN_DELAY:
3765       JBUF_LOCK (priv);
3766       g_value_set_uint (value, priv->rtx_min_delay);
3767       JBUF_UNLOCK (priv);
3768       break;
3769     case PROP_RTX_DELAY_REORDER:
3770       JBUF_LOCK (priv);
3771       g_value_set_int (value, priv->rtx_delay_reorder);
3772       JBUF_UNLOCK (priv);
3773       break;
3774     case PROP_RTX_RETRY_TIMEOUT:
3775       JBUF_LOCK (priv);
3776       g_value_set_int (value, priv->rtx_retry_timeout);
3777       JBUF_UNLOCK (priv);
3778       break;
3779     case PROP_RTX_MIN_RETRY_TIMEOUT:
3780       JBUF_LOCK (priv);
3781       g_value_set_int (value, priv->rtx_min_retry_timeout);
3782       JBUF_UNLOCK (priv);
3783       break;
3784     case PROP_RTX_RETRY_PERIOD:
3785       JBUF_LOCK (priv);
3786       g_value_set_int (value, priv->rtx_retry_period);
3787       JBUF_UNLOCK (priv);
3788       break;
3789     case PROP_RTX_MAX_RETRIES:
3790       JBUF_LOCK (priv);
3791       g_value_set_int (value, priv->rtx_max_retries);
3792       JBUF_UNLOCK (priv);
3793       break;
3794     case PROP_STATS:
3795       g_value_take_boxed (value,
3796           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3797       break;
3798     default:
3799       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3800       break;
3801   }
3802 }
3803
3804 static GstStructure *
3805 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3806 {
3807   GstStructure *s;
3808
3809   JBUF_LOCK (jbuf->priv);
3810   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3811       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3812       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3813       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3814       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3815   JBUF_UNLOCK (jbuf->priv);
3816
3817   return s;
3818 }