rtpjitterbuffer: Take a running average of the packet spacings instead of just the...
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  */
95
96 #ifdef HAVE_CONFIG_H
97 #include "config.h"
98 #endif
99
100 #include <stdlib.h>
101 #include <string.h>
102 #include <gst/rtp/gstrtpbuffer.h>
103
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
107
108 #include <gst/glib-compat-private.h>
109
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
112
113 /* RTPJitterBuffer signals and args */
114 enum
115 {
116   SIGNAL_REQUEST_PT_MAP,
117   SIGNAL_CLEAR_PT_MAP,
118   SIGNAL_HANDLE_SYNC,
119   SIGNAL_ON_NPT_STOP,
120   SIGNAL_SET_ACTIVE,
121   LAST_SIGNAL
122 };
123
124 #define DEFAULT_LATENCY_MS          200
125 #define DEFAULT_DROP_ON_LATENCY     FALSE
126 #define DEFAULT_TS_OFFSET           0
127 #define DEFAULT_DO_LOST             FALSE
128 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT             0
130 #define DEFAULT_DO_RETRANSMISSION   FALSE
131 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
132 #define DEFAULT_RTX_DELAY           -1
133 #define DEFAULT_RTX_MIN_DELAY       0
134 #define DEFAULT_RTX_DELAY_REORDER   3
135 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
136 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
137 #define DEFAULT_RTX_RETRY_PERIOD    -1
138 #define DEFAULT_RTX_MAX_RETRIES    -1
139
140 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
141 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
142
143 enum
144 {
145   PROP_0,
146   PROP_LATENCY,
147   PROP_DROP_ON_LATENCY,
148   PROP_TS_OFFSET,
149   PROP_DO_LOST,
150   PROP_MODE,
151   PROP_PERCENT,
152   PROP_DO_RETRANSMISSION,
153   PROP_RTX_NEXT_SEQNUM,
154   PROP_RTX_DELAY,
155   PROP_RTX_MIN_DELAY,
156   PROP_RTX_DELAY_REORDER,
157   PROP_RTX_RETRY_TIMEOUT,
158   PROP_RTX_MIN_RETRY_TIMEOUT,
159   PROP_RTX_RETRY_PERIOD,
160   PROP_RTX_MAX_RETRIES,
161   PROP_STATS,
162   PROP_LAST
163 };
164
165 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
166
167 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
168   JBUF_LOCK (priv);                                   \
169   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
170     goto label;                                       \
171 } G_STMT_END
172 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
173
174 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
175   GST_DEBUG ("waiting timer");                            \
176   (priv)->waiting_timer = TRUE;                           \
177   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
178   (priv)->waiting_timer = FALSE;                          \
179   GST_DEBUG ("waiting timer done");                       \
180 } G_STMT_END
181 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
182   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
183     GST_DEBUG ("signal timer");                           \
184     g_cond_signal (&(priv)->jbuf_timer);                  \
185   }                                                       \
186 } G_STMT_END
187
188 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
189   GST_DEBUG ("waiting event");                           \
190   (priv)->waiting_event = TRUE;                          \
191   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
192   (priv)->waiting_event = FALSE;                         \
193   GST_DEBUG ("waiting event done");                      \
194   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
195     goto label;                                          \
196 } G_STMT_END
197 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
198   if (G_UNLIKELY ((priv)->waiting_event)) {              \
199     GST_DEBUG ("signal event");                          \
200     g_cond_signal (&(priv)->jbuf_event);                 \
201   }                                                      \
202 } G_STMT_END
203
204 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
205   GST_DEBUG ("waiting query");                           \
206   (priv)->waiting_query = TRUE;                          \
207   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
208   (priv)->waiting_query = FALSE;                         \
209   GST_DEBUG ("waiting query done");                      \
210   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
211     goto label;                                          \
212 } G_STMT_END
213 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
214   (priv)->last_query = res;                              \
215   if (G_UNLIKELY ((priv)->waiting_query)) {              \
216     GST_DEBUG ("signal query");                          \
217     g_cond_signal (&(priv)->jbuf_query);                 \
218   }                                                      \
219 } G_STMT_END
220
221
222 struct _GstRtpJitterBufferPrivate
223 {
224   GstPad *sinkpad, *srcpad;
225   GstPad *rtcpsinkpad;
226
227   RTPJitterBuffer *jbuf;
228   GMutex jbuf_lock;
229   gboolean waiting_timer;
230   GCond jbuf_timer;
231   gboolean waiting_event;
232   GCond jbuf_event;
233   gboolean waiting_query;
234   GCond jbuf_query;
235   gboolean last_query;
236   gboolean discont;
237   gboolean ts_discont;
238   gboolean active;
239   guint64 out_offset;
240
241   gboolean timer_running;
242   GThread *timer_thread;
243
244   /* properties */
245   guint latency_ms;
246   guint64 latency_ns;
247   gboolean drop_on_latency;
248   gint64 ts_offset;
249   gboolean do_lost;
250   gboolean do_retransmission;
251   gboolean rtx_next_seqnum;
252   gint rtx_delay;
253   guint rtx_min_delay;
254   gint rtx_delay_reorder;
255   gint rtx_retry_timeout;
256   gint rtx_min_retry_timeout;
257   gint rtx_retry_period;
258   gint rtx_max_retries;
259
260   /* the last seqnum we pushed out */
261   guint32 last_popped_seqnum;
262   /* the next expected seqnum we push */
263   guint32 next_seqnum;
264   /* seqnum-base, if known */
265   guint32 seqnum_base;
266   /* last output time */
267   GstClockTime last_out_time;
268   /* last valid input timestamp and rtptime pair */
269   GstClockTime ips_dts;
270   guint64 ips_rtptime;
271   GstClockTime packet_spacing;
272
273   /* the next expected seqnum we receive */
274   GstClockTime last_in_dts;
275   guint32 last_in_seqnum;
276   guint32 next_in_seqnum;
277
278   GArray *timers;
279
280   /* start and stop ranges */
281   GstClockTime npt_start;
282   GstClockTime npt_stop;
283   guint64 ext_timestamp;
284   guint64 last_elapsed;
285   guint64 estimated_eos;
286   GstClockID eos_id;
287
288   /* state */
289   gboolean eos;
290   guint last_percent;
291
292   /* clock rate and rtp timestamp offset */
293   gint last_pt;
294   gint32 clock_rate;
295   gint64 clock_base;
296   gint64 prev_ts_offset;
297
298   /* when we are shutting down */
299   GstFlowReturn srcresult;
300   gboolean blocked;
301
302   /* for sync */
303   GstSegment segment;
304   GstClockID clock_id;
305   GstClockTime timer_timeout;
306   guint16 timer_seqnum;
307   /* the latency of the upstream peer, we have to take this into account when
308    * synchronizing the buffers. */
309   GstClockTime peer_latency;
310   guint64 ext_rtptime;
311   GstBuffer *last_sr;
312
313   /* some accounting */
314   guint64 num_late;
315   guint64 num_duplicates;
316   guint64 num_rtx_requests;
317   guint64 num_rtx_success;
318   guint64 num_rtx_failed;
319   gdouble avg_rtx_num;
320   guint64 avg_rtx_rtt;
321
322   /* for the jitter */
323   GstClockTime last_dts;
324   guint64 last_rtptime;
325   GstClockTime avg_jitter;
326 };
327
328 typedef enum
329 {
330   TIMER_TYPE_EXPECTED,
331   TIMER_TYPE_LOST,
332   TIMER_TYPE_DEADLINE,
333   TIMER_TYPE_EOS
334 } TimerType;
335
336 typedef struct
337 {
338   guint idx;
339   guint16 seqnum;
340   guint num;
341   TimerType type;
342   GstClockTime timeout;
343   GstClockTime duration;
344   GstClockTime rtx_base;
345   GstClockTime rtx_delay;
346   GstClockTime rtx_retry;
347   GstClockTime rtx_last;
348   guint num_rtx_retry;
349 } TimerData;
350
351 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
352   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
353                                 GstRtpJitterBufferPrivate))
354
355 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
356 GST_STATIC_PAD_TEMPLATE ("sink",
357     GST_PAD_SINK,
358     GST_PAD_ALWAYS,
359     GST_STATIC_CAPS ("application/x-rtp"
360         /* "clock-rate = (int) [ 1, 2147483647 ], "
361          * "payload = (int) , "
362          * "encoding-name = (string) "
363          */ )
364     );
365
366 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
367 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
368     GST_PAD_SINK,
369     GST_PAD_REQUEST,
370     GST_STATIC_CAPS ("application/x-rtcp")
371     );
372
373 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
374 GST_STATIC_PAD_TEMPLATE ("src",
375     GST_PAD_SRC,
376     GST_PAD_ALWAYS,
377     GST_STATIC_CAPS ("application/x-rtp"
378         /* "payload = (int) , "
379          * "clock-rate = (int) , "
380          * "encoding-name = (string) "
381          */ )
382     );
383
384 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
385
386 #define gst_rtp_jitter_buffer_parent_class parent_class
387 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
388
389 /* object overrides */
390 static void gst_rtp_jitter_buffer_set_property (GObject * object,
391     guint prop_id, const GValue * value, GParamSpec * pspec);
392 static void gst_rtp_jitter_buffer_get_property (GObject * object,
393     guint prop_id, GValue * value, GParamSpec * pspec);
394 static void gst_rtp_jitter_buffer_finalize (GObject * object);
395
396 /* element overrides */
397 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
398     * element, GstStateChange transition);
399 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
400     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
401 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
402     GstPad * pad);
403 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
404
405 /* pad overrides */
406 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
407 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
408     GstObject * parent);
409
410 /* sinkpad overrides */
411 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
412     GstObject * parent, GstEvent * event);
413 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
414     GstObject * parent, GstBuffer * buffer);
415
416 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
417     GstObject * parent, GstEvent * event);
418 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
419     GstObject * parent, GstBuffer * buffer);
420
421 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
422     GstObject * parent, GstQuery * query);
423
424 /* srcpad overrides */
425 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
426     GstObject * parent, GstEvent * event);
427 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
428     GstObject * parent, GstPadMode mode, gboolean active);
429 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
430 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
431     GstObject * parent, GstQuery * query);
432
433 static void
434 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
435 static GstClockTime
436 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
437     gboolean active, guint64 base_time);
438 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
439
440 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
441 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
442
443 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
444
445 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
446     jitterbuffer);
447
448 static void
449 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
450 {
451   GObjectClass *gobject_class;
452   GstElementClass *gstelement_class;
453
454   gobject_class = (GObjectClass *) klass;
455   gstelement_class = (GstElementClass *) klass;
456
457   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
458
459   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
460
461   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
462   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
463
464   /**
465    * GstRtpJitterBuffer:latency:
466    *
467    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
468    * for at most this time.
469    */
470   g_object_class_install_property (gobject_class, PROP_LATENCY,
471       g_param_spec_uint ("latency", "Buffer latency in ms",
472           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
473           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474   /**
475    * GstRtpJitterBuffer:drop-on-latency:
476    *
477    * Drop oldest buffers when the queue is completely filled.
478    */
479   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
480       g_param_spec_boolean ("drop-on-latency",
481           "Drop buffers when maximum latency is reached",
482           "Tells the jitterbuffer to never exceed the given latency in size",
483           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
484   /**
485    * GstRtpJitterBuffer:ts-offset:
486    *
487    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
488    * This is mainly used to ensure interstream synchronisation.
489    */
490   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
491       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
492           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
493           G_MAXINT64, DEFAULT_TS_OFFSET,
494           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
495
496   /**
497    * GstRtpJitterBuffer:do-lost:
498    *
499    * Send out a GstRTPPacketLost event downstream when a packet is considered
500    * lost.
501    */
502   g_object_class_install_property (gobject_class, PROP_DO_LOST,
503       g_param_spec_boolean ("do-lost", "Do Lost",
504           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
505           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
506
507   /**
508    * GstRtpJitterBuffer:mode:
509    *
510    * Control the buffering and timestamping mode used by the jitterbuffer.
511    */
512   g_object_class_install_property (gobject_class, PROP_MODE,
513       g_param_spec_enum ("mode", "Mode",
514           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
515           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
516   /**
517    * GstRtpJitterBuffer:percent:
518    *
519    * The percent of the jitterbuffer that is filled.
520    */
521   g_object_class_install_property (gobject_class, PROP_PERCENT,
522       g_param_spec_int ("percent", "percent",
523           "The buffer filled percent", 0, 100,
524           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
525   /**
526    * GstRtpJitterBuffer:do-retransmission:
527    *
528    * Send out a GstRTPRetransmission event upstream when a packet is considered
529    * late and should be retransmitted.
530    *
531    * Since: 1.2
532    */
533   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
534       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
535           "Send retransmission events upstream when a packet is late",
536           DEFAULT_DO_RETRANSMISSION,
537           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
538
539   /**
540    * GstRtpJitterBuffer:rtx-next-seqnum
541    *
542    * Estimate when the next packet should arrive and schedule a retransmission
543    * request for it.
544    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
545    * for packet N+1. So it will be requested if it does not arrive at the expected time.
546    * The expected time is calculated using the dts of N and the packet spacing.
547    *
548    * Since: 1.6
549    */
550   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
551       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
552           "Estimate when the next packet should arrive and schedule a "
553           "retransmission request for it.",
554           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
555
556   /**
557    * GstRtpJitterBuffer:rtx-delay:
558    *
559    * When a packet did not arrive at the expected time, wait this extra amount
560    * of time before sending a retransmission event.
561    *
562    * When -1 is used, the max jitter will be used as extra delay.
563    *
564    * Since: 1.2
565    */
566   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
567       g_param_spec_int ("rtx-delay", "RTX Delay",
568           "Extra time in ms to wait before sending retransmission "
569           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
570           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
571
572   /**
573    * GstRtpJitterBuffer:rtx-min-delay:
574    *
575    * When a packet did not arrive at the expected time, wait at least this extra amount
576    * of time before sending a retransmission event.
577    *
578    * Since: 1.6
579    */
580   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
581       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
582           "Minimum time in ms to wait before sending retransmission "
583           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
584           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
585   /**
586    * GstRtpJitterBuffer:rtx-delay-reorder:
587    *
588    * Assume that a retransmission event should be sent when we see
589    * this much packet reordering.
590    *
591    * When -1 is used, the value will be estimated based on observed packet
592    * reordering.
593    *
594    * Since: 1.2
595    */
596   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
597       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
598           "Sending retransmission event when this much reordering (-1 automatic)",
599           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
600           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601   /**
602    * GstRtpJitterBuffer::rtx-retry-timeout:
603    *
604    * When no packet has been received after sending a retransmission event
605    * for this time, retry sending a retransmission event.
606    *
607    * When -1 is used, the value will be estimated based on observed round
608    * trip time.
609    *
610    * Since: 1.2
611    */
612   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
613       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
614           "Retry sending a transmission event after this timeout in "
615           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
616           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
617   /**
618    * GstRtpJitterBuffer::rtx-min-retry-timeout:
619    *
620    * The minimum amount of time between retry timeouts. When
621    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
622    * minimum interval between retry timeouts.
623    *
624    * When -1 is used, the value will be estimated based on the
625    * packet spacing.
626    *
627    * Since: 1.6
628    */
629   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
630       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
631           "Minimum timeout between sending a transmission event in "
632           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
633           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
634   /**
635    * GstRtpJitterBuffer:rtx-retry-period:
636    *
637    * The amount of time to try to get a retransmission.
638    *
639    * When -1 is used, the value will be estimated based on the jitterbuffer
640    * latency and the observed round trip time.
641    *
642    * Since: 1.2
643    */
644   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
645       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
646           "Try to get a retransmission for this many ms "
647           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
648           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
649   /**
650    * GstRtpJitterBuffer:rtx-max-retries:
651    *
652    * The maximum number of retries to request a retransmission.
653    *
654    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
655    * When -1 is used, the number of retransmission request will not be limited.
656    *
657    * Since: 1.6
658    */
659   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
660       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
661           "The maximum number of retries to request a retransmission. "
662           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
663           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
664   /**
665    * GstRtpJitterBuffer:stats:
666    *
667    * Various jitterbuffer statistics. This property returns a GstStructure
668    * with name application/x-rtp-jitterbuffer-stats with the following fields:
669    *
670    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
671    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
672    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
673    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
674    *
675    * Since: 1.4
676    */
677   g_object_class_install_property (gobject_class, PROP_STATS,
678       g_param_spec_boxed ("stats", "Statistics",
679           "Various statistics", GST_TYPE_STRUCTURE,
680           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
681
682   /**
683    * GstRtpJitterBuffer::request-pt-map:
684    * @buffer: the object which received the signal
685    * @pt: the pt
686    *
687    * Request the payload type as #GstCaps for @pt.
688    */
689   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
690       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
691       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
692           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
693       GST_TYPE_CAPS, 1, G_TYPE_UINT);
694   /**
695    * GstRtpJitterBuffer::handle-sync:
696    * @buffer: the object which received the signal
697    * @struct: a GstStructure containing sync values.
698    *
699    * Be notified of new sync values.
700    */
701   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
702       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
703       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
704           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
705       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
706
707   /**
708    * GstRtpJitterBuffer::on-npt-stop:
709    * @buffer: the object which received the signal
710    *
711    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
712    * the npt-stop position.
713    */
714   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
715       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
716       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
717           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
718       G_TYPE_NONE, 0, G_TYPE_NONE);
719
720   /**
721    * GstRtpJitterBuffer::clear-pt-map:
722    * @buffer: the object which received the signal
723    *
724    * Invalidate the clock-rate as obtained with the
725    * #GstRtpJitterBuffer::request-pt-map signal.
726    */
727   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
728       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
729       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
730       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
731       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
732
733   /**
734    * GstRtpJitterBuffer::set-active:
735    * @buffer: the object which received the signal
736    *
737    * Start pushing out packets with the given base time. This signal is only
738    * useful in buffering mode.
739    *
740    * Returns: the time of the last pushed packet.
741    */
742   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
743       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
744       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
745       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
746       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
747       G_TYPE_UINT64);
748
749   gstelement_class->change_state =
750       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
751   gstelement_class->request_new_pad =
752       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
753   gstelement_class->release_pad =
754       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
755   gstelement_class->provide_clock =
756       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
757
758   gst_element_class_add_pad_template (gstelement_class,
759       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
760   gst_element_class_add_pad_template (gstelement_class,
761       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
762   gst_element_class_add_pad_template (gstelement_class,
763       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
764
765   gst_element_class_set_static_metadata (gstelement_class,
766       "RTP packet jitter-buffer", "Filter/Network/RTP",
767       "A buffer that deals with network jitter and other transmission faults",
768       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
769       "Wim Taymans <wim.taymans@gmail.com>");
770
771   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
772   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
773
774   GST_DEBUG_CATEGORY_INIT
775       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
776 }
777
778 static void
779 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
780 {
781   GstRtpJitterBufferPrivate *priv;
782
783   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
784   jitterbuffer->priv = priv;
785
786   priv->latency_ms = DEFAULT_LATENCY_MS;
787   priv->latency_ns = priv->latency_ms * GST_MSECOND;
788   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
789   priv->do_lost = DEFAULT_DO_LOST;
790   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
791   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
792   priv->rtx_delay = DEFAULT_RTX_DELAY;
793   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
794   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
795   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
796   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
797   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
798   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
799
800   priv->last_dts = -1;
801   priv->last_rtptime = -1;
802   priv->avg_jitter = 0;
803   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
804   priv->jbuf = rtp_jitter_buffer_new ();
805   g_mutex_init (&priv->jbuf_lock);
806   g_cond_init (&priv->jbuf_timer);
807   g_cond_init (&priv->jbuf_event);
808   g_cond_init (&priv->jbuf_query);
809
810   /* reset skew detection initialy */
811   rtp_jitter_buffer_reset_skew (priv->jbuf);
812   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
813   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
814   priv->active = TRUE;
815
816   priv->srcpad =
817       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
818       "src");
819
820   gst_pad_set_activatemode_function (priv->srcpad,
821       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
822   gst_pad_set_query_function (priv->srcpad,
823       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
824   gst_pad_set_event_function (priv->srcpad,
825       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
826
827   priv->sinkpad =
828       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
829       "sink");
830
831   gst_pad_set_chain_function (priv->sinkpad,
832       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
833   gst_pad_set_event_function (priv->sinkpad,
834       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
835   gst_pad_set_query_function (priv->sinkpad,
836       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
837
838   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
839   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
840
841   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
842 }
843
844 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
845
846 #define ITEM_TYPE_BUFFER        0
847 #define ITEM_TYPE_LOST          1
848 #define ITEM_TYPE_EVENT         2
849 #define ITEM_TYPE_QUERY         3
850
851 static RTPJitterBufferItem *
852 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
853     guint seqnum, guint count, guint rtptime)
854 {
855   RTPJitterBufferItem *item;
856
857   item = g_slice_new (RTPJitterBufferItem);
858   item->data = data;
859   item->next = NULL;
860   item->prev = NULL;
861   item->type = type;
862   item->dts = dts;
863   item->pts = pts;
864   item->seqnum = seqnum;
865   item->count = count;
866   item->rtptime = rtptime;
867
868   return item;
869 }
870
871 static void
872 free_item (RTPJitterBufferItem * item)
873 {
874   if (item->data && item->type != ITEM_TYPE_QUERY)
875     gst_mini_object_unref (item->data);
876   g_slice_free (RTPJitterBufferItem, item);
877 }
878
879 static void
880 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
881 {
882   GList **l = user_data;
883
884   if (item->data && item->type == ITEM_TYPE_EVENT
885       && GST_EVENT_IS_STICKY (item->data)) {
886     *l = g_list_prepend (*l, item->data);
887   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
888     gst_mini_object_unref (item->data);
889   }
890   g_slice_free (RTPJitterBufferItem, item);
891 }
892
893 static void
894 gst_rtp_jitter_buffer_finalize (GObject * object)
895 {
896   GstRtpJitterBuffer *jitterbuffer;
897   GstRtpJitterBufferPrivate *priv;
898
899   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
900   priv = jitterbuffer->priv;
901
902   g_array_free (priv->timers, TRUE);
903   g_mutex_clear (&priv->jbuf_lock);
904   g_cond_clear (&priv->jbuf_timer);
905   g_cond_clear (&priv->jbuf_event);
906   g_cond_clear (&priv->jbuf_query);
907
908   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
909   g_object_unref (priv->jbuf);
910
911   G_OBJECT_CLASS (parent_class)->finalize (object);
912 }
913
914 static GstIterator *
915 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
916 {
917   GstRtpJitterBuffer *jitterbuffer;
918   GstPad *otherpad = NULL;
919   GstIterator *it = NULL;
920   GValue val = { 0, };
921
922   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
923
924   if (pad == jitterbuffer->priv->sinkpad) {
925     otherpad = jitterbuffer->priv->srcpad;
926   } else if (pad == jitterbuffer->priv->srcpad) {
927     otherpad = jitterbuffer->priv->sinkpad;
928   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
929     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
930   }
931
932   if (it == NULL) {
933     g_value_init (&val, GST_TYPE_PAD);
934     g_value_set_object (&val, otherpad);
935     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
936     g_value_unset (&val);
937   }
938
939   return it;
940 }
941
942 static GstPad *
943 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
944 {
945   GstRtpJitterBufferPrivate *priv;
946
947   priv = jitterbuffer->priv;
948
949   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
950
951   priv->rtcpsinkpad =
952       gst_pad_new_from_static_template
953       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
954   gst_pad_set_chain_function (priv->rtcpsinkpad,
955       gst_rtp_jitter_buffer_chain_rtcp);
956   gst_pad_set_event_function (priv->rtcpsinkpad,
957       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
958   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
959       gst_rtp_jitter_buffer_iterate_internal_links);
960   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
961   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
962
963   return priv->rtcpsinkpad;
964 }
965
966 static void
967 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
968 {
969   GstRtpJitterBufferPrivate *priv;
970
971   priv = jitterbuffer->priv;
972
973   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
974
975   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
976
977   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
978   priv->rtcpsinkpad = NULL;
979 }
980
981 static GstPad *
982 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
983     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
984 {
985   GstRtpJitterBuffer *jitterbuffer;
986   GstElementClass *klass;
987   GstPad *result;
988   GstRtpJitterBufferPrivate *priv;
989
990   g_return_val_if_fail (templ != NULL, NULL);
991   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
992
993   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
994   priv = jitterbuffer->priv;
995   klass = GST_ELEMENT_GET_CLASS (element);
996
997   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
998
999   /* figure out the template */
1000   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1001     if (priv->rtcpsinkpad != NULL)
1002       goto exists;
1003
1004     result = create_rtcp_sink (jitterbuffer);
1005   } else
1006     goto wrong_template;
1007
1008   return result;
1009
1010   /* ERRORS */
1011 wrong_template:
1012   {
1013     g_warning ("rtpjitterbuffer: this is not our template");
1014     return NULL;
1015   }
1016 exists:
1017   {
1018     g_warning ("rtpjitterbuffer: pad already requested");
1019     return NULL;
1020   }
1021 }
1022
1023 static void
1024 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1025 {
1026   GstRtpJitterBuffer *jitterbuffer;
1027   GstRtpJitterBufferPrivate *priv;
1028
1029   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1030   g_return_if_fail (GST_IS_PAD (pad));
1031
1032   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1033   priv = jitterbuffer->priv;
1034
1035   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1036
1037   if (priv->rtcpsinkpad == pad) {
1038     remove_rtcp_sink (jitterbuffer);
1039   } else
1040     goto wrong_pad;
1041
1042   return;
1043
1044   /* ERRORS */
1045 wrong_pad:
1046   {
1047     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1048     return;
1049   }
1050 }
1051
1052 static GstClock *
1053 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1054 {
1055   return gst_system_clock_obtain ();
1056 }
1057
1058 static void
1059 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1060 {
1061   GstRtpJitterBufferPrivate *priv;
1062
1063   priv = jitterbuffer->priv;
1064
1065   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1066
1067   JBUF_LOCK (priv);
1068   priv->clock_rate = -1;
1069   /* do not clear current content, but refresh state for new arrival */
1070   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1071   rtp_jitter_buffer_reset_skew (priv->jbuf);
1072   JBUF_UNLOCK (priv);
1073 }
1074
1075 static GstClockTime
1076 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1077     guint64 offset)
1078 {
1079   GstRtpJitterBufferPrivate *priv;
1080   GstClockTime last_out;
1081   RTPJitterBufferItem *item;
1082
1083   priv = jbuf->priv;
1084
1085   JBUF_LOCK (priv);
1086   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1087       active, GST_TIME_ARGS (offset));
1088
1089   if (active != priv->active) {
1090     /* add the amount of time spent in paused to the output offset. All
1091      * outgoing buffers will have this offset applied to their timestamps in
1092      * order to make them arrive in time in the sink. */
1093     priv->out_offset = offset;
1094     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1095         GST_TIME_ARGS (priv->out_offset));
1096     priv->active = active;
1097     JBUF_SIGNAL_EVENT (priv);
1098   }
1099   if (!active) {
1100     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1101   }
1102   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1103     /* head buffer timestamp and offset gives our output time */
1104     last_out = item->dts + priv->ts_offset;
1105   } else {
1106     /* use last known time when the buffer is empty */
1107     last_out = priv->last_out_time;
1108   }
1109   JBUF_UNLOCK (priv);
1110
1111   return last_out;
1112 }
1113
1114 static GstCaps *
1115 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1116 {
1117   GstRtpJitterBuffer *jitterbuffer;
1118   GstRtpJitterBufferPrivate *priv;
1119   GstPad *other;
1120   GstCaps *caps;
1121   GstCaps *templ;
1122
1123   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1124   priv = jitterbuffer->priv;
1125
1126   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1127
1128   caps = gst_pad_peer_query_caps (other, filter);
1129
1130   templ = gst_pad_get_pad_template_caps (pad);
1131   if (caps == NULL) {
1132     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1133     caps = templ;
1134   } else {
1135     GstCaps *intersect;
1136
1137     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1138
1139     intersect = gst_caps_intersect (caps, templ);
1140     gst_caps_unref (caps);
1141     gst_caps_unref (templ);
1142
1143     caps = intersect;
1144   }
1145   gst_object_unref (jitterbuffer);
1146
1147   return caps;
1148 }
1149
1150 /*
1151  * Must be called with JBUF_LOCK held
1152  */
1153
1154 static gboolean
1155 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1156     GstCaps * caps)
1157 {
1158   GstRtpJitterBufferPrivate *priv;
1159   GstStructure *caps_struct;
1160   guint val;
1161   GstClockTime tval;
1162
1163   priv = jitterbuffer->priv;
1164
1165   /* first parse the caps */
1166   caps_struct = gst_caps_get_structure (caps, 0);
1167
1168   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1169
1170   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1171    * measure the amount of data in the buffer */
1172   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1173     goto error;
1174
1175   if (priv->clock_rate <= 0)
1176     goto wrong_rate;
1177
1178   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1179
1180   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1181
1182   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1183    * can use this to track the amount of time elapsed on the sender. */
1184   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1185     priv->clock_base = val;
1186   else
1187     priv->clock_base = -1;
1188
1189   priv->ext_timestamp = priv->clock_base;
1190
1191   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1192       priv->clock_base);
1193
1194   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1195     /* first expected seqnum, only update when we didn't have a previous base. */
1196     if (priv->next_in_seqnum == -1)
1197       priv->next_in_seqnum = val;
1198     if (priv->next_seqnum == -1) {
1199       priv->next_seqnum = val;
1200       JBUF_SIGNAL_EVENT (priv);
1201     }
1202     priv->seqnum_base = val;
1203   } else {
1204     priv->seqnum_base = -1;
1205   }
1206
1207   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1208
1209   /* the start and stop times. The seqnum-base corresponds to the start time. We
1210    * will keep track of the seqnums on the output and when we reach the one
1211    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1212   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1213     priv->npt_start = tval;
1214   else
1215     priv->npt_start = 0;
1216
1217   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1218     priv->npt_stop = tval;
1219   else
1220     priv->npt_stop = -1;
1221
1222   GST_DEBUG_OBJECT (jitterbuffer,
1223       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1224       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1225
1226   return TRUE;
1227
1228   /* ERRORS */
1229 error:
1230   {
1231     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1232     return FALSE;
1233   }
1234 wrong_rate:
1235   {
1236     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1237     return FALSE;
1238   }
1239 }
1240
1241 static void
1242 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1243 {
1244   GstRtpJitterBufferPrivate *priv;
1245
1246   priv = jitterbuffer->priv;
1247
1248   JBUF_LOCK (priv);
1249   /* mark ourselves as flushing */
1250   priv->srcresult = GST_FLOW_FLUSHING;
1251   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1252   /* this unblocks any waiting pops on the src pad task */
1253   JBUF_SIGNAL_EVENT (priv);
1254   JBUF_SIGNAL_QUERY (priv, FALSE);
1255   JBUF_UNLOCK (priv);
1256 }
1257
1258 static void
1259 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1260 {
1261   GstRtpJitterBufferPrivate *priv;
1262
1263   priv = jitterbuffer->priv;
1264
1265   JBUF_LOCK (priv);
1266   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1267   /* Mark as non flushing */
1268   priv->srcresult = GST_FLOW_OK;
1269   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1270   priv->last_popped_seqnum = -1;
1271   priv->last_out_time = -1;
1272   priv->next_seqnum = -1;
1273   priv->seqnum_base = -1;
1274   priv->ips_rtptime = -1;
1275   priv->ips_dts = GST_CLOCK_TIME_NONE;
1276   priv->packet_spacing = 0;
1277   priv->next_in_seqnum = -1;
1278   priv->clock_rate = -1;
1279   priv->last_pt = -1;
1280   priv->eos = FALSE;
1281   priv->estimated_eos = -1;
1282   priv->last_elapsed = 0;
1283   priv->ext_timestamp = -1;
1284   priv->avg_jitter = 0;
1285   priv->last_dts = -1;
1286   priv->last_rtptime = -1;
1287   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1288   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1289   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1290   rtp_jitter_buffer_reset_skew (priv->jbuf);
1291   remove_all_timers (jitterbuffer);
1292   JBUF_UNLOCK (priv);
1293 }
1294
1295 static gboolean
1296 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1297     GstPadMode mode, gboolean active)
1298 {
1299   gboolean result;
1300   GstRtpJitterBuffer *jitterbuffer = NULL;
1301
1302   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1303
1304   switch (mode) {
1305     case GST_PAD_MODE_PUSH:
1306       if (active) {
1307         /* allow data processing */
1308         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1309
1310         /* start pushing out buffers */
1311         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1312         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1313             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1314       } else {
1315         /* make sure all data processing stops ASAP */
1316         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1317
1318         /* NOTE this will hardlock if the state change is called from the src pad
1319          * task thread because we will _join() the thread. */
1320         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1321         result = gst_pad_stop_task (pad);
1322       }
1323       break;
1324     default:
1325       result = FALSE;
1326       break;
1327   }
1328   return result;
1329 }
1330
1331 static GstStateChangeReturn
1332 gst_rtp_jitter_buffer_change_state (GstElement * element,
1333     GstStateChange transition)
1334 {
1335   GstRtpJitterBuffer *jitterbuffer;
1336   GstRtpJitterBufferPrivate *priv;
1337   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1338
1339   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1340   priv = jitterbuffer->priv;
1341
1342   switch (transition) {
1343     case GST_STATE_CHANGE_NULL_TO_READY:
1344       break;
1345     case GST_STATE_CHANGE_READY_TO_PAUSED:
1346       JBUF_LOCK (priv);
1347       /* reset negotiated values */
1348       priv->clock_rate = -1;
1349       priv->clock_base = -1;
1350       priv->peer_latency = 0;
1351       priv->last_pt = -1;
1352       /* block until we go to PLAYING */
1353       priv->blocked = TRUE;
1354       priv->timer_running = TRUE;
1355       priv->timer_thread =
1356           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1357       JBUF_UNLOCK (priv);
1358       break;
1359     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1360       JBUF_LOCK (priv);
1361       /* unblock to allow streaming in PLAYING */
1362       priv->blocked = FALSE;
1363       JBUF_SIGNAL_EVENT (priv);
1364       JBUF_SIGNAL_TIMER (priv);
1365       JBUF_UNLOCK (priv);
1366       break;
1367     default:
1368       break;
1369   }
1370
1371   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1372
1373   switch (transition) {
1374     case GST_STATE_CHANGE_READY_TO_PAUSED:
1375       /* we are a live element because we sync to the clock, which we can only
1376        * do in the PLAYING state */
1377       if (ret != GST_STATE_CHANGE_FAILURE)
1378         ret = GST_STATE_CHANGE_NO_PREROLL;
1379       break;
1380     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1381       JBUF_LOCK (priv);
1382       /* block to stop streaming when PAUSED */
1383       priv->blocked = TRUE;
1384       unschedule_current_timer (jitterbuffer);
1385       JBUF_UNLOCK (priv);
1386       if (ret != GST_STATE_CHANGE_FAILURE)
1387         ret = GST_STATE_CHANGE_NO_PREROLL;
1388       break;
1389     case GST_STATE_CHANGE_PAUSED_TO_READY:
1390       JBUF_LOCK (priv);
1391       gst_buffer_replace (&priv->last_sr, NULL);
1392       priv->timer_running = FALSE;
1393       unschedule_current_timer (jitterbuffer);
1394       JBUF_SIGNAL_TIMER (priv);
1395       JBUF_SIGNAL_QUERY (priv, FALSE);
1396       JBUF_UNLOCK (priv);
1397       g_thread_join (priv->timer_thread);
1398       priv->timer_thread = NULL;
1399       break;
1400     case GST_STATE_CHANGE_READY_TO_NULL:
1401       break;
1402     default:
1403       break;
1404   }
1405
1406   return ret;
1407 }
1408
1409 static gboolean
1410 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1411     GstEvent * event)
1412 {
1413   gboolean ret = TRUE;
1414   GstRtpJitterBuffer *jitterbuffer;
1415   GstRtpJitterBufferPrivate *priv;
1416
1417   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1418   priv = jitterbuffer->priv;
1419
1420   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1421
1422   switch (GST_EVENT_TYPE (event)) {
1423     case GST_EVENT_LATENCY:
1424     {
1425       GstClockTime latency;
1426
1427       gst_event_parse_latency (event, &latency);
1428
1429       GST_DEBUG_OBJECT (jitterbuffer,
1430           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1431
1432       JBUF_LOCK (priv);
1433       /* adjust the overall buffer delay to the total pipeline latency in
1434        * buffering mode because if downstream consumes too fast (because of
1435        * large latency or queues, we would start rebuffering again. */
1436       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1437           RTP_JITTER_BUFFER_MODE_BUFFER) {
1438         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1439       }
1440       JBUF_UNLOCK (priv);
1441
1442       ret = gst_pad_push_event (priv->sinkpad, event);
1443       break;
1444     }
1445     default:
1446       ret = gst_pad_push_event (priv->sinkpad, event);
1447       break;
1448   }
1449
1450   return ret;
1451 }
1452
1453 /* handles and stores the event in the jitterbuffer, must be called with
1454  * LOCK */
1455 static gboolean
1456 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1457 {
1458   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1459   RTPJitterBufferItem *item;
1460   gboolean head;
1461
1462   switch (GST_EVENT_TYPE (event)) {
1463     case GST_EVENT_CAPS:
1464     {
1465       GstCaps *caps;
1466
1467       gst_event_parse_caps (event, &caps);
1468       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1469       break;
1470     }
1471     case GST_EVENT_SEGMENT:
1472       gst_event_copy_segment (event, &priv->segment);
1473
1474       /* we need time for now */
1475       if (priv->segment.format != GST_FORMAT_TIME)
1476         goto newseg_wrong_format;
1477
1478       GST_DEBUG_OBJECT (jitterbuffer,
1479           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1480       break;
1481     case GST_EVENT_EOS:
1482       priv->eos = TRUE;
1483       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1484       break;
1485     default:
1486       break;
1487   }
1488
1489
1490   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1491   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1492   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1493   if (head)
1494     JBUF_SIGNAL_EVENT (priv);
1495
1496   return TRUE;
1497
1498   /* ERRORS */
1499 newseg_wrong_format:
1500   {
1501     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1502     gst_event_unref (event);
1503     return FALSE;
1504   }
1505 }
1506
1507 static gboolean
1508 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1509     GstEvent * event)
1510 {
1511   gboolean ret = TRUE;
1512   GstRtpJitterBuffer *jitterbuffer;
1513   GstRtpJitterBufferPrivate *priv;
1514
1515   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1516   priv = jitterbuffer->priv;
1517
1518   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1519
1520   switch (GST_EVENT_TYPE (event)) {
1521     case GST_EVENT_FLUSH_START:
1522       ret = gst_pad_push_event (priv->srcpad, event);
1523       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1524       /* wait for the loop to go into PAUSED */
1525       gst_pad_pause_task (priv->srcpad);
1526       break;
1527     case GST_EVENT_FLUSH_STOP:
1528       ret = gst_pad_push_event (priv->srcpad, event);
1529       ret =
1530           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1531           GST_PAD_MODE_PUSH, TRUE);
1532       break;
1533     default:
1534       if (GST_EVENT_IS_SERIALIZED (event)) {
1535         /* serialized events go in the queue */
1536         JBUF_LOCK (priv);
1537         if (priv->srcresult != GST_FLOW_OK) {
1538           /* Errors in sticky event pushing are no problem and ignored here
1539            * as they will cause more meaningful errors during data flow.
1540            * For EOS events, that are not followed by data flow, we still
1541            * return FALSE here though.
1542            */
1543           if (!GST_EVENT_IS_STICKY (event) ||
1544               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1545             goto out_flow_error;
1546         }
1547         /* refuse more events on EOS */
1548         if (priv->eos)
1549           goto out_eos;
1550         ret = queue_event (jitterbuffer, event);
1551         JBUF_UNLOCK (priv);
1552       } else {
1553         /* non-serialized events are forwarded downstream immediately */
1554         ret = gst_pad_push_event (priv->srcpad, event);
1555       }
1556       break;
1557   }
1558   return ret;
1559
1560   /* ERRORS */
1561 out_flow_error:
1562   {
1563     GST_DEBUG_OBJECT (jitterbuffer,
1564         "refusing event, we have a downstream flow error: %s",
1565         gst_flow_get_name (priv->srcresult));
1566     JBUF_UNLOCK (priv);
1567     gst_event_unref (event);
1568     return FALSE;
1569   }
1570 out_eos:
1571   {
1572     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1573     JBUF_UNLOCK (priv);
1574     gst_event_unref (event);
1575     return FALSE;
1576   }
1577 }
1578
1579 static gboolean
1580 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1581     GstEvent * event)
1582 {
1583   gboolean ret = TRUE;
1584   GstRtpJitterBuffer *jitterbuffer;
1585
1586   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1587
1588   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1589
1590   switch (GST_EVENT_TYPE (event)) {
1591     case GST_EVENT_FLUSH_START:
1592       gst_event_unref (event);
1593       break;
1594     case GST_EVENT_FLUSH_STOP:
1595       gst_event_unref (event);
1596       break;
1597     default:
1598       ret = gst_pad_event_default (pad, parent, event);
1599       break;
1600   }
1601
1602   return ret;
1603 }
1604
1605 /*
1606  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1607  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1608  * GST_FLOW_FLUSHING when the element is shutting down. On success
1609  * GST_FLOW_OK is returned.
1610  */
1611 static GstFlowReturn
1612 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1613     guint8 pt)
1614 {
1615   GValue ret = { 0 };
1616   GValue args[2] = { {0}, {0} };
1617   GstCaps *caps;
1618   gboolean res;
1619
1620   g_value_init (&args[0], GST_TYPE_ELEMENT);
1621   g_value_set_object (&args[0], jitterbuffer);
1622   g_value_init (&args[1], G_TYPE_UINT);
1623   g_value_set_uint (&args[1], pt);
1624
1625   g_value_init (&ret, GST_TYPE_CAPS);
1626   g_value_set_boxed (&ret, NULL);
1627
1628   JBUF_UNLOCK (jitterbuffer->priv);
1629   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1630       &ret);
1631   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1632
1633   g_value_unset (&args[0]);
1634   g_value_unset (&args[1]);
1635   caps = (GstCaps *) g_value_dup_boxed (&ret);
1636   g_value_unset (&ret);
1637   if (!caps)
1638     goto no_caps;
1639
1640   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1641   gst_caps_unref (caps);
1642
1643   if (G_UNLIKELY (!res))
1644     goto parse_failed;
1645
1646   return GST_FLOW_OK;
1647
1648   /* ERRORS */
1649 no_caps:
1650   {
1651     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1652     return GST_FLOW_ERROR;
1653   }
1654 out_flushing:
1655   {
1656     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1657     return GST_FLOW_FLUSHING;
1658   }
1659 parse_failed:
1660   {
1661     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1662     return GST_FLOW_ERROR;
1663   }
1664 }
1665
1666 /* call with jbuf lock held */
1667 static GstMessage *
1668 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1669 {
1670   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1671   GstMessage *message = NULL;
1672
1673   if (percent == -1)
1674     return NULL;
1675
1676   /* Post a buffering message */
1677   if (priv->last_percent != percent) {
1678     priv->last_percent = percent;
1679     message =
1680         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1681     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1682   }
1683
1684   return message;
1685 }
1686
1687 static GstClockTime
1688 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1689 {
1690   GstRtpJitterBufferPrivate *priv;
1691
1692   priv = jitterbuffer->priv;
1693
1694   if (timestamp == -1)
1695     return -1;
1696
1697   /* apply the timestamp offset, this is used for inter stream sync */
1698   timestamp += priv->ts_offset;
1699   /* add the offset, this is used when buffering */
1700   timestamp += priv->out_offset;
1701
1702   return timestamp;
1703 }
1704
1705 static TimerData *
1706 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1707 {
1708   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1709   TimerData *timer = NULL;
1710   gint i, len;
1711
1712   len = priv->timers->len;
1713   for (i = 0; i < len; i++) {
1714     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1715     if (test->seqnum == seqnum && test->type == type) {
1716       timer = test;
1717       break;
1718     }
1719   }
1720   return timer;
1721 }
1722
1723 static void
1724 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1725 {
1726   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1727
1728   if (priv->clock_id) {
1729     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1730     gst_clock_id_unschedule (priv->clock_id);
1731     priv->clock_id = NULL;
1732   }
1733 }
1734
1735 static GstClockTime
1736 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1737 {
1738   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1739   GstClockTime test_timeout;
1740
1741   if ((test_timeout = timer->timeout) == -1)
1742     return -1;
1743
1744   if (timer->type != TIMER_TYPE_EXPECTED) {
1745     /* add our latency and offset to get output times. */
1746     test_timeout = apply_offset (jitterbuffer, test_timeout);
1747     test_timeout += priv->latency_ns;
1748   }
1749   return test_timeout;
1750 }
1751
1752 static void
1753 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1754 {
1755   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1756
1757   if (priv->clock_id) {
1758     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1759
1760     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1761         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1762
1763     if (timeout == -1 || timeout < priv->timer_timeout)
1764       unschedule_current_timer (jitterbuffer);
1765   }
1766 }
1767
1768 static TimerData *
1769 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1770     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1771     GstClockTime duration)
1772 {
1773   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1774   TimerData *timer;
1775   gint len;
1776
1777   GST_DEBUG_OBJECT (jitterbuffer,
1778       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1779       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1780       GST_TIME_ARGS (delay));
1781
1782   len = priv->timers->len;
1783   g_array_set_size (priv->timers, len + 1);
1784   timer = &g_array_index (priv->timers, TimerData, len);
1785   timer->idx = len;
1786   timer->type = type;
1787   timer->seqnum = seqnum;
1788   timer->num = num;
1789   timer->timeout = timeout + delay;
1790   timer->duration = duration;
1791   if (type == TIMER_TYPE_EXPECTED) {
1792     timer->rtx_base = timeout;
1793     timer->rtx_delay = delay;
1794     timer->rtx_retry = 0;
1795   }
1796   timer->num_rtx_retry = 0;
1797   recalculate_timer (jitterbuffer, timer);
1798   JBUF_SIGNAL_TIMER (priv);
1799
1800   return timer;
1801 }
1802
1803 static void
1804 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1805     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1806 {
1807   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1808   gboolean seqchange, timechange;
1809   guint16 oldseq;
1810
1811   seqchange = timer->seqnum != seqnum;
1812   timechange = timer->timeout != timeout;
1813
1814   if (!seqchange && !timechange)
1815     return;
1816
1817   oldseq = timer->seqnum;
1818
1819   GST_DEBUG_OBJECT (jitterbuffer,
1820       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1821       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1822
1823   timer->timeout = timeout + delay;
1824   timer->seqnum = seqnum;
1825   if (reset) {
1826     timer->rtx_base = timeout;
1827     timer->rtx_delay = delay;
1828     timer->rtx_retry = 0;
1829   }
1830   if (seqchange)
1831     timer->num_rtx_retry = 0;
1832
1833   if (priv->clock_id) {
1834     /* we changed the seqnum and there is a timer currently waiting with this
1835      * seqnum, unschedule it */
1836     if (seqchange && priv->timer_seqnum == oldseq)
1837       unschedule_current_timer (jitterbuffer);
1838     /* we changed the time, check if it is earlier than what we are waiting
1839      * for and unschedule if so */
1840     else if (timechange)
1841       recalculate_timer (jitterbuffer, timer);
1842   }
1843 }
1844
1845 static TimerData *
1846 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1847     guint16 seqnum, GstClockTime timeout)
1848 {
1849   TimerData *timer;
1850
1851   /* find the seqnum timer */
1852   timer = find_timer (jitterbuffer, type, seqnum);
1853   if (timer == NULL) {
1854     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1855   } else {
1856     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1857   }
1858   return timer;
1859 }
1860
1861 static void
1862 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1863 {
1864   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1865   guint idx;
1866
1867   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1868     unschedule_current_timer (jitterbuffer);
1869
1870   idx = timer->idx;
1871   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1872   g_array_remove_index_fast (priv->timers, idx);
1873   timer->idx = idx;
1874 }
1875
1876 static void
1877 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1878 {
1879   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1880   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1881   g_array_set_size (priv->timers, 0);
1882   unschedule_current_timer (jitterbuffer);
1883 }
1884
1885 /* get the extra delay to wait before sending RTX */
1886 static GstClockTime
1887 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1888 {
1889   GstClockTime delay;
1890
1891   if (priv->rtx_delay == -1) {
1892     if (priv->avg_jitter == 0)
1893       delay = DEFAULT_AUTO_RTX_DELAY;
1894     else
1895       /* jitter is in nanoseconds, 2x jitter is a good margin */
1896       delay = priv->avg_jitter * 2;
1897   } else {
1898     delay = priv->rtx_delay * GST_MSECOND;
1899   }
1900   if (priv->rtx_min_delay > 0)
1901     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1902
1903   return delay;
1904 }
1905
1906 /* we just received a packet with seqnum and dts.
1907  *
1908  * First check for old seqnum that we are still expecting. If the gap with the
1909  * current seqnum is too big, unschedule the timeouts.
1910  *
1911  * If we have a valid packet spacing estimate we can set a timer for when we
1912  * should receive the next packet.
1913  * If we don't have a valid estimate, we remove any timer we might have
1914  * had for this packet.
1915  */
1916 static void
1917 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1918     GstClockTime dts, gboolean do_next_seqnum)
1919 {
1920   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1921   TimerData *timer = NULL;
1922   gint i, len;
1923
1924   /* go through all timers and unschedule the ones with a large gap, also find
1925    * the timer for the seqnum */
1926   len = priv->timers->len;
1927   for (i = 0; i < len; i++) {
1928     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1929     gint gap;
1930
1931     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1932
1933     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1934         test->type, test->seqnum, seqnum, gap);
1935
1936     if (gap == 0) {
1937       GST_DEBUG ("found timer for current seqnum");
1938       /* the timer for the current seqnum */
1939       timer = test;
1940       /* when no retransmission, we can stop now, we only need to find the
1941        * timer for the current seqnum */
1942       if (!priv->do_retransmission)
1943         break;
1944     } else if (gap > priv->rtx_delay_reorder) {
1945       /* max gap, we exceeded the max reorder distance and we don't expect the
1946        * missing packet to be this reordered */
1947       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1948         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1949     }
1950   }
1951
1952   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1953       && priv->do_retransmission && priv->rtx_next_seqnum;
1954
1955   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1956     if (timer->num_rtx_retry > 0) {
1957       GstClockTime rtx_last, delay;
1958
1959       /* we scheduled a retry for this packet and now we have it */
1960       priv->num_rtx_success++;
1961       /* all the previous retry attempts failed */
1962       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1963       /* number of retries before receiving the packet */
1964       if (priv->avg_rtx_num == 0.0)
1965         priv->avg_rtx_num = timer->num_rtx_retry;
1966       else
1967         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1968       /* calculate the delay between retransmission request and receiving this
1969        * packet, start with when we scheduled this timeout last */
1970       rtx_last = timer->rtx_last;
1971       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1972         /* we have a valid delay if this packet arrived after we scheduled the
1973          * request */
1974         delay = dts - rtx_last;
1975         if (priv->avg_rtx_rtt == 0)
1976           priv->avg_rtx_rtt = delay;
1977         else
1978           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1979       } else
1980         delay = 0;
1981
1982       GST_LOG_OBJECT (jitterbuffer,
1983           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1984           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1985           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
1986           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
1987           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
1988           GST_TIME_ARGS (priv->avg_rtx_rtt));
1989
1990       /* don't try to estimate the next seqnum because this is a retransmitted
1991        * packet and it probably did not arrive with the expected packet
1992        * spacing. */
1993       do_next_seqnum = FALSE;
1994     }
1995   }
1996
1997   if (do_next_seqnum) {
1998     GstClockTime expected, delay;
1999
2000     /* calculate expected arrival time of the next seqnum */
2001     expected = dts + priv->packet_spacing;
2002
2003     delay = get_rtx_delay (priv);
2004
2005     /* and update/install timer for next seqnum */
2006     if (timer)
2007       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2008           delay, TRUE);
2009     else
2010       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2011           expected, delay, priv->packet_spacing);
2012   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2013     /* if we had a timer, remove it, we don't know when to expect the next
2014      * packet. */
2015     remove_timer (jitterbuffer, timer);
2016   }
2017 }
2018
2019 static void
2020 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2021     GstClockTime dts)
2022 {
2023   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2024
2025   /* we need consecutive seqnums with a different
2026    * rtptime to estimate the packet spacing. */
2027   if (priv->ips_rtptime != rtptime) {
2028     /* rtptime changed, check dts diff */
2029     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2030       GstClockTime new_packet_spacing = dts - priv->ips_dts;
2031       GstClockTime old_packet_spacing = priv->packet_spacing;
2032
2033       /* Biased towards bigger packet spacings to prevent
2034        * too many unneeded retransmission requests for next
2035        * packets that just arrive a little later than we would
2036        * expect */
2037       if (old_packet_spacing > new_packet_spacing)
2038         priv->packet_spacing =
2039             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2040       else if (old_packet_spacing > 0)
2041         priv->packet_spacing =
2042             (3 * new_packet_spacing + old_packet_spacing) / 4;
2043       else
2044         priv->packet_spacing = new_packet_spacing;
2045
2046       GST_DEBUG_OBJECT (jitterbuffer,
2047           "new packet spacing %" GST_TIME_FORMAT
2048           " old packet spacing %" GST_TIME_FORMAT
2049           " combined to %" GST_TIME_FORMAT,
2050           GST_TIME_ARGS (new_packet_spacing),
2051           GST_TIME_ARGS (old_packet_spacing),
2052           GST_TIME_ARGS (priv->packet_spacing));
2053     }
2054     priv->ips_rtptime = rtptime;
2055     priv->ips_dts = dts;
2056   }
2057 }
2058
2059 static void
2060 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2061     guint16 seqnum, GstClockTime dts, gint gap)
2062 {
2063   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2064   GstClockTime total_duration, duration, expected_dts;
2065   TimerType type;
2066   guint lost_packets = 0;
2067
2068   GST_DEBUG_OBJECT (jitterbuffer,
2069       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2070       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2071
2072   /* the total duration spanned by the missing packets */
2073   if (dts >= priv->last_in_dts)
2074     total_duration = dts - priv->last_in_dts;
2075   else
2076     total_duration = 0;
2077
2078   /* interpolate between the current time and the last time based on
2079    * number of packets we are missing, this is the estimated duration
2080    * for the missing packet based on equidistant packet spacing. */
2081   duration = total_duration / (gap + 1);
2082
2083   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2084       GST_TIME_ARGS (duration));
2085
2086   if (total_duration > priv->latency_ns) {
2087     GstClockTime gap_time;
2088
2089     gap_time = total_duration - priv->latency_ns;
2090
2091     if (duration > 0) {
2092       lost_packets = gap_time / duration;
2093       gap_time = lost_packets * duration;
2094     } else {
2095       lost_packets = gap;
2096     }
2097
2098     /* too many lost packets, some of the missing packets are already
2099      * too late and we can generate lost packet events for them. */
2100     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2101         " > %" GST_TIME_FORMAT ", consider %u lost",
2102         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2103         lost_packets);
2104
2105     /* this timer will fire immediately and the lost event will be pushed from
2106      * the timer thread */
2107     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2108         priv->last_in_dts + duration, 0, gap_time);
2109
2110     expected += lost_packets;
2111     priv->last_in_dts += gap_time;
2112   }
2113
2114   expected_dts = priv->last_in_dts + (lost_packets + 1) * duration;
2115
2116   if (priv->do_retransmission) {
2117     TimerData *timer;
2118
2119     type = TIMER_TYPE_EXPECTED;
2120     /* if we had a timer for the first missing packet, update it. */
2121     if ((timer = find_timer (jitterbuffer, type, expected))) {
2122       GstClockTime timeout = timer->timeout;
2123
2124       timer->duration = duration;
2125       if (timeout > (expected_dts + timer->rtx_retry)) {
2126         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2127         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2128             delay, TRUE);
2129       }
2130       expected++;
2131       expected_dts += duration;
2132     }
2133   } else {
2134     type = TIMER_TYPE_LOST;
2135   }
2136
2137   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2138     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2139     expected_dts += duration;
2140     expected++;
2141   }
2142 }
2143
2144 static void
2145 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2146     guint rtptime)
2147 {
2148   gint32 rtpdiff;
2149   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2150   GstRtpJitterBufferPrivate *priv;
2151
2152   priv = jitterbuffer->priv;
2153
2154   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2155     goto no_time;
2156
2157   if (priv->last_dts != -1)
2158     dtsdiff = dts - priv->last_dts;
2159   else
2160     dtsdiff = 0;
2161
2162   if (priv->last_rtptime != -1)
2163     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2164   else
2165     rtpdiff = 0;
2166
2167   priv->last_dts = dts;
2168   priv->last_rtptime = rtptime;
2169
2170   if (rtpdiff > 0)
2171     rtpdiffns =
2172         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2173   else
2174     rtpdiffns =
2175         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2176
2177   diff = ABS (dtsdiff - rtpdiffns);
2178
2179   /* jitter is stored in nanoseconds */
2180   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2181
2182   GST_LOG_OBJECT (jitterbuffer,
2183       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2184       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2185       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2186       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2187
2188   return;
2189
2190   /* ERRORS */
2191 no_time:
2192   {
2193     GST_DEBUG_OBJECT (jitterbuffer,
2194         "no dts or no clock-rate, can't calculate jitter");
2195     return;
2196   }
2197 }
2198
2199 static GstFlowReturn
2200 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2201     GstBuffer * buffer)
2202 {
2203   GstRtpJitterBuffer *jitterbuffer;
2204   GstRtpJitterBufferPrivate *priv;
2205   guint16 seqnum;
2206   guint32 expected, rtptime;
2207   GstFlowReturn ret = GST_FLOW_OK;
2208   GstClockTime dts, pts;
2209   guint64 latency_ts;
2210   gboolean head;
2211   gint percent = -1;
2212   guint8 pt;
2213   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2214   gboolean do_next_seqnum = FALSE;
2215   RTPJitterBufferItem *item;
2216   GstMessage *msg = NULL;
2217
2218   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2219
2220   priv = jitterbuffer->priv;
2221
2222   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2223     goto invalid_buffer;
2224
2225   pt = gst_rtp_buffer_get_payload_type (&rtp);
2226   seqnum = gst_rtp_buffer_get_seq (&rtp);
2227   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2228   gst_rtp_buffer_unmap (&rtp);
2229
2230   /* make sure we have PTS and DTS set */
2231   pts = GST_BUFFER_PTS (buffer);
2232   dts = GST_BUFFER_DTS (buffer);
2233   if (dts == -1)
2234     dts = pts;
2235   else if (pts == -1)
2236     pts = dts;
2237
2238   /* take the DTS of the buffer. This is the time when the packet was
2239    * received and is used to calculate jitter and clock skew. We will adjust
2240    * this DTS with the smoothed value after processing it in the
2241    * jitterbuffer and assign it as the PTS. */
2242   /* bring to running time */
2243   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2244
2245   GST_DEBUG_OBJECT (jitterbuffer,
2246       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2247       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2248
2249   JBUF_LOCK_CHECK (priv, out_flushing);
2250
2251   if (G_UNLIKELY (priv->last_pt != pt)) {
2252     GstCaps *caps;
2253
2254     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2255         pt);
2256
2257     priv->last_pt = pt;
2258     /* reset clock-rate so that we get a new one */
2259     priv->clock_rate = -1;
2260
2261     /* Try to get the clock-rate from the caps first if we can. If there are no
2262      * caps we must fire the signal to get the clock-rate. */
2263     if ((caps = gst_pad_get_current_caps (pad))) {
2264       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2265       gst_caps_unref (caps);
2266     }
2267   }
2268
2269   if (G_UNLIKELY (priv->clock_rate == -1)) {
2270     /* no clock rate given on the caps, try to get one with the signal */
2271     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2272             pt) == GST_FLOW_FLUSHING)
2273       goto out_flushing;
2274
2275     if (G_UNLIKELY (priv->clock_rate == -1))
2276       goto no_clock_rate;
2277   }
2278
2279   /* don't accept more data on EOS */
2280   if (G_UNLIKELY (priv->eos))
2281     goto have_eos;
2282
2283   calculate_jitter (jitterbuffer, dts, rtptime);
2284
2285   if (priv->seqnum_base != -1) {
2286     gint gap;
2287
2288     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2289
2290     if (gap < 0) {
2291       GST_DEBUG_OBJECT (jitterbuffer,
2292           "packet seqnum #%d before seqnum-base #%d", seqnum,
2293           priv->seqnum_base);
2294       gst_buffer_unref (buffer);
2295       ret = GST_FLOW_OK;
2296       goto finished;
2297     } else if (gap > 16384) {
2298       /* From now on don't compare against the seqnum base anymore as
2299        * at some point in the future we will wrap around and also that
2300        * much reordering is very unlikely */
2301       priv->seqnum_base = -1;
2302     }
2303   }
2304
2305   expected = priv->next_in_seqnum;
2306
2307   /* now check against our expected seqnum */
2308   if (G_LIKELY (expected != -1)) {
2309     gint gap;
2310
2311     /* now calculate gap */
2312     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2313
2314     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2315         expected, seqnum, gap);
2316
2317     if (G_LIKELY (gap == 0)) {
2318       /* packet is expected */
2319       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2320       do_next_seqnum = TRUE;
2321     } else {
2322       gboolean reset = FALSE;
2323
2324       if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2325         /* We would run into calculations with GST_CLOCK_TIME_NONE below
2326          * and can't compensate for anything without DTS on RTP packets
2327          */
2328         goto gap_but_no_dts;
2329       } else if (gap < 0) {
2330         /* we received an old packet */
2331         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2332           /* too old packet, reset */
2333           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2334               -RTP_MAX_MISORDER);
2335           reset = TRUE;
2336         } else {
2337           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2338         }
2339       } else {
2340         /* new packet, we are missing some packets */
2341         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2342           /* packet too far in future, reset */
2343           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2344               RTP_MAX_DROPOUT);
2345           reset = TRUE;
2346         } else {
2347           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2348           /* fill in the gap with EXPECTED timers */
2349           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2350
2351           do_next_seqnum = TRUE;
2352         }
2353       }
2354       if (G_UNLIKELY (reset)) {
2355         GList *events = NULL, *l;
2356
2357         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2358         rtp_jitter_buffer_flush (priv->jbuf,
2359             (GFunc) free_item_and_retain_events, &events);
2360         rtp_jitter_buffer_reset_skew (priv->jbuf);
2361         remove_all_timers (jitterbuffer);
2362         priv->discont = TRUE;
2363         priv->last_popped_seqnum = -1;
2364         priv->next_seqnum = seqnum;
2365         do_next_seqnum = TRUE;
2366
2367         /* Insert all sticky events again in order, otherwise we would
2368          * potentially loose STREAM_START, CAPS or SEGMENT events
2369          */
2370         events = g_list_reverse (events);
2371         for (l = events; l; l = l->next) {
2372           RTPJitterBufferItem *item;
2373
2374           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2375           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2376         }
2377         g_list_free (events);
2378
2379         JBUF_SIGNAL_EVENT (priv);
2380       }
2381       /* reset spacing estimation when gap */
2382       priv->ips_rtptime = -1;
2383       priv->ips_dts = GST_CLOCK_TIME_NONE;
2384     }
2385   } else {
2386     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2387     /* we don't know what the next_in_seqnum should be, wait for the last
2388      * possible moment to push this buffer, maybe we get an earlier seqnum
2389      * while we wait */
2390     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2391     do_next_seqnum = TRUE;
2392     /* take rtptime and dts to calculate packet spacing */
2393     priv->ips_rtptime = rtptime;
2394     priv->ips_dts = dts;
2395   }
2396   if (do_next_seqnum) {
2397     priv->last_in_seqnum = seqnum;
2398     priv->last_in_dts = dts;
2399     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2400   }
2401
2402   /* let's check if this buffer is too late, we can only accept packets with
2403    * bigger seqnum than the one we last pushed. */
2404   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2405     gint gap;
2406
2407     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2408
2409     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2410     if (G_UNLIKELY (gap <= 0))
2411       goto too_late;
2412   }
2413
2414   /* let's drop oldest packet if the queue is already full and drop-on-latency
2415    * is set. We can only do this when there actually is a latency. When no
2416    * latency is set, we just pump it in the queue and let the other end push it
2417    * out as fast as possible. */
2418   if (priv->latency_ms && priv->drop_on_latency) {
2419     latency_ts =
2420         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2421
2422     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2423       RTPJitterBufferItem *old_item;
2424
2425       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2426
2427       if (IS_DROPABLE (old_item)) {
2428         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2429         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2430             old_item);
2431         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2432         free_item (old_item);
2433       }
2434       /* we might have removed some head buffers, signal the pushing thread to
2435        * see if it can push now */
2436       JBUF_SIGNAL_EVENT (priv);
2437     }
2438   }
2439
2440   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2441
2442   /* now insert the packet into the queue in sorted order. This function returns
2443    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2444    * have a duplicate. */
2445   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2446               &head, &percent)))
2447     goto duplicate;
2448
2449   /* update timers */
2450   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2451
2452   /* we had an unhandled SR, handle it now */
2453   if (priv->last_sr)
2454     do_handle_sync (jitterbuffer);
2455
2456   if (G_UNLIKELY (head)) {
2457     /* signal addition of new buffer when the _loop is waiting. */
2458     if (G_LIKELY (priv->active))
2459       JBUF_SIGNAL_EVENT (priv);
2460
2461     /* let's unschedule and unblock any waiting buffers. We only want to do this
2462      * when the head buffer changed */
2463     if (G_UNLIKELY (priv->clock_id)) {
2464       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2465       unschedule_current_timer (jitterbuffer);
2466     }
2467   }
2468
2469   GST_DEBUG_OBJECT (jitterbuffer,
2470       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2471       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2472
2473   msg = check_buffering_percent (jitterbuffer, percent);
2474
2475 finished:
2476   JBUF_UNLOCK (priv);
2477
2478   if (msg)
2479     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2480
2481   return ret;
2482
2483   /* ERRORS */
2484 invalid_buffer:
2485   {
2486     /* this is not fatal but should be filtered earlier */
2487     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2488         ("Received invalid RTP payload, dropping"));
2489     gst_buffer_unref (buffer);
2490     return GST_FLOW_OK;
2491   }
2492 no_clock_rate:
2493   {
2494     GST_WARNING_OBJECT (jitterbuffer,
2495         "No clock-rate in caps!, dropping buffer");
2496     gst_buffer_unref (buffer);
2497     goto finished;
2498   }
2499 out_flushing:
2500   {
2501     ret = priv->srcresult;
2502     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2503     gst_buffer_unref (buffer);
2504     goto finished;
2505   }
2506 have_eos:
2507   {
2508     ret = GST_FLOW_EOS;
2509     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2510     gst_buffer_unref (buffer);
2511     goto finished;
2512   }
2513 too_late:
2514   {
2515     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2516         " popped, dropping", seqnum, priv->last_popped_seqnum);
2517     priv->num_late++;
2518     gst_buffer_unref (buffer);
2519     goto finished;
2520   }
2521 duplicate:
2522   {
2523     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2524         seqnum);
2525     priv->num_duplicates++;
2526     free_item (item);
2527     goto finished;
2528   }
2529 gap_but_no_dts:
2530   {
2531     /* this is fatal as we can't compensate for gaps without DTS */
2532     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2533         ("Received packet without DTS after a gap"));
2534     gst_buffer_unref (buffer);
2535     ret = GST_FLOW_ERROR;
2536     goto finished;
2537   }
2538 }
2539
2540 static GstClockTime
2541 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2542 {
2543   guint64 ext_time, elapsed;
2544   guint32 rtp_time;
2545   GstRtpJitterBufferPrivate *priv;
2546
2547   priv = jitterbuffer->priv;
2548   rtp_time = item->rtptime;
2549
2550   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2551       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2552
2553   if (rtp_time < priv->ext_timestamp) {
2554     ext_time = priv->ext_timestamp;
2555   } else {
2556     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2557   }
2558
2559   if (ext_time > priv->clock_base)
2560     elapsed = ext_time - priv->clock_base;
2561   else
2562     elapsed = 0;
2563
2564   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2565   return elapsed;
2566 }
2567
2568 static void
2569 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2570     RTPJitterBufferItem * item)
2571 {
2572   guint64 total, elapsed, left, estimated;
2573   GstClockTime out_time;
2574   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2575
2576   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2577       || priv->clock_base == -1 || priv->clock_rate <= 0)
2578     return;
2579
2580   /* compute the elapsed time */
2581   elapsed = compute_elapsed (jitterbuffer, item);
2582
2583   /* do nothing if elapsed time doesn't increment */
2584   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2585     return;
2586
2587   priv->last_elapsed = elapsed;
2588
2589   /* this is the total time we need to play */
2590   total = priv->npt_stop - priv->npt_start;
2591   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2592       GST_TIME_ARGS (total));
2593
2594   /* this is how much time there is left */
2595   if (total > elapsed)
2596     left = total - elapsed;
2597   else
2598     left = 0;
2599
2600   /* if we have less time left that the size of the buffer, we will not
2601    * be able to keep it filled, disabled buffering then */
2602   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2603     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2604         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2605     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2606   }
2607
2608   /* this is the current time as running-time */
2609   out_time = item->dts;
2610
2611   if (elapsed > 0)
2612     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2613   else {
2614     /* if there is almost nothing left,
2615      * we may never advance enough to end up in the above case */
2616     if (total < GST_SECOND)
2617       estimated = GST_SECOND;
2618     else
2619       estimated = -1;
2620   }
2621   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2622       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2623
2624   if (estimated != -1 && priv->estimated_eos != estimated) {
2625     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2626     priv->estimated_eos = estimated;
2627   }
2628 }
2629
2630 /* take a buffer from the queue and push it */
2631 static GstFlowReturn
2632 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2633 {
2634   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2635   GstFlowReturn result = GST_FLOW_OK;
2636   RTPJitterBufferItem *item;
2637   GstBuffer *outbuf = NULL;
2638   GstEvent *outevent = NULL;
2639   GstQuery *outquery = NULL;
2640   GstClockTime dts, pts;
2641   gint percent = -1;
2642   gboolean do_push = TRUE;
2643   guint type;
2644   GstMessage *msg;
2645
2646   /* when we get here we are ready to pop and push the buffer */
2647   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2648   type = item->type;
2649
2650   switch (type) {
2651     case ITEM_TYPE_BUFFER:
2652
2653       /* we need to make writable to change the flags and timestamps */
2654       outbuf = gst_buffer_make_writable (item->data);
2655
2656       if (G_UNLIKELY (priv->discont)) {
2657         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2658          * into the jitterbuffer so we can modify now. */
2659         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2660         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2661         priv->discont = FALSE;
2662       }
2663       if (G_UNLIKELY (priv->ts_discont)) {
2664         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2665         priv->ts_discont = FALSE;
2666       }
2667
2668       dts =
2669           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2670       pts =
2671           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2672
2673       /* apply timestamp with offset to buffer now */
2674       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2675       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2676
2677       /* update the elapsed time when we need to check against the npt stop time. */
2678       update_estimated_eos (jitterbuffer, item);
2679
2680       priv->last_out_time = GST_BUFFER_PTS (outbuf);
2681       break;
2682     case ITEM_TYPE_LOST:
2683       priv->discont = TRUE;
2684       if (!priv->do_lost)
2685         do_push = FALSE;
2686       /* FALLTHROUGH */
2687     case ITEM_TYPE_EVENT:
2688       outevent = item->data;
2689       break;
2690     case ITEM_TYPE_QUERY:
2691       outquery = item->data;
2692       break;
2693   }
2694
2695   /* now we are ready to push the buffer. Save the seqnum and release the lock
2696    * so the other end can push stuff in the queue again. */
2697   if (seqnum != -1) {
2698     priv->last_popped_seqnum = seqnum;
2699     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2700   }
2701   msg = check_buffering_percent (jitterbuffer, percent);
2702   JBUF_UNLOCK (priv);
2703
2704   item->data = NULL;
2705   free_item (item);
2706
2707   if (msg)
2708     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2709
2710   switch (type) {
2711     case ITEM_TYPE_BUFFER:
2712       /* push buffer */
2713       GST_DEBUG_OBJECT (jitterbuffer,
2714           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2715           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2716           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2717       result = gst_pad_push (priv->srcpad, outbuf);
2718
2719       JBUF_LOCK_CHECK (priv, out_flushing);
2720       break;
2721     case ITEM_TYPE_LOST:
2722     case ITEM_TYPE_EVENT:
2723       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2724           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2725
2726       if (do_push)
2727         gst_pad_push_event (priv->srcpad, outevent);
2728       else
2729         gst_event_unref (outevent);
2730
2731       result = GST_FLOW_OK;
2732
2733       JBUF_LOCK_CHECK (priv, out_flushing);
2734       break;
2735     case ITEM_TYPE_QUERY:
2736     {
2737       gboolean res;
2738
2739       res = gst_pad_peer_query (priv->srcpad, outquery);
2740
2741       JBUF_LOCK_CHECK (priv, out_flushing);
2742       result = GST_FLOW_OK;
2743       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2744       JBUF_SIGNAL_QUERY (priv, res);
2745       break;
2746     }
2747   }
2748   return result;
2749
2750   /* ERRORS */
2751 out_flushing:
2752   {
2753     return priv->srcresult;
2754   }
2755 }
2756
2757 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2758
2759 /* Peek a buffer and compare the seqnum to the expected seqnum.
2760  * If all is fine, the buffer is pushed.
2761  * If something is wrong, we wait for some event
2762  */
2763 static GstFlowReturn
2764 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2765 {
2766   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2767   GstFlowReturn result = GST_FLOW_OK;
2768   RTPJitterBufferItem *item;
2769   guint seqnum;
2770   guint32 next_seqnum;
2771   gint gap;
2772
2773   /* only push buffers when PLAYING and active and not buffering */
2774   if (priv->blocked || !priv->active ||
2775       rtp_jitter_buffer_is_buffering (priv->jbuf))
2776     return GST_FLOW_WAIT;
2777
2778 again:
2779   /* peek a buffer, we're just looking at the sequence number.
2780    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2781    * wait for a timeout or something to change.
2782    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2783   item = rtp_jitter_buffer_peek (priv->jbuf);
2784   if (item == NULL)
2785     goto wait;
2786
2787   /* get the seqnum and the next expected seqnum */
2788   seqnum = item->seqnum;
2789   if (seqnum == -1)
2790     goto do_push;
2791
2792   next_seqnum = priv->next_seqnum;
2793
2794   /* get the gap between this and the previous packet. If we don't know the
2795    * previous packet seqnum assume no gap. */
2796   if (G_UNLIKELY (next_seqnum == -1)) {
2797     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2798     /* we don't know what the next_seqnum should be, the chain function should
2799      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2800      * fires, so wait for that */
2801     result = GST_FLOW_WAIT;
2802   } else {
2803     /* else calculate GAP */
2804     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2805
2806     if (G_LIKELY (gap == 0)) {
2807     do_push:
2808       /* no missing packet, pop and push */
2809       result = pop_and_push_next (jitterbuffer, seqnum);
2810     } else if (G_UNLIKELY (gap < 0)) {
2811       RTPJitterBufferItem *item;
2812       /* if we have a packet that we already pushed or considered dropped, pop it
2813        * off and get the next packet */
2814       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2815           seqnum, next_seqnum);
2816       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2817       free_item (item);
2818       goto again;
2819     } else {
2820       /* the chain function has scheduled timers to request retransmission or
2821        * when to consider the packet lost, wait for that */
2822       GST_DEBUG_OBJECT (jitterbuffer,
2823           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2824           next_seqnum, seqnum, gap);
2825       result = GST_FLOW_WAIT;
2826     }
2827   }
2828   return result;
2829
2830 wait:
2831   {
2832     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2833     if (priv->eos)
2834       result = GST_FLOW_EOS;
2835     else
2836       result = GST_FLOW_WAIT;
2837     return result;
2838   }
2839 }
2840
2841 static GstClockTime
2842 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2843 {
2844   GstClockTime rtx_retry_timeout;
2845   GstClockTime rtx_min_retry_timeout;
2846
2847   if (priv->rtx_retry_timeout == -1) {
2848     if (priv->avg_rtx_rtt == 0)
2849       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2850     else
2851       /* we want to ask for a retransmission after we waited for a
2852        * complete RTT and the additional jitter */
2853       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2854   } else {
2855     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2856   }
2857   /* make sure we don't retry too often. On very low latency networks,
2858    * the RTT and jitter can be very low. */
2859   if (priv->rtx_min_retry_timeout == -1) {
2860     rtx_min_retry_timeout = priv->packet_spacing;
2861   } else {
2862     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2863   }
2864   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2865
2866   return rtx_retry_timeout;
2867 }
2868
2869 static GstClockTime
2870 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2871     GstClockTime rtx_retry_timeout)
2872 {
2873   GstClockTime rtx_retry_period;
2874
2875   if (priv->rtx_retry_period == -1) {
2876     /* we retry up to the configured jitterbuffer size but leaving some
2877      * room for the retransmission to arrive in time */
2878     if (rtx_retry_timeout > priv->latency_ns) {
2879       rtx_retry_period = 0;
2880     } else {
2881       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2882     }
2883   } else {
2884     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2885   }
2886   return rtx_retry_period;
2887 }
2888
2889 /* the timeout for when we expected a packet expired */
2890 static gboolean
2891 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2892     GstClockTime now)
2893 {
2894   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2895   GstEvent *event;
2896   guint delay, delay_ms, avg_rtx_rtt_ms;
2897   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2898   GstClockTime rtx_retry_period;
2899   GstClockTime rtx_retry_timeout;
2900   GstClock *clock;
2901
2902   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2903       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2904
2905   rtx_retry_timeout = get_rtx_retry_timeout (priv);
2906   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2907
2908   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2909       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2910       GST_TIME_ARGS (rtx_retry_period));
2911
2912   delay = timer->rtx_delay + timer->rtx_retry;
2913
2914   delay_ms = GST_TIME_AS_MSECONDS (delay);
2915   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2916   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2917   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2918
2919   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2920       gst_structure_new ("GstRTPRetransmissionRequest",
2921           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2922           "running-time", G_TYPE_UINT64, timer->rtx_base,
2923           "delay", G_TYPE_UINT, delay_ms,
2924           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2925           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2926           "period", G_TYPE_UINT, rtx_retry_period_ms,
2927           "deadline", G_TYPE_UINT, priv->latency_ms,
2928           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2929           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2930
2931   priv->num_rtx_requests++;
2932   timer->num_rtx_retry++;
2933
2934   GST_OBJECT_LOCK (jitterbuffer);
2935   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2936     timer->rtx_last = gst_clock_get_time (clock);
2937     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2938   } else {
2939     timer->rtx_last = now;
2940   }
2941   GST_OBJECT_UNLOCK (jitterbuffer);
2942
2943   /* calculate the timeout for the next retransmission attempt */
2944   timer->rtx_retry += rtx_retry_timeout;
2945   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2946       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2947       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2948       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2949   if ((priv->rtx_max_retries != -1
2950           && timer->num_rtx_retry >= priv->rtx_max_retries)
2951       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
2952     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2953     /* too many retransmission request, we now convert the timer
2954      * to a lost timer, leave the num_rtx_retry as it is for stats */
2955     timer->type = TIMER_TYPE_LOST;
2956     timer->rtx_delay = 0;
2957     timer->rtx_retry = 0;
2958   }
2959   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2960       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2961
2962   JBUF_UNLOCK (priv);
2963   gst_pad_push_event (priv->sinkpad, event);
2964   JBUF_LOCK (priv);
2965
2966   return FALSE;
2967 }
2968
2969 /* a packet is lost */
2970 static gboolean
2971 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2972     GstClockTime now)
2973 {
2974   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2975   GstClockTime duration, timestamp;
2976   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
2977   gboolean late, head;
2978   GstEvent *event;
2979   RTPJitterBufferItem *item;
2980
2981   seqnum = timer->seqnum;
2982   timestamp = apply_offset (jitterbuffer, timer->timeout);
2983   duration = timer->duration;
2984   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2985     duration = priv->packet_spacing;
2986   lost_packets = MAX (timer->num, 1);
2987   late = timer->num > 0;
2988   num_rtx_retry = timer->num_rtx_retry;
2989
2990   /* we had a gap and thus we lost some packets. Create an event for this.  */
2991   if (lost_packets > 1)
2992     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2993         seqnum + lost_packets - 1);
2994   else
2995     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2996
2997   priv->num_late += lost_packets;
2998   priv->num_rtx_failed += num_rtx_retry;
2999
3000   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3001
3002   /* we now only accept seqnum bigger than this */
3003   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
3004     priv->next_in_seqnum = next_in_seqnum;
3005
3006   /* create paket lost event */
3007   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3008       gst_structure_new ("GstRTPPacketLost",
3009           "seqnum", G_TYPE_UINT, (guint) seqnum,
3010           "timestamp", G_TYPE_UINT64, timestamp,
3011           "duration", G_TYPE_UINT64, duration,
3012           "late", G_TYPE_BOOLEAN, late,
3013           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3014
3015   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3016   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3017
3018   /* remove timer now */
3019   remove_timer (jitterbuffer, timer);
3020   if (head)
3021     JBUF_SIGNAL_EVENT (priv);
3022
3023   return TRUE;
3024 }
3025
3026 static gboolean
3027 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3028     GstClockTime now)
3029 {
3030   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3031
3032   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3033   remove_timer (jitterbuffer, timer);
3034   if (!priv->eos) {
3035     /* there was no EOS in the buffer, put one in there now */
3036     queue_event (jitterbuffer, gst_event_new_eos ());
3037   }
3038   JBUF_SIGNAL_EVENT (priv);
3039
3040   return TRUE;
3041 }
3042
3043 static gboolean
3044 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3045     GstClockTime now)
3046 {
3047   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3048
3049   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3050
3051   /* timer seqnum might have been obsoleted by caps seqnum-base,
3052    * only mess with current ongoing seqnum if still unknown */
3053   if (priv->next_seqnum == -1)
3054     priv->next_seqnum = timer->seqnum;
3055   remove_timer (jitterbuffer, timer);
3056   JBUF_SIGNAL_EVENT (priv);
3057
3058   return TRUE;
3059 }
3060
3061 static gboolean
3062 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3063     GstClockTime now)
3064 {
3065   gboolean removed = FALSE;
3066
3067   switch (timer->type) {
3068     case TIMER_TYPE_EXPECTED:
3069       removed = do_expected_timeout (jitterbuffer, timer, now);
3070       break;
3071     case TIMER_TYPE_LOST:
3072       removed = do_lost_timeout (jitterbuffer, timer, now);
3073       break;
3074     case TIMER_TYPE_DEADLINE:
3075       removed = do_deadline_timeout (jitterbuffer, timer, now);
3076       break;
3077     case TIMER_TYPE_EOS:
3078       removed = do_eos_timeout (jitterbuffer, timer, now);
3079       break;
3080   }
3081   return removed;
3082 }
3083
3084 /* called when we need to wait for the next timeout.
3085  *
3086  * We loop over the array of recorded timeouts and wait for the earliest one.
3087  * When it timed out, do the logic associated with the timer.
3088  *
3089  * If there are no timers, we wait on a gcond until something new happens.
3090  */
3091 static void
3092 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3093 {
3094   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3095   GstClockTime now = 0;
3096
3097   JBUF_LOCK (priv);
3098   while (priv->timer_running) {
3099     TimerData *timer = NULL;
3100     GstClockTime timer_timeout = -1;
3101     gint i, len;
3102
3103     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3104         GST_TIME_ARGS (now));
3105
3106     len = priv->timers->len;
3107     for (i = 0; i < len; i++) {
3108       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3109       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3110       gboolean save_best = FALSE;
3111
3112       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3113           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3114
3115       /* find the smallest timeout */
3116       if (timer == NULL) {
3117         save_best = TRUE;
3118       } else if (timer_timeout == -1) {
3119         /* we already have an immediate timeout, the new timer must be an
3120          * immediate timer with smaller seqnum to become the best */
3121         if (test_timeout == -1
3122             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3123                     timer->seqnum) > 0))
3124           save_best = TRUE;
3125       } else if (test_timeout == -1) {
3126         /* first immediate timer */
3127         save_best = TRUE;
3128       } else if (test_timeout < timer_timeout) {
3129         /* earlier timer */
3130         save_best = TRUE;
3131       } else if (test_timeout == timer_timeout
3132           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3133                   timer->seqnum) > 0)) {
3134         /* same timer, smaller seqnum */
3135         save_best = TRUE;
3136       }
3137       if (save_best) {
3138         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3139         timer = test;
3140         timer_timeout = test_timeout;
3141       }
3142     }
3143     if (timer && !priv->blocked) {
3144       GstClock *clock;
3145       GstClockTime sync_time;
3146       GstClockID id;
3147       GstClockReturn ret;
3148       GstClockTimeDiff clock_jitter;
3149
3150       if (timer_timeout == -1 || timer_timeout <= now) {
3151         do_timeout (jitterbuffer, timer, now);
3152         /* check here, do_timeout could have released the lock */
3153         if (!priv->timer_running)
3154           break;
3155         continue;
3156       }
3157
3158       GST_OBJECT_LOCK (jitterbuffer);
3159       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3160       if (!clock) {
3161         GST_OBJECT_UNLOCK (jitterbuffer);
3162         /* let's just push if there is no clock */
3163         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3164         now = timer_timeout;
3165         continue;
3166       }
3167
3168       /* prepare for sync against clock */
3169       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3170       /* add latency of peer to get input time */
3171       sync_time += priv->peer_latency;
3172
3173       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3174           " with sync time %" GST_TIME_FORMAT,
3175           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3176
3177       /* create an entry for the clock */
3178       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3179       priv->timer_timeout = timer_timeout;
3180       priv->timer_seqnum = timer->seqnum;
3181       GST_OBJECT_UNLOCK (jitterbuffer);
3182
3183       /* release the lock so that the other end can push stuff or unlock */
3184       JBUF_UNLOCK (priv);
3185
3186       ret = gst_clock_id_wait (id, &clock_jitter);
3187
3188       JBUF_LOCK (priv);
3189       if (!priv->timer_running) {
3190         gst_clock_id_unref (id);
3191         priv->clock_id = NULL;
3192         break;
3193       }
3194
3195       if (ret != GST_CLOCK_UNSCHEDULED) {
3196         now = timer_timeout + MAX (clock_jitter, 0);
3197         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3198             ret, priv->timer_seqnum, clock_jitter);
3199       } else {
3200         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3201       }
3202       /* and free the entry */
3203       gst_clock_id_unref (id);
3204       priv->clock_id = NULL;
3205     } else {
3206       /* no timers, wait for activity */
3207       JBUF_WAIT_TIMER (priv);
3208     }
3209   }
3210   JBUF_UNLOCK (priv);
3211
3212   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3213   return;
3214 }
3215
3216 /*
3217  * This funcion implements the main pushing loop on the source pad.
3218  *
3219  * It first tries to push as many buffers as possible. If there is a seqnum
3220  * mismatch, we wait for the next timeouts.
3221  */
3222 static void
3223 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3224 {
3225   GstRtpJitterBufferPrivate *priv;
3226   GstFlowReturn result = GST_FLOW_OK;
3227
3228   priv = jitterbuffer->priv;
3229
3230   JBUF_LOCK_CHECK (priv, flushing);
3231   do {
3232     result = handle_next_buffer (jitterbuffer);
3233     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3234       /* now wait for the next event */
3235       JBUF_WAIT_EVENT (priv, flushing);
3236       result = GST_FLOW_OK;
3237     }
3238   }
3239   while (result == GST_FLOW_OK);
3240   /* store result for upstream */
3241   priv->srcresult = result;
3242   /* if we get here we need to pause */
3243   goto pause;
3244
3245   /* ERRORS */
3246 flushing:
3247   {
3248     result = priv->srcresult;
3249     goto pause;
3250   }
3251 pause:
3252   {
3253     GstEvent *event;
3254
3255     JBUF_SIGNAL_QUERY (priv, FALSE);
3256     JBUF_UNLOCK (priv);
3257
3258     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3259         gst_flow_get_name (result));
3260     gst_pad_pause_task (priv->srcpad);
3261     if (result == GST_FLOW_EOS) {
3262       event = gst_event_new_eos ();
3263       gst_pad_push_event (priv->srcpad, event);
3264     }
3265     return;
3266   }
3267 }
3268
3269 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3270  * some sanity checks and then emit the handle-sync signal with the parameters.
3271  * This function must be called with the LOCK */
3272 static void
3273 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3274 {
3275   GstRtpJitterBufferPrivate *priv;
3276   guint64 base_rtptime, base_time;
3277   guint32 clock_rate;
3278   guint64 last_rtptime;
3279   guint64 clock_base;
3280   guint64 ext_rtptime, diff;
3281   gboolean valid = TRUE, keep = FALSE;
3282
3283   priv = jitterbuffer->priv;
3284
3285   /* get the last values from the jitterbuffer */
3286   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3287       &clock_rate, &last_rtptime);
3288
3289   clock_base = priv->clock_base;
3290   ext_rtptime = priv->ext_rtptime;
3291
3292   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3293       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3294       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3295       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3296
3297   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3298     /* we keep this SR packet for later. When we get a valid RTP packet the
3299      * above values will be set and we can try to use the SR packet */
3300     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3301     keep = TRUE;
3302   } else {
3303     /* we can't accept anything that happened before we did the last resync */
3304     if (base_rtptime > ext_rtptime) {
3305       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3306       valid = FALSE;
3307     } else {
3308       /* the SR RTP timestamp must be something close to what we last observed
3309        * in the jitterbuffer */
3310       if (ext_rtptime > last_rtptime) {
3311         /* check how far ahead it is to our RTP timestamps */
3312         diff = ext_rtptime - last_rtptime;
3313         /* if bigger than 1 second, we drop it */
3314         if (diff > clock_rate) {
3315           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3316           /* should drop this, but some RTSP servers end up with bogus
3317            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3318            * so still trigger rptbin sync but invalidate RTCP data
3319            * (sync might use other methods) */
3320           ext_rtptime = -1;
3321         }
3322         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3323             G_GUINT64_FORMAT, last_rtptime, diff);
3324       }
3325     }
3326   }
3327
3328   if (keep) {
3329     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3330   } else if (valid) {
3331     GstStructure *s;
3332
3333     s = gst_structure_new ("application/x-rtp-sync",
3334         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3335         "base-time", G_TYPE_UINT64, base_time,
3336         "clock-rate", G_TYPE_UINT, clock_rate,
3337         "clock-base", G_TYPE_UINT64, clock_base,
3338         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3339         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3340
3341     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3342     gst_buffer_replace (&priv->last_sr, NULL);
3343     JBUF_UNLOCK (priv);
3344     g_signal_emit (jitterbuffer,
3345         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3346     JBUF_LOCK (priv);
3347     gst_structure_free (s);
3348   } else {
3349     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3350     gst_buffer_replace (&priv->last_sr, NULL);
3351   }
3352 }
3353
3354 static GstFlowReturn
3355 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3356     GstBuffer * buffer)
3357 {
3358   GstRtpJitterBuffer *jitterbuffer;
3359   GstRtpJitterBufferPrivate *priv;
3360   GstFlowReturn ret = GST_FLOW_OK;
3361   guint32 ssrc;
3362   GstRTCPPacket packet;
3363   guint64 ext_rtptime;
3364   guint32 rtptime;
3365   GstRTCPBuffer rtcp = { NULL, };
3366
3367   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3368
3369   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3370     goto invalid_buffer;
3371
3372   priv = jitterbuffer->priv;
3373
3374   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3375
3376   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3377     goto empty_buffer;
3378
3379   /* first packet must be SR or RR or else the validate would have failed */
3380   switch (gst_rtcp_packet_get_type (&packet)) {
3381     case GST_RTCP_TYPE_SR:
3382       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3383           NULL, NULL);
3384       break;
3385     default:
3386       goto ignore_buffer;
3387   }
3388   gst_rtcp_buffer_unmap (&rtcp);
3389
3390   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3391
3392   JBUF_LOCK (priv);
3393   /* convert the RTP timestamp to our extended timestamp, using the same offset
3394    * we used in the jitterbuffer */
3395   ext_rtptime = priv->jbuf->ext_rtptime;
3396   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3397
3398   priv->ext_rtptime = ext_rtptime;
3399   gst_buffer_replace (&priv->last_sr, buffer);
3400
3401   do_handle_sync (jitterbuffer);
3402   JBUF_UNLOCK (priv);
3403
3404 done:
3405   gst_buffer_unref (buffer);
3406
3407   return ret;
3408
3409 invalid_buffer:
3410   {
3411     /* this is not fatal but should be filtered earlier */
3412     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3413         ("Received invalid RTCP payload, dropping"));
3414     ret = GST_FLOW_OK;
3415     goto done;
3416   }
3417 empty_buffer:
3418   {
3419     /* this is not fatal but should be filtered earlier */
3420     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3421         ("Received empty RTCP payload, dropping"));
3422     gst_rtcp_buffer_unmap (&rtcp);
3423     ret = GST_FLOW_OK;
3424     goto done;
3425   }
3426 ignore_buffer:
3427   {
3428     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3429     gst_rtcp_buffer_unmap (&rtcp);
3430     ret = GST_FLOW_OK;
3431     goto done;
3432   }
3433 }
3434
3435 static gboolean
3436 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3437     GstQuery * query)
3438 {
3439   gboolean res = FALSE;
3440   GstRtpJitterBuffer *jitterbuffer;
3441   GstRtpJitterBufferPrivate *priv;
3442
3443   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3444   priv = jitterbuffer->priv;
3445
3446   switch (GST_QUERY_TYPE (query)) {
3447     case GST_QUERY_CAPS:
3448     {
3449       GstCaps *filter, *caps;
3450
3451       gst_query_parse_caps (query, &filter);
3452       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3453       gst_query_set_caps_result (query, caps);
3454       gst_caps_unref (caps);
3455       res = TRUE;
3456       break;
3457     }
3458     default:
3459       if (GST_QUERY_IS_SERIALIZED (query)) {
3460         RTPJitterBufferItem *item;
3461         gboolean head;
3462
3463         JBUF_LOCK_CHECK (priv, out_flushing);
3464         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3465             RTP_JITTER_BUFFER_MODE_BUFFER) {
3466           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3467           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3468           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3469           if (head)
3470             JBUF_SIGNAL_EVENT (priv);
3471           JBUF_WAIT_QUERY (priv, out_flushing);
3472           res = priv->last_query;
3473         } else {
3474           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3475           res = FALSE;
3476         }
3477         JBUF_UNLOCK (priv);
3478       } else {
3479         res = gst_pad_query_default (pad, parent, query);
3480       }
3481       break;
3482   }
3483   return res;
3484   /* ERRORS */
3485 out_flushing:
3486   {
3487     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3488     JBUF_UNLOCK (priv);
3489     return FALSE;
3490   }
3491
3492 }
3493
3494 static gboolean
3495 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3496     GstQuery * query)
3497 {
3498   GstRtpJitterBuffer *jitterbuffer;
3499   GstRtpJitterBufferPrivate *priv;
3500   gboolean res = FALSE;
3501
3502   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3503   priv = jitterbuffer->priv;
3504
3505   switch (GST_QUERY_TYPE (query)) {
3506     case GST_QUERY_LATENCY:
3507     {
3508       /* We need to send the query upstream and add the returned latency to our
3509        * own */
3510       GstClockTime min_latency, max_latency;
3511       gboolean us_live;
3512       GstClockTime our_latency;
3513
3514       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3515         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3516
3517         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3518             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3519             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3520
3521         /* store this so that we can safely sync on the peer buffers. */
3522         JBUF_LOCK (priv);
3523         priv->peer_latency = min_latency;
3524         our_latency = priv->latency_ns;
3525         JBUF_UNLOCK (priv);
3526
3527         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3528             GST_TIME_ARGS (our_latency));
3529
3530         /* we add some latency but can buffer an infinite amount of time */
3531         min_latency += our_latency;
3532         max_latency = -1;
3533
3534         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3535             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3536             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3537
3538         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3539       }
3540       break;
3541     }
3542     case GST_QUERY_POSITION:
3543     {
3544       GstClockTime start, last_out;
3545       GstFormat fmt;
3546
3547       gst_query_parse_position (query, &fmt, NULL);
3548       if (fmt != GST_FORMAT_TIME) {
3549         res = gst_pad_query_default (pad, parent, query);
3550         break;
3551       }
3552
3553       JBUF_LOCK (priv);
3554       start = priv->npt_start;
3555       last_out = priv->last_out_time;
3556       JBUF_UNLOCK (priv);
3557
3558       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3559           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3560           GST_TIME_ARGS (last_out));
3561
3562       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3563         /* bring 0-based outgoing time to stream time */
3564         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3565         res = TRUE;
3566       } else {
3567         res = gst_pad_query_default (pad, parent, query);
3568       }
3569       break;
3570     }
3571     case GST_QUERY_CAPS:
3572     {
3573       GstCaps *filter, *caps;
3574
3575       gst_query_parse_caps (query, &filter);
3576       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3577       gst_query_set_caps_result (query, caps);
3578       gst_caps_unref (caps);
3579       res = TRUE;
3580       break;
3581     }
3582     default:
3583       res = gst_pad_query_default (pad, parent, query);
3584       break;
3585   }
3586
3587   return res;
3588 }
3589
3590 static void
3591 gst_rtp_jitter_buffer_set_property (GObject * object,
3592     guint prop_id, const GValue * value, GParamSpec * pspec)
3593 {
3594   GstRtpJitterBuffer *jitterbuffer;
3595   GstRtpJitterBufferPrivate *priv;
3596
3597   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3598   priv = jitterbuffer->priv;
3599
3600   switch (prop_id) {
3601     case PROP_LATENCY:
3602     {
3603       guint new_latency, old_latency;
3604
3605       new_latency = g_value_get_uint (value);
3606
3607       JBUF_LOCK (priv);
3608       old_latency = priv->latency_ms;
3609       priv->latency_ms = new_latency;
3610       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3611       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3612       JBUF_UNLOCK (priv);
3613
3614       /* post message if latency changed, this will inform the parent pipeline
3615        * that a latency reconfiguration is possible/needed. */
3616       if (new_latency != old_latency) {
3617         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3618             GST_TIME_ARGS (new_latency * GST_MSECOND));
3619
3620         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3621             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3622       }
3623       break;
3624     }
3625     case PROP_DROP_ON_LATENCY:
3626       JBUF_LOCK (priv);
3627       priv->drop_on_latency = g_value_get_boolean (value);
3628       JBUF_UNLOCK (priv);
3629       break;
3630     case PROP_TS_OFFSET:
3631       JBUF_LOCK (priv);
3632       priv->ts_offset = g_value_get_int64 (value);
3633       priv->ts_discont = TRUE;
3634       JBUF_UNLOCK (priv);
3635       break;
3636     case PROP_DO_LOST:
3637       JBUF_LOCK (priv);
3638       priv->do_lost = g_value_get_boolean (value);
3639       JBUF_UNLOCK (priv);
3640       break;
3641     case PROP_MODE:
3642       JBUF_LOCK (priv);
3643       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3644       JBUF_UNLOCK (priv);
3645       break;
3646     case PROP_DO_RETRANSMISSION:
3647       JBUF_LOCK (priv);
3648       priv->do_retransmission = g_value_get_boolean (value);
3649       JBUF_UNLOCK (priv);
3650       break;
3651     case PROP_RTX_NEXT_SEQNUM:
3652       JBUF_LOCK (priv);
3653       priv->rtx_next_seqnum = g_value_get_boolean (value);
3654       JBUF_UNLOCK (priv);
3655       break;
3656     case PROP_RTX_DELAY:
3657       JBUF_LOCK (priv);
3658       priv->rtx_delay = g_value_get_int (value);
3659       JBUF_UNLOCK (priv);
3660       break;
3661     case PROP_RTX_MIN_DELAY:
3662       JBUF_LOCK (priv);
3663       priv->rtx_min_delay = g_value_get_uint (value);
3664       JBUF_UNLOCK (priv);
3665       break;
3666     case PROP_RTX_DELAY_REORDER:
3667       JBUF_LOCK (priv);
3668       priv->rtx_delay_reorder = g_value_get_int (value);
3669       JBUF_UNLOCK (priv);
3670       break;
3671     case PROP_RTX_RETRY_TIMEOUT:
3672       JBUF_LOCK (priv);
3673       priv->rtx_retry_timeout = g_value_get_int (value);
3674       JBUF_UNLOCK (priv);
3675       break;
3676     case PROP_RTX_MIN_RETRY_TIMEOUT:
3677       JBUF_LOCK (priv);
3678       priv->rtx_min_retry_timeout = g_value_get_int (value);
3679       JBUF_UNLOCK (priv);
3680       break;
3681     case PROP_RTX_RETRY_PERIOD:
3682       JBUF_LOCK (priv);
3683       priv->rtx_retry_period = g_value_get_int (value);
3684       JBUF_UNLOCK (priv);
3685       break;
3686     case PROP_RTX_MAX_RETRIES:
3687       JBUF_LOCK (priv);
3688       priv->rtx_max_retries = g_value_get_int (value);
3689       JBUF_UNLOCK (priv);
3690       break;
3691     default:
3692       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3693       break;
3694   }
3695 }
3696
3697 static void
3698 gst_rtp_jitter_buffer_get_property (GObject * object,
3699     guint prop_id, GValue * value, GParamSpec * pspec)
3700 {
3701   GstRtpJitterBuffer *jitterbuffer;
3702   GstRtpJitterBufferPrivate *priv;
3703
3704   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3705   priv = jitterbuffer->priv;
3706
3707   switch (prop_id) {
3708     case PROP_LATENCY:
3709       JBUF_LOCK (priv);
3710       g_value_set_uint (value, priv->latency_ms);
3711       JBUF_UNLOCK (priv);
3712       break;
3713     case PROP_DROP_ON_LATENCY:
3714       JBUF_LOCK (priv);
3715       g_value_set_boolean (value, priv->drop_on_latency);
3716       JBUF_UNLOCK (priv);
3717       break;
3718     case PROP_TS_OFFSET:
3719       JBUF_LOCK (priv);
3720       g_value_set_int64 (value, priv->ts_offset);
3721       JBUF_UNLOCK (priv);
3722       break;
3723     case PROP_DO_LOST:
3724       JBUF_LOCK (priv);
3725       g_value_set_boolean (value, priv->do_lost);
3726       JBUF_UNLOCK (priv);
3727       break;
3728     case PROP_MODE:
3729       JBUF_LOCK (priv);
3730       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3731       JBUF_UNLOCK (priv);
3732       break;
3733     case PROP_PERCENT:
3734     {
3735       gint percent;
3736
3737       JBUF_LOCK (priv);
3738       if (priv->srcresult != GST_FLOW_OK)
3739         percent = 100;
3740       else
3741         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3742
3743       g_value_set_int (value, percent);
3744       JBUF_UNLOCK (priv);
3745       break;
3746     }
3747     case PROP_DO_RETRANSMISSION:
3748       JBUF_LOCK (priv);
3749       g_value_set_boolean (value, priv->do_retransmission);
3750       JBUF_UNLOCK (priv);
3751       break;
3752     case PROP_RTX_NEXT_SEQNUM:
3753       JBUF_LOCK (priv);
3754       g_value_set_boolean (value, priv->rtx_next_seqnum);
3755       JBUF_UNLOCK (priv);
3756       break;
3757     case PROP_RTX_DELAY:
3758       JBUF_LOCK (priv);
3759       g_value_set_int (value, priv->rtx_delay);
3760       JBUF_UNLOCK (priv);
3761       break;
3762     case PROP_RTX_MIN_DELAY:
3763       JBUF_LOCK (priv);
3764       g_value_set_uint (value, priv->rtx_min_delay);
3765       JBUF_UNLOCK (priv);
3766       break;
3767     case PROP_RTX_DELAY_REORDER:
3768       JBUF_LOCK (priv);
3769       g_value_set_int (value, priv->rtx_delay_reorder);
3770       JBUF_UNLOCK (priv);
3771       break;
3772     case PROP_RTX_RETRY_TIMEOUT:
3773       JBUF_LOCK (priv);
3774       g_value_set_int (value, priv->rtx_retry_timeout);
3775       JBUF_UNLOCK (priv);
3776       break;
3777     case PROP_RTX_MIN_RETRY_TIMEOUT:
3778       JBUF_LOCK (priv);
3779       g_value_set_int (value, priv->rtx_min_retry_timeout);
3780       JBUF_UNLOCK (priv);
3781       break;
3782     case PROP_RTX_RETRY_PERIOD:
3783       JBUF_LOCK (priv);
3784       g_value_set_int (value, priv->rtx_retry_period);
3785       JBUF_UNLOCK (priv);
3786       break;
3787     case PROP_RTX_MAX_RETRIES:
3788       JBUF_LOCK (priv);
3789       g_value_set_int (value, priv->rtx_max_retries);
3790       JBUF_UNLOCK (priv);
3791       break;
3792     case PROP_STATS:
3793       g_value_take_boxed (value,
3794           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3795       break;
3796     default:
3797       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3798       break;
3799   }
3800 }
3801
3802 static GstStructure *
3803 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3804 {
3805   GstStructure *s;
3806
3807   JBUF_LOCK (jbuf->priv);
3808   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3809       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3810       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3811       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3812       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3813   JBUF_UNLOCK (jbuf->priv);
3814
3815   return s;
3816 }