rtpjitterbuffer: Fix "stats" property docs
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  */
95
96 #ifdef HAVE_CONFIG_H
97 #include "config.h"
98 #endif
99
100 #include <stdlib.h>
101 #include <string.h>
102 #include <gst/rtp/gstrtpbuffer.h>
103
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
107
108 #include <gst/glib-compat-private.h>
109
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
112
113 /* RTPJitterBuffer signals and args */
114 enum
115 {
116   SIGNAL_REQUEST_PT_MAP,
117   SIGNAL_CLEAR_PT_MAP,
118   SIGNAL_HANDLE_SYNC,
119   SIGNAL_ON_NPT_STOP,
120   SIGNAL_SET_ACTIVE,
121   LAST_SIGNAL
122 };
123
124 #define DEFAULT_LATENCY_MS          200
125 #define DEFAULT_DROP_ON_LATENCY     FALSE
126 #define DEFAULT_TS_OFFSET           0
127 #define DEFAULT_DO_LOST             FALSE
128 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT             0
130 #define DEFAULT_DO_RETRANSMISSION   FALSE
131 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
132 #define DEFAULT_RTX_DELAY           -1
133 #define DEFAULT_RTX_MIN_DELAY       0
134 #define DEFAULT_RTX_DELAY_REORDER   3
135 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
136 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
137 #define DEFAULT_RTX_RETRY_PERIOD    -1
138 #define DEFAULT_RTX_MAX_RETRIES    -1
139
140 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
141 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
142
143 enum
144 {
145   PROP_0,
146   PROP_LATENCY,
147   PROP_DROP_ON_LATENCY,
148   PROP_TS_OFFSET,
149   PROP_DO_LOST,
150   PROP_MODE,
151   PROP_PERCENT,
152   PROP_DO_RETRANSMISSION,
153   PROP_RTX_NEXT_SEQNUM,
154   PROP_RTX_DELAY,
155   PROP_RTX_MIN_DELAY,
156   PROP_RTX_DELAY_REORDER,
157   PROP_RTX_RETRY_TIMEOUT,
158   PROP_RTX_MIN_RETRY_TIMEOUT,
159   PROP_RTX_RETRY_PERIOD,
160   PROP_RTX_MAX_RETRIES,
161   PROP_STATS
162 };
163
164 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
165
166 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
167   JBUF_LOCK (priv);                                   \
168   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
169     goto label;                                       \
170 } G_STMT_END
171 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
172
173 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
174   GST_DEBUG ("waiting timer");                            \
175   (priv)->waiting_timer = TRUE;                           \
176   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
177   (priv)->waiting_timer = FALSE;                          \
178   GST_DEBUG ("waiting timer done");                       \
179 } G_STMT_END
180 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
181   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
182     GST_DEBUG ("signal timer");                           \
183     g_cond_signal (&(priv)->jbuf_timer);                  \
184   }                                                       \
185 } G_STMT_END
186
187 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
188   GST_DEBUG ("waiting event");                           \
189   (priv)->waiting_event = TRUE;                          \
190   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
191   (priv)->waiting_event = FALSE;                         \
192   GST_DEBUG ("waiting event done");                      \
193   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
194     goto label;                                          \
195 } G_STMT_END
196 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
197   if (G_UNLIKELY ((priv)->waiting_event)) {              \
198     GST_DEBUG ("signal event");                          \
199     g_cond_signal (&(priv)->jbuf_event);                 \
200   }                                                      \
201 } G_STMT_END
202
203 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
204   GST_DEBUG ("waiting query");                           \
205   (priv)->waiting_query = TRUE;                          \
206   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
207   (priv)->waiting_query = FALSE;                         \
208   GST_DEBUG ("waiting query done");                      \
209   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
210     goto label;                                          \
211 } G_STMT_END
212 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
213   (priv)->last_query = res;                              \
214   if (G_UNLIKELY ((priv)->waiting_query)) {              \
215     GST_DEBUG ("signal query");                          \
216     g_cond_signal (&(priv)->jbuf_query);                 \
217   }                                                      \
218 } G_STMT_END
219
220
221 struct _GstRtpJitterBufferPrivate
222 {
223   GstPad *sinkpad, *srcpad;
224   GstPad *rtcpsinkpad;
225
226   RTPJitterBuffer *jbuf;
227   GMutex jbuf_lock;
228   gboolean waiting_timer;
229   GCond jbuf_timer;
230   gboolean waiting_event;
231   GCond jbuf_event;
232   gboolean waiting_query;
233   GCond jbuf_query;
234   gboolean last_query;
235   gboolean discont;
236   gboolean ts_discont;
237   gboolean active;
238   guint64 out_offset;
239
240   gboolean timer_running;
241   GThread *timer_thread;
242
243   /* properties */
244   guint latency_ms;
245   guint64 latency_ns;
246   gboolean drop_on_latency;
247   gint64 ts_offset;
248   gboolean do_lost;
249   gboolean do_retransmission;
250   gboolean rtx_next_seqnum;
251   gint rtx_delay;
252   guint rtx_min_delay;
253   gint rtx_delay_reorder;
254   gint rtx_retry_timeout;
255   gint rtx_min_retry_timeout;
256   gint rtx_retry_period;
257   gint rtx_max_retries;
258
259   /* the last seqnum we pushed out */
260   guint32 last_popped_seqnum;
261   /* the next expected seqnum we push */
262   guint32 next_seqnum;
263   /* seqnum-base, if known */
264   guint32 seqnum_base;
265   /* last output time */
266   GstClockTime last_out_time;
267   /* last valid input timestamp and rtptime pair */
268   GstClockTime ips_dts;
269   guint64 ips_rtptime;
270   GstClockTime packet_spacing;
271
272   /* the next expected seqnum we receive */
273   GstClockTime last_in_dts;
274   guint32 last_in_seqnum;
275   guint32 next_in_seqnum;
276
277   GArray *timers;
278
279   /* start and stop ranges */
280   GstClockTime npt_start;
281   GstClockTime npt_stop;
282   guint64 ext_timestamp;
283   guint64 last_elapsed;
284   guint64 estimated_eos;
285   GstClockID eos_id;
286
287   /* state */
288   gboolean eos;
289   guint last_percent;
290
291   /* clock rate and rtp timestamp offset */
292   gint last_pt;
293   gint32 clock_rate;
294   gint64 clock_base;
295   gint64 prev_ts_offset;
296
297   /* when we are shutting down */
298   GstFlowReturn srcresult;
299   gboolean blocked;
300
301   /* for sync */
302   GstSegment segment;
303   GstClockID clock_id;
304   GstClockTime timer_timeout;
305   guint16 timer_seqnum;
306   /* the latency of the upstream peer, we have to take this into account when
307    * synchronizing the buffers. */
308   GstClockTime peer_latency;
309   guint64 ext_rtptime;
310   GstBuffer *last_sr;
311
312   /* some accounting */
313   guint64 num_late;
314   guint64 num_duplicates;
315   guint64 num_rtx_requests;
316   guint64 num_rtx_success;
317   guint64 num_rtx_failed;
318   gdouble avg_rtx_num;
319   guint64 avg_rtx_rtt;
320
321   /* for the jitter */
322   GstClockTime last_dts;
323   guint64 last_rtptime;
324   GstClockTime avg_jitter;
325 };
326
327 typedef enum
328 {
329   TIMER_TYPE_EXPECTED,
330   TIMER_TYPE_LOST,
331   TIMER_TYPE_DEADLINE,
332   TIMER_TYPE_EOS
333 } TimerType;
334
335 typedef struct
336 {
337   guint idx;
338   guint16 seqnum;
339   guint num;
340   TimerType type;
341   GstClockTime timeout;
342   GstClockTime duration;
343   GstClockTime rtx_base;
344   GstClockTime rtx_delay;
345   GstClockTime rtx_retry;
346   GstClockTime rtx_last;
347   guint num_rtx_retry;
348 } TimerData;
349
350 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
351   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
352                                 GstRtpJitterBufferPrivate))
353
354 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
355 GST_STATIC_PAD_TEMPLATE ("sink",
356     GST_PAD_SINK,
357     GST_PAD_ALWAYS,
358     GST_STATIC_CAPS ("application/x-rtp"
359         /* "clock-rate = (int) [ 1, 2147483647 ], "
360          * "payload = (int) , "
361          * "encoding-name = (string) "
362          */ )
363     );
364
365 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
366 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
367     GST_PAD_SINK,
368     GST_PAD_REQUEST,
369     GST_STATIC_CAPS ("application/x-rtcp")
370     );
371
372 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
373 GST_STATIC_PAD_TEMPLATE ("src",
374     GST_PAD_SRC,
375     GST_PAD_ALWAYS,
376     GST_STATIC_CAPS ("application/x-rtp"
377         /* "payload = (int) , "
378          * "clock-rate = (int) , "
379          * "encoding-name = (string) "
380          */ )
381     );
382
383 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
384
385 #define gst_rtp_jitter_buffer_parent_class parent_class
386 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
387
388 /* object overrides */
389 static void gst_rtp_jitter_buffer_set_property (GObject * object,
390     guint prop_id, const GValue * value, GParamSpec * pspec);
391 static void gst_rtp_jitter_buffer_get_property (GObject * object,
392     guint prop_id, GValue * value, GParamSpec * pspec);
393 static void gst_rtp_jitter_buffer_finalize (GObject * object);
394
395 /* element overrides */
396 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
397     * element, GstStateChange transition);
398 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
399     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
400 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
401     GstPad * pad);
402 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
403
404 /* pad overrides */
405 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
406 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
407     GstObject * parent);
408
409 /* sinkpad overrides */
410 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
411     GstObject * parent, GstEvent * event);
412 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
413     GstObject * parent, GstBuffer * buffer);
414
415 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
416     GstObject * parent, GstEvent * event);
417 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
418     GstObject * parent, GstBuffer * buffer);
419
420 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
421     GstObject * parent, GstQuery * query);
422
423 /* srcpad overrides */
424 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
425     GstObject * parent, GstEvent * event);
426 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
427     GstObject * parent, GstPadMode mode, gboolean active);
428 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
429 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
430     GstObject * parent, GstQuery * query);
431
432 static void
433 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
434 static GstClockTime
435 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
436     gboolean active, guint64 base_time);
437 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
438
439 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
440 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
441
442 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
443
444 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
445     jitterbuffer);
446
447 static void
448 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
449 {
450   GObjectClass *gobject_class;
451   GstElementClass *gstelement_class;
452
453   gobject_class = (GObjectClass *) klass;
454   gstelement_class = (GstElementClass *) klass;
455
456   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
457
458   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
459
460   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
461   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
462
463   /**
464    * GstRtpJitterBuffer:latency:
465    *
466    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
467    * for at most this time.
468    */
469   g_object_class_install_property (gobject_class, PROP_LATENCY,
470       g_param_spec_uint ("latency", "Buffer latency in ms",
471           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
472           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
473   /**
474    * GstRtpJitterBuffer:drop-on-latency:
475    *
476    * Drop oldest buffers when the queue is completely filled.
477    */
478   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
479       g_param_spec_boolean ("drop-on-latency",
480           "Drop buffers when maximum latency is reached",
481           "Tells the jitterbuffer to never exceed the given latency in size",
482           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
483   /**
484    * GstRtpJitterBuffer:ts-offset:
485    *
486    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
487    * This is mainly used to ensure interstream synchronisation.
488    */
489   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
490       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
491           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
492           G_MAXINT64, DEFAULT_TS_OFFSET,
493           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
494
495   /**
496    * GstRtpJitterBuffer:do-lost:
497    *
498    * Send out a GstRTPPacketLost event downstream when a packet is considered
499    * lost.
500    */
501   g_object_class_install_property (gobject_class, PROP_DO_LOST,
502       g_param_spec_boolean ("do-lost", "Do Lost",
503           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
504           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505
506   /**
507    * GstRtpJitterBuffer:mode:
508    *
509    * Control the buffering and timestamping mode used by the jitterbuffer.
510    */
511   g_object_class_install_property (gobject_class, PROP_MODE,
512       g_param_spec_enum ("mode", "Mode",
513           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
514           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
515   /**
516    * GstRtpJitterBuffer:percent:
517    *
518    * The percent of the jitterbuffer that is filled.
519    */
520   g_object_class_install_property (gobject_class, PROP_PERCENT,
521       g_param_spec_int ("percent", "percent",
522           "The buffer filled percent", 0, 100,
523           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
524   /**
525    * GstRtpJitterBuffer:do-retransmission:
526    *
527    * Send out a GstRTPRetransmission event upstream when a packet is considered
528    * late and should be retransmitted.
529    *
530    * Since: 1.2
531    */
532   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
533       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
534           "Send retransmission events upstream when a packet is late",
535           DEFAULT_DO_RETRANSMISSION,
536           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
537
538   /**
539    * GstRtpJitterBuffer:rtx-next-seqnum
540    *
541    * Estimate when the next packet should arrive and schedule a retransmission
542    * request for it.
543    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
544    * for packet N+1. So it will be requested if it does not arrive at the expected time.
545    * The expected time is calculated using the dts of N and the packet spacing.
546    *
547    * Since: 1.6
548    */
549   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
550       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
551           "Estimate when the next packet should arrive and schedule a "
552           "retransmission request for it.",
553           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
554
555   /**
556    * GstRtpJitterBuffer:rtx-delay:
557    *
558    * When a packet did not arrive at the expected time, wait this extra amount
559    * of time before sending a retransmission event.
560    *
561    * When -1 is used, the max jitter will be used as extra delay.
562    *
563    * Since: 1.2
564    */
565   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
566       g_param_spec_int ("rtx-delay", "RTX Delay",
567           "Extra time in ms to wait before sending retransmission "
568           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
569           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
570
571   /**
572    * GstRtpJitterBuffer:rtx-min-delay:
573    *
574    * When a packet did not arrive at the expected time, wait at least this extra amount
575    * of time before sending a retransmission event.
576    *
577    * Since: 1.6
578    */
579   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
580       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
581           "Minimum time in ms to wait before sending retransmission "
582           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
583           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
584   /**
585    * GstRtpJitterBuffer:rtx-delay-reorder:
586    *
587    * Assume that a retransmission event should be sent when we see
588    * this much packet reordering.
589    *
590    * When -1 is used, the value will be estimated based on observed packet
591    * reordering.
592    *
593    * Since: 1.2
594    */
595   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
596       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
597           "Sending retransmission event when this much reordering (-1 automatic)",
598           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
599           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
600   /**
601    * GstRtpJitterBuffer::rtx-retry-timeout:
602    *
603    * When no packet has been received after sending a retransmission event
604    * for this time, retry sending a retransmission event.
605    *
606    * When -1 is used, the value will be estimated based on observed round
607    * trip time.
608    *
609    * Since: 1.2
610    */
611   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
612       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
613           "Retry sending a transmission event after this timeout in "
614           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
615           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
616   /**
617    * GstRtpJitterBuffer::rtx-min-retry-timeout:
618    *
619    * The minimum amount of time between retry timeouts. When
620    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
621    * minimum interval between retry timeouts.
622    *
623    * When -1 is used, the value will be estimated based on the
624    * packet spacing.
625    *
626    * Since: 1.6
627    */
628   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
629       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
630           "Minimum timeout between sending a transmission event in "
631           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
632           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
633   /**
634    * GstRtpJitterBuffer:rtx-retry-period:
635    *
636    * The amount of time to try to get a retransmission.
637    *
638    * When -1 is used, the value will be estimated based on the jitterbuffer
639    * latency and the observed round trip time.
640    *
641    * Since: 1.2
642    */
643   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
644       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
645           "Try to get a retransmission for this many ms "
646           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
647           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
648   /**
649    * GstRtpJitterBuffer:rtx-max-retries:
650    *
651    * The maximum number of retries to request a retransmission.
652    *
653    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
654    * When -1 is used, the number of retransmission request will not be limited.
655    *
656    * Since: 1.6
657    */
658   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
659       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
660           "The maximum number of retries to request a retransmission. "
661           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
662           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
663   /**
664    * GstRtpJitterBuffer:stats:
665    *
666    * Various jitterbuffer statistics. This property returns a GstStructure
667    * with name application/x-rtp-jitterbuffer-stats with the following fields:
668    *
669    * <itemizedlist>
670    * <listitem>
671    *   <para>
672    *   #guint64
673    *   <classname>&quot;rtx-count&quot;</classname>:
674    *   the number of retransmissions requested.
675    *   </para>
676    * </listitem>
677    * <listitem>
678    *   <para>
679    *   #guint64
680    *   <classname>&quot;rtx-success-count&quot;</classname>:
681    *   the number of successful retransmissions.
682    *   </para>
683    * </listitem>
684    * <listitem>
685    *   <para>
686    *   #gdouble
687    *   <classname>&quot;rtx-per-packet&quot;</classname>:
688    *   average number of RTX per packet.
689    *   </para>
690    * </listitem>
691    * <listitem>
692    *   <para>
693    *   #guint64
694    *   <classname>&quot;rtx-rtt&quot;</classname>:
695    *   average round trip time per RTX.
696    *   </para>
697    * </listitem>
698    * </itemizedlist>
699    *
700    * Since: 1.4
701    */
702   g_object_class_install_property (gobject_class, PROP_STATS,
703       g_param_spec_boxed ("stats", "Statistics",
704           "Various statistics", GST_TYPE_STRUCTURE,
705           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
706
707   /**
708    * GstRtpJitterBuffer::request-pt-map:
709    * @buffer: the object which received the signal
710    * @pt: the pt
711    *
712    * Request the payload type as #GstCaps for @pt.
713    */
714   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
715       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
716       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
717           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
718       GST_TYPE_CAPS, 1, G_TYPE_UINT);
719   /**
720    * GstRtpJitterBuffer::handle-sync:
721    * @buffer: the object which received the signal
722    * @struct: a GstStructure containing sync values.
723    *
724    * Be notified of new sync values.
725    */
726   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
727       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
728       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
729           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
730       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
731
732   /**
733    * GstRtpJitterBuffer::on-npt-stop:
734    * @buffer: the object which received the signal
735    *
736    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
737    * the npt-stop position.
738    */
739   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
740       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
741       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
742           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
743       G_TYPE_NONE, 0, G_TYPE_NONE);
744
745   /**
746    * GstRtpJitterBuffer::clear-pt-map:
747    * @buffer: the object which received the signal
748    *
749    * Invalidate the clock-rate as obtained with the
750    * #GstRtpJitterBuffer::request-pt-map signal.
751    */
752   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
753       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
754       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
755       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
756       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
757
758   /**
759    * GstRtpJitterBuffer::set-active:
760    * @buffer: the object which received the signal
761    *
762    * Start pushing out packets with the given base time. This signal is only
763    * useful in buffering mode.
764    *
765    * Returns: the time of the last pushed packet.
766    */
767   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
768       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
769       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
770       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
771       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
772       G_TYPE_UINT64);
773
774   gstelement_class->change_state =
775       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
776   gstelement_class->request_new_pad =
777       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
778   gstelement_class->release_pad =
779       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
780   gstelement_class->provide_clock =
781       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
782
783   gst_element_class_add_pad_template (gstelement_class,
784       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
785   gst_element_class_add_pad_template (gstelement_class,
786       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
787   gst_element_class_add_pad_template (gstelement_class,
788       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
789
790   gst_element_class_set_static_metadata (gstelement_class,
791       "RTP packet jitter-buffer", "Filter/Network/RTP",
792       "A buffer that deals with network jitter and other transmission faults",
793       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
794       "Wim Taymans <wim.taymans@gmail.com>");
795
796   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
797   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
798
799   GST_DEBUG_CATEGORY_INIT
800       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
801 }
802
803 static void
804 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
805 {
806   GstRtpJitterBufferPrivate *priv;
807
808   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
809   jitterbuffer->priv = priv;
810
811   priv->latency_ms = DEFAULT_LATENCY_MS;
812   priv->latency_ns = priv->latency_ms * GST_MSECOND;
813   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
814   priv->do_lost = DEFAULT_DO_LOST;
815   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
816   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
817   priv->rtx_delay = DEFAULT_RTX_DELAY;
818   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
819   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
820   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
821   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
822   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
823   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
824
825   priv->last_dts = -1;
826   priv->last_rtptime = -1;
827   priv->avg_jitter = 0;
828   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
829   priv->jbuf = rtp_jitter_buffer_new ();
830   g_mutex_init (&priv->jbuf_lock);
831   g_cond_init (&priv->jbuf_timer);
832   g_cond_init (&priv->jbuf_event);
833   g_cond_init (&priv->jbuf_query);
834
835   /* reset skew detection initialy */
836   rtp_jitter_buffer_reset_skew (priv->jbuf);
837   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
838   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
839   priv->active = TRUE;
840
841   priv->srcpad =
842       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
843       "src");
844
845   gst_pad_set_activatemode_function (priv->srcpad,
846       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
847   gst_pad_set_query_function (priv->srcpad,
848       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
849   gst_pad_set_event_function (priv->srcpad,
850       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
851
852   priv->sinkpad =
853       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
854       "sink");
855
856   gst_pad_set_chain_function (priv->sinkpad,
857       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
858   gst_pad_set_event_function (priv->sinkpad,
859       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
860   gst_pad_set_query_function (priv->sinkpad,
861       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
862
863   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
864   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
865
866   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
867 }
868
869 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
870
871 #define ITEM_TYPE_BUFFER        0
872 #define ITEM_TYPE_LOST          1
873 #define ITEM_TYPE_EVENT         2
874 #define ITEM_TYPE_QUERY         3
875
876 static RTPJitterBufferItem *
877 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
878     guint seqnum, guint count, guint rtptime)
879 {
880   RTPJitterBufferItem *item;
881
882   item = g_slice_new (RTPJitterBufferItem);
883   item->data = data;
884   item->next = NULL;
885   item->prev = NULL;
886   item->type = type;
887   item->dts = dts;
888   item->pts = pts;
889   item->seqnum = seqnum;
890   item->count = count;
891   item->rtptime = rtptime;
892
893   return item;
894 }
895
896 static void
897 free_item (RTPJitterBufferItem * item)
898 {
899   if (item->data && item->type != ITEM_TYPE_QUERY)
900     gst_mini_object_unref (item->data);
901   g_slice_free (RTPJitterBufferItem, item);
902 }
903
904 static void
905 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
906 {
907   GList **l = user_data;
908
909   if (item->data && item->type == ITEM_TYPE_EVENT
910       && GST_EVENT_IS_STICKY (item->data)) {
911     *l = g_list_prepend (*l, item->data);
912   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
913     gst_mini_object_unref (item->data);
914   }
915   g_slice_free (RTPJitterBufferItem, item);
916 }
917
918 static void
919 gst_rtp_jitter_buffer_finalize (GObject * object)
920 {
921   GstRtpJitterBuffer *jitterbuffer;
922   GstRtpJitterBufferPrivate *priv;
923
924   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
925   priv = jitterbuffer->priv;
926
927   g_array_free (priv->timers, TRUE);
928   g_mutex_clear (&priv->jbuf_lock);
929   g_cond_clear (&priv->jbuf_timer);
930   g_cond_clear (&priv->jbuf_event);
931   g_cond_clear (&priv->jbuf_query);
932
933   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
934   g_object_unref (priv->jbuf);
935
936   G_OBJECT_CLASS (parent_class)->finalize (object);
937 }
938
939 static GstIterator *
940 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
941 {
942   GstRtpJitterBuffer *jitterbuffer;
943   GstPad *otherpad = NULL;
944   GstIterator *it = NULL;
945   GValue val = { 0, };
946
947   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
948
949   if (pad == jitterbuffer->priv->sinkpad) {
950     otherpad = jitterbuffer->priv->srcpad;
951   } else if (pad == jitterbuffer->priv->srcpad) {
952     otherpad = jitterbuffer->priv->sinkpad;
953   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
954     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
955   }
956
957   if (it == NULL) {
958     g_value_init (&val, GST_TYPE_PAD);
959     g_value_set_object (&val, otherpad);
960     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
961     g_value_unset (&val);
962   }
963
964   return it;
965 }
966
967 static GstPad *
968 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
969 {
970   GstRtpJitterBufferPrivate *priv;
971
972   priv = jitterbuffer->priv;
973
974   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
975
976   priv->rtcpsinkpad =
977       gst_pad_new_from_static_template
978       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
979   gst_pad_set_chain_function (priv->rtcpsinkpad,
980       gst_rtp_jitter_buffer_chain_rtcp);
981   gst_pad_set_event_function (priv->rtcpsinkpad,
982       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
983   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
984       gst_rtp_jitter_buffer_iterate_internal_links);
985   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
986   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
987
988   return priv->rtcpsinkpad;
989 }
990
991 static void
992 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
993 {
994   GstRtpJitterBufferPrivate *priv;
995
996   priv = jitterbuffer->priv;
997
998   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
999
1000   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1001
1002   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1003   priv->rtcpsinkpad = NULL;
1004 }
1005
1006 static GstPad *
1007 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1008     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1009 {
1010   GstRtpJitterBuffer *jitterbuffer;
1011   GstElementClass *klass;
1012   GstPad *result;
1013   GstRtpJitterBufferPrivate *priv;
1014
1015   g_return_val_if_fail (templ != NULL, NULL);
1016   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1017
1018   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1019   priv = jitterbuffer->priv;
1020   klass = GST_ELEMENT_GET_CLASS (element);
1021
1022   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1023
1024   /* figure out the template */
1025   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1026     if (priv->rtcpsinkpad != NULL)
1027       goto exists;
1028
1029     result = create_rtcp_sink (jitterbuffer);
1030   } else
1031     goto wrong_template;
1032
1033   return result;
1034
1035   /* ERRORS */
1036 wrong_template:
1037   {
1038     g_warning ("rtpjitterbuffer: this is not our template");
1039     return NULL;
1040   }
1041 exists:
1042   {
1043     g_warning ("rtpjitterbuffer: pad already requested");
1044     return NULL;
1045   }
1046 }
1047
1048 static void
1049 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1050 {
1051   GstRtpJitterBuffer *jitterbuffer;
1052   GstRtpJitterBufferPrivate *priv;
1053
1054   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1055   g_return_if_fail (GST_IS_PAD (pad));
1056
1057   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1058   priv = jitterbuffer->priv;
1059
1060   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1061
1062   if (priv->rtcpsinkpad == pad) {
1063     remove_rtcp_sink (jitterbuffer);
1064   } else
1065     goto wrong_pad;
1066
1067   return;
1068
1069   /* ERRORS */
1070 wrong_pad:
1071   {
1072     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1073     return;
1074   }
1075 }
1076
1077 static GstClock *
1078 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1079 {
1080   return gst_system_clock_obtain ();
1081 }
1082
1083 static void
1084 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1085 {
1086   GstRtpJitterBufferPrivate *priv;
1087
1088   priv = jitterbuffer->priv;
1089
1090   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1091
1092   JBUF_LOCK (priv);
1093   priv->clock_rate = -1;
1094   /* do not clear current content, but refresh state for new arrival */
1095   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1096   rtp_jitter_buffer_reset_skew (priv->jbuf);
1097   JBUF_UNLOCK (priv);
1098 }
1099
1100 static GstClockTime
1101 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1102     guint64 offset)
1103 {
1104   GstRtpJitterBufferPrivate *priv;
1105   GstClockTime last_out;
1106   RTPJitterBufferItem *item;
1107
1108   priv = jbuf->priv;
1109
1110   JBUF_LOCK (priv);
1111   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1112       active, GST_TIME_ARGS (offset));
1113
1114   if (active != priv->active) {
1115     /* add the amount of time spent in paused to the output offset. All
1116      * outgoing buffers will have this offset applied to their timestamps in
1117      * order to make them arrive in time in the sink. */
1118     priv->out_offset = offset;
1119     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1120         GST_TIME_ARGS (priv->out_offset));
1121     priv->active = active;
1122     JBUF_SIGNAL_EVENT (priv);
1123   }
1124   if (!active) {
1125     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1126   }
1127   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1128     /* head buffer timestamp and offset gives our output time */
1129     last_out = item->dts + priv->ts_offset;
1130   } else {
1131     /* use last known time when the buffer is empty */
1132     last_out = priv->last_out_time;
1133   }
1134   JBUF_UNLOCK (priv);
1135
1136   return last_out;
1137 }
1138
1139 static GstCaps *
1140 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1141 {
1142   GstRtpJitterBuffer *jitterbuffer;
1143   GstRtpJitterBufferPrivate *priv;
1144   GstPad *other;
1145   GstCaps *caps;
1146   GstCaps *templ;
1147
1148   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1149   priv = jitterbuffer->priv;
1150
1151   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1152
1153   caps = gst_pad_peer_query_caps (other, filter);
1154
1155   templ = gst_pad_get_pad_template_caps (pad);
1156   if (caps == NULL) {
1157     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1158     caps = templ;
1159   } else {
1160     GstCaps *intersect;
1161
1162     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1163
1164     intersect = gst_caps_intersect (caps, templ);
1165     gst_caps_unref (caps);
1166     gst_caps_unref (templ);
1167
1168     caps = intersect;
1169   }
1170   gst_object_unref (jitterbuffer);
1171
1172   return caps;
1173 }
1174
1175 /*
1176  * Must be called with JBUF_LOCK held
1177  */
1178
1179 static gboolean
1180 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1181     GstCaps * caps)
1182 {
1183   GstRtpJitterBufferPrivate *priv;
1184   GstStructure *caps_struct;
1185   guint val;
1186   GstClockTime tval;
1187
1188   priv = jitterbuffer->priv;
1189
1190   /* first parse the caps */
1191   caps_struct = gst_caps_get_structure (caps, 0);
1192
1193   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1194
1195   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1196    * measure the amount of data in the buffer */
1197   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1198     goto error;
1199
1200   if (priv->clock_rate <= 0)
1201     goto wrong_rate;
1202
1203   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1204
1205   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1206
1207   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1208    * can use this to track the amount of time elapsed on the sender. */
1209   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1210     priv->clock_base = val;
1211   else
1212     priv->clock_base = -1;
1213
1214   priv->ext_timestamp = priv->clock_base;
1215
1216   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1217       priv->clock_base);
1218
1219   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1220     /* first expected seqnum, only update when we didn't have a previous base. */
1221     if (priv->next_in_seqnum == -1)
1222       priv->next_in_seqnum = val;
1223     if (priv->next_seqnum == -1) {
1224       priv->next_seqnum = val;
1225       JBUF_SIGNAL_EVENT (priv);
1226     }
1227     priv->seqnum_base = val;
1228   } else {
1229     priv->seqnum_base = -1;
1230   }
1231
1232   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1233
1234   /* the start and stop times. The seqnum-base corresponds to the start time. We
1235    * will keep track of the seqnums on the output and when we reach the one
1236    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1237   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1238     priv->npt_start = tval;
1239   else
1240     priv->npt_start = 0;
1241
1242   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1243     priv->npt_stop = tval;
1244   else
1245     priv->npt_stop = -1;
1246
1247   GST_DEBUG_OBJECT (jitterbuffer,
1248       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1249       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1250
1251   return TRUE;
1252
1253   /* ERRORS */
1254 error:
1255   {
1256     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1257     return FALSE;
1258   }
1259 wrong_rate:
1260   {
1261     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1262     return FALSE;
1263   }
1264 }
1265
1266 static void
1267 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1268 {
1269   GstRtpJitterBufferPrivate *priv;
1270
1271   priv = jitterbuffer->priv;
1272
1273   JBUF_LOCK (priv);
1274   /* mark ourselves as flushing */
1275   priv->srcresult = GST_FLOW_FLUSHING;
1276   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1277   /* this unblocks any waiting pops on the src pad task */
1278   JBUF_SIGNAL_EVENT (priv);
1279   JBUF_SIGNAL_QUERY (priv, FALSE);
1280   JBUF_UNLOCK (priv);
1281 }
1282
1283 static void
1284 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1285 {
1286   GstRtpJitterBufferPrivate *priv;
1287
1288   priv = jitterbuffer->priv;
1289
1290   JBUF_LOCK (priv);
1291   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1292   /* Mark as non flushing */
1293   priv->srcresult = GST_FLOW_OK;
1294   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1295   priv->last_popped_seqnum = -1;
1296   priv->last_out_time = -1;
1297   priv->next_seqnum = -1;
1298   priv->seqnum_base = -1;
1299   priv->ips_rtptime = -1;
1300   priv->ips_dts = GST_CLOCK_TIME_NONE;
1301   priv->packet_spacing = 0;
1302   priv->next_in_seqnum = -1;
1303   priv->clock_rate = -1;
1304   priv->last_pt = -1;
1305   priv->eos = FALSE;
1306   priv->estimated_eos = -1;
1307   priv->last_elapsed = 0;
1308   priv->ext_timestamp = -1;
1309   priv->avg_jitter = 0;
1310   priv->last_dts = -1;
1311   priv->last_rtptime = -1;
1312   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1313   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1314   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1315   rtp_jitter_buffer_reset_skew (priv->jbuf);
1316   remove_all_timers (jitterbuffer);
1317   JBUF_UNLOCK (priv);
1318 }
1319
1320 static gboolean
1321 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1322     GstPadMode mode, gboolean active)
1323 {
1324   gboolean result;
1325   GstRtpJitterBuffer *jitterbuffer = NULL;
1326
1327   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1328
1329   switch (mode) {
1330     case GST_PAD_MODE_PUSH:
1331       if (active) {
1332         /* allow data processing */
1333         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1334
1335         /* start pushing out buffers */
1336         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1337         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1338             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1339       } else {
1340         /* make sure all data processing stops ASAP */
1341         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1342
1343         /* NOTE this will hardlock if the state change is called from the src pad
1344          * task thread because we will _join() the thread. */
1345         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1346         result = gst_pad_stop_task (pad);
1347       }
1348       break;
1349     default:
1350       result = FALSE;
1351       break;
1352   }
1353   return result;
1354 }
1355
1356 static GstStateChangeReturn
1357 gst_rtp_jitter_buffer_change_state (GstElement * element,
1358     GstStateChange transition)
1359 {
1360   GstRtpJitterBuffer *jitterbuffer;
1361   GstRtpJitterBufferPrivate *priv;
1362   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1363
1364   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1365   priv = jitterbuffer->priv;
1366
1367   switch (transition) {
1368     case GST_STATE_CHANGE_NULL_TO_READY:
1369       break;
1370     case GST_STATE_CHANGE_READY_TO_PAUSED:
1371       JBUF_LOCK (priv);
1372       /* reset negotiated values */
1373       priv->clock_rate = -1;
1374       priv->clock_base = -1;
1375       priv->peer_latency = 0;
1376       priv->last_pt = -1;
1377       /* block until we go to PLAYING */
1378       priv->blocked = TRUE;
1379       priv->timer_running = TRUE;
1380       priv->timer_thread =
1381           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1382       JBUF_UNLOCK (priv);
1383       break;
1384     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1385       JBUF_LOCK (priv);
1386       /* unblock to allow streaming in PLAYING */
1387       priv->blocked = FALSE;
1388       JBUF_SIGNAL_EVENT (priv);
1389       JBUF_SIGNAL_TIMER (priv);
1390       JBUF_UNLOCK (priv);
1391       break;
1392     default:
1393       break;
1394   }
1395
1396   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1397
1398   switch (transition) {
1399     case GST_STATE_CHANGE_READY_TO_PAUSED:
1400       /* we are a live element because we sync to the clock, which we can only
1401        * do in the PLAYING state */
1402       if (ret != GST_STATE_CHANGE_FAILURE)
1403         ret = GST_STATE_CHANGE_NO_PREROLL;
1404       break;
1405     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1406       JBUF_LOCK (priv);
1407       /* block to stop streaming when PAUSED */
1408       priv->blocked = TRUE;
1409       unschedule_current_timer (jitterbuffer);
1410       JBUF_UNLOCK (priv);
1411       if (ret != GST_STATE_CHANGE_FAILURE)
1412         ret = GST_STATE_CHANGE_NO_PREROLL;
1413       break;
1414     case GST_STATE_CHANGE_PAUSED_TO_READY:
1415       JBUF_LOCK (priv);
1416       gst_buffer_replace (&priv->last_sr, NULL);
1417       priv->timer_running = FALSE;
1418       unschedule_current_timer (jitterbuffer);
1419       JBUF_SIGNAL_TIMER (priv);
1420       JBUF_SIGNAL_QUERY (priv, FALSE);
1421       JBUF_UNLOCK (priv);
1422       g_thread_join (priv->timer_thread);
1423       priv->timer_thread = NULL;
1424       break;
1425     case GST_STATE_CHANGE_READY_TO_NULL:
1426       break;
1427     default:
1428       break;
1429   }
1430
1431   return ret;
1432 }
1433
1434 static gboolean
1435 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1436     GstEvent * event)
1437 {
1438   gboolean ret = TRUE;
1439   GstRtpJitterBuffer *jitterbuffer;
1440   GstRtpJitterBufferPrivate *priv;
1441
1442   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1443   priv = jitterbuffer->priv;
1444
1445   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1446
1447   switch (GST_EVENT_TYPE (event)) {
1448     case GST_EVENT_LATENCY:
1449     {
1450       GstClockTime latency;
1451
1452       gst_event_parse_latency (event, &latency);
1453
1454       GST_DEBUG_OBJECT (jitterbuffer,
1455           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1456
1457       JBUF_LOCK (priv);
1458       /* adjust the overall buffer delay to the total pipeline latency in
1459        * buffering mode because if downstream consumes too fast (because of
1460        * large latency or queues, we would start rebuffering again. */
1461       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1462           RTP_JITTER_BUFFER_MODE_BUFFER) {
1463         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1464       }
1465       JBUF_UNLOCK (priv);
1466
1467       ret = gst_pad_push_event (priv->sinkpad, event);
1468       break;
1469     }
1470     default:
1471       ret = gst_pad_push_event (priv->sinkpad, event);
1472       break;
1473   }
1474
1475   return ret;
1476 }
1477
1478 /* handles and stores the event in the jitterbuffer, must be called with
1479  * LOCK */
1480 static gboolean
1481 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1482 {
1483   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1484   RTPJitterBufferItem *item;
1485   gboolean head;
1486
1487   switch (GST_EVENT_TYPE (event)) {
1488     case GST_EVENT_CAPS:
1489     {
1490       GstCaps *caps;
1491
1492       gst_event_parse_caps (event, &caps);
1493       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1494       break;
1495     }
1496     case GST_EVENT_SEGMENT:
1497       gst_event_copy_segment (event, &priv->segment);
1498
1499       /* we need time for now */
1500       if (priv->segment.format != GST_FORMAT_TIME)
1501         goto newseg_wrong_format;
1502
1503       GST_DEBUG_OBJECT (jitterbuffer,
1504           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1505       break;
1506     case GST_EVENT_EOS:
1507       priv->eos = TRUE;
1508       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1509       break;
1510     default:
1511       break;
1512   }
1513
1514
1515   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1516   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1517   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1518   if (head)
1519     JBUF_SIGNAL_EVENT (priv);
1520
1521   return TRUE;
1522
1523   /* ERRORS */
1524 newseg_wrong_format:
1525   {
1526     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1527     gst_event_unref (event);
1528     return FALSE;
1529   }
1530 }
1531
1532 static gboolean
1533 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1534     GstEvent * event)
1535 {
1536   gboolean ret = TRUE;
1537   GstRtpJitterBuffer *jitterbuffer;
1538   GstRtpJitterBufferPrivate *priv;
1539
1540   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1541   priv = jitterbuffer->priv;
1542
1543   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1544
1545   switch (GST_EVENT_TYPE (event)) {
1546     case GST_EVENT_FLUSH_START:
1547       ret = gst_pad_push_event (priv->srcpad, event);
1548       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1549       /* wait for the loop to go into PAUSED */
1550       gst_pad_pause_task (priv->srcpad);
1551       break;
1552     case GST_EVENT_FLUSH_STOP:
1553       ret = gst_pad_push_event (priv->srcpad, event);
1554       ret =
1555           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1556           GST_PAD_MODE_PUSH, TRUE);
1557       break;
1558     default:
1559       if (GST_EVENT_IS_SERIALIZED (event)) {
1560         /* serialized events go in the queue */
1561         JBUF_LOCK (priv);
1562         if (priv->srcresult != GST_FLOW_OK) {
1563           /* Errors in sticky event pushing are no problem and ignored here
1564            * as they will cause more meaningful errors during data flow.
1565            * For EOS events, that are not followed by data flow, we still
1566            * return FALSE here though.
1567            */
1568           if (!GST_EVENT_IS_STICKY (event) ||
1569               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1570             goto out_flow_error;
1571         }
1572         /* refuse more events on EOS */
1573         if (priv->eos)
1574           goto out_eos;
1575         ret = queue_event (jitterbuffer, event);
1576         JBUF_UNLOCK (priv);
1577       } else {
1578         /* non-serialized events are forwarded downstream immediately */
1579         ret = gst_pad_push_event (priv->srcpad, event);
1580       }
1581       break;
1582   }
1583   return ret;
1584
1585   /* ERRORS */
1586 out_flow_error:
1587   {
1588     GST_DEBUG_OBJECT (jitterbuffer,
1589         "refusing event, we have a downstream flow error: %s",
1590         gst_flow_get_name (priv->srcresult));
1591     JBUF_UNLOCK (priv);
1592     gst_event_unref (event);
1593     return FALSE;
1594   }
1595 out_eos:
1596   {
1597     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1598     JBUF_UNLOCK (priv);
1599     gst_event_unref (event);
1600     return FALSE;
1601   }
1602 }
1603
1604 static gboolean
1605 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1606     GstEvent * event)
1607 {
1608   gboolean ret = TRUE;
1609   GstRtpJitterBuffer *jitterbuffer;
1610
1611   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1612
1613   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1614
1615   switch (GST_EVENT_TYPE (event)) {
1616     case GST_EVENT_FLUSH_START:
1617       gst_event_unref (event);
1618       break;
1619     case GST_EVENT_FLUSH_STOP:
1620       gst_event_unref (event);
1621       break;
1622     default:
1623       ret = gst_pad_event_default (pad, parent, event);
1624       break;
1625   }
1626
1627   return ret;
1628 }
1629
1630 /*
1631  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1632  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1633  * GST_FLOW_FLUSHING when the element is shutting down. On success
1634  * GST_FLOW_OK is returned.
1635  */
1636 static GstFlowReturn
1637 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1638     guint8 pt)
1639 {
1640   GValue ret = { 0 };
1641   GValue args[2] = { {0}, {0} };
1642   GstCaps *caps;
1643   gboolean res;
1644
1645   g_value_init (&args[0], GST_TYPE_ELEMENT);
1646   g_value_set_object (&args[0], jitterbuffer);
1647   g_value_init (&args[1], G_TYPE_UINT);
1648   g_value_set_uint (&args[1], pt);
1649
1650   g_value_init (&ret, GST_TYPE_CAPS);
1651   g_value_set_boxed (&ret, NULL);
1652
1653   JBUF_UNLOCK (jitterbuffer->priv);
1654   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1655       &ret);
1656   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1657
1658   g_value_unset (&args[0]);
1659   g_value_unset (&args[1]);
1660   caps = (GstCaps *) g_value_dup_boxed (&ret);
1661   g_value_unset (&ret);
1662   if (!caps)
1663     goto no_caps;
1664
1665   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1666   gst_caps_unref (caps);
1667
1668   if (G_UNLIKELY (!res))
1669     goto parse_failed;
1670
1671   return GST_FLOW_OK;
1672
1673   /* ERRORS */
1674 no_caps:
1675   {
1676     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1677     return GST_FLOW_ERROR;
1678   }
1679 out_flushing:
1680   {
1681     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1682     return GST_FLOW_FLUSHING;
1683   }
1684 parse_failed:
1685   {
1686     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1687     return GST_FLOW_ERROR;
1688   }
1689 }
1690
1691 /* call with jbuf lock held */
1692 static GstMessage *
1693 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1694 {
1695   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1696   GstMessage *message = NULL;
1697
1698   if (percent == -1)
1699     return NULL;
1700
1701   /* Post a buffering message */
1702   if (priv->last_percent != percent) {
1703     priv->last_percent = percent;
1704     message =
1705         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1706     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1707   }
1708
1709   return message;
1710 }
1711
1712 static GstClockTime
1713 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1714 {
1715   GstRtpJitterBufferPrivate *priv;
1716
1717   priv = jitterbuffer->priv;
1718
1719   if (timestamp == -1)
1720     return -1;
1721
1722   /* apply the timestamp offset, this is used for inter stream sync */
1723   timestamp += priv->ts_offset;
1724   /* add the offset, this is used when buffering */
1725   timestamp += priv->out_offset;
1726
1727   return timestamp;
1728 }
1729
1730 static TimerData *
1731 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1732 {
1733   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1734   TimerData *timer = NULL;
1735   gint i, len;
1736
1737   len = priv->timers->len;
1738   for (i = 0; i < len; i++) {
1739     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1740     if (test->seqnum == seqnum && test->type == type) {
1741       timer = test;
1742       break;
1743     }
1744   }
1745   return timer;
1746 }
1747
1748 static void
1749 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1750 {
1751   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1752
1753   if (priv->clock_id) {
1754     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1755     gst_clock_id_unschedule (priv->clock_id);
1756     priv->clock_id = NULL;
1757   }
1758 }
1759
1760 static GstClockTime
1761 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1762 {
1763   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1764   GstClockTime test_timeout;
1765
1766   if ((test_timeout = timer->timeout) == -1)
1767     return -1;
1768
1769   if (timer->type != TIMER_TYPE_EXPECTED) {
1770     /* add our latency and offset to get output times. */
1771     test_timeout = apply_offset (jitterbuffer, test_timeout);
1772     test_timeout += priv->latency_ns;
1773   }
1774   return test_timeout;
1775 }
1776
1777 static void
1778 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1779 {
1780   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1781
1782   if (priv->clock_id) {
1783     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1784
1785     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1786         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1787
1788     if (timeout == -1 || timeout < priv->timer_timeout)
1789       unschedule_current_timer (jitterbuffer);
1790   }
1791 }
1792
1793 static TimerData *
1794 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1795     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1796     GstClockTime duration)
1797 {
1798   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1799   TimerData *timer;
1800   gint len;
1801
1802   GST_DEBUG_OBJECT (jitterbuffer,
1803       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1804       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1805       GST_TIME_ARGS (delay));
1806
1807   len = priv->timers->len;
1808   g_array_set_size (priv->timers, len + 1);
1809   timer = &g_array_index (priv->timers, TimerData, len);
1810   timer->idx = len;
1811   timer->type = type;
1812   timer->seqnum = seqnum;
1813   timer->num = num;
1814   timer->timeout = timeout + delay;
1815   timer->duration = duration;
1816   if (type == TIMER_TYPE_EXPECTED) {
1817     timer->rtx_base = timeout;
1818     timer->rtx_delay = delay;
1819     timer->rtx_retry = 0;
1820   }
1821   timer->num_rtx_retry = 0;
1822   recalculate_timer (jitterbuffer, timer);
1823   JBUF_SIGNAL_TIMER (priv);
1824
1825   return timer;
1826 }
1827
1828 static void
1829 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1830     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1831 {
1832   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1833   gboolean seqchange, timechange;
1834   guint16 oldseq;
1835
1836   seqchange = timer->seqnum != seqnum;
1837   timechange = timer->timeout != timeout;
1838
1839   if (!seqchange && !timechange)
1840     return;
1841
1842   oldseq = timer->seqnum;
1843
1844   GST_DEBUG_OBJECT (jitterbuffer,
1845       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1846       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1847
1848   timer->timeout = timeout + delay;
1849   timer->seqnum = seqnum;
1850   if (reset) {
1851     timer->rtx_base = timeout;
1852     timer->rtx_delay = delay;
1853     timer->rtx_retry = 0;
1854   }
1855   if (seqchange)
1856     timer->num_rtx_retry = 0;
1857
1858   if (priv->clock_id) {
1859     /* we changed the seqnum and there is a timer currently waiting with this
1860      * seqnum, unschedule it */
1861     if (seqchange && priv->timer_seqnum == oldseq)
1862       unschedule_current_timer (jitterbuffer);
1863     /* we changed the time, check if it is earlier than what we are waiting
1864      * for and unschedule if so */
1865     else if (timechange)
1866       recalculate_timer (jitterbuffer, timer);
1867   }
1868 }
1869
1870 static TimerData *
1871 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1872     guint16 seqnum, GstClockTime timeout)
1873 {
1874   TimerData *timer;
1875
1876   /* find the seqnum timer */
1877   timer = find_timer (jitterbuffer, type, seqnum);
1878   if (timer == NULL) {
1879     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1880   } else {
1881     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1882   }
1883   return timer;
1884 }
1885
1886 static void
1887 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1888 {
1889   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1890   guint idx;
1891
1892   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1893     unschedule_current_timer (jitterbuffer);
1894
1895   idx = timer->idx;
1896   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1897   g_array_remove_index_fast (priv->timers, idx);
1898   timer->idx = idx;
1899 }
1900
1901 static void
1902 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1903 {
1904   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1905   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1906   g_array_set_size (priv->timers, 0);
1907   unschedule_current_timer (jitterbuffer);
1908 }
1909
1910 /* get the extra delay to wait before sending RTX */
1911 static GstClockTime
1912 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1913 {
1914   GstClockTime delay;
1915
1916   if (priv->rtx_delay == -1) {
1917     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
1918       delay = DEFAULT_AUTO_RTX_DELAY;
1919     } else {
1920       /* jitter is in nanoseconds, maximum of 2x jitter and half the
1921        * packet spacing is a good margin */
1922       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
1923     }
1924   } else {
1925     delay = priv->rtx_delay * GST_MSECOND;
1926   }
1927   if (priv->rtx_min_delay > 0)
1928     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1929
1930   return delay;
1931 }
1932
1933 /* we just received a packet with seqnum and dts.
1934  *
1935  * First check for old seqnum that we are still expecting. If the gap with the
1936  * current seqnum is too big, unschedule the timeouts.
1937  *
1938  * If we have a valid packet spacing estimate we can set a timer for when we
1939  * should receive the next packet.
1940  * If we don't have a valid estimate, we remove any timer we might have
1941  * had for this packet.
1942  */
1943 static void
1944 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1945     GstClockTime dts, gboolean do_next_seqnum)
1946 {
1947   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1948   TimerData *timer = NULL;
1949   gint i, len;
1950
1951   /* go through all timers and unschedule the ones with a large gap, also find
1952    * the timer for the seqnum */
1953   len = priv->timers->len;
1954   for (i = 0; i < len; i++) {
1955     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1956     gint gap;
1957
1958     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1959
1960     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1961         test->type, test->seqnum, seqnum, gap);
1962
1963     if (gap == 0) {
1964       GST_DEBUG ("found timer for current seqnum");
1965       /* the timer for the current seqnum */
1966       timer = test;
1967       /* when no retransmission, we can stop now, we only need to find the
1968        * timer for the current seqnum */
1969       if (!priv->do_retransmission)
1970         break;
1971     } else if (gap > priv->rtx_delay_reorder) {
1972       /* max gap, we exceeded the max reorder distance and we don't expect the
1973        * missing packet to be this reordered */
1974       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1975         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1976     }
1977   }
1978
1979   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1980       && priv->do_retransmission && priv->rtx_next_seqnum;
1981
1982   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1983     if (timer->num_rtx_retry > 0) {
1984       GstClockTime rtx_last, delay;
1985
1986       /* we scheduled a retry for this packet and now we have it */
1987       priv->num_rtx_success++;
1988       /* all the previous retry attempts failed */
1989       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1990       /* number of retries before receiving the packet */
1991       if (priv->avg_rtx_num == 0.0)
1992         priv->avg_rtx_num = timer->num_rtx_retry;
1993       else
1994         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1995       /* calculate the delay between retransmission request and receiving this
1996        * packet, start with when we scheduled this timeout last */
1997       rtx_last = timer->rtx_last;
1998       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1999         /* we have a valid delay if this packet arrived after we scheduled the
2000          * request */
2001         delay = dts - rtx_last;
2002         if (priv->avg_rtx_rtt == 0)
2003           priv->avg_rtx_rtt = delay;
2004         else
2005           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
2006       } else
2007         delay = 0;
2008
2009       GST_LOG_OBJECT (jitterbuffer,
2010           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
2011           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
2012           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
2013           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
2014           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
2015           GST_TIME_ARGS (priv->avg_rtx_rtt));
2016
2017       /* don't try to estimate the next seqnum because this is a retransmitted
2018        * packet and it probably did not arrive with the expected packet
2019        * spacing. */
2020       do_next_seqnum = FALSE;
2021     }
2022   }
2023
2024   if (do_next_seqnum) {
2025     GstClockTime expected, delay;
2026
2027     /* calculate expected arrival time of the next seqnum */
2028     expected = dts + priv->packet_spacing;
2029
2030     delay = get_rtx_delay (priv);
2031
2032     /* and update/install timer for next seqnum */
2033     if (timer)
2034       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2035           delay, TRUE);
2036     else
2037       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2038           expected, delay, priv->packet_spacing);
2039   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2040     /* if we had a timer, remove it, we don't know when to expect the next
2041      * packet. */
2042     remove_timer (jitterbuffer, timer);
2043   }
2044 }
2045
2046 static void
2047 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2048     GstClockTime dts)
2049 {
2050   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2051
2052   /* we need consecutive seqnums with a different
2053    * rtptime to estimate the packet spacing. */
2054   if (priv->ips_rtptime != rtptime) {
2055     /* rtptime changed, check dts diff */
2056     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2057       GstClockTime new_packet_spacing = dts - priv->ips_dts;
2058       GstClockTime old_packet_spacing = priv->packet_spacing;
2059
2060       /* Biased towards bigger packet spacings to prevent
2061        * too many unneeded retransmission requests for next
2062        * packets that just arrive a little later than we would
2063        * expect */
2064       if (old_packet_spacing > new_packet_spacing)
2065         priv->packet_spacing =
2066             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2067       else if (old_packet_spacing > 0)
2068         priv->packet_spacing =
2069             (3 * new_packet_spacing + old_packet_spacing) / 4;
2070       else
2071         priv->packet_spacing = new_packet_spacing;
2072
2073       GST_DEBUG_OBJECT (jitterbuffer,
2074           "new packet spacing %" GST_TIME_FORMAT
2075           " old packet spacing %" GST_TIME_FORMAT
2076           " combined to %" GST_TIME_FORMAT,
2077           GST_TIME_ARGS (new_packet_spacing),
2078           GST_TIME_ARGS (old_packet_spacing),
2079           GST_TIME_ARGS (priv->packet_spacing));
2080     }
2081     priv->ips_rtptime = rtptime;
2082     priv->ips_dts = dts;
2083   }
2084 }
2085
2086 static void
2087 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2088     guint16 seqnum, GstClockTime dts, gint gap)
2089 {
2090   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2091   GstClockTime total_duration, duration, expected_dts;
2092   TimerType type;
2093   guint lost_packets = 0;
2094
2095   GST_DEBUG_OBJECT (jitterbuffer,
2096       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2097       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2098
2099   /* the total duration spanned by the missing packets */
2100   if (dts >= priv->last_in_dts)
2101     total_duration = dts - priv->last_in_dts;
2102   else
2103     total_duration = 0;
2104
2105   /* interpolate between the current time and the last time based on
2106    * number of packets we are missing, this is the estimated duration
2107    * for the missing packet based on equidistant packet spacing. */
2108   duration = total_duration / (gap + 1);
2109
2110   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2111       GST_TIME_ARGS (duration));
2112
2113   if (total_duration > priv->latency_ns) {
2114     GstClockTime gap_time;
2115
2116     gap_time = total_duration - priv->latency_ns;
2117
2118     if (duration > 0) {
2119       lost_packets = gap_time / duration;
2120       gap_time = lost_packets * duration;
2121     } else {
2122       lost_packets = gap;
2123     }
2124
2125     /* too many lost packets, some of the missing packets are already
2126      * too late and we can generate lost packet events for them. */
2127     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2128         " > %" GST_TIME_FORMAT ", consider %u lost",
2129         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2130         lost_packets);
2131
2132     /* this timer will fire immediately and the lost event will be pushed from
2133      * the timer thread */
2134     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2135         priv->last_in_dts + duration, 0, gap_time);
2136
2137     expected += lost_packets;
2138     priv->last_in_dts += gap_time;
2139   }
2140
2141   expected_dts = priv->last_in_dts + (lost_packets + 1) * duration;
2142
2143   if (priv->do_retransmission) {
2144     TimerData *timer;
2145
2146     type = TIMER_TYPE_EXPECTED;
2147     /* if we had a timer for the first missing packet, update it. */
2148     if ((timer = find_timer (jitterbuffer, type, expected))) {
2149       GstClockTime timeout = timer->timeout;
2150
2151       timer->duration = duration;
2152       if (timeout > (expected_dts + timer->rtx_retry)) {
2153         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2154         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2155             delay, TRUE);
2156       }
2157       expected++;
2158       expected_dts += duration;
2159     }
2160   } else {
2161     type = TIMER_TYPE_LOST;
2162   }
2163
2164   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2165     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2166     expected_dts += duration;
2167     expected++;
2168   }
2169 }
2170
2171 static void
2172 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2173     guint rtptime)
2174 {
2175   gint32 rtpdiff;
2176   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2177   GstRtpJitterBufferPrivate *priv;
2178
2179   priv = jitterbuffer->priv;
2180
2181   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2182     goto no_time;
2183
2184   if (priv->last_dts != -1)
2185     dtsdiff = dts - priv->last_dts;
2186   else
2187     dtsdiff = 0;
2188
2189   if (priv->last_rtptime != -1)
2190     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2191   else
2192     rtpdiff = 0;
2193
2194   priv->last_dts = dts;
2195   priv->last_rtptime = rtptime;
2196
2197   if (rtpdiff > 0)
2198     rtpdiffns =
2199         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2200   else
2201     rtpdiffns =
2202         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2203
2204   diff = ABS (dtsdiff - rtpdiffns);
2205
2206   /* jitter is stored in nanoseconds */
2207   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2208
2209   GST_LOG_OBJECT (jitterbuffer,
2210       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2211       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2212       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2213       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2214
2215   return;
2216
2217   /* ERRORS */
2218 no_time:
2219   {
2220     GST_DEBUG_OBJECT (jitterbuffer,
2221         "no dts or no clock-rate, can't calculate jitter");
2222     return;
2223   }
2224 }
2225
2226 static GstFlowReturn
2227 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2228     GstBuffer * buffer)
2229 {
2230   GstRtpJitterBuffer *jitterbuffer;
2231   GstRtpJitterBufferPrivate *priv;
2232   guint16 seqnum;
2233   guint32 expected, rtptime;
2234   GstFlowReturn ret = GST_FLOW_OK;
2235   GstClockTime dts, pts;
2236   guint64 latency_ts;
2237   gboolean head;
2238   gint percent = -1;
2239   guint8 pt;
2240   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2241   gboolean do_next_seqnum = FALSE;
2242   RTPJitterBufferItem *item;
2243   GstMessage *msg = NULL;
2244
2245   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2246
2247   priv = jitterbuffer->priv;
2248
2249   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2250     goto invalid_buffer;
2251
2252   pt = gst_rtp_buffer_get_payload_type (&rtp);
2253   seqnum = gst_rtp_buffer_get_seq (&rtp);
2254   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2255   gst_rtp_buffer_unmap (&rtp);
2256
2257   /* make sure we have PTS and DTS set */
2258   pts = GST_BUFFER_PTS (buffer);
2259   dts = GST_BUFFER_DTS (buffer);
2260   if (dts == -1)
2261     dts = pts;
2262   else if (pts == -1)
2263     pts = dts;
2264
2265   /* take the DTS of the buffer. This is the time when the packet was
2266    * received and is used to calculate jitter and clock skew. We will adjust
2267    * this DTS with the smoothed value after processing it in the
2268    * jitterbuffer and assign it as the PTS. */
2269   /* bring to running time */
2270   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2271
2272   GST_DEBUG_OBJECT (jitterbuffer,
2273       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2274       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2275
2276   JBUF_LOCK_CHECK (priv, out_flushing);
2277
2278   if (G_UNLIKELY (priv->last_pt != pt)) {
2279     GstCaps *caps;
2280
2281     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2282         pt);
2283
2284     priv->last_pt = pt;
2285     /* reset clock-rate so that we get a new one */
2286     priv->clock_rate = -1;
2287
2288     /* Try to get the clock-rate from the caps first if we can. If there are no
2289      * caps we must fire the signal to get the clock-rate. */
2290     if ((caps = gst_pad_get_current_caps (pad))) {
2291       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2292       gst_caps_unref (caps);
2293     }
2294   }
2295
2296   if (G_UNLIKELY (priv->clock_rate == -1)) {
2297     /* no clock rate given on the caps, try to get one with the signal */
2298     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2299             pt) == GST_FLOW_FLUSHING)
2300       goto out_flushing;
2301
2302     if (G_UNLIKELY (priv->clock_rate == -1))
2303       goto no_clock_rate;
2304   }
2305
2306   /* don't accept more data on EOS */
2307   if (G_UNLIKELY (priv->eos))
2308     goto have_eos;
2309
2310   calculate_jitter (jitterbuffer, dts, rtptime);
2311
2312   if (priv->seqnum_base != -1) {
2313     gint gap;
2314
2315     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2316
2317     if (gap < 0) {
2318       GST_DEBUG_OBJECT (jitterbuffer,
2319           "packet seqnum #%d before seqnum-base #%d", seqnum,
2320           priv->seqnum_base);
2321       gst_buffer_unref (buffer);
2322       ret = GST_FLOW_OK;
2323       goto finished;
2324     } else if (gap > 16384) {
2325       /* From now on don't compare against the seqnum base anymore as
2326        * at some point in the future we will wrap around and also that
2327        * much reordering is very unlikely */
2328       priv->seqnum_base = -1;
2329     }
2330   }
2331
2332   expected = priv->next_in_seqnum;
2333
2334   /* now check against our expected seqnum */
2335   if (G_LIKELY (expected != -1)) {
2336     gint gap;
2337
2338     /* now calculate gap */
2339     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2340
2341     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2342         expected, seqnum, gap);
2343
2344     if (G_LIKELY (gap == 0)) {
2345       /* packet is expected */
2346       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2347       do_next_seqnum = TRUE;
2348     } else {
2349       gboolean reset = FALSE;
2350
2351       if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2352         /* We would run into calculations with GST_CLOCK_TIME_NONE below
2353          * and can't compensate for anything without DTS on RTP packets
2354          */
2355         goto gap_but_no_dts;
2356       } else if (gap < 0) {
2357         /* we received an old packet */
2358         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2359           /* too old packet, reset */
2360           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2361               -RTP_MAX_MISORDER);
2362           reset = TRUE;
2363         } else {
2364           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2365         }
2366       } else {
2367         /* new packet, we are missing some packets */
2368         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2369           /* packet too far in future, reset */
2370           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2371               RTP_MAX_DROPOUT);
2372           reset = TRUE;
2373         } else {
2374           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2375           /* fill in the gap with EXPECTED timers */
2376           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2377
2378           do_next_seqnum = TRUE;
2379         }
2380       }
2381       if (G_UNLIKELY (reset)) {
2382         GList *events = NULL, *l;
2383
2384         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2385         rtp_jitter_buffer_flush (priv->jbuf,
2386             (GFunc) free_item_and_retain_events, &events);
2387         rtp_jitter_buffer_reset_skew (priv->jbuf);
2388         remove_all_timers (jitterbuffer);
2389         priv->discont = TRUE;
2390         priv->last_popped_seqnum = -1;
2391         priv->next_seqnum = seqnum;
2392         do_next_seqnum = TRUE;
2393
2394         /* Insert all sticky events again in order, otherwise we would
2395          * potentially loose STREAM_START, CAPS or SEGMENT events
2396          */
2397         events = g_list_reverse (events);
2398         for (l = events; l; l = l->next) {
2399           RTPJitterBufferItem *item;
2400
2401           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2402           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2403         }
2404         g_list_free (events);
2405
2406         JBUF_SIGNAL_EVENT (priv);
2407       }
2408       /* reset spacing estimation when gap */
2409       priv->ips_rtptime = -1;
2410       priv->ips_dts = GST_CLOCK_TIME_NONE;
2411     }
2412   } else {
2413     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2414     /* we don't know what the next_in_seqnum should be, wait for the last
2415      * possible moment to push this buffer, maybe we get an earlier seqnum
2416      * while we wait */
2417     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2418     do_next_seqnum = TRUE;
2419     /* take rtptime and dts to calculate packet spacing */
2420     priv->ips_rtptime = rtptime;
2421     priv->ips_dts = dts;
2422   }
2423   if (do_next_seqnum) {
2424     priv->last_in_seqnum = seqnum;
2425     priv->last_in_dts = dts;
2426     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2427   }
2428
2429   /* let's check if this buffer is too late, we can only accept packets with
2430    * bigger seqnum than the one we last pushed. */
2431   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2432     gint gap;
2433
2434     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2435
2436     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2437     if (G_UNLIKELY (gap <= 0))
2438       goto too_late;
2439   }
2440
2441   /* let's drop oldest packet if the queue is already full and drop-on-latency
2442    * is set. We can only do this when there actually is a latency. When no
2443    * latency is set, we just pump it in the queue and let the other end push it
2444    * out as fast as possible. */
2445   if (priv->latency_ms && priv->drop_on_latency) {
2446     latency_ts =
2447         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2448
2449     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2450       RTPJitterBufferItem *old_item;
2451
2452       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2453
2454       if (IS_DROPABLE (old_item)) {
2455         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2456         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2457             old_item);
2458         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2459         free_item (old_item);
2460       }
2461       /* we might have removed some head buffers, signal the pushing thread to
2462        * see if it can push now */
2463       JBUF_SIGNAL_EVENT (priv);
2464     }
2465   }
2466
2467   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2468
2469   /* now insert the packet into the queue in sorted order. This function returns
2470    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2471    * have a duplicate. */
2472   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2473               &head, &percent)))
2474     goto duplicate;
2475
2476   /* update timers */
2477   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2478
2479   /* we had an unhandled SR, handle it now */
2480   if (priv->last_sr)
2481     do_handle_sync (jitterbuffer);
2482
2483   if (G_UNLIKELY (head)) {
2484     /* signal addition of new buffer when the _loop is waiting. */
2485     if (G_LIKELY (priv->active))
2486       JBUF_SIGNAL_EVENT (priv);
2487
2488     /* let's unschedule and unblock any waiting buffers. We only want to do this
2489      * when the head buffer changed */
2490     if (G_UNLIKELY (priv->clock_id)) {
2491       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2492       unschedule_current_timer (jitterbuffer);
2493     }
2494   }
2495
2496   GST_DEBUG_OBJECT (jitterbuffer,
2497       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2498       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2499
2500   msg = check_buffering_percent (jitterbuffer, percent);
2501
2502 finished:
2503   JBUF_UNLOCK (priv);
2504
2505   if (msg)
2506     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2507
2508   return ret;
2509
2510   /* ERRORS */
2511 invalid_buffer:
2512   {
2513     /* this is not fatal but should be filtered earlier */
2514     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2515         ("Received invalid RTP payload, dropping"));
2516     gst_buffer_unref (buffer);
2517     return GST_FLOW_OK;
2518   }
2519 no_clock_rate:
2520   {
2521     GST_WARNING_OBJECT (jitterbuffer,
2522         "No clock-rate in caps!, dropping buffer");
2523     gst_buffer_unref (buffer);
2524     goto finished;
2525   }
2526 out_flushing:
2527   {
2528     ret = priv->srcresult;
2529     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2530     gst_buffer_unref (buffer);
2531     goto finished;
2532   }
2533 have_eos:
2534   {
2535     ret = GST_FLOW_EOS;
2536     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2537     gst_buffer_unref (buffer);
2538     goto finished;
2539   }
2540 too_late:
2541   {
2542     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2543         " popped, dropping", seqnum, priv->last_popped_seqnum);
2544     priv->num_late++;
2545     gst_buffer_unref (buffer);
2546     goto finished;
2547   }
2548 duplicate:
2549   {
2550     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2551         seqnum);
2552     priv->num_duplicates++;
2553     free_item (item);
2554     goto finished;
2555   }
2556 gap_but_no_dts:
2557   {
2558     /* this is fatal as we can't compensate for gaps without DTS */
2559     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2560         ("Received packet without DTS after a gap"));
2561     gst_buffer_unref (buffer);
2562     ret = GST_FLOW_ERROR;
2563     goto finished;
2564   }
2565 }
2566
2567 static GstClockTime
2568 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2569 {
2570   guint64 ext_time, elapsed;
2571   guint32 rtp_time;
2572   GstRtpJitterBufferPrivate *priv;
2573
2574   priv = jitterbuffer->priv;
2575   rtp_time = item->rtptime;
2576
2577   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2578       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2579
2580   if (rtp_time < priv->ext_timestamp) {
2581     ext_time = priv->ext_timestamp;
2582   } else {
2583     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2584   }
2585
2586   if (ext_time > priv->clock_base)
2587     elapsed = ext_time - priv->clock_base;
2588   else
2589     elapsed = 0;
2590
2591   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2592   return elapsed;
2593 }
2594
2595 static void
2596 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2597     RTPJitterBufferItem * item)
2598 {
2599   guint64 total, elapsed, left, estimated;
2600   GstClockTime out_time;
2601   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2602
2603   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2604       || priv->clock_base == -1 || priv->clock_rate <= 0)
2605     return;
2606
2607   /* compute the elapsed time */
2608   elapsed = compute_elapsed (jitterbuffer, item);
2609
2610   /* do nothing if elapsed time doesn't increment */
2611   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2612     return;
2613
2614   priv->last_elapsed = elapsed;
2615
2616   /* this is the total time we need to play */
2617   total = priv->npt_stop - priv->npt_start;
2618   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2619       GST_TIME_ARGS (total));
2620
2621   /* this is how much time there is left */
2622   if (total > elapsed)
2623     left = total - elapsed;
2624   else
2625     left = 0;
2626
2627   /* if we have less time left that the size of the buffer, we will not
2628    * be able to keep it filled, disabled buffering then */
2629   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2630     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2631         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2632     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2633   }
2634
2635   /* this is the current time as running-time */
2636   out_time = item->dts;
2637
2638   if (elapsed > 0)
2639     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2640   else {
2641     /* if there is almost nothing left,
2642      * we may never advance enough to end up in the above case */
2643     if (total < GST_SECOND)
2644       estimated = GST_SECOND;
2645     else
2646       estimated = -1;
2647   }
2648   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2649       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2650
2651   if (estimated != -1 && priv->estimated_eos != estimated) {
2652     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2653     priv->estimated_eos = estimated;
2654   }
2655 }
2656
2657 /* take a buffer from the queue and push it */
2658 static GstFlowReturn
2659 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2660 {
2661   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2662   GstFlowReturn result = GST_FLOW_OK;
2663   RTPJitterBufferItem *item;
2664   GstBuffer *outbuf = NULL;
2665   GstEvent *outevent = NULL;
2666   GstQuery *outquery = NULL;
2667   GstClockTime dts, pts;
2668   gint percent = -1;
2669   gboolean do_push = TRUE;
2670   guint type;
2671   GstMessage *msg;
2672
2673   /* when we get here we are ready to pop and push the buffer */
2674   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2675   type = item->type;
2676
2677   switch (type) {
2678     case ITEM_TYPE_BUFFER:
2679
2680       /* we need to make writable to change the flags and timestamps */
2681       outbuf = gst_buffer_make_writable (item->data);
2682
2683       if (G_UNLIKELY (priv->discont)) {
2684         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2685          * into the jitterbuffer so we can modify now. */
2686         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2687         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2688         priv->discont = FALSE;
2689       }
2690       if (G_UNLIKELY (priv->ts_discont)) {
2691         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2692         priv->ts_discont = FALSE;
2693       }
2694
2695       dts =
2696           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2697       pts =
2698           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2699
2700       /* apply timestamp with offset to buffer now */
2701       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2702       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2703
2704       /* update the elapsed time when we need to check against the npt stop time. */
2705       update_estimated_eos (jitterbuffer, item);
2706
2707       priv->last_out_time = GST_BUFFER_PTS (outbuf);
2708       break;
2709     case ITEM_TYPE_LOST:
2710       priv->discont = TRUE;
2711       if (!priv->do_lost)
2712         do_push = FALSE;
2713       /* FALLTHROUGH */
2714     case ITEM_TYPE_EVENT:
2715       outevent = item->data;
2716       break;
2717     case ITEM_TYPE_QUERY:
2718       outquery = item->data;
2719       break;
2720   }
2721
2722   /* now we are ready to push the buffer. Save the seqnum and release the lock
2723    * so the other end can push stuff in the queue again. */
2724   if (seqnum != -1) {
2725     priv->last_popped_seqnum = seqnum;
2726     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2727   }
2728   msg = check_buffering_percent (jitterbuffer, percent);
2729   JBUF_UNLOCK (priv);
2730
2731   item->data = NULL;
2732   free_item (item);
2733
2734   if (msg)
2735     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2736
2737   switch (type) {
2738     case ITEM_TYPE_BUFFER:
2739       /* push buffer */
2740       GST_DEBUG_OBJECT (jitterbuffer,
2741           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2742           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2743           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2744       result = gst_pad_push (priv->srcpad, outbuf);
2745
2746       JBUF_LOCK_CHECK (priv, out_flushing);
2747       break;
2748     case ITEM_TYPE_LOST:
2749     case ITEM_TYPE_EVENT:
2750       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2751           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2752
2753       if (do_push)
2754         gst_pad_push_event (priv->srcpad, outevent);
2755       else
2756         gst_event_unref (outevent);
2757
2758       result = GST_FLOW_OK;
2759
2760       JBUF_LOCK_CHECK (priv, out_flushing);
2761       break;
2762     case ITEM_TYPE_QUERY:
2763     {
2764       gboolean res;
2765
2766       res = gst_pad_peer_query (priv->srcpad, outquery);
2767
2768       JBUF_LOCK_CHECK (priv, out_flushing);
2769       result = GST_FLOW_OK;
2770       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2771       JBUF_SIGNAL_QUERY (priv, res);
2772       break;
2773     }
2774   }
2775   return result;
2776
2777   /* ERRORS */
2778 out_flushing:
2779   {
2780     return priv->srcresult;
2781   }
2782 }
2783
2784 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2785
2786 /* Peek a buffer and compare the seqnum to the expected seqnum.
2787  * If all is fine, the buffer is pushed.
2788  * If something is wrong, we wait for some event
2789  */
2790 static GstFlowReturn
2791 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2792 {
2793   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2794   GstFlowReturn result = GST_FLOW_OK;
2795   RTPJitterBufferItem *item;
2796   guint seqnum;
2797   guint32 next_seqnum;
2798   gint gap;
2799
2800   /* only push buffers when PLAYING and active and not buffering */
2801   if (priv->blocked || !priv->active ||
2802       rtp_jitter_buffer_is_buffering (priv->jbuf))
2803     return GST_FLOW_WAIT;
2804
2805 again:
2806   /* peek a buffer, we're just looking at the sequence number.
2807    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2808    * wait for a timeout or something to change.
2809    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2810   item = rtp_jitter_buffer_peek (priv->jbuf);
2811   if (item == NULL)
2812     goto wait;
2813
2814   /* get the seqnum and the next expected seqnum */
2815   seqnum = item->seqnum;
2816   if (seqnum == -1)
2817     goto do_push;
2818
2819   next_seqnum = priv->next_seqnum;
2820
2821   /* get the gap between this and the previous packet. If we don't know the
2822    * previous packet seqnum assume no gap. */
2823   if (G_UNLIKELY (next_seqnum == -1)) {
2824     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2825     /* we don't know what the next_seqnum should be, the chain function should
2826      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2827      * fires, so wait for that */
2828     result = GST_FLOW_WAIT;
2829   } else {
2830     /* else calculate GAP */
2831     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2832
2833     if (G_LIKELY (gap == 0)) {
2834     do_push:
2835       /* no missing packet, pop and push */
2836       result = pop_and_push_next (jitterbuffer, seqnum);
2837     } else if (G_UNLIKELY (gap < 0)) {
2838       RTPJitterBufferItem *item;
2839       /* if we have a packet that we already pushed or considered dropped, pop it
2840        * off and get the next packet */
2841       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2842           seqnum, next_seqnum);
2843       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2844       free_item (item);
2845       goto again;
2846     } else {
2847       /* the chain function has scheduled timers to request retransmission or
2848        * when to consider the packet lost, wait for that */
2849       GST_DEBUG_OBJECT (jitterbuffer,
2850           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2851           next_seqnum, seqnum, gap);
2852       result = GST_FLOW_WAIT;
2853     }
2854   }
2855   return result;
2856
2857 wait:
2858   {
2859     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2860     if (priv->eos)
2861       result = GST_FLOW_EOS;
2862     else
2863       result = GST_FLOW_WAIT;
2864     return result;
2865   }
2866 }
2867
2868 static GstClockTime
2869 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2870 {
2871   GstClockTime rtx_retry_timeout;
2872   GstClockTime rtx_min_retry_timeout;
2873
2874   if (priv->rtx_retry_timeout == -1) {
2875     if (priv->avg_rtx_rtt == 0)
2876       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2877     else
2878       /* we want to ask for a retransmission after we waited for a
2879        * complete RTT and the additional jitter */
2880       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2881   } else {
2882     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2883   }
2884   /* make sure we don't retry too often. On very low latency networks,
2885    * the RTT and jitter can be very low. */
2886   if (priv->rtx_min_retry_timeout == -1) {
2887     rtx_min_retry_timeout = priv->packet_spacing;
2888   } else {
2889     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2890   }
2891   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2892
2893   return rtx_retry_timeout;
2894 }
2895
2896 static GstClockTime
2897 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2898     GstClockTime rtx_retry_timeout)
2899 {
2900   GstClockTime rtx_retry_period;
2901
2902   if (priv->rtx_retry_period == -1) {
2903     /* we retry up to the configured jitterbuffer size but leaving some
2904      * room for the retransmission to arrive in time */
2905     if (rtx_retry_timeout > priv->latency_ns) {
2906       rtx_retry_period = 0;
2907     } else {
2908       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2909     }
2910   } else {
2911     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2912   }
2913   return rtx_retry_period;
2914 }
2915
2916 /* the timeout for when we expected a packet expired */
2917 static gboolean
2918 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2919     GstClockTime now)
2920 {
2921   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2922   GstEvent *event;
2923   guint delay, delay_ms, avg_rtx_rtt_ms;
2924   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2925   GstClockTime rtx_retry_period;
2926   GstClockTime rtx_retry_timeout;
2927   GstClock *clock;
2928
2929   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2930       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2931
2932   rtx_retry_timeout = get_rtx_retry_timeout (priv);
2933   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2934
2935   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2936       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2937       GST_TIME_ARGS (rtx_retry_period));
2938
2939   delay = timer->rtx_delay + timer->rtx_retry;
2940
2941   delay_ms = GST_TIME_AS_MSECONDS (delay);
2942   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2943   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2944   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2945
2946   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2947       gst_structure_new ("GstRTPRetransmissionRequest",
2948           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2949           "running-time", G_TYPE_UINT64, timer->rtx_base,
2950           "delay", G_TYPE_UINT, delay_ms,
2951           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2952           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2953           "period", G_TYPE_UINT, rtx_retry_period_ms,
2954           "deadline", G_TYPE_UINT, priv->latency_ms,
2955           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2956           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2957
2958   priv->num_rtx_requests++;
2959   timer->num_rtx_retry++;
2960
2961   GST_OBJECT_LOCK (jitterbuffer);
2962   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2963     timer->rtx_last = gst_clock_get_time (clock);
2964     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2965   } else {
2966     timer->rtx_last = now;
2967   }
2968   GST_OBJECT_UNLOCK (jitterbuffer);
2969
2970   /* calculate the timeout for the next retransmission attempt */
2971   timer->rtx_retry += rtx_retry_timeout;
2972   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2973       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2974       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2975       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2976   if ((priv->rtx_max_retries != -1
2977           && timer->num_rtx_retry >= priv->rtx_max_retries)
2978       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
2979     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2980     /* too many retransmission request, we now convert the timer
2981      * to a lost timer, leave the num_rtx_retry as it is for stats */
2982     timer->type = TIMER_TYPE_LOST;
2983     timer->rtx_delay = 0;
2984     timer->rtx_retry = 0;
2985   }
2986   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2987       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2988
2989   JBUF_UNLOCK (priv);
2990   gst_pad_push_event (priv->sinkpad, event);
2991   JBUF_LOCK (priv);
2992
2993   return FALSE;
2994 }
2995
2996 /* a packet is lost */
2997 static gboolean
2998 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2999     GstClockTime now)
3000 {
3001   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3002   GstClockTime duration, timestamp;
3003   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3004   gboolean late, head;
3005   GstEvent *event;
3006   RTPJitterBufferItem *item;
3007
3008   seqnum = timer->seqnum;
3009   timestamp = apply_offset (jitterbuffer, timer->timeout);
3010   duration = timer->duration;
3011   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3012     duration = priv->packet_spacing;
3013   lost_packets = MAX (timer->num, 1);
3014   late = timer->num > 0;
3015   num_rtx_retry = timer->num_rtx_retry;
3016
3017   /* we had a gap and thus we lost some packets. Create an event for this.  */
3018   if (lost_packets > 1)
3019     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3020         seqnum + lost_packets - 1);
3021   else
3022     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3023
3024   priv->num_late += lost_packets;
3025   priv->num_rtx_failed += num_rtx_retry;
3026
3027   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3028
3029   /* we now only accept seqnum bigger than this */
3030   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
3031     priv->next_in_seqnum = next_in_seqnum;
3032
3033   /* create paket lost event */
3034   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3035       gst_structure_new ("GstRTPPacketLost",
3036           "seqnum", G_TYPE_UINT, (guint) seqnum,
3037           "timestamp", G_TYPE_UINT64, timestamp,
3038           "duration", G_TYPE_UINT64, duration,
3039           "late", G_TYPE_BOOLEAN, late,
3040           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3041
3042   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3043   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3044
3045   /* remove timer now */
3046   remove_timer (jitterbuffer, timer);
3047   if (head)
3048     JBUF_SIGNAL_EVENT (priv);
3049
3050   return TRUE;
3051 }
3052
3053 static gboolean
3054 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3055     GstClockTime now)
3056 {
3057   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3058
3059   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3060   remove_timer (jitterbuffer, timer);
3061   if (!priv->eos) {
3062     /* there was no EOS in the buffer, put one in there now */
3063     queue_event (jitterbuffer, gst_event_new_eos ());
3064   }
3065   JBUF_SIGNAL_EVENT (priv);
3066
3067   return TRUE;
3068 }
3069
3070 static gboolean
3071 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3072     GstClockTime now)
3073 {
3074   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3075
3076   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3077
3078   /* timer seqnum might have been obsoleted by caps seqnum-base,
3079    * only mess with current ongoing seqnum if still unknown */
3080   if (priv->next_seqnum == -1)
3081     priv->next_seqnum = timer->seqnum;
3082   remove_timer (jitterbuffer, timer);
3083   JBUF_SIGNAL_EVENT (priv);
3084
3085   return TRUE;
3086 }
3087
3088 static gboolean
3089 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3090     GstClockTime now)
3091 {
3092   gboolean removed = FALSE;
3093
3094   switch (timer->type) {
3095     case TIMER_TYPE_EXPECTED:
3096       removed = do_expected_timeout (jitterbuffer, timer, now);
3097       break;
3098     case TIMER_TYPE_LOST:
3099       removed = do_lost_timeout (jitterbuffer, timer, now);
3100       break;
3101     case TIMER_TYPE_DEADLINE:
3102       removed = do_deadline_timeout (jitterbuffer, timer, now);
3103       break;
3104     case TIMER_TYPE_EOS:
3105       removed = do_eos_timeout (jitterbuffer, timer, now);
3106       break;
3107   }
3108   return removed;
3109 }
3110
3111 /* called when we need to wait for the next timeout.
3112  *
3113  * We loop over the array of recorded timeouts and wait for the earliest one.
3114  * When it timed out, do the logic associated with the timer.
3115  *
3116  * If there are no timers, we wait on a gcond until something new happens.
3117  */
3118 static void
3119 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3120 {
3121   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3122   GstClockTime now = 0;
3123
3124   JBUF_LOCK (priv);
3125   while (priv->timer_running) {
3126     TimerData *timer = NULL;
3127     GstClockTime timer_timeout = -1;
3128     gint i, len;
3129
3130     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3131         GST_TIME_ARGS (now));
3132
3133     len = priv->timers->len;
3134     for (i = 0; i < len; i++) {
3135       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3136       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3137       gboolean save_best = FALSE;
3138
3139       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3140           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3141
3142       /* find the smallest timeout */
3143       if (timer == NULL) {
3144         save_best = TRUE;
3145       } else if (timer_timeout == -1) {
3146         /* we already have an immediate timeout, the new timer must be an
3147          * immediate timer with smaller seqnum to become the best */
3148         if (test_timeout == -1
3149             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3150                     timer->seqnum) > 0))
3151           save_best = TRUE;
3152       } else if (test_timeout == -1) {
3153         /* first immediate timer */
3154         save_best = TRUE;
3155       } else if (test_timeout < timer_timeout) {
3156         /* earlier timer */
3157         save_best = TRUE;
3158       } else if (test_timeout == timer_timeout
3159           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3160                   timer->seqnum) > 0)) {
3161         /* same timer, smaller seqnum */
3162         save_best = TRUE;
3163       }
3164       if (save_best) {
3165         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3166         timer = test;
3167         timer_timeout = test_timeout;
3168       }
3169     }
3170     if (timer && !priv->blocked) {
3171       GstClock *clock;
3172       GstClockTime sync_time;
3173       GstClockID id;
3174       GstClockReturn ret;
3175       GstClockTimeDiff clock_jitter;
3176
3177       if (timer_timeout == -1 || timer_timeout <= now) {
3178         do_timeout (jitterbuffer, timer, now);
3179         /* check here, do_timeout could have released the lock */
3180         if (!priv->timer_running)
3181           break;
3182         continue;
3183       }
3184
3185       GST_OBJECT_LOCK (jitterbuffer);
3186       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3187       if (!clock) {
3188         GST_OBJECT_UNLOCK (jitterbuffer);
3189         /* let's just push if there is no clock */
3190         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3191         now = timer_timeout;
3192         continue;
3193       }
3194
3195       /* prepare for sync against clock */
3196       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3197       /* add latency of peer to get input time */
3198       sync_time += priv->peer_latency;
3199
3200       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3201           " with sync time %" GST_TIME_FORMAT,
3202           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3203
3204       /* create an entry for the clock */
3205       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3206       priv->timer_timeout = timer_timeout;
3207       priv->timer_seqnum = timer->seqnum;
3208       GST_OBJECT_UNLOCK (jitterbuffer);
3209
3210       /* release the lock so that the other end can push stuff or unlock */
3211       JBUF_UNLOCK (priv);
3212
3213       ret = gst_clock_id_wait (id, &clock_jitter);
3214
3215       JBUF_LOCK (priv);
3216       if (!priv->timer_running) {
3217         gst_clock_id_unref (id);
3218         priv->clock_id = NULL;
3219         break;
3220       }
3221
3222       if (ret != GST_CLOCK_UNSCHEDULED) {
3223         now = timer_timeout + MAX (clock_jitter, 0);
3224         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3225             ret, priv->timer_seqnum, clock_jitter);
3226       } else {
3227         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3228       }
3229       /* and free the entry */
3230       gst_clock_id_unref (id);
3231       priv->clock_id = NULL;
3232     } else {
3233       /* no timers, wait for activity */
3234       JBUF_WAIT_TIMER (priv);
3235     }
3236   }
3237   JBUF_UNLOCK (priv);
3238
3239   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3240   return;
3241 }
3242
3243 /*
3244  * This funcion implements the main pushing loop on the source pad.
3245  *
3246  * It first tries to push as many buffers as possible. If there is a seqnum
3247  * mismatch, we wait for the next timeouts.
3248  */
3249 static void
3250 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3251 {
3252   GstRtpJitterBufferPrivate *priv;
3253   GstFlowReturn result = GST_FLOW_OK;
3254
3255   priv = jitterbuffer->priv;
3256
3257   JBUF_LOCK_CHECK (priv, flushing);
3258   do {
3259     result = handle_next_buffer (jitterbuffer);
3260     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3261       /* now wait for the next event */
3262       JBUF_WAIT_EVENT (priv, flushing);
3263       result = GST_FLOW_OK;
3264     }
3265   }
3266   while (result == GST_FLOW_OK);
3267   /* store result for upstream */
3268   priv->srcresult = result;
3269   /* if we get here we need to pause */
3270   goto pause;
3271
3272   /* ERRORS */
3273 flushing:
3274   {
3275     result = priv->srcresult;
3276     goto pause;
3277   }
3278 pause:
3279   {
3280     GstEvent *event;
3281
3282     JBUF_SIGNAL_QUERY (priv, FALSE);
3283     JBUF_UNLOCK (priv);
3284
3285     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3286         gst_flow_get_name (result));
3287     gst_pad_pause_task (priv->srcpad);
3288     if (result == GST_FLOW_EOS) {
3289       event = gst_event_new_eos ();
3290       gst_pad_push_event (priv->srcpad, event);
3291     }
3292     return;
3293   }
3294 }
3295
3296 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3297  * some sanity checks and then emit the handle-sync signal with the parameters.
3298  * This function must be called with the LOCK */
3299 static void
3300 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3301 {
3302   GstRtpJitterBufferPrivate *priv;
3303   guint64 base_rtptime, base_time;
3304   guint32 clock_rate;
3305   guint64 last_rtptime;
3306   guint64 clock_base;
3307   guint64 ext_rtptime, diff;
3308   gboolean valid = TRUE, keep = FALSE;
3309
3310   priv = jitterbuffer->priv;
3311
3312   /* get the last values from the jitterbuffer */
3313   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3314       &clock_rate, &last_rtptime);
3315
3316   clock_base = priv->clock_base;
3317   ext_rtptime = priv->ext_rtptime;
3318
3319   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3320       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3321       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3322       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3323
3324   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3325     /* we keep this SR packet for later. When we get a valid RTP packet the
3326      * above values will be set and we can try to use the SR packet */
3327     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3328     keep = TRUE;
3329   } else {
3330     /* we can't accept anything that happened before we did the last resync */
3331     if (base_rtptime > ext_rtptime) {
3332       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3333       valid = FALSE;
3334     } else {
3335       /* the SR RTP timestamp must be something close to what we last observed
3336        * in the jitterbuffer */
3337       if (ext_rtptime > last_rtptime) {
3338         /* check how far ahead it is to our RTP timestamps */
3339         diff = ext_rtptime - last_rtptime;
3340         /* if bigger than 1 second, we drop it */
3341         if (diff > clock_rate) {
3342           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3343           /* should drop this, but some RTSP servers end up with bogus
3344            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3345            * so still trigger rptbin sync but invalidate RTCP data
3346            * (sync might use other methods) */
3347           ext_rtptime = -1;
3348         }
3349         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3350             G_GUINT64_FORMAT, last_rtptime, diff);
3351       }
3352     }
3353   }
3354
3355   if (keep) {
3356     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3357   } else if (valid) {
3358     GstStructure *s;
3359
3360     s = gst_structure_new ("application/x-rtp-sync",
3361         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3362         "base-time", G_TYPE_UINT64, base_time,
3363         "clock-rate", G_TYPE_UINT, clock_rate,
3364         "clock-base", G_TYPE_UINT64, clock_base,
3365         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3366         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3367
3368     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3369     gst_buffer_replace (&priv->last_sr, NULL);
3370     JBUF_UNLOCK (priv);
3371     g_signal_emit (jitterbuffer,
3372         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3373     JBUF_LOCK (priv);
3374     gst_structure_free (s);
3375   } else {
3376     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3377     gst_buffer_replace (&priv->last_sr, NULL);
3378   }
3379 }
3380
3381 static GstFlowReturn
3382 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3383     GstBuffer * buffer)
3384 {
3385   GstRtpJitterBuffer *jitterbuffer;
3386   GstRtpJitterBufferPrivate *priv;
3387   GstFlowReturn ret = GST_FLOW_OK;
3388   guint32 ssrc;
3389   GstRTCPPacket packet;
3390   guint64 ext_rtptime;
3391   guint32 rtptime;
3392   GstRTCPBuffer rtcp = { NULL, };
3393
3394   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3395
3396   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3397     goto invalid_buffer;
3398
3399   priv = jitterbuffer->priv;
3400
3401   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3402
3403   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3404     goto empty_buffer;
3405
3406   /* first packet must be SR or RR or else the validate would have failed */
3407   switch (gst_rtcp_packet_get_type (&packet)) {
3408     case GST_RTCP_TYPE_SR:
3409       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3410           NULL, NULL);
3411       break;
3412     default:
3413       goto ignore_buffer;
3414   }
3415   gst_rtcp_buffer_unmap (&rtcp);
3416
3417   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3418
3419   JBUF_LOCK (priv);
3420   /* convert the RTP timestamp to our extended timestamp, using the same offset
3421    * we used in the jitterbuffer */
3422   ext_rtptime = priv->jbuf->ext_rtptime;
3423   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3424
3425   priv->ext_rtptime = ext_rtptime;
3426   gst_buffer_replace (&priv->last_sr, buffer);
3427
3428   do_handle_sync (jitterbuffer);
3429   JBUF_UNLOCK (priv);
3430
3431 done:
3432   gst_buffer_unref (buffer);
3433
3434   return ret;
3435
3436 invalid_buffer:
3437   {
3438     /* this is not fatal but should be filtered earlier */
3439     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3440         ("Received invalid RTCP payload, dropping"));
3441     ret = GST_FLOW_OK;
3442     goto done;
3443   }
3444 empty_buffer:
3445   {
3446     /* this is not fatal but should be filtered earlier */
3447     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3448         ("Received empty RTCP payload, dropping"));
3449     gst_rtcp_buffer_unmap (&rtcp);
3450     ret = GST_FLOW_OK;
3451     goto done;
3452   }
3453 ignore_buffer:
3454   {
3455     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3456     gst_rtcp_buffer_unmap (&rtcp);
3457     ret = GST_FLOW_OK;
3458     goto done;
3459   }
3460 }
3461
3462 static gboolean
3463 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3464     GstQuery * query)
3465 {
3466   gboolean res = FALSE;
3467   GstRtpJitterBuffer *jitterbuffer;
3468   GstRtpJitterBufferPrivate *priv;
3469
3470   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3471   priv = jitterbuffer->priv;
3472
3473   switch (GST_QUERY_TYPE (query)) {
3474     case GST_QUERY_CAPS:
3475     {
3476       GstCaps *filter, *caps;
3477
3478       gst_query_parse_caps (query, &filter);
3479       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3480       gst_query_set_caps_result (query, caps);
3481       gst_caps_unref (caps);
3482       res = TRUE;
3483       break;
3484     }
3485     default:
3486       if (GST_QUERY_IS_SERIALIZED (query)) {
3487         RTPJitterBufferItem *item;
3488         gboolean head;
3489
3490         JBUF_LOCK_CHECK (priv, out_flushing);
3491         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3492             RTP_JITTER_BUFFER_MODE_BUFFER) {
3493           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3494           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3495           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3496           if (head)
3497             JBUF_SIGNAL_EVENT (priv);
3498           JBUF_WAIT_QUERY (priv, out_flushing);
3499           res = priv->last_query;
3500         } else {
3501           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3502           res = FALSE;
3503         }
3504         JBUF_UNLOCK (priv);
3505       } else {
3506         res = gst_pad_query_default (pad, parent, query);
3507       }
3508       break;
3509   }
3510   return res;
3511   /* ERRORS */
3512 out_flushing:
3513   {
3514     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3515     JBUF_UNLOCK (priv);
3516     return FALSE;
3517   }
3518
3519 }
3520
3521 static gboolean
3522 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3523     GstQuery * query)
3524 {
3525   GstRtpJitterBuffer *jitterbuffer;
3526   GstRtpJitterBufferPrivate *priv;
3527   gboolean res = FALSE;
3528
3529   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3530   priv = jitterbuffer->priv;
3531
3532   switch (GST_QUERY_TYPE (query)) {
3533     case GST_QUERY_LATENCY:
3534     {
3535       /* We need to send the query upstream and add the returned latency to our
3536        * own */
3537       GstClockTime min_latency, max_latency;
3538       gboolean us_live;
3539       GstClockTime our_latency;
3540
3541       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3542         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3543
3544         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3545             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3546             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3547
3548         /* store this so that we can safely sync on the peer buffers. */
3549         JBUF_LOCK (priv);
3550         priv->peer_latency = min_latency;
3551         our_latency = priv->latency_ns;
3552         JBUF_UNLOCK (priv);
3553
3554         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3555             GST_TIME_ARGS (our_latency));
3556
3557         /* we add some latency but can buffer an infinite amount of time */
3558         min_latency += our_latency;
3559         max_latency = -1;
3560
3561         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3562             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3563             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3564
3565         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3566       }
3567       break;
3568     }
3569     case GST_QUERY_POSITION:
3570     {
3571       GstClockTime start, last_out;
3572       GstFormat fmt;
3573
3574       gst_query_parse_position (query, &fmt, NULL);
3575       if (fmt != GST_FORMAT_TIME) {
3576         res = gst_pad_query_default (pad, parent, query);
3577         break;
3578       }
3579
3580       JBUF_LOCK (priv);
3581       start = priv->npt_start;
3582       last_out = priv->last_out_time;
3583       JBUF_UNLOCK (priv);
3584
3585       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3586           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3587           GST_TIME_ARGS (last_out));
3588
3589       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3590         /* bring 0-based outgoing time to stream time */
3591         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3592         res = TRUE;
3593       } else {
3594         res = gst_pad_query_default (pad, parent, query);
3595       }
3596       break;
3597     }
3598     case GST_QUERY_CAPS:
3599     {
3600       GstCaps *filter, *caps;
3601
3602       gst_query_parse_caps (query, &filter);
3603       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3604       gst_query_set_caps_result (query, caps);
3605       gst_caps_unref (caps);
3606       res = TRUE;
3607       break;
3608     }
3609     default:
3610       res = gst_pad_query_default (pad, parent, query);
3611       break;
3612   }
3613
3614   return res;
3615 }
3616
3617 static void
3618 gst_rtp_jitter_buffer_set_property (GObject * object,
3619     guint prop_id, const GValue * value, GParamSpec * pspec)
3620 {
3621   GstRtpJitterBuffer *jitterbuffer;
3622   GstRtpJitterBufferPrivate *priv;
3623
3624   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3625   priv = jitterbuffer->priv;
3626
3627   switch (prop_id) {
3628     case PROP_LATENCY:
3629     {
3630       guint new_latency, old_latency;
3631
3632       new_latency = g_value_get_uint (value);
3633
3634       JBUF_LOCK (priv);
3635       old_latency = priv->latency_ms;
3636       priv->latency_ms = new_latency;
3637       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3638       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3639       JBUF_UNLOCK (priv);
3640
3641       /* post message if latency changed, this will inform the parent pipeline
3642        * that a latency reconfiguration is possible/needed. */
3643       if (new_latency != old_latency) {
3644         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3645             GST_TIME_ARGS (new_latency * GST_MSECOND));
3646
3647         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3648             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3649       }
3650       break;
3651     }
3652     case PROP_DROP_ON_LATENCY:
3653       JBUF_LOCK (priv);
3654       priv->drop_on_latency = g_value_get_boolean (value);
3655       JBUF_UNLOCK (priv);
3656       break;
3657     case PROP_TS_OFFSET:
3658       JBUF_LOCK (priv);
3659       priv->ts_offset = g_value_get_int64 (value);
3660       priv->ts_discont = TRUE;
3661       JBUF_UNLOCK (priv);
3662       break;
3663     case PROP_DO_LOST:
3664       JBUF_LOCK (priv);
3665       priv->do_lost = g_value_get_boolean (value);
3666       JBUF_UNLOCK (priv);
3667       break;
3668     case PROP_MODE:
3669       JBUF_LOCK (priv);
3670       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3671       JBUF_UNLOCK (priv);
3672       break;
3673     case PROP_DO_RETRANSMISSION:
3674       JBUF_LOCK (priv);
3675       priv->do_retransmission = g_value_get_boolean (value);
3676       JBUF_UNLOCK (priv);
3677       break;
3678     case PROP_RTX_NEXT_SEQNUM:
3679       JBUF_LOCK (priv);
3680       priv->rtx_next_seqnum = g_value_get_boolean (value);
3681       JBUF_UNLOCK (priv);
3682       break;
3683     case PROP_RTX_DELAY:
3684       JBUF_LOCK (priv);
3685       priv->rtx_delay = g_value_get_int (value);
3686       JBUF_UNLOCK (priv);
3687       break;
3688     case PROP_RTX_MIN_DELAY:
3689       JBUF_LOCK (priv);
3690       priv->rtx_min_delay = g_value_get_uint (value);
3691       JBUF_UNLOCK (priv);
3692       break;
3693     case PROP_RTX_DELAY_REORDER:
3694       JBUF_LOCK (priv);
3695       priv->rtx_delay_reorder = g_value_get_int (value);
3696       JBUF_UNLOCK (priv);
3697       break;
3698     case PROP_RTX_RETRY_TIMEOUT:
3699       JBUF_LOCK (priv);
3700       priv->rtx_retry_timeout = g_value_get_int (value);
3701       JBUF_UNLOCK (priv);
3702       break;
3703     case PROP_RTX_MIN_RETRY_TIMEOUT:
3704       JBUF_LOCK (priv);
3705       priv->rtx_min_retry_timeout = g_value_get_int (value);
3706       JBUF_UNLOCK (priv);
3707       break;
3708     case PROP_RTX_RETRY_PERIOD:
3709       JBUF_LOCK (priv);
3710       priv->rtx_retry_period = g_value_get_int (value);
3711       JBUF_UNLOCK (priv);
3712       break;
3713     case PROP_RTX_MAX_RETRIES:
3714       JBUF_LOCK (priv);
3715       priv->rtx_max_retries = g_value_get_int (value);
3716       JBUF_UNLOCK (priv);
3717       break;
3718     default:
3719       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3720       break;
3721   }
3722 }
3723
3724 static void
3725 gst_rtp_jitter_buffer_get_property (GObject * object,
3726     guint prop_id, GValue * value, GParamSpec * pspec)
3727 {
3728   GstRtpJitterBuffer *jitterbuffer;
3729   GstRtpJitterBufferPrivate *priv;
3730
3731   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3732   priv = jitterbuffer->priv;
3733
3734   switch (prop_id) {
3735     case PROP_LATENCY:
3736       JBUF_LOCK (priv);
3737       g_value_set_uint (value, priv->latency_ms);
3738       JBUF_UNLOCK (priv);
3739       break;
3740     case PROP_DROP_ON_LATENCY:
3741       JBUF_LOCK (priv);
3742       g_value_set_boolean (value, priv->drop_on_latency);
3743       JBUF_UNLOCK (priv);
3744       break;
3745     case PROP_TS_OFFSET:
3746       JBUF_LOCK (priv);
3747       g_value_set_int64 (value, priv->ts_offset);
3748       JBUF_UNLOCK (priv);
3749       break;
3750     case PROP_DO_LOST:
3751       JBUF_LOCK (priv);
3752       g_value_set_boolean (value, priv->do_lost);
3753       JBUF_UNLOCK (priv);
3754       break;
3755     case PROP_MODE:
3756       JBUF_LOCK (priv);
3757       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3758       JBUF_UNLOCK (priv);
3759       break;
3760     case PROP_PERCENT:
3761     {
3762       gint percent;
3763
3764       JBUF_LOCK (priv);
3765       if (priv->srcresult != GST_FLOW_OK)
3766         percent = 100;
3767       else
3768         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3769
3770       g_value_set_int (value, percent);
3771       JBUF_UNLOCK (priv);
3772       break;
3773     }
3774     case PROP_DO_RETRANSMISSION:
3775       JBUF_LOCK (priv);
3776       g_value_set_boolean (value, priv->do_retransmission);
3777       JBUF_UNLOCK (priv);
3778       break;
3779     case PROP_RTX_NEXT_SEQNUM:
3780       JBUF_LOCK (priv);
3781       g_value_set_boolean (value, priv->rtx_next_seqnum);
3782       JBUF_UNLOCK (priv);
3783       break;
3784     case PROP_RTX_DELAY:
3785       JBUF_LOCK (priv);
3786       g_value_set_int (value, priv->rtx_delay);
3787       JBUF_UNLOCK (priv);
3788       break;
3789     case PROP_RTX_MIN_DELAY:
3790       JBUF_LOCK (priv);
3791       g_value_set_uint (value, priv->rtx_min_delay);
3792       JBUF_UNLOCK (priv);
3793       break;
3794     case PROP_RTX_DELAY_REORDER:
3795       JBUF_LOCK (priv);
3796       g_value_set_int (value, priv->rtx_delay_reorder);
3797       JBUF_UNLOCK (priv);
3798       break;
3799     case PROP_RTX_RETRY_TIMEOUT:
3800       JBUF_LOCK (priv);
3801       g_value_set_int (value, priv->rtx_retry_timeout);
3802       JBUF_UNLOCK (priv);
3803       break;
3804     case PROP_RTX_MIN_RETRY_TIMEOUT:
3805       JBUF_LOCK (priv);
3806       g_value_set_int (value, priv->rtx_min_retry_timeout);
3807       JBUF_UNLOCK (priv);
3808       break;
3809     case PROP_RTX_RETRY_PERIOD:
3810       JBUF_LOCK (priv);
3811       g_value_set_int (value, priv->rtx_retry_period);
3812       JBUF_UNLOCK (priv);
3813       break;
3814     case PROP_RTX_MAX_RETRIES:
3815       JBUF_LOCK (priv);
3816       g_value_set_int (value, priv->rtx_max_retries);
3817       JBUF_UNLOCK (priv);
3818       break;
3819     case PROP_STATS:
3820       g_value_take_boxed (value,
3821           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3822       break;
3823     default:
3824       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3825       break;
3826   }
3827 }
3828
3829 static GstStructure *
3830 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3831 {
3832   GstStructure *s;
3833
3834   JBUF_LOCK (jbuf->priv);
3835   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3836       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3837       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3838       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3839       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3840   JBUF_UNLOCK (jbuf->priv);
3841
3842   return s;
3843 }