rtpjitterbuffer: Add "rtx-max-retries" property
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  */
95
96 #ifdef HAVE_CONFIG_H
97 #include "config.h"
98 #endif
99
100 #include <stdlib.h>
101 #include <string.h>
102 #include <gst/rtp/gstrtpbuffer.h>
103
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
107
108 #include <gst/glib-compat-private.h>
109
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
112
113 /* RTPJitterBuffer signals and args */
114 enum
115 {
116   SIGNAL_REQUEST_PT_MAP,
117   SIGNAL_CLEAR_PT_MAP,
118   SIGNAL_HANDLE_SYNC,
119   SIGNAL_ON_NPT_STOP,
120   SIGNAL_SET_ACTIVE,
121   LAST_SIGNAL
122 };
123
124 #define DEFAULT_LATENCY_MS          200
125 #define DEFAULT_DROP_ON_LATENCY     FALSE
126 #define DEFAULT_TS_OFFSET           0
127 #define DEFAULT_DO_LOST             FALSE
128 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT             0
130 #define DEFAULT_DO_RETRANSMISSION   FALSE
131 #define DEFAULT_RTX_DELAY           -1
132 #define DEFAULT_RTX_MIN_DELAY       0
133 #define DEFAULT_RTX_DELAY_REORDER   3
134 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
135 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
136 #define DEFAULT_RTX_RETRY_PERIOD    -1
137 #define DEFAULT_RTX_MAX_RETRIES    -1
138
139 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
140 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
141
142 enum
143 {
144   PROP_0,
145   PROP_LATENCY,
146   PROP_DROP_ON_LATENCY,
147   PROP_TS_OFFSET,
148   PROP_DO_LOST,
149   PROP_MODE,
150   PROP_PERCENT,
151   PROP_DO_RETRANSMISSION,
152   PROP_RTX_DELAY,
153   PROP_RTX_MIN_DELAY,
154   PROP_RTX_DELAY_REORDER,
155   PROP_RTX_RETRY_TIMEOUT,
156   PROP_RTX_MIN_RETRY_TIMEOUT,
157   PROP_RTX_RETRY_PERIOD,
158   PROP_RTX_MAX_RETRIES,
159   PROP_STATS,
160   PROP_LAST
161 };
162
163 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
164
165 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
166   JBUF_LOCK (priv);                                   \
167   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
168     goto label;                                       \
169 } G_STMT_END
170 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
171
172 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
173   GST_DEBUG ("waiting timer");                            \
174   (priv)->waiting_timer = TRUE;                           \
175   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
176   (priv)->waiting_timer = FALSE;                          \
177   GST_DEBUG ("waiting timer done");                       \
178 } G_STMT_END
179 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
180   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
181     GST_DEBUG ("signal timer");                           \
182     g_cond_signal (&(priv)->jbuf_timer);                  \
183   }                                                       \
184 } G_STMT_END
185
186 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
187   GST_DEBUG ("waiting event");                           \
188   (priv)->waiting_event = TRUE;                          \
189   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
190   (priv)->waiting_event = FALSE;                         \
191   GST_DEBUG ("waiting event done");                      \
192   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
193     goto label;                                          \
194 } G_STMT_END
195 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
196   if (G_UNLIKELY ((priv)->waiting_event)) {              \
197     GST_DEBUG ("signal event");                          \
198     g_cond_signal (&(priv)->jbuf_event);                 \
199   }                                                      \
200 } G_STMT_END
201
202 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
203   GST_DEBUG ("waiting query");                           \
204   (priv)->waiting_query = TRUE;                          \
205   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
206   (priv)->waiting_query = FALSE;                         \
207   GST_DEBUG ("waiting query done");                      \
208   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
209     goto label;                                          \
210 } G_STMT_END
211 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
212   (priv)->last_query = res;                              \
213   if (G_UNLIKELY ((priv)->waiting_query)) {              \
214     GST_DEBUG ("signal query");                          \
215     g_cond_signal (&(priv)->jbuf_query);                 \
216   }                                                      \
217 } G_STMT_END
218
219
220 struct _GstRtpJitterBufferPrivate
221 {
222   GstPad *sinkpad, *srcpad;
223   GstPad *rtcpsinkpad;
224
225   RTPJitterBuffer *jbuf;
226   GMutex jbuf_lock;
227   gboolean waiting_timer;
228   GCond jbuf_timer;
229   gboolean waiting_event;
230   GCond jbuf_event;
231   gboolean waiting_query;
232   GCond jbuf_query;
233   gboolean last_query;
234   gboolean discont;
235   gboolean ts_discont;
236   gboolean active;
237   guint64 out_offset;
238
239   gboolean timer_running;
240   GThread *timer_thread;
241
242   /* properties */
243   guint latency_ms;
244   guint64 latency_ns;
245   gboolean drop_on_latency;
246   gint64 ts_offset;
247   gboolean do_lost;
248   gboolean do_retransmission;
249   gint rtx_delay;
250   guint rtx_min_delay;
251   gint rtx_delay_reorder;
252   gint rtx_retry_timeout;
253   gint rtx_min_retry_timeout;
254   gint rtx_retry_period;
255   gint rtx_max_retries;
256
257   /* the last seqnum we pushed out */
258   guint32 last_popped_seqnum;
259   /* the next expected seqnum we push */
260   guint32 next_seqnum;
261   /* seqnum-base, if known */
262   guint32 seqnum_base;
263   /* last output time */
264   GstClockTime last_out_time;
265   /* last valid input timestamp and rtptime pair */
266   GstClockTime ips_dts;
267   guint64 ips_rtptime;
268   GstClockTime packet_spacing;
269
270   /* the next expected seqnum we receive */
271   GstClockTime last_in_dts;
272   guint32 last_in_seqnum;
273   guint32 next_in_seqnum;
274
275   GArray *timers;
276
277   /* start and stop ranges */
278   GstClockTime npt_start;
279   GstClockTime npt_stop;
280   guint64 ext_timestamp;
281   guint64 last_elapsed;
282   guint64 estimated_eos;
283   GstClockID eos_id;
284
285   /* state */
286   gboolean eos;
287   guint last_percent;
288
289   /* clock rate and rtp timestamp offset */
290   gint last_pt;
291   gint32 clock_rate;
292   gint64 clock_base;
293   gint64 prev_ts_offset;
294
295   /* when we are shutting down */
296   GstFlowReturn srcresult;
297   gboolean blocked;
298
299   /* for sync */
300   GstSegment segment;
301   GstClockID clock_id;
302   GstClockTime timer_timeout;
303   guint16 timer_seqnum;
304   /* the latency of the upstream peer, we have to take this into account when
305    * synchronizing the buffers. */
306   GstClockTime peer_latency;
307   guint64 ext_rtptime;
308   GstBuffer *last_sr;
309
310   /* some accounting */
311   guint64 num_late;
312   guint64 num_duplicates;
313   guint64 num_rtx_requests;
314   guint64 num_rtx_success;
315   guint64 num_rtx_failed;
316   gdouble avg_rtx_num;
317   guint64 avg_rtx_rtt;
318
319   /* for the jitter */
320   GstClockTime last_dts;
321   guint64 last_rtptime;
322   GstClockTime avg_jitter;
323 };
324
325 typedef enum
326 {
327   TIMER_TYPE_EXPECTED,
328   TIMER_TYPE_LOST,
329   TIMER_TYPE_DEADLINE,
330   TIMER_TYPE_EOS
331 } TimerType;
332
333 typedef struct
334 {
335   guint idx;
336   guint16 seqnum;
337   guint num;
338   TimerType type;
339   GstClockTime timeout;
340   GstClockTime duration;
341   GstClockTime rtx_base;
342   GstClockTime rtx_delay;
343   GstClockTime rtx_retry;
344   GstClockTime rtx_last;
345   guint num_rtx_retry;
346 } TimerData;
347
348 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
349   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
350                                 GstRtpJitterBufferPrivate))
351
352 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
353 GST_STATIC_PAD_TEMPLATE ("sink",
354     GST_PAD_SINK,
355     GST_PAD_ALWAYS,
356     GST_STATIC_CAPS ("application/x-rtp"
357         /* "clock-rate = (int) [ 1, 2147483647 ], "
358          * "payload = (int) , "
359          * "encoding-name = (string) "
360          */ )
361     );
362
363 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
364 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
365     GST_PAD_SINK,
366     GST_PAD_REQUEST,
367     GST_STATIC_CAPS ("application/x-rtcp")
368     );
369
370 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
371 GST_STATIC_PAD_TEMPLATE ("src",
372     GST_PAD_SRC,
373     GST_PAD_ALWAYS,
374     GST_STATIC_CAPS ("application/x-rtp"
375         /* "payload = (int) , "
376          * "clock-rate = (int) , "
377          * "encoding-name = (string) "
378          */ )
379     );
380
381 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
382
383 #define gst_rtp_jitter_buffer_parent_class parent_class
384 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
385
386 /* object overrides */
387 static void gst_rtp_jitter_buffer_set_property (GObject * object,
388     guint prop_id, const GValue * value, GParamSpec * pspec);
389 static void gst_rtp_jitter_buffer_get_property (GObject * object,
390     guint prop_id, GValue * value, GParamSpec * pspec);
391 static void gst_rtp_jitter_buffer_finalize (GObject * object);
392
393 /* element overrides */
394 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
395     * element, GstStateChange transition);
396 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
397     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
398 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
399     GstPad * pad);
400 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
401
402 /* pad overrides */
403 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
404 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
405     GstObject * parent);
406
407 /* sinkpad overrides */
408 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
409     GstObject * parent, GstEvent * event);
410 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
411     GstObject * parent, GstBuffer * buffer);
412
413 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
414     GstObject * parent, GstEvent * event);
415 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
416     GstObject * parent, GstBuffer * buffer);
417
418 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
419     GstObject * parent, GstQuery * query);
420
421 /* srcpad overrides */
422 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
423     GstObject * parent, GstEvent * event);
424 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
425     GstObject * parent, GstPadMode mode, gboolean active);
426 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
427 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
428     GstObject * parent, GstQuery * query);
429
430 static void
431 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
432 static GstClockTime
433 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
434     gboolean active, guint64 base_time);
435 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
436
437 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
438 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
439
440 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
441
442 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
443     jitterbuffer);
444
445 static void
446 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
447 {
448   GObjectClass *gobject_class;
449   GstElementClass *gstelement_class;
450
451   gobject_class = (GObjectClass *) klass;
452   gstelement_class = (GstElementClass *) klass;
453
454   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
455
456   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
457
458   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
459   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
460
461   /**
462    * GstRtpJitterBuffer:latency:
463    *
464    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
465    * for at most this time.
466    */
467   g_object_class_install_property (gobject_class, PROP_LATENCY,
468       g_param_spec_uint ("latency", "Buffer latency in ms",
469           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
470           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471   /**
472    * GstRtpJitterBuffer:drop-on-latency:
473    *
474    * Drop oldest buffers when the queue is completely filled.
475    */
476   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
477       g_param_spec_boolean ("drop-on-latency",
478           "Drop buffers when maximum latency is reached",
479           "Tells the jitterbuffer to never exceed the given latency in size",
480           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
481   /**
482    * GstRtpJitterBuffer:ts-offset:
483    *
484    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
485    * This is mainly used to ensure interstream synchronisation.
486    */
487   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
488       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
489           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
490           G_MAXINT64, DEFAULT_TS_OFFSET,
491           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
492
493   /**
494    * GstRtpJitterBuffer:do-lost:
495    *
496    * Send out a GstRTPPacketLost event downstream when a packet is considered
497    * lost.
498    */
499   g_object_class_install_property (gobject_class, PROP_DO_LOST,
500       g_param_spec_boolean ("do-lost", "Do Lost",
501           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
502           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
503
504   /**
505    * GstRtpJitterBuffer:mode:
506    *
507    * Control the buffering and timestamping mode used by the jitterbuffer.
508    */
509   g_object_class_install_property (gobject_class, PROP_MODE,
510       g_param_spec_enum ("mode", "Mode",
511           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
512           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
513   /**
514    * GstRtpJitterBuffer:percent:
515    *
516    * The percent of the jitterbuffer that is filled.
517    */
518   g_object_class_install_property (gobject_class, PROP_PERCENT,
519       g_param_spec_int ("percent", "percent",
520           "The buffer filled percent", 0, 100,
521           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
522   /**
523    * GstRtpJitterBuffer:do-retransmission:
524    *
525    * Send out a GstRTPRetransmission event upstream when a packet is considered
526    * late and should be retransmitted.
527    *
528    * Since: 1.2
529    */
530   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
531       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
532           "Send retransmission events upstream when a packet is late",
533           DEFAULT_DO_RETRANSMISSION,
534           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
535
536   /**
537    * GstRtpJitterBuffer:rtx-delay:
538    *
539    * When a packet did not arrive at the expected time, wait this extra amount
540    * of time before sending a retransmission event.
541    *
542    * When -1 is used, the max jitter will be used as extra delay.
543    *
544    * Since: 1.2
545    */
546   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
547       g_param_spec_int ("rtx-delay", "RTX Delay",
548           "Extra time in ms to wait before sending retransmission "
549           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
550           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
551
552   /**
553    * GstRtpJitterBuffer:rtx-min-delay:
554    *
555    * When a packet did not arrive at the expected time, wait at least this extra amount
556    * of time before sending a retransmission event.
557    *
558    * Since: 1.6
559    */
560   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
561       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
562           "Minimum time in ms to wait before sending retransmission "
563           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
564           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
565   /**
566    * GstRtpJitterBuffer:rtx-delay-reorder:
567    *
568    * Assume that a retransmission event should be sent when we see
569    * this much packet reordering.
570    *
571    * When -1 is used, the value will be estimated based on observed packet
572    * reordering.
573    *
574    * Since: 1.2
575    */
576   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
577       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
578           "Sending retransmission event when this much reordering (-1 automatic)",
579           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
580           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
581   /**
582    * GstRtpJitterBuffer::rtx-retry-timeout:
583    *
584    * When no packet has been received after sending a retransmission event
585    * for this time, retry sending a retransmission event.
586    *
587    * When -1 is used, the value will be estimated based on observed round
588    * trip time.
589    *
590    * Since: 1.2
591    */
592   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
593       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
594           "Retry sending a transmission event after this timeout in "
595           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
596           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
597   /**
598    * GstRtpJitterBuffer::rtx-min-retry-timeout:
599    *
600    * The minimum amount of time between retry timeouts. When
601    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
602    * minimum interval between retry timeouts.
603    *
604    * When -1 is used, the value will be estimated based on the
605    * packet spacing.
606    *
607    * Since: 1.6
608    */
609   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
610       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
611           "Minimum timeout between sending a transmission event in "
612           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
613           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
614   /**
615    * GstRtpJitterBuffer:rtx-retry-period:
616    *
617    * The amount of time to try to get a retransmission.
618    *
619    * When -1 is used, the value will be estimated based on the jitterbuffer
620    * latency and the observed round trip time.
621    *
622    * Since: 1.2
623    */
624   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
625       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
626           "Try to get a retransmission for this many ms "
627           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
628           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
629   /**
630    * GstRtpJitterBuffer:rtx-max-retries:
631    *
632    * The maximum number of retries to request a retransmission.
633    *
634    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
635    * When -1 is used, the number of retransmission request will not be limited.
636    *
637    * Since: 1.6
638    */
639   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
640       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
641           "The maximum number of retries to request a retransmission. "
642           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
643           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
644   /**
645    * GstRtpJitterBuffer:stats:
646    *
647    * Various jitterbuffer statistics. This property returns a GstStructure
648    * with name application/x-rtp-jitterbuffer-stats with the following fields:
649    *
650    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
651    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
652    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
653    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
654    *
655    * Since: 1.4
656    */
657   g_object_class_install_property (gobject_class, PROP_STATS,
658       g_param_spec_boxed ("stats", "Statistics",
659           "Various statistics", GST_TYPE_STRUCTURE,
660           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
661
662   /**
663    * GstRtpJitterBuffer::request-pt-map:
664    * @buffer: the object which received the signal
665    * @pt: the pt
666    *
667    * Request the payload type as #GstCaps for @pt.
668    */
669   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
670       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
671       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
672           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
673       GST_TYPE_CAPS, 1, G_TYPE_UINT);
674   /**
675    * GstRtpJitterBuffer::handle-sync:
676    * @buffer: the object which received the signal
677    * @struct: a GstStructure containing sync values.
678    *
679    * Be notified of new sync values.
680    */
681   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
682       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
683       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
684           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
685       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
686
687   /**
688    * GstRtpJitterBuffer::on-npt-stop:
689    * @buffer: the object which received the signal
690    *
691    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
692    * the npt-stop position.
693    */
694   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
695       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
696       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
697           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
698       G_TYPE_NONE, 0, G_TYPE_NONE);
699
700   /**
701    * GstRtpJitterBuffer::clear-pt-map:
702    * @buffer: the object which received the signal
703    *
704    * Invalidate the clock-rate as obtained with the
705    * #GstRtpJitterBuffer::request-pt-map signal.
706    */
707   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
708       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
709       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
710       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
711       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
712
713   /**
714    * GstRtpJitterBuffer::set-active:
715    * @buffer: the object which received the signal
716    *
717    * Start pushing out packets with the given base time. This signal is only
718    * useful in buffering mode.
719    *
720    * Returns: the time of the last pushed packet.
721    */
722   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
723       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
724       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
725       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
726       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
727       G_TYPE_UINT64);
728
729   gstelement_class->change_state =
730       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
731   gstelement_class->request_new_pad =
732       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
733   gstelement_class->release_pad =
734       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
735   gstelement_class->provide_clock =
736       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
737
738   gst_element_class_add_pad_template (gstelement_class,
739       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
740   gst_element_class_add_pad_template (gstelement_class,
741       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
742   gst_element_class_add_pad_template (gstelement_class,
743       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
744
745   gst_element_class_set_static_metadata (gstelement_class,
746       "RTP packet jitter-buffer", "Filter/Network/RTP",
747       "A buffer that deals with network jitter and other transmission faults",
748       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
749       "Wim Taymans <wim.taymans@gmail.com>");
750
751   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
752   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
753
754   GST_DEBUG_CATEGORY_INIT
755       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
756 }
757
758 static void
759 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
760 {
761   GstRtpJitterBufferPrivate *priv;
762
763   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
764   jitterbuffer->priv = priv;
765
766   priv->latency_ms = DEFAULT_LATENCY_MS;
767   priv->latency_ns = priv->latency_ms * GST_MSECOND;
768   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
769   priv->do_lost = DEFAULT_DO_LOST;
770   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
771   priv->rtx_delay = DEFAULT_RTX_DELAY;
772   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
773   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
774   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
775   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
776   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
777   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
778
779   priv->last_dts = -1;
780   priv->last_rtptime = -1;
781   priv->avg_jitter = 0;
782   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
783   priv->jbuf = rtp_jitter_buffer_new ();
784   g_mutex_init (&priv->jbuf_lock);
785   g_cond_init (&priv->jbuf_timer);
786   g_cond_init (&priv->jbuf_event);
787   g_cond_init (&priv->jbuf_query);
788
789   /* reset skew detection initialy */
790   rtp_jitter_buffer_reset_skew (priv->jbuf);
791   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
792   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
793   priv->active = TRUE;
794
795   priv->srcpad =
796       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
797       "src");
798
799   gst_pad_set_activatemode_function (priv->srcpad,
800       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
801   gst_pad_set_query_function (priv->srcpad,
802       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
803   gst_pad_set_event_function (priv->srcpad,
804       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
805
806   priv->sinkpad =
807       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
808       "sink");
809
810   gst_pad_set_chain_function (priv->sinkpad,
811       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
812   gst_pad_set_event_function (priv->sinkpad,
813       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
814   gst_pad_set_query_function (priv->sinkpad,
815       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
816
817   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
818   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
819
820   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
821 }
822
823 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
824
825 #define ITEM_TYPE_BUFFER        0
826 #define ITEM_TYPE_LOST          1
827 #define ITEM_TYPE_EVENT         2
828 #define ITEM_TYPE_QUERY         3
829
830 static RTPJitterBufferItem *
831 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
832     guint seqnum, guint count, guint rtptime)
833 {
834   RTPJitterBufferItem *item;
835
836   item = g_slice_new (RTPJitterBufferItem);
837   item->data = data;
838   item->next = NULL;
839   item->prev = NULL;
840   item->type = type;
841   item->dts = dts;
842   item->pts = pts;
843   item->seqnum = seqnum;
844   item->count = count;
845   item->rtptime = rtptime;
846
847   return item;
848 }
849
850 static void
851 free_item (RTPJitterBufferItem * item)
852 {
853   if (item->data && item->type != ITEM_TYPE_QUERY)
854     gst_mini_object_unref (item->data);
855   g_slice_free (RTPJitterBufferItem, item);
856 }
857
858 static void
859 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
860 {
861   GList **l = user_data;
862
863   if (item->data && item->type == ITEM_TYPE_EVENT
864       && GST_EVENT_IS_STICKY (item->data)) {
865     *l = g_list_prepend (*l, item->data);
866   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
867     gst_mini_object_unref (item->data);
868   }
869   g_slice_free (RTPJitterBufferItem, item);
870 }
871
872 static void
873 gst_rtp_jitter_buffer_finalize (GObject * object)
874 {
875   GstRtpJitterBuffer *jitterbuffer;
876   GstRtpJitterBufferPrivate *priv;
877
878   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
879   priv = jitterbuffer->priv;
880
881   g_array_free (priv->timers, TRUE);
882   g_mutex_clear (&priv->jbuf_lock);
883   g_cond_clear (&priv->jbuf_timer);
884   g_cond_clear (&priv->jbuf_event);
885   g_cond_clear (&priv->jbuf_query);
886
887   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
888   g_object_unref (priv->jbuf);
889
890   G_OBJECT_CLASS (parent_class)->finalize (object);
891 }
892
893 static GstIterator *
894 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
895 {
896   GstRtpJitterBuffer *jitterbuffer;
897   GstPad *otherpad = NULL;
898   GstIterator *it = NULL;
899   GValue val = { 0, };
900
901   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
902
903   if (pad == jitterbuffer->priv->sinkpad) {
904     otherpad = jitterbuffer->priv->srcpad;
905   } else if (pad == jitterbuffer->priv->srcpad) {
906     otherpad = jitterbuffer->priv->sinkpad;
907   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
908     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
909   }
910
911   if (it == NULL) {
912     g_value_init (&val, GST_TYPE_PAD);
913     g_value_set_object (&val, otherpad);
914     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
915     g_value_unset (&val);
916   }
917
918   return it;
919 }
920
921 static GstPad *
922 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
923 {
924   GstRtpJitterBufferPrivate *priv;
925
926   priv = jitterbuffer->priv;
927
928   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
929
930   priv->rtcpsinkpad =
931       gst_pad_new_from_static_template
932       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
933   gst_pad_set_chain_function (priv->rtcpsinkpad,
934       gst_rtp_jitter_buffer_chain_rtcp);
935   gst_pad_set_event_function (priv->rtcpsinkpad,
936       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
937   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
938       gst_rtp_jitter_buffer_iterate_internal_links);
939   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
940   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
941
942   return priv->rtcpsinkpad;
943 }
944
945 static void
946 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
947 {
948   GstRtpJitterBufferPrivate *priv;
949
950   priv = jitterbuffer->priv;
951
952   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
953
954   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
955
956   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
957   priv->rtcpsinkpad = NULL;
958 }
959
960 static GstPad *
961 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
962     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
963 {
964   GstRtpJitterBuffer *jitterbuffer;
965   GstElementClass *klass;
966   GstPad *result;
967   GstRtpJitterBufferPrivate *priv;
968
969   g_return_val_if_fail (templ != NULL, NULL);
970   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
971
972   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
973   priv = jitterbuffer->priv;
974   klass = GST_ELEMENT_GET_CLASS (element);
975
976   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
977
978   /* figure out the template */
979   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
980     if (priv->rtcpsinkpad != NULL)
981       goto exists;
982
983     result = create_rtcp_sink (jitterbuffer);
984   } else
985     goto wrong_template;
986
987   return result;
988
989   /* ERRORS */
990 wrong_template:
991   {
992     g_warning ("rtpjitterbuffer: this is not our template");
993     return NULL;
994   }
995 exists:
996   {
997     g_warning ("rtpjitterbuffer: pad already requested");
998     return NULL;
999   }
1000 }
1001
1002 static void
1003 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1004 {
1005   GstRtpJitterBuffer *jitterbuffer;
1006   GstRtpJitterBufferPrivate *priv;
1007
1008   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1009   g_return_if_fail (GST_IS_PAD (pad));
1010
1011   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1012   priv = jitterbuffer->priv;
1013
1014   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1015
1016   if (priv->rtcpsinkpad == pad) {
1017     remove_rtcp_sink (jitterbuffer);
1018   } else
1019     goto wrong_pad;
1020
1021   return;
1022
1023   /* ERRORS */
1024 wrong_pad:
1025   {
1026     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1027     return;
1028   }
1029 }
1030
1031 static GstClock *
1032 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1033 {
1034   return gst_system_clock_obtain ();
1035 }
1036
1037 static void
1038 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1039 {
1040   GstRtpJitterBufferPrivate *priv;
1041
1042   priv = jitterbuffer->priv;
1043
1044   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1045
1046   JBUF_LOCK (priv);
1047   priv->clock_rate = -1;
1048   /* do not clear current content, but refresh state for new arrival */
1049   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1050   rtp_jitter_buffer_reset_skew (priv->jbuf);
1051   JBUF_UNLOCK (priv);
1052 }
1053
1054 static GstClockTime
1055 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1056     guint64 offset)
1057 {
1058   GstRtpJitterBufferPrivate *priv;
1059   GstClockTime last_out;
1060   RTPJitterBufferItem *item;
1061
1062   priv = jbuf->priv;
1063
1064   JBUF_LOCK (priv);
1065   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1066       active, GST_TIME_ARGS (offset));
1067
1068   if (active != priv->active) {
1069     /* add the amount of time spent in paused to the output offset. All
1070      * outgoing buffers will have this offset applied to their timestamps in
1071      * order to make them arrive in time in the sink. */
1072     priv->out_offset = offset;
1073     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1074         GST_TIME_ARGS (priv->out_offset));
1075     priv->active = active;
1076     JBUF_SIGNAL_EVENT (priv);
1077   }
1078   if (!active) {
1079     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1080   }
1081   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1082     /* head buffer timestamp and offset gives our output time */
1083     last_out = item->dts + priv->ts_offset;
1084   } else {
1085     /* use last known time when the buffer is empty */
1086     last_out = priv->last_out_time;
1087   }
1088   JBUF_UNLOCK (priv);
1089
1090   return last_out;
1091 }
1092
1093 static GstCaps *
1094 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1095 {
1096   GstRtpJitterBuffer *jitterbuffer;
1097   GstRtpJitterBufferPrivate *priv;
1098   GstPad *other;
1099   GstCaps *caps;
1100   GstCaps *templ;
1101
1102   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1103   priv = jitterbuffer->priv;
1104
1105   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1106
1107   caps = gst_pad_peer_query_caps (other, filter);
1108
1109   templ = gst_pad_get_pad_template_caps (pad);
1110   if (caps == NULL) {
1111     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1112     caps = templ;
1113   } else {
1114     GstCaps *intersect;
1115
1116     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1117
1118     intersect = gst_caps_intersect (caps, templ);
1119     gst_caps_unref (caps);
1120     gst_caps_unref (templ);
1121
1122     caps = intersect;
1123   }
1124   gst_object_unref (jitterbuffer);
1125
1126   return caps;
1127 }
1128
1129 /*
1130  * Must be called with JBUF_LOCK held
1131  */
1132
1133 static gboolean
1134 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1135     GstCaps * caps)
1136 {
1137   GstRtpJitterBufferPrivate *priv;
1138   GstStructure *caps_struct;
1139   guint val;
1140   GstClockTime tval;
1141
1142   priv = jitterbuffer->priv;
1143
1144   /* first parse the caps */
1145   caps_struct = gst_caps_get_structure (caps, 0);
1146
1147   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1148
1149   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1150    * measure the amount of data in the buffer */
1151   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1152     goto error;
1153
1154   if (priv->clock_rate <= 0)
1155     goto wrong_rate;
1156
1157   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1158
1159   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1160
1161   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1162    * can use this to track the amount of time elapsed on the sender. */
1163   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1164     priv->clock_base = val;
1165   else
1166     priv->clock_base = -1;
1167
1168   priv->ext_timestamp = priv->clock_base;
1169
1170   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1171       priv->clock_base);
1172
1173   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1174     /* first expected seqnum, only update when we didn't have a previous base. */
1175     if (priv->next_in_seqnum == -1)
1176       priv->next_in_seqnum = val;
1177     if (priv->next_seqnum == -1) {
1178       priv->next_seqnum = val;
1179       JBUF_SIGNAL_EVENT (priv);
1180     }
1181     priv->seqnum_base = val;
1182   } else {
1183     priv->seqnum_base = -1;
1184   }
1185
1186   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1187
1188   /* the start and stop times. The seqnum-base corresponds to the start time. We
1189    * will keep track of the seqnums on the output and when we reach the one
1190    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1191   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1192     priv->npt_start = tval;
1193   else
1194     priv->npt_start = 0;
1195
1196   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1197     priv->npt_stop = tval;
1198   else
1199     priv->npt_stop = -1;
1200
1201   GST_DEBUG_OBJECT (jitterbuffer,
1202       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1203       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1204
1205   return TRUE;
1206
1207   /* ERRORS */
1208 error:
1209   {
1210     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1211     return FALSE;
1212   }
1213 wrong_rate:
1214   {
1215     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1216     return FALSE;
1217   }
1218 }
1219
1220 static void
1221 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1222 {
1223   GstRtpJitterBufferPrivate *priv;
1224
1225   priv = jitterbuffer->priv;
1226
1227   JBUF_LOCK (priv);
1228   /* mark ourselves as flushing */
1229   priv->srcresult = GST_FLOW_FLUSHING;
1230   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1231   /* this unblocks any waiting pops on the src pad task */
1232   JBUF_SIGNAL_EVENT (priv);
1233   JBUF_SIGNAL_QUERY (priv, FALSE);
1234   JBUF_UNLOCK (priv);
1235 }
1236
1237 static void
1238 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1239 {
1240   GstRtpJitterBufferPrivate *priv;
1241
1242   priv = jitterbuffer->priv;
1243
1244   JBUF_LOCK (priv);
1245   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1246   /* Mark as non flushing */
1247   priv->srcresult = GST_FLOW_OK;
1248   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1249   priv->last_popped_seqnum = -1;
1250   priv->last_out_time = -1;
1251   priv->next_seqnum = -1;
1252   priv->seqnum_base = -1;
1253   priv->ips_rtptime = -1;
1254   priv->ips_dts = GST_CLOCK_TIME_NONE;
1255   priv->packet_spacing = 0;
1256   priv->next_in_seqnum = -1;
1257   priv->clock_rate = -1;
1258   priv->last_pt = -1;
1259   priv->eos = FALSE;
1260   priv->estimated_eos = -1;
1261   priv->last_elapsed = 0;
1262   priv->ext_timestamp = -1;
1263   priv->avg_jitter = 0;
1264   priv->last_dts = -1;
1265   priv->last_rtptime = -1;
1266   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1267   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1268   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1269   rtp_jitter_buffer_reset_skew (priv->jbuf);
1270   remove_all_timers (jitterbuffer);
1271   JBUF_UNLOCK (priv);
1272 }
1273
1274 static gboolean
1275 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1276     GstPadMode mode, gboolean active)
1277 {
1278   gboolean result;
1279   GstRtpJitterBuffer *jitterbuffer = NULL;
1280
1281   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1282
1283   switch (mode) {
1284     case GST_PAD_MODE_PUSH:
1285       if (active) {
1286         /* allow data processing */
1287         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1288
1289         /* start pushing out buffers */
1290         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1291         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1292             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1293       } else {
1294         /* make sure all data processing stops ASAP */
1295         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1296
1297         /* NOTE this will hardlock if the state change is called from the src pad
1298          * task thread because we will _join() the thread. */
1299         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1300         result = gst_pad_stop_task (pad);
1301       }
1302       break;
1303     default:
1304       result = FALSE;
1305       break;
1306   }
1307   return result;
1308 }
1309
1310 static GstStateChangeReturn
1311 gst_rtp_jitter_buffer_change_state (GstElement * element,
1312     GstStateChange transition)
1313 {
1314   GstRtpJitterBuffer *jitterbuffer;
1315   GstRtpJitterBufferPrivate *priv;
1316   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1317
1318   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1319   priv = jitterbuffer->priv;
1320
1321   switch (transition) {
1322     case GST_STATE_CHANGE_NULL_TO_READY:
1323       break;
1324     case GST_STATE_CHANGE_READY_TO_PAUSED:
1325       JBUF_LOCK (priv);
1326       /* reset negotiated values */
1327       priv->clock_rate = -1;
1328       priv->clock_base = -1;
1329       priv->peer_latency = 0;
1330       priv->last_pt = -1;
1331       /* block until we go to PLAYING */
1332       priv->blocked = TRUE;
1333       priv->timer_running = TRUE;
1334       priv->timer_thread =
1335           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1336       JBUF_UNLOCK (priv);
1337       break;
1338     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1339       JBUF_LOCK (priv);
1340       /* unblock to allow streaming in PLAYING */
1341       priv->blocked = FALSE;
1342       JBUF_SIGNAL_EVENT (priv);
1343       JBUF_SIGNAL_TIMER (priv);
1344       JBUF_UNLOCK (priv);
1345       break;
1346     default:
1347       break;
1348   }
1349
1350   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1351
1352   switch (transition) {
1353     case GST_STATE_CHANGE_READY_TO_PAUSED:
1354       /* we are a live element because we sync to the clock, which we can only
1355        * do in the PLAYING state */
1356       if (ret != GST_STATE_CHANGE_FAILURE)
1357         ret = GST_STATE_CHANGE_NO_PREROLL;
1358       break;
1359     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1360       JBUF_LOCK (priv);
1361       /* block to stop streaming when PAUSED */
1362       priv->blocked = TRUE;
1363       unschedule_current_timer (jitterbuffer);
1364       JBUF_UNLOCK (priv);
1365       if (ret != GST_STATE_CHANGE_FAILURE)
1366         ret = GST_STATE_CHANGE_NO_PREROLL;
1367       break;
1368     case GST_STATE_CHANGE_PAUSED_TO_READY:
1369       JBUF_LOCK (priv);
1370       gst_buffer_replace (&priv->last_sr, NULL);
1371       priv->timer_running = FALSE;
1372       unschedule_current_timer (jitterbuffer);
1373       JBUF_SIGNAL_TIMER (priv);
1374       JBUF_SIGNAL_QUERY (priv, FALSE);
1375       JBUF_UNLOCK (priv);
1376       g_thread_join (priv->timer_thread);
1377       priv->timer_thread = NULL;
1378       break;
1379     case GST_STATE_CHANGE_READY_TO_NULL:
1380       break;
1381     default:
1382       break;
1383   }
1384
1385   return ret;
1386 }
1387
1388 static gboolean
1389 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1390     GstEvent * event)
1391 {
1392   gboolean ret = TRUE;
1393   GstRtpJitterBuffer *jitterbuffer;
1394   GstRtpJitterBufferPrivate *priv;
1395
1396   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1397   priv = jitterbuffer->priv;
1398
1399   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1400
1401   switch (GST_EVENT_TYPE (event)) {
1402     case GST_EVENT_LATENCY:
1403     {
1404       GstClockTime latency;
1405
1406       gst_event_parse_latency (event, &latency);
1407
1408       GST_DEBUG_OBJECT (jitterbuffer,
1409           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1410
1411       JBUF_LOCK (priv);
1412       /* adjust the overall buffer delay to the total pipeline latency in
1413        * buffering mode because if downstream consumes too fast (because of
1414        * large latency or queues, we would start rebuffering again. */
1415       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1416           RTP_JITTER_BUFFER_MODE_BUFFER) {
1417         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1418       }
1419       JBUF_UNLOCK (priv);
1420
1421       ret = gst_pad_push_event (priv->sinkpad, event);
1422       break;
1423     }
1424     default:
1425       ret = gst_pad_push_event (priv->sinkpad, event);
1426       break;
1427   }
1428
1429   return ret;
1430 }
1431
1432 /* handles and stores the event in the jitterbuffer, must be called with
1433  * LOCK */
1434 static gboolean
1435 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1436 {
1437   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1438   RTPJitterBufferItem *item;
1439   gboolean head;
1440
1441   switch (GST_EVENT_TYPE (event)) {
1442     case GST_EVENT_CAPS:
1443     {
1444       GstCaps *caps;
1445
1446       gst_event_parse_caps (event, &caps);
1447       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1448       break;
1449     }
1450     case GST_EVENT_SEGMENT:
1451       gst_event_copy_segment (event, &priv->segment);
1452
1453       /* we need time for now */
1454       if (priv->segment.format != GST_FORMAT_TIME)
1455         goto newseg_wrong_format;
1456
1457       GST_DEBUG_OBJECT (jitterbuffer,
1458           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1459       break;
1460     case GST_EVENT_EOS:
1461       priv->eos = TRUE;
1462       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1463       break;
1464     default:
1465       break;
1466   }
1467
1468
1469   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1470   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1471   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1472   if (head)
1473     JBUF_SIGNAL_EVENT (priv);
1474
1475   return TRUE;
1476
1477   /* ERRORS */
1478 newseg_wrong_format:
1479   {
1480     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1481     gst_event_unref (event);
1482     return FALSE;
1483   }
1484 }
1485
1486 static gboolean
1487 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1488     GstEvent * event)
1489 {
1490   gboolean ret = TRUE;
1491   GstRtpJitterBuffer *jitterbuffer;
1492   GstRtpJitterBufferPrivate *priv;
1493
1494   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1495   priv = jitterbuffer->priv;
1496
1497   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1498
1499   switch (GST_EVENT_TYPE (event)) {
1500     case GST_EVENT_FLUSH_START:
1501       ret = gst_pad_push_event (priv->srcpad, event);
1502       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1503       /* wait for the loop to go into PAUSED */
1504       gst_pad_pause_task (priv->srcpad);
1505       break;
1506     case GST_EVENT_FLUSH_STOP:
1507       ret = gst_pad_push_event (priv->srcpad, event);
1508       ret =
1509           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1510           GST_PAD_MODE_PUSH, TRUE);
1511       break;
1512     default:
1513       if (GST_EVENT_IS_SERIALIZED (event)) {
1514         /* serialized events go in the queue */
1515         JBUF_LOCK (priv);
1516         if (priv->srcresult != GST_FLOW_OK) {
1517           /* Errors in sticky event pushing are no problem and ignored here
1518            * as they will cause more meaningful errors during data flow.
1519            * For EOS events, that are not followed by data flow, we still
1520            * return FALSE here though.
1521            */
1522           if (!GST_EVENT_IS_STICKY (event) ||
1523               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1524             goto out_flow_error;
1525         }
1526         /* refuse more events on EOS */
1527         if (priv->eos)
1528           goto out_eos;
1529         ret = queue_event (jitterbuffer, event);
1530         JBUF_UNLOCK (priv);
1531       } else {
1532         /* non-serialized events are forwarded downstream immediately */
1533         ret = gst_pad_push_event (priv->srcpad, event);
1534       }
1535       break;
1536   }
1537   return ret;
1538
1539   /* ERRORS */
1540 out_flow_error:
1541   {
1542     GST_DEBUG_OBJECT (jitterbuffer,
1543         "refusing event, we have a downstream flow error: %s",
1544         gst_flow_get_name (priv->srcresult));
1545     JBUF_UNLOCK (priv);
1546     gst_event_unref (event);
1547     return FALSE;
1548   }
1549 out_eos:
1550   {
1551     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1552     JBUF_UNLOCK (priv);
1553     gst_event_unref (event);
1554     return FALSE;
1555   }
1556 }
1557
1558 static gboolean
1559 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1560     GstEvent * event)
1561 {
1562   gboolean ret = TRUE;
1563   GstRtpJitterBuffer *jitterbuffer;
1564
1565   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1566
1567   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1568
1569   switch (GST_EVENT_TYPE (event)) {
1570     case GST_EVENT_FLUSH_START:
1571       gst_event_unref (event);
1572       break;
1573     case GST_EVENT_FLUSH_STOP:
1574       gst_event_unref (event);
1575       break;
1576     default:
1577       ret = gst_pad_event_default (pad, parent, event);
1578       break;
1579   }
1580
1581   return ret;
1582 }
1583
1584 /*
1585  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1586  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1587  * GST_FLOW_FLUSHING when the element is shutting down. On success
1588  * GST_FLOW_OK is returned.
1589  */
1590 static GstFlowReturn
1591 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1592     guint8 pt)
1593 {
1594   GValue ret = { 0 };
1595   GValue args[2] = { {0}, {0} };
1596   GstCaps *caps;
1597   gboolean res;
1598
1599   g_value_init (&args[0], GST_TYPE_ELEMENT);
1600   g_value_set_object (&args[0], jitterbuffer);
1601   g_value_init (&args[1], G_TYPE_UINT);
1602   g_value_set_uint (&args[1], pt);
1603
1604   g_value_init (&ret, GST_TYPE_CAPS);
1605   g_value_set_boxed (&ret, NULL);
1606
1607   JBUF_UNLOCK (jitterbuffer->priv);
1608   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1609       &ret);
1610   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1611
1612   g_value_unset (&args[0]);
1613   g_value_unset (&args[1]);
1614   caps = (GstCaps *) g_value_dup_boxed (&ret);
1615   g_value_unset (&ret);
1616   if (!caps)
1617     goto no_caps;
1618
1619   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1620   gst_caps_unref (caps);
1621
1622   if (G_UNLIKELY (!res))
1623     goto parse_failed;
1624
1625   return GST_FLOW_OK;
1626
1627   /* ERRORS */
1628 no_caps:
1629   {
1630     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1631     return GST_FLOW_ERROR;
1632   }
1633 out_flushing:
1634   {
1635     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1636     return GST_FLOW_FLUSHING;
1637   }
1638 parse_failed:
1639   {
1640     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1641     return GST_FLOW_ERROR;
1642   }
1643 }
1644
1645 /* call with jbuf lock held */
1646 static GstMessage *
1647 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1648 {
1649   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1650   GstMessage *message = NULL;
1651
1652   if (percent == -1)
1653     return NULL;
1654
1655   /* Post a buffering message */
1656   if (priv->last_percent != percent) {
1657     priv->last_percent = percent;
1658     message =
1659         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1660     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1661   }
1662
1663   return message;
1664 }
1665
1666 static GstClockTime
1667 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1668 {
1669   GstRtpJitterBufferPrivate *priv;
1670
1671   priv = jitterbuffer->priv;
1672
1673   if (timestamp == -1)
1674     return -1;
1675
1676   /* apply the timestamp offset, this is used for inter stream sync */
1677   timestamp += priv->ts_offset;
1678   /* add the offset, this is used when buffering */
1679   timestamp += priv->out_offset;
1680
1681   return timestamp;
1682 }
1683
1684 static TimerData *
1685 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1686 {
1687   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1688   TimerData *timer = NULL;
1689   gint i, len;
1690
1691   len = priv->timers->len;
1692   for (i = 0; i < len; i++) {
1693     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1694     if (test->seqnum == seqnum && test->type == type) {
1695       timer = test;
1696       break;
1697     }
1698   }
1699   return timer;
1700 }
1701
1702 static void
1703 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1704 {
1705   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1706
1707   if (priv->clock_id) {
1708     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1709     gst_clock_id_unschedule (priv->clock_id);
1710     priv->clock_id = NULL;
1711   }
1712 }
1713
1714 static GstClockTime
1715 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1716 {
1717   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1718   GstClockTime test_timeout;
1719
1720   if ((test_timeout = timer->timeout) == -1)
1721     return -1;
1722
1723   if (timer->type != TIMER_TYPE_EXPECTED) {
1724     /* add our latency and offset to get output times. */
1725     test_timeout = apply_offset (jitterbuffer, test_timeout);
1726     test_timeout += priv->latency_ns;
1727   }
1728   return test_timeout;
1729 }
1730
1731 static void
1732 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1733 {
1734   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1735
1736   if (priv->clock_id) {
1737     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1738
1739     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1740         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1741
1742     if (timeout == -1 || timeout < priv->timer_timeout)
1743       unschedule_current_timer (jitterbuffer);
1744   }
1745 }
1746
1747 static TimerData *
1748 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1749     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1750     GstClockTime duration)
1751 {
1752   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1753   TimerData *timer;
1754   gint len;
1755
1756   GST_DEBUG_OBJECT (jitterbuffer,
1757       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1758       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1759       GST_TIME_ARGS (delay));
1760
1761   len = priv->timers->len;
1762   g_array_set_size (priv->timers, len + 1);
1763   timer = &g_array_index (priv->timers, TimerData, len);
1764   timer->idx = len;
1765   timer->type = type;
1766   timer->seqnum = seqnum;
1767   timer->num = num;
1768   timer->timeout = timeout + delay;
1769   timer->duration = duration;
1770   if (type == TIMER_TYPE_EXPECTED) {
1771     timer->rtx_base = timeout;
1772     timer->rtx_delay = delay;
1773     timer->rtx_retry = 0;
1774   }
1775   timer->num_rtx_retry = 0;
1776   recalculate_timer (jitterbuffer, timer);
1777   JBUF_SIGNAL_TIMER (priv);
1778
1779   return timer;
1780 }
1781
1782 static void
1783 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1784     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1785 {
1786   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1787   gboolean seqchange, timechange;
1788   guint16 oldseq;
1789
1790   seqchange = timer->seqnum != seqnum;
1791   timechange = timer->timeout != timeout;
1792
1793   if (!seqchange && !timechange)
1794     return;
1795
1796   oldseq = timer->seqnum;
1797
1798   GST_DEBUG_OBJECT (jitterbuffer,
1799       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1800       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1801
1802   timer->timeout = timeout + delay;
1803   timer->seqnum = seqnum;
1804   if (reset) {
1805     timer->rtx_base = timeout;
1806     timer->rtx_delay = delay;
1807     timer->rtx_retry = 0;
1808   }
1809   if (seqchange)
1810     timer->num_rtx_retry = 0;
1811
1812   if (priv->clock_id) {
1813     /* we changed the seqnum and there is a timer currently waiting with this
1814      * seqnum, unschedule it */
1815     if (seqchange && priv->timer_seqnum == oldseq)
1816       unschedule_current_timer (jitterbuffer);
1817     /* we changed the time, check if it is earlier than what we are waiting
1818      * for and unschedule if so */
1819     else if (timechange)
1820       recalculate_timer (jitterbuffer, timer);
1821   }
1822 }
1823
1824 static TimerData *
1825 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1826     guint16 seqnum, GstClockTime timeout)
1827 {
1828   TimerData *timer;
1829
1830   /* find the seqnum timer */
1831   timer = find_timer (jitterbuffer, type, seqnum);
1832   if (timer == NULL) {
1833     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1834   } else {
1835     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1836   }
1837   return timer;
1838 }
1839
1840 static void
1841 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1842 {
1843   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1844   guint idx;
1845
1846   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1847     unschedule_current_timer (jitterbuffer);
1848
1849   idx = timer->idx;
1850   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1851   g_array_remove_index_fast (priv->timers, idx);
1852   timer->idx = idx;
1853 }
1854
1855 static void
1856 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1857 {
1858   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1859   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1860   g_array_set_size (priv->timers, 0);
1861   unschedule_current_timer (jitterbuffer);
1862 }
1863
1864 /* get the extra delay to wait before sending RTX */
1865 static GstClockTime
1866 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1867 {
1868   GstClockTime delay;
1869
1870   if (priv->rtx_delay == -1) {
1871     if (priv->avg_jitter == 0)
1872       delay = DEFAULT_AUTO_RTX_DELAY;
1873     else
1874       /* jitter is in nanoseconds, 2x jitter is a good margin */
1875       delay = priv->avg_jitter * 2;
1876   } else {
1877     delay = priv->rtx_delay * GST_MSECOND;
1878   }
1879   if (priv->rtx_min_delay > 0)
1880     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1881
1882   return delay;
1883 }
1884
1885 /* we just received a packet with seqnum and dts.
1886  *
1887  * First check for old seqnum that we are still expecting. If the gap with the
1888  * current seqnum is too big, unschedule the timeouts.
1889  *
1890  * If we have a valid packet spacing estimate we can set a timer for when we
1891  * should receive the next packet.
1892  * If we don't have a valid estimate, we remove any timer we might have
1893  * had for this packet.
1894  */
1895 static void
1896 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1897     GstClockTime dts, gboolean do_next_seqnum)
1898 {
1899   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1900   TimerData *timer = NULL;
1901   gint i, len;
1902
1903   /* go through all timers and unschedule the ones with a large gap, also find
1904    * the timer for the seqnum */
1905   len = priv->timers->len;
1906   for (i = 0; i < len; i++) {
1907     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1908     gint gap;
1909
1910     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1911
1912     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1913         test->type, test->seqnum, seqnum, gap);
1914
1915     if (gap == 0) {
1916       GST_DEBUG ("found timer for current seqnum");
1917       /* the timer for the current seqnum */
1918       timer = test;
1919       /* when no retransmission, we can stop now, we only need to find the
1920        * timer for the current seqnum */
1921       if (!priv->do_retransmission)
1922         break;
1923     } else if (gap > priv->rtx_delay_reorder) {
1924       /* max gap, we exceeded the max reorder distance and we don't expect the
1925        * missing packet to be this reordered */
1926       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1927         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1928     }
1929   }
1930
1931   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1932       && priv->do_retransmission;
1933
1934   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1935     if (timer->num_rtx_retry > 0) {
1936       GstClockTime rtx_last, delay;
1937
1938       /* we scheduled a retry for this packet and now we have it */
1939       priv->num_rtx_success++;
1940       /* all the previous retry attempts failed */
1941       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1942       /* number of retries before receiving the packet */
1943       if (priv->avg_rtx_num == 0.0)
1944         priv->avg_rtx_num = timer->num_rtx_retry;
1945       else
1946         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1947       /* calculate the delay between retransmission request and receiving this
1948        * packet, start with when we scheduled this timeout last */
1949       rtx_last = timer->rtx_last;
1950       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1951         /* we have a valid delay if this packet arrived after we scheduled the
1952          * request */
1953         delay = dts - rtx_last;
1954         if (priv->avg_rtx_rtt == 0)
1955           priv->avg_rtx_rtt = delay;
1956         else
1957           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1958       } else
1959         delay = 0;
1960
1961       GST_LOG_OBJECT (jitterbuffer,
1962           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1963           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1964           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
1965           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
1966           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
1967           GST_TIME_ARGS (priv->avg_rtx_rtt));
1968
1969       /* don't try to estimate the next seqnum because this is a retransmitted
1970        * packet and it probably did not arrive with the expected packet
1971        * spacing. */
1972       do_next_seqnum = FALSE;
1973     }
1974   }
1975
1976   if (do_next_seqnum) {
1977     GstClockTime expected, delay;
1978
1979     /* calculate expected arrival time of the next seqnum */
1980     expected = dts + priv->packet_spacing;
1981
1982     delay = get_rtx_delay (priv);
1983
1984     /* and update/install timer for next seqnum */
1985     if (timer)
1986       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1987           delay, TRUE);
1988     else
1989       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1990           expected, delay, priv->packet_spacing);
1991   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1992     /* if we had a timer, remove it, we don't know when to expect the next
1993      * packet. */
1994     remove_timer (jitterbuffer, timer);
1995   }
1996 }
1997
1998 static void
1999 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2000     GstClockTime dts)
2001 {
2002   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2003
2004   /* we need consecutive seqnums with a different
2005    * rtptime to estimate the packet spacing. */
2006   if (priv->ips_rtptime != rtptime) {
2007     /* rtptime changed, check dts diff */
2008     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2009       priv->packet_spacing = dts - priv->ips_dts;
2010       GST_DEBUG_OBJECT (jitterbuffer,
2011           "new packet spacing %" GST_TIME_FORMAT,
2012           GST_TIME_ARGS (priv->packet_spacing));
2013     }
2014     priv->ips_rtptime = rtptime;
2015     priv->ips_dts = dts;
2016   }
2017 }
2018
2019 static void
2020 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2021     guint16 seqnum, GstClockTime dts, gint gap)
2022 {
2023   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2024   GstClockTime total_duration, duration, expected_dts;
2025   TimerType type;
2026   guint lost_packets = 0;
2027
2028   GST_DEBUG_OBJECT (jitterbuffer,
2029       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2030       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2031
2032   /* the total duration spanned by the missing packets */
2033   if (dts >= priv->last_in_dts)
2034     total_duration = dts - priv->last_in_dts;
2035   else
2036     total_duration = 0;
2037
2038   /* interpolate between the current time and the last time based on
2039    * number of packets we are missing, this is the estimated duration
2040    * for the missing packet based on equidistant packet spacing. */
2041   duration = total_duration / (gap + 1);
2042
2043   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2044       GST_TIME_ARGS (duration));
2045
2046   if (total_duration > priv->latency_ns) {
2047     GstClockTime gap_time;
2048
2049     gap_time = total_duration - priv->latency_ns;
2050
2051     if (duration > 0) {
2052       lost_packets = gap_time / duration;
2053       gap_time = lost_packets * duration;
2054     } else {
2055       lost_packets = gap;
2056     }
2057
2058     /* too many lost packets, some of the missing packets are already
2059      * too late and we can generate lost packet events for them. */
2060     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2061         " > %" GST_TIME_FORMAT ", consider %u lost",
2062         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2063         lost_packets);
2064
2065     /* this timer will fire immediately and the lost event will be pushed from
2066      * the timer thread */
2067     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2068         priv->last_in_dts + duration, 0, gap_time);
2069
2070     expected += lost_packets;
2071     priv->last_in_dts += gap_time;
2072   }
2073
2074   expected_dts = priv->last_in_dts + (lost_packets + 1) * duration;
2075
2076   if (priv->do_retransmission) {
2077     TimerData *timer;
2078
2079     type = TIMER_TYPE_EXPECTED;
2080     /* if we had a timer for the first missing packet, update it. */
2081     if ((timer = find_timer (jitterbuffer, type, expected))) {
2082       GstClockTime timeout = timer->timeout;
2083
2084       timer->duration = duration;
2085       if (timeout > (expected_dts + timer->rtx_retry)) {
2086         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2087         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2088             delay, TRUE);
2089       }
2090       expected++;
2091       expected_dts += duration;
2092     }
2093   } else {
2094     type = TIMER_TYPE_LOST;
2095   }
2096
2097   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2098     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2099     expected_dts += duration;
2100     expected++;
2101   }
2102 }
2103
2104 static void
2105 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2106     guint rtptime)
2107 {
2108   gint32 rtpdiff;
2109   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2110   GstRtpJitterBufferPrivate *priv;
2111
2112   priv = jitterbuffer->priv;
2113
2114   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2115     goto no_time;
2116
2117   if (priv->last_dts != -1)
2118     dtsdiff = dts - priv->last_dts;
2119   else
2120     dtsdiff = 0;
2121
2122   if (priv->last_rtptime != -1)
2123     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2124   else
2125     rtpdiff = 0;
2126
2127   priv->last_dts = dts;
2128   priv->last_rtptime = rtptime;
2129
2130   if (rtpdiff > 0)
2131     rtpdiffns =
2132         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2133   else
2134     rtpdiffns =
2135         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2136
2137   diff = ABS (dtsdiff - rtpdiffns);
2138
2139   /* jitter is stored in nanoseconds */
2140   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2141
2142   GST_LOG_OBJECT (jitterbuffer,
2143       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2144       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2145       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2146       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2147
2148   return;
2149
2150   /* ERRORS */
2151 no_time:
2152   {
2153     GST_DEBUG_OBJECT (jitterbuffer,
2154         "no dts or no clock-rate, can't calculate jitter");
2155     return;
2156   }
2157 }
2158
2159 static GstFlowReturn
2160 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2161     GstBuffer * buffer)
2162 {
2163   GstRtpJitterBuffer *jitterbuffer;
2164   GstRtpJitterBufferPrivate *priv;
2165   guint16 seqnum;
2166   guint32 expected, rtptime;
2167   GstFlowReturn ret = GST_FLOW_OK;
2168   GstClockTime dts, pts;
2169   guint64 latency_ts;
2170   gboolean head;
2171   gint percent = -1;
2172   guint8 pt;
2173   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2174   gboolean do_next_seqnum = FALSE;
2175   RTPJitterBufferItem *item;
2176   GstMessage *msg = NULL;
2177
2178   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2179
2180   priv = jitterbuffer->priv;
2181
2182   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2183     goto invalid_buffer;
2184
2185   pt = gst_rtp_buffer_get_payload_type (&rtp);
2186   seqnum = gst_rtp_buffer_get_seq (&rtp);
2187   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2188   gst_rtp_buffer_unmap (&rtp);
2189
2190   /* make sure we have PTS and DTS set */
2191   pts = GST_BUFFER_PTS (buffer);
2192   dts = GST_BUFFER_DTS (buffer);
2193   if (dts == -1)
2194     dts = pts;
2195   else if (pts == -1)
2196     pts = dts;
2197
2198   /* take the DTS of the buffer. This is the time when the packet was
2199    * received and is used to calculate jitter and clock skew. We will adjust
2200    * this DTS with the smoothed value after processing it in the
2201    * jitterbuffer and assign it as the PTS. */
2202   /* bring to running time */
2203   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2204
2205   GST_DEBUG_OBJECT (jitterbuffer,
2206       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2207       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2208
2209   JBUF_LOCK_CHECK (priv, out_flushing);
2210
2211   if (G_UNLIKELY (priv->last_pt != pt)) {
2212     GstCaps *caps;
2213
2214     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2215         pt);
2216
2217     priv->last_pt = pt;
2218     /* reset clock-rate so that we get a new one */
2219     priv->clock_rate = -1;
2220
2221     /* Try to get the clock-rate from the caps first if we can. If there are no
2222      * caps we must fire the signal to get the clock-rate. */
2223     if ((caps = gst_pad_get_current_caps (pad))) {
2224       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2225       gst_caps_unref (caps);
2226     }
2227   }
2228
2229   if (G_UNLIKELY (priv->clock_rate == -1)) {
2230     /* no clock rate given on the caps, try to get one with the signal */
2231     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2232             pt) == GST_FLOW_FLUSHING)
2233       goto out_flushing;
2234
2235     if (G_UNLIKELY (priv->clock_rate == -1))
2236       goto no_clock_rate;
2237   }
2238
2239   /* don't accept more data on EOS */
2240   if (G_UNLIKELY (priv->eos))
2241     goto have_eos;
2242
2243   calculate_jitter (jitterbuffer, dts, rtptime);
2244
2245   if (priv->seqnum_base != -1) {
2246     gint gap;
2247
2248     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2249
2250     if (gap < 0) {
2251       GST_DEBUG_OBJECT (jitterbuffer,
2252           "packet seqnum #%d before seqnum-base #%d", seqnum,
2253           priv->seqnum_base);
2254       gst_buffer_unref (buffer);
2255       ret = GST_FLOW_OK;
2256       goto finished;
2257     } else if (gap > 16384) {
2258       /* From now on don't compare against the seqnum base anymore as
2259        * at some point in the future we will wrap around and also that
2260        * much reordering is very unlikely */
2261       priv->seqnum_base = -1;
2262     }
2263   }
2264
2265   expected = priv->next_in_seqnum;
2266
2267   /* now check against our expected seqnum */
2268   if (G_LIKELY (expected != -1)) {
2269     gint gap;
2270
2271     /* now calculate gap */
2272     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2273
2274     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2275         expected, seqnum, gap);
2276
2277     if (G_LIKELY (gap == 0)) {
2278       /* packet is expected */
2279       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2280       do_next_seqnum = TRUE;
2281     } else {
2282       gboolean reset = FALSE;
2283
2284       if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2285         /* We would run into calculations with GST_CLOCK_TIME_NONE below
2286          * and can't compensate for anything without DTS on RTP packets
2287          */
2288         goto gap_but_no_dts;
2289       } else if (gap < 0) {
2290         /* we received an old packet */
2291         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2292           /* too old packet, reset */
2293           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2294               -RTP_MAX_MISORDER);
2295           reset = TRUE;
2296         } else {
2297           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2298         }
2299       } else {
2300         /* new packet, we are missing some packets */
2301         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2302           /* packet too far in future, reset */
2303           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2304               RTP_MAX_DROPOUT);
2305           reset = TRUE;
2306         } else {
2307           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2308           /* fill in the gap with EXPECTED timers */
2309           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2310
2311           do_next_seqnum = TRUE;
2312         }
2313       }
2314       if (G_UNLIKELY (reset)) {
2315         GList *events = NULL, *l;
2316
2317         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2318         rtp_jitter_buffer_flush (priv->jbuf,
2319             (GFunc) free_item_and_retain_events, &events);
2320         rtp_jitter_buffer_reset_skew (priv->jbuf);
2321         remove_all_timers (jitterbuffer);
2322         priv->discont = TRUE;
2323         priv->last_popped_seqnum = -1;
2324         priv->next_seqnum = seqnum;
2325         do_next_seqnum = TRUE;
2326
2327         /* Insert all sticky events again in order, otherwise we would
2328          * potentially loose STREAM_START, CAPS or SEGMENT events
2329          */
2330         events = g_list_reverse (events);
2331         for (l = events; l; l = l->next) {
2332           RTPJitterBufferItem *item;
2333
2334           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2335           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2336         }
2337         g_list_free (events);
2338
2339         JBUF_SIGNAL_EVENT (priv);
2340       }
2341       /* reset spacing estimation when gap */
2342       priv->ips_rtptime = -1;
2343       priv->ips_dts = GST_CLOCK_TIME_NONE;
2344     }
2345   } else {
2346     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2347     /* we don't know what the next_in_seqnum should be, wait for the last
2348      * possible moment to push this buffer, maybe we get an earlier seqnum
2349      * while we wait */
2350     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2351     do_next_seqnum = TRUE;
2352     /* take rtptime and dts to calculate packet spacing */
2353     priv->ips_rtptime = rtptime;
2354     priv->ips_dts = dts;
2355   }
2356   if (do_next_seqnum) {
2357     priv->last_in_seqnum = seqnum;
2358     priv->last_in_dts = dts;
2359     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2360   }
2361
2362   /* let's check if this buffer is too late, we can only accept packets with
2363    * bigger seqnum than the one we last pushed. */
2364   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2365     gint gap;
2366
2367     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2368
2369     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2370     if (G_UNLIKELY (gap <= 0))
2371       goto too_late;
2372   }
2373
2374   /* let's drop oldest packet if the queue is already full and drop-on-latency
2375    * is set. We can only do this when there actually is a latency. When no
2376    * latency is set, we just pump it in the queue and let the other end push it
2377    * out as fast as possible. */
2378   if (priv->latency_ms && priv->drop_on_latency) {
2379     latency_ts =
2380         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2381
2382     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2383       RTPJitterBufferItem *old_item;
2384
2385       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2386
2387       if (IS_DROPABLE (old_item)) {
2388         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2389         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2390             old_item);
2391         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2392         free_item (old_item);
2393       }
2394       /* we might have removed some head buffers, signal the pushing thread to
2395        * see if it can push now */
2396       JBUF_SIGNAL_EVENT (priv);
2397     }
2398   }
2399
2400   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2401
2402   /* now insert the packet into the queue in sorted order. This function returns
2403    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2404    * have a duplicate. */
2405   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2406               &head, &percent)))
2407     goto duplicate;
2408
2409   /* update timers */
2410   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2411
2412   /* we had an unhandled SR, handle it now */
2413   if (priv->last_sr)
2414     do_handle_sync (jitterbuffer);
2415
2416   if (G_UNLIKELY (head)) {
2417     /* signal addition of new buffer when the _loop is waiting. */
2418     if (G_LIKELY (priv->active))
2419       JBUF_SIGNAL_EVENT (priv);
2420
2421     /* let's unschedule and unblock any waiting buffers. We only want to do this
2422      * when the head buffer changed */
2423     if (G_UNLIKELY (priv->clock_id)) {
2424       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2425       unschedule_current_timer (jitterbuffer);
2426     }
2427   }
2428
2429   GST_DEBUG_OBJECT (jitterbuffer,
2430       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2431       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2432
2433   msg = check_buffering_percent (jitterbuffer, percent);
2434
2435 finished:
2436   JBUF_UNLOCK (priv);
2437
2438   if (msg)
2439     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2440
2441   return ret;
2442
2443   /* ERRORS */
2444 invalid_buffer:
2445   {
2446     /* this is not fatal but should be filtered earlier */
2447     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2448         ("Received invalid RTP payload, dropping"));
2449     gst_buffer_unref (buffer);
2450     return GST_FLOW_OK;
2451   }
2452 no_clock_rate:
2453   {
2454     GST_WARNING_OBJECT (jitterbuffer,
2455         "No clock-rate in caps!, dropping buffer");
2456     gst_buffer_unref (buffer);
2457     goto finished;
2458   }
2459 out_flushing:
2460   {
2461     ret = priv->srcresult;
2462     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2463     gst_buffer_unref (buffer);
2464     goto finished;
2465   }
2466 have_eos:
2467   {
2468     ret = GST_FLOW_EOS;
2469     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2470     gst_buffer_unref (buffer);
2471     goto finished;
2472   }
2473 too_late:
2474   {
2475     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2476         " popped, dropping", seqnum, priv->last_popped_seqnum);
2477     priv->num_late++;
2478     gst_buffer_unref (buffer);
2479     goto finished;
2480   }
2481 duplicate:
2482   {
2483     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2484         seqnum);
2485     priv->num_duplicates++;
2486     free_item (item);
2487     goto finished;
2488   }
2489 gap_but_no_dts:
2490   {
2491     /* this is fatal as we can't compensate for gaps without DTS */
2492     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2493         ("Received packet without DTS after a gap"));
2494     gst_buffer_unref (buffer);
2495     ret = GST_FLOW_ERROR;
2496     goto finished;
2497   }
2498 }
2499
2500 static GstClockTime
2501 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2502 {
2503   guint64 ext_time, elapsed;
2504   guint32 rtp_time;
2505   GstRtpJitterBufferPrivate *priv;
2506
2507   priv = jitterbuffer->priv;
2508   rtp_time = item->rtptime;
2509
2510   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2511       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2512
2513   if (rtp_time < priv->ext_timestamp) {
2514     ext_time = priv->ext_timestamp;
2515   } else {
2516     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2517   }
2518
2519   if (ext_time > priv->clock_base)
2520     elapsed = ext_time - priv->clock_base;
2521   else
2522     elapsed = 0;
2523
2524   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2525   return elapsed;
2526 }
2527
2528 static void
2529 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2530     RTPJitterBufferItem * item)
2531 {
2532   guint64 total, elapsed, left, estimated;
2533   GstClockTime out_time;
2534   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2535
2536   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2537       || priv->clock_base == -1 || priv->clock_rate <= 0)
2538     return;
2539
2540   /* compute the elapsed time */
2541   elapsed = compute_elapsed (jitterbuffer, item);
2542
2543   /* do nothing if elapsed time doesn't increment */
2544   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2545     return;
2546
2547   priv->last_elapsed = elapsed;
2548
2549   /* this is the total time we need to play */
2550   total = priv->npt_stop - priv->npt_start;
2551   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2552       GST_TIME_ARGS (total));
2553
2554   /* this is how much time there is left */
2555   if (total > elapsed)
2556     left = total - elapsed;
2557   else
2558     left = 0;
2559
2560   /* if we have less time left that the size of the buffer, we will not
2561    * be able to keep it filled, disabled buffering then */
2562   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2563     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2564         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2565     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2566   }
2567
2568   /* this is the current time as running-time */
2569   out_time = item->dts;
2570
2571   if (elapsed > 0)
2572     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2573   else {
2574     /* if there is almost nothing left,
2575      * we may never advance enough to end up in the above case */
2576     if (total < GST_SECOND)
2577       estimated = GST_SECOND;
2578     else
2579       estimated = -1;
2580   }
2581   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2582       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2583
2584   if (estimated != -1 && priv->estimated_eos != estimated) {
2585     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2586     priv->estimated_eos = estimated;
2587   }
2588 }
2589
2590 /* take a buffer from the queue and push it */
2591 static GstFlowReturn
2592 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2593 {
2594   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2595   GstFlowReturn result = GST_FLOW_OK;
2596   RTPJitterBufferItem *item;
2597   GstBuffer *outbuf = NULL;
2598   GstEvent *outevent = NULL;
2599   GstQuery *outquery = NULL;
2600   GstClockTime dts, pts;
2601   gint percent = -1;
2602   gboolean do_push = TRUE;
2603   guint type;
2604   GstMessage *msg;
2605
2606   /* when we get here we are ready to pop and push the buffer */
2607   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2608   type = item->type;
2609
2610   switch (type) {
2611     case ITEM_TYPE_BUFFER:
2612
2613       /* we need to make writable to change the flags and timestamps */
2614       outbuf = gst_buffer_make_writable (item->data);
2615
2616       if (G_UNLIKELY (priv->discont)) {
2617         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2618          * into the jitterbuffer so we can modify now. */
2619         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2620         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2621         priv->discont = FALSE;
2622       }
2623       if (G_UNLIKELY (priv->ts_discont)) {
2624         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2625         priv->ts_discont = FALSE;
2626       }
2627
2628       dts =
2629           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2630       pts =
2631           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2632
2633       /* apply timestamp with offset to buffer now */
2634       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2635       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2636
2637       /* update the elapsed time when we need to check against the npt stop time. */
2638       update_estimated_eos (jitterbuffer, item);
2639
2640       priv->last_out_time = GST_BUFFER_PTS (outbuf);
2641       break;
2642     case ITEM_TYPE_LOST:
2643       priv->discont = TRUE;
2644       if (!priv->do_lost)
2645         do_push = FALSE;
2646       /* FALLTHROUGH */
2647     case ITEM_TYPE_EVENT:
2648       outevent = item->data;
2649       break;
2650     case ITEM_TYPE_QUERY:
2651       outquery = item->data;
2652       break;
2653   }
2654
2655   /* now we are ready to push the buffer. Save the seqnum and release the lock
2656    * so the other end can push stuff in the queue again. */
2657   if (seqnum != -1) {
2658     priv->last_popped_seqnum = seqnum;
2659     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2660   }
2661   msg = check_buffering_percent (jitterbuffer, percent);
2662   JBUF_UNLOCK (priv);
2663
2664   item->data = NULL;
2665   free_item (item);
2666
2667   if (msg)
2668     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2669
2670   switch (type) {
2671     case ITEM_TYPE_BUFFER:
2672       /* push buffer */
2673       GST_DEBUG_OBJECT (jitterbuffer,
2674           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2675           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2676           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2677       result = gst_pad_push (priv->srcpad, outbuf);
2678
2679       JBUF_LOCK_CHECK (priv, out_flushing);
2680       break;
2681     case ITEM_TYPE_LOST:
2682     case ITEM_TYPE_EVENT:
2683       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2684           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2685
2686       if (do_push)
2687         gst_pad_push_event (priv->srcpad, outevent);
2688       else
2689         gst_event_unref (outevent);
2690
2691       result = GST_FLOW_OK;
2692
2693       JBUF_LOCK_CHECK (priv, out_flushing);
2694       break;
2695     case ITEM_TYPE_QUERY:
2696     {
2697       gboolean res;
2698
2699       res = gst_pad_peer_query (priv->srcpad, outquery);
2700
2701       JBUF_LOCK_CHECK (priv, out_flushing);
2702       result = GST_FLOW_OK;
2703       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2704       JBUF_SIGNAL_QUERY (priv, res);
2705       break;
2706     }
2707   }
2708   return result;
2709
2710   /* ERRORS */
2711 out_flushing:
2712   {
2713     return priv->srcresult;
2714   }
2715 }
2716
2717 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2718
2719 /* Peek a buffer and compare the seqnum to the expected seqnum.
2720  * If all is fine, the buffer is pushed.
2721  * If something is wrong, we wait for some event
2722  */
2723 static GstFlowReturn
2724 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2725 {
2726   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2727   GstFlowReturn result = GST_FLOW_OK;
2728   RTPJitterBufferItem *item;
2729   guint seqnum;
2730   guint32 next_seqnum;
2731   gint gap;
2732
2733   /* only push buffers when PLAYING and active and not buffering */
2734   if (priv->blocked || !priv->active ||
2735       rtp_jitter_buffer_is_buffering (priv->jbuf))
2736     return GST_FLOW_WAIT;
2737
2738 again:
2739   /* peek a buffer, we're just looking at the sequence number.
2740    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2741    * wait for a timeout or something to change.
2742    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2743   item = rtp_jitter_buffer_peek (priv->jbuf);
2744   if (item == NULL)
2745     goto wait;
2746
2747   /* get the seqnum and the next expected seqnum */
2748   seqnum = item->seqnum;
2749   if (seqnum == -1)
2750     goto do_push;
2751
2752   next_seqnum = priv->next_seqnum;
2753
2754   /* get the gap between this and the previous packet. If we don't know the
2755    * previous packet seqnum assume no gap. */
2756   if (G_UNLIKELY (next_seqnum == -1)) {
2757     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2758     /* we don't know what the next_seqnum should be, the chain function should
2759      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2760      * fires, so wait for that */
2761     result = GST_FLOW_WAIT;
2762   } else {
2763     /* else calculate GAP */
2764     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2765
2766     if (G_LIKELY (gap == 0)) {
2767     do_push:
2768       /* no missing packet, pop and push */
2769       result = pop_and_push_next (jitterbuffer, seqnum);
2770     } else if (G_UNLIKELY (gap < 0)) {
2771       RTPJitterBufferItem *item;
2772       /* if we have a packet that we already pushed or considered dropped, pop it
2773        * off and get the next packet */
2774       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2775           seqnum, next_seqnum);
2776       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2777       free_item (item);
2778       goto again;
2779     } else {
2780       /* the chain function has scheduled timers to request retransmission or
2781        * when to consider the packet lost, wait for that */
2782       GST_DEBUG_OBJECT (jitterbuffer,
2783           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2784           next_seqnum, seqnum, gap);
2785       result = GST_FLOW_WAIT;
2786     }
2787   }
2788   return result;
2789
2790 wait:
2791   {
2792     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2793     if (priv->eos)
2794       result = GST_FLOW_EOS;
2795     else
2796       result = GST_FLOW_WAIT;
2797     return result;
2798   }
2799 }
2800
2801 static GstClockTime
2802 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2803 {
2804   GstClockTime rtx_retry_timeout;
2805   GstClockTime rtx_min_retry_timeout;
2806
2807   if (priv->rtx_retry_timeout == -1) {
2808     if (priv->avg_rtx_rtt == 0)
2809       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2810     else
2811       /* we want to ask for a retransmission after we waited for a
2812        * complete RTT and the additional jitter */
2813       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2814   } else {
2815     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2816   }
2817   /* make sure we don't retry too often. On very low latency networks,
2818    * the RTT and jitter can be very low. */
2819   if (priv->rtx_min_retry_timeout == -1) {
2820     rtx_min_retry_timeout = priv->packet_spacing;
2821   } else {
2822     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2823   }
2824   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2825
2826   return rtx_retry_timeout;
2827 }
2828
2829 static GstClockTime
2830 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2831     GstClockTime rtx_retry_timeout)
2832 {
2833   GstClockTime rtx_retry_period;
2834
2835   if (priv->rtx_retry_period == -1) {
2836     /* we retry up to the configured jitterbuffer size but leaving some
2837      * room for the retransmission to arrive in time */
2838     if (rtx_retry_timeout > priv->latency_ns) {
2839       rtx_retry_period = 0;
2840     } else {
2841       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2842     }
2843   } else {
2844     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2845   }
2846   return rtx_retry_period;
2847 }
2848
2849 /* the timeout for when we expected a packet expired */
2850 static gboolean
2851 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2852     GstClockTime now)
2853 {
2854   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2855   GstEvent *event;
2856   guint delay, delay_ms, avg_rtx_rtt_ms;
2857   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2858   GstClockTime rtx_retry_period;
2859   GstClockTime rtx_retry_timeout;
2860   GstClock *clock;
2861
2862   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2863       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2864
2865   rtx_retry_timeout = get_rtx_retry_timeout (priv);
2866   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2867
2868   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2869       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2870       GST_TIME_ARGS (rtx_retry_period));
2871
2872   delay = timer->rtx_delay + timer->rtx_retry;
2873
2874   delay_ms = GST_TIME_AS_MSECONDS (delay);
2875   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2876   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2877   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2878
2879   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2880       gst_structure_new ("GstRTPRetransmissionRequest",
2881           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2882           "running-time", G_TYPE_UINT64, timer->rtx_base,
2883           "delay", G_TYPE_UINT, delay_ms,
2884           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2885           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2886           "period", G_TYPE_UINT, rtx_retry_period_ms,
2887           "deadline", G_TYPE_UINT, priv->latency_ms,
2888           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2889           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2890
2891   priv->num_rtx_requests++;
2892   timer->num_rtx_retry++;
2893
2894   GST_OBJECT_LOCK (jitterbuffer);
2895   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2896     timer->rtx_last = gst_clock_get_time (clock);
2897     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2898   } else {
2899     timer->rtx_last = now;
2900   }
2901   GST_OBJECT_UNLOCK (jitterbuffer);
2902
2903   /* calculate the timeout for the next retransmission attempt */
2904   timer->rtx_retry += rtx_retry_timeout;
2905   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2906       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2907       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2908       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2909   if ((priv->rtx_max_retries != -1
2910           && timer->num_rtx_retry >= priv->rtx_max_retries)
2911       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
2912     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2913     /* too many retransmission request, we now convert the timer
2914      * to a lost timer, leave the num_rtx_retry as it is for stats */
2915     timer->type = TIMER_TYPE_LOST;
2916     timer->rtx_delay = 0;
2917     timer->rtx_retry = 0;
2918   }
2919   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2920       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2921
2922   JBUF_UNLOCK (priv);
2923   gst_pad_push_event (priv->sinkpad, event);
2924   JBUF_LOCK (priv);
2925
2926   return FALSE;
2927 }
2928
2929 /* a packet is lost */
2930 static gboolean
2931 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2932     GstClockTime now)
2933 {
2934   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2935   GstClockTime duration, timestamp;
2936   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
2937   gboolean late, head;
2938   GstEvent *event;
2939   RTPJitterBufferItem *item;
2940
2941   seqnum = timer->seqnum;
2942   timestamp = apply_offset (jitterbuffer, timer->timeout);
2943   duration = timer->duration;
2944   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2945     duration = priv->packet_spacing;
2946   lost_packets = MAX (timer->num, 1);
2947   late = timer->num > 0;
2948   num_rtx_retry = timer->num_rtx_retry;
2949
2950   /* we had a gap and thus we lost some packets. Create an event for this.  */
2951   if (lost_packets > 1)
2952     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2953         seqnum + lost_packets - 1);
2954   else
2955     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2956
2957   priv->num_late += lost_packets;
2958   priv->num_rtx_failed += num_rtx_retry;
2959
2960   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
2961
2962   /* we now only accept seqnum bigger than this */
2963   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
2964     priv->next_in_seqnum = next_in_seqnum;
2965
2966   /* create paket lost event */
2967   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2968       gst_structure_new ("GstRTPPacketLost",
2969           "seqnum", G_TYPE_UINT, (guint) seqnum,
2970           "timestamp", G_TYPE_UINT64, timestamp,
2971           "duration", G_TYPE_UINT64, duration,
2972           "late", G_TYPE_BOOLEAN, late,
2973           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2974
2975   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2976   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2977
2978   /* remove timer now */
2979   remove_timer (jitterbuffer, timer);
2980   if (head)
2981     JBUF_SIGNAL_EVENT (priv);
2982
2983   return TRUE;
2984 }
2985
2986 static gboolean
2987 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2988     GstClockTime now)
2989 {
2990   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2991
2992   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2993   remove_timer (jitterbuffer, timer);
2994   if (!priv->eos) {
2995     /* there was no EOS in the buffer, put one in there now */
2996     queue_event (jitterbuffer, gst_event_new_eos ());
2997   }
2998   JBUF_SIGNAL_EVENT (priv);
2999
3000   return TRUE;
3001 }
3002
3003 static gboolean
3004 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3005     GstClockTime now)
3006 {
3007   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3008
3009   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3010
3011   /* timer seqnum might have been obsoleted by caps seqnum-base,
3012    * only mess with current ongoing seqnum if still unknown */
3013   if (priv->next_seqnum == -1)
3014     priv->next_seqnum = timer->seqnum;
3015   remove_timer (jitterbuffer, timer);
3016   JBUF_SIGNAL_EVENT (priv);
3017
3018   return TRUE;
3019 }
3020
3021 static gboolean
3022 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3023     GstClockTime now)
3024 {
3025   gboolean removed = FALSE;
3026
3027   switch (timer->type) {
3028     case TIMER_TYPE_EXPECTED:
3029       removed = do_expected_timeout (jitterbuffer, timer, now);
3030       break;
3031     case TIMER_TYPE_LOST:
3032       removed = do_lost_timeout (jitterbuffer, timer, now);
3033       break;
3034     case TIMER_TYPE_DEADLINE:
3035       removed = do_deadline_timeout (jitterbuffer, timer, now);
3036       break;
3037     case TIMER_TYPE_EOS:
3038       removed = do_eos_timeout (jitterbuffer, timer, now);
3039       break;
3040   }
3041   return removed;
3042 }
3043
3044 /* called when we need to wait for the next timeout.
3045  *
3046  * We loop over the array of recorded timeouts and wait for the earliest one.
3047  * When it timed out, do the logic associated with the timer.
3048  *
3049  * If there are no timers, we wait on a gcond until something new happens.
3050  */
3051 static void
3052 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3053 {
3054   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3055   GstClockTime now = 0;
3056
3057   JBUF_LOCK (priv);
3058   while (priv->timer_running) {
3059     TimerData *timer = NULL;
3060     GstClockTime timer_timeout = -1;
3061     gint i, len;
3062
3063     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3064         GST_TIME_ARGS (now));
3065
3066     len = priv->timers->len;
3067     for (i = 0; i < len; i++) {
3068       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3069       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3070       gboolean save_best = FALSE;
3071
3072       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3073           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3074
3075       /* find the smallest timeout */
3076       if (timer == NULL) {
3077         save_best = TRUE;
3078       } else if (timer_timeout == -1) {
3079         /* we already have an immediate timeout, the new timer must be an
3080          * immediate timer with smaller seqnum to become the best */
3081         if (test_timeout == -1
3082             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3083                     timer->seqnum) > 0))
3084           save_best = TRUE;
3085       } else if (test_timeout == -1) {
3086         /* first immediate timer */
3087         save_best = TRUE;
3088       } else if (test_timeout < timer_timeout) {
3089         /* earlier timer */
3090         save_best = TRUE;
3091       } else if (test_timeout == timer_timeout
3092           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3093                   timer->seqnum) > 0)) {
3094         /* same timer, smaller seqnum */
3095         save_best = TRUE;
3096       }
3097       if (save_best) {
3098         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3099         timer = test;
3100         timer_timeout = test_timeout;
3101       }
3102     }
3103     if (timer && !priv->blocked) {
3104       GstClock *clock;
3105       GstClockTime sync_time;
3106       GstClockID id;
3107       GstClockReturn ret;
3108       GstClockTimeDiff clock_jitter;
3109
3110       if (timer_timeout == -1 || timer_timeout <= now) {
3111         do_timeout (jitterbuffer, timer, now);
3112         /* check here, do_timeout could have released the lock */
3113         if (!priv->timer_running)
3114           break;
3115         continue;
3116       }
3117
3118       GST_OBJECT_LOCK (jitterbuffer);
3119       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3120       if (!clock) {
3121         GST_OBJECT_UNLOCK (jitterbuffer);
3122         /* let's just push if there is no clock */
3123         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3124         now = timer_timeout;
3125         continue;
3126       }
3127
3128       /* prepare for sync against clock */
3129       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3130       /* add latency of peer to get input time */
3131       sync_time += priv->peer_latency;
3132
3133       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3134           " with sync time %" GST_TIME_FORMAT,
3135           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3136
3137       /* create an entry for the clock */
3138       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3139       priv->timer_timeout = timer_timeout;
3140       priv->timer_seqnum = timer->seqnum;
3141       GST_OBJECT_UNLOCK (jitterbuffer);
3142
3143       /* release the lock so that the other end can push stuff or unlock */
3144       JBUF_UNLOCK (priv);
3145
3146       ret = gst_clock_id_wait (id, &clock_jitter);
3147
3148       JBUF_LOCK (priv);
3149       if (!priv->timer_running) {
3150         gst_clock_id_unref (id);
3151         priv->clock_id = NULL;
3152         break;
3153       }
3154
3155       if (ret != GST_CLOCK_UNSCHEDULED) {
3156         now = timer_timeout + MAX (clock_jitter, 0);
3157         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3158             ret, priv->timer_seqnum, clock_jitter);
3159       } else {
3160         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3161       }
3162       /* and free the entry */
3163       gst_clock_id_unref (id);
3164       priv->clock_id = NULL;
3165     } else {
3166       /* no timers, wait for activity */
3167       JBUF_WAIT_TIMER (priv);
3168     }
3169   }
3170   JBUF_UNLOCK (priv);
3171
3172   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3173   return;
3174 }
3175
3176 /*
3177  * This funcion implements the main pushing loop on the source pad.
3178  *
3179  * It first tries to push as many buffers as possible. If there is a seqnum
3180  * mismatch, we wait for the next timeouts.
3181  */
3182 static void
3183 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3184 {
3185   GstRtpJitterBufferPrivate *priv;
3186   GstFlowReturn result = GST_FLOW_OK;
3187
3188   priv = jitterbuffer->priv;
3189
3190   JBUF_LOCK_CHECK (priv, flushing);
3191   do {
3192     result = handle_next_buffer (jitterbuffer);
3193     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3194       /* now wait for the next event */
3195       JBUF_WAIT_EVENT (priv, flushing);
3196       result = GST_FLOW_OK;
3197     }
3198   }
3199   while (result == GST_FLOW_OK);
3200   /* store result for upstream */
3201   priv->srcresult = result;
3202   /* if we get here we need to pause */
3203   goto pause;
3204
3205   /* ERRORS */
3206 flushing:
3207   {
3208     result = priv->srcresult;
3209     goto pause;
3210   }
3211 pause:
3212   {
3213     GstEvent *event;
3214
3215     JBUF_SIGNAL_QUERY (priv, FALSE);
3216     JBUF_UNLOCK (priv);
3217
3218     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3219         gst_flow_get_name (result));
3220     gst_pad_pause_task (priv->srcpad);
3221     if (result == GST_FLOW_EOS) {
3222       event = gst_event_new_eos ();
3223       gst_pad_push_event (priv->srcpad, event);
3224     }
3225     return;
3226   }
3227 }
3228
3229 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3230  * some sanity checks and then emit the handle-sync signal with the parameters.
3231  * This function must be called with the LOCK */
3232 static void
3233 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3234 {
3235   GstRtpJitterBufferPrivate *priv;
3236   guint64 base_rtptime, base_time;
3237   guint32 clock_rate;
3238   guint64 last_rtptime;
3239   guint64 clock_base;
3240   guint64 ext_rtptime, diff;
3241   gboolean valid = TRUE, keep = FALSE;
3242
3243   priv = jitterbuffer->priv;
3244
3245   /* get the last values from the jitterbuffer */
3246   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3247       &clock_rate, &last_rtptime);
3248
3249   clock_base = priv->clock_base;
3250   ext_rtptime = priv->ext_rtptime;
3251
3252   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3253       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3254       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3255       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3256
3257   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3258     /* we keep this SR packet for later. When we get a valid RTP packet the
3259      * above values will be set and we can try to use the SR packet */
3260     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3261     keep = TRUE;
3262   } else {
3263     /* we can't accept anything that happened before we did the last resync */
3264     if (base_rtptime > ext_rtptime) {
3265       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3266       valid = FALSE;
3267     } else {
3268       /* the SR RTP timestamp must be something close to what we last observed
3269        * in the jitterbuffer */
3270       if (ext_rtptime > last_rtptime) {
3271         /* check how far ahead it is to our RTP timestamps */
3272         diff = ext_rtptime - last_rtptime;
3273         /* if bigger than 1 second, we drop it */
3274         if (diff > clock_rate) {
3275           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3276           /* should drop this, but some RTSP servers end up with bogus
3277            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3278            * so still trigger rptbin sync but invalidate RTCP data
3279            * (sync might use other methods) */
3280           ext_rtptime = -1;
3281         }
3282         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3283             G_GUINT64_FORMAT, last_rtptime, diff);
3284       }
3285     }
3286   }
3287
3288   if (keep) {
3289     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3290   } else if (valid) {
3291     GstStructure *s;
3292
3293     s = gst_structure_new ("application/x-rtp-sync",
3294         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3295         "base-time", G_TYPE_UINT64, base_time,
3296         "clock-rate", G_TYPE_UINT, clock_rate,
3297         "clock-base", G_TYPE_UINT64, clock_base,
3298         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3299         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3300
3301     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3302     gst_buffer_replace (&priv->last_sr, NULL);
3303     JBUF_UNLOCK (priv);
3304     g_signal_emit (jitterbuffer,
3305         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3306     JBUF_LOCK (priv);
3307     gst_structure_free (s);
3308   } else {
3309     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3310     gst_buffer_replace (&priv->last_sr, NULL);
3311   }
3312 }
3313
3314 static GstFlowReturn
3315 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3316     GstBuffer * buffer)
3317 {
3318   GstRtpJitterBuffer *jitterbuffer;
3319   GstRtpJitterBufferPrivate *priv;
3320   GstFlowReturn ret = GST_FLOW_OK;
3321   guint32 ssrc;
3322   GstRTCPPacket packet;
3323   guint64 ext_rtptime;
3324   guint32 rtptime;
3325   GstRTCPBuffer rtcp = { NULL, };
3326
3327   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3328
3329   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3330     goto invalid_buffer;
3331
3332   priv = jitterbuffer->priv;
3333
3334   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3335
3336   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3337     goto empty_buffer;
3338
3339   /* first packet must be SR or RR or else the validate would have failed */
3340   switch (gst_rtcp_packet_get_type (&packet)) {
3341     case GST_RTCP_TYPE_SR:
3342       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3343           NULL, NULL);
3344       break;
3345     default:
3346       goto ignore_buffer;
3347   }
3348   gst_rtcp_buffer_unmap (&rtcp);
3349
3350   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3351
3352   JBUF_LOCK (priv);
3353   /* convert the RTP timestamp to our extended timestamp, using the same offset
3354    * we used in the jitterbuffer */
3355   ext_rtptime = priv->jbuf->ext_rtptime;
3356   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3357
3358   priv->ext_rtptime = ext_rtptime;
3359   gst_buffer_replace (&priv->last_sr, buffer);
3360
3361   do_handle_sync (jitterbuffer);
3362   JBUF_UNLOCK (priv);
3363
3364 done:
3365   gst_buffer_unref (buffer);
3366
3367   return ret;
3368
3369 invalid_buffer:
3370   {
3371     /* this is not fatal but should be filtered earlier */
3372     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3373         ("Received invalid RTCP payload, dropping"));
3374     ret = GST_FLOW_OK;
3375     goto done;
3376   }
3377 empty_buffer:
3378   {
3379     /* this is not fatal but should be filtered earlier */
3380     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3381         ("Received empty RTCP payload, dropping"));
3382     gst_rtcp_buffer_unmap (&rtcp);
3383     ret = GST_FLOW_OK;
3384     goto done;
3385   }
3386 ignore_buffer:
3387   {
3388     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3389     gst_rtcp_buffer_unmap (&rtcp);
3390     ret = GST_FLOW_OK;
3391     goto done;
3392   }
3393 }
3394
3395 static gboolean
3396 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3397     GstQuery * query)
3398 {
3399   gboolean res = FALSE;
3400   GstRtpJitterBuffer *jitterbuffer;
3401   GstRtpJitterBufferPrivate *priv;
3402
3403   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3404   priv = jitterbuffer->priv;
3405
3406   switch (GST_QUERY_TYPE (query)) {
3407     case GST_QUERY_CAPS:
3408     {
3409       GstCaps *filter, *caps;
3410
3411       gst_query_parse_caps (query, &filter);
3412       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3413       gst_query_set_caps_result (query, caps);
3414       gst_caps_unref (caps);
3415       res = TRUE;
3416       break;
3417     }
3418     default:
3419       if (GST_QUERY_IS_SERIALIZED (query)) {
3420         RTPJitterBufferItem *item;
3421         gboolean head;
3422
3423         JBUF_LOCK_CHECK (priv, out_flushing);
3424         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3425             RTP_JITTER_BUFFER_MODE_BUFFER) {
3426           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3427           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3428           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3429           if (head)
3430             JBUF_SIGNAL_EVENT (priv);
3431           JBUF_WAIT_QUERY (priv, out_flushing);
3432           res = priv->last_query;
3433         } else {
3434           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3435           res = FALSE;
3436         }
3437         JBUF_UNLOCK (priv);
3438       } else {
3439         res = gst_pad_query_default (pad, parent, query);
3440       }
3441       break;
3442   }
3443   return res;
3444   /* ERRORS */
3445 out_flushing:
3446   {
3447     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3448     JBUF_UNLOCK (priv);
3449     return FALSE;
3450   }
3451
3452 }
3453
3454 static gboolean
3455 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3456     GstQuery * query)
3457 {
3458   GstRtpJitterBuffer *jitterbuffer;
3459   GstRtpJitterBufferPrivate *priv;
3460   gboolean res = FALSE;
3461
3462   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3463   priv = jitterbuffer->priv;
3464
3465   switch (GST_QUERY_TYPE (query)) {
3466     case GST_QUERY_LATENCY:
3467     {
3468       /* We need to send the query upstream and add the returned latency to our
3469        * own */
3470       GstClockTime min_latency, max_latency;
3471       gboolean us_live;
3472       GstClockTime our_latency;
3473
3474       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3475         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3476
3477         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3478             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3479             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3480
3481         /* store this so that we can safely sync on the peer buffers. */
3482         JBUF_LOCK (priv);
3483         priv->peer_latency = min_latency;
3484         our_latency = priv->latency_ns;
3485         JBUF_UNLOCK (priv);
3486
3487         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3488             GST_TIME_ARGS (our_latency));
3489
3490         /* we add some latency but can buffer an infinite amount of time */
3491         min_latency += our_latency;
3492         max_latency = -1;
3493
3494         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3495             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3496             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3497
3498         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3499       }
3500       break;
3501     }
3502     case GST_QUERY_POSITION:
3503     {
3504       GstClockTime start, last_out;
3505       GstFormat fmt;
3506
3507       gst_query_parse_position (query, &fmt, NULL);
3508       if (fmt != GST_FORMAT_TIME) {
3509         res = gst_pad_query_default (pad, parent, query);
3510         break;
3511       }
3512
3513       JBUF_LOCK (priv);
3514       start = priv->npt_start;
3515       last_out = priv->last_out_time;
3516       JBUF_UNLOCK (priv);
3517
3518       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3519           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3520           GST_TIME_ARGS (last_out));
3521
3522       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3523         /* bring 0-based outgoing time to stream time */
3524         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3525         res = TRUE;
3526       } else {
3527         res = gst_pad_query_default (pad, parent, query);
3528       }
3529       break;
3530     }
3531     case GST_QUERY_CAPS:
3532     {
3533       GstCaps *filter, *caps;
3534
3535       gst_query_parse_caps (query, &filter);
3536       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3537       gst_query_set_caps_result (query, caps);
3538       gst_caps_unref (caps);
3539       res = TRUE;
3540       break;
3541     }
3542     default:
3543       res = gst_pad_query_default (pad, parent, query);
3544       break;
3545   }
3546
3547   return res;
3548 }
3549
3550 static void
3551 gst_rtp_jitter_buffer_set_property (GObject * object,
3552     guint prop_id, const GValue * value, GParamSpec * pspec)
3553 {
3554   GstRtpJitterBuffer *jitterbuffer;
3555   GstRtpJitterBufferPrivate *priv;
3556
3557   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3558   priv = jitterbuffer->priv;
3559
3560   switch (prop_id) {
3561     case PROP_LATENCY:
3562     {
3563       guint new_latency, old_latency;
3564
3565       new_latency = g_value_get_uint (value);
3566
3567       JBUF_LOCK (priv);
3568       old_latency = priv->latency_ms;
3569       priv->latency_ms = new_latency;
3570       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3571       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3572       JBUF_UNLOCK (priv);
3573
3574       /* post message if latency changed, this will inform the parent pipeline
3575        * that a latency reconfiguration is possible/needed. */
3576       if (new_latency != old_latency) {
3577         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3578             GST_TIME_ARGS (new_latency * GST_MSECOND));
3579
3580         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3581             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3582       }
3583       break;
3584     }
3585     case PROP_DROP_ON_LATENCY:
3586       JBUF_LOCK (priv);
3587       priv->drop_on_latency = g_value_get_boolean (value);
3588       JBUF_UNLOCK (priv);
3589       break;
3590     case PROP_TS_OFFSET:
3591       JBUF_LOCK (priv);
3592       priv->ts_offset = g_value_get_int64 (value);
3593       priv->ts_discont = TRUE;
3594       JBUF_UNLOCK (priv);
3595       break;
3596     case PROP_DO_LOST:
3597       JBUF_LOCK (priv);
3598       priv->do_lost = g_value_get_boolean (value);
3599       JBUF_UNLOCK (priv);
3600       break;
3601     case PROP_MODE:
3602       JBUF_LOCK (priv);
3603       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3604       JBUF_UNLOCK (priv);
3605       break;
3606     case PROP_DO_RETRANSMISSION:
3607       JBUF_LOCK (priv);
3608       priv->do_retransmission = g_value_get_boolean (value);
3609       JBUF_UNLOCK (priv);
3610       break;
3611     case PROP_RTX_DELAY:
3612       JBUF_LOCK (priv);
3613       priv->rtx_delay = g_value_get_int (value);
3614       JBUF_UNLOCK (priv);
3615       break;
3616     case PROP_RTX_MIN_DELAY:
3617       JBUF_LOCK (priv);
3618       priv->rtx_min_delay = g_value_get_uint (value);
3619       JBUF_UNLOCK (priv);
3620       break;
3621     case PROP_RTX_DELAY_REORDER:
3622       JBUF_LOCK (priv);
3623       priv->rtx_delay_reorder = g_value_get_int (value);
3624       JBUF_UNLOCK (priv);
3625       break;
3626     case PROP_RTX_RETRY_TIMEOUT:
3627       JBUF_LOCK (priv);
3628       priv->rtx_retry_timeout = g_value_get_int (value);
3629       JBUF_UNLOCK (priv);
3630       break;
3631     case PROP_RTX_MIN_RETRY_TIMEOUT:
3632       JBUF_LOCK (priv);
3633       priv->rtx_min_retry_timeout = g_value_get_int (value);
3634       JBUF_UNLOCK (priv);
3635       break;
3636     case PROP_RTX_RETRY_PERIOD:
3637       JBUF_LOCK (priv);
3638       priv->rtx_retry_period = g_value_get_int (value);
3639       JBUF_UNLOCK (priv);
3640       break;
3641     case PROP_RTX_MAX_RETRIES:
3642       JBUF_LOCK (priv);
3643       priv->rtx_max_retries = g_value_get_int (value);
3644       JBUF_UNLOCK (priv);
3645       break;
3646     default:
3647       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3648       break;
3649   }
3650 }
3651
3652 static void
3653 gst_rtp_jitter_buffer_get_property (GObject * object,
3654     guint prop_id, GValue * value, GParamSpec * pspec)
3655 {
3656   GstRtpJitterBuffer *jitterbuffer;
3657   GstRtpJitterBufferPrivate *priv;
3658
3659   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3660   priv = jitterbuffer->priv;
3661
3662   switch (prop_id) {
3663     case PROP_LATENCY:
3664       JBUF_LOCK (priv);
3665       g_value_set_uint (value, priv->latency_ms);
3666       JBUF_UNLOCK (priv);
3667       break;
3668     case PROP_DROP_ON_LATENCY:
3669       JBUF_LOCK (priv);
3670       g_value_set_boolean (value, priv->drop_on_latency);
3671       JBUF_UNLOCK (priv);
3672       break;
3673     case PROP_TS_OFFSET:
3674       JBUF_LOCK (priv);
3675       g_value_set_int64 (value, priv->ts_offset);
3676       JBUF_UNLOCK (priv);
3677       break;
3678     case PROP_DO_LOST:
3679       JBUF_LOCK (priv);
3680       g_value_set_boolean (value, priv->do_lost);
3681       JBUF_UNLOCK (priv);
3682       break;
3683     case PROP_MODE:
3684       JBUF_LOCK (priv);
3685       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3686       JBUF_UNLOCK (priv);
3687       break;
3688     case PROP_PERCENT:
3689     {
3690       gint percent;
3691
3692       JBUF_LOCK (priv);
3693       if (priv->srcresult != GST_FLOW_OK)
3694         percent = 100;
3695       else
3696         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3697
3698       g_value_set_int (value, percent);
3699       JBUF_UNLOCK (priv);
3700       break;
3701     }
3702     case PROP_DO_RETRANSMISSION:
3703       JBUF_LOCK (priv);
3704       g_value_set_boolean (value, priv->do_retransmission);
3705       JBUF_UNLOCK (priv);
3706       break;
3707     case PROP_RTX_DELAY:
3708       JBUF_LOCK (priv);
3709       g_value_set_int (value, priv->rtx_delay);
3710       JBUF_UNLOCK (priv);
3711       break;
3712     case PROP_RTX_MIN_DELAY:
3713       JBUF_LOCK (priv);
3714       g_value_set_uint (value, priv->rtx_min_delay);
3715       JBUF_UNLOCK (priv);
3716       break;
3717     case PROP_RTX_DELAY_REORDER:
3718       JBUF_LOCK (priv);
3719       g_value_set_int (value, priv->rtx_delay_reorder);
3720       JBUF_UNLOCK (priv);
3721       break;
3722     case PROP_RTX_RETRY_TIMEOUT:
3723       JBUF_LOCK (priv);
3724       g_value_set_int (value, priv->rtx_retry_timeout);
3725       JBUF_UNLOCK (priv);
3726       break;
3727     case PROP_RTX_MIN_RETRY_TIMEOUT:
3728       JBUF_LOCK (priv);
3729       g_value_set_int (value, priv->rtx_min_retry_timeout);
3730       JBUF_UNLOCK (priv);
3731       break;
3732     case PROP_RTX_RETRY_PERIOD:
3733       JBUF_LOCK (priv);
3734       g_value_set_int (value, priv->rtx_retry_period);
3735       JBUF_UNLOCK (priv);
3736       break;
3737     case PROP_RTX_MAX_RETRIES:
3738       JBUF_LOCK (priv);
3739       g_value_set_int (value, priv->rtx_max_retries);
3740       JBUF_UNLOCK (priv);
3741       break;
3742     case PROP_STATS:
3743       g_value_take_boxed (value,
3744           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3745       break;
3746     default:
3747       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3748       break;
3749   }
3750 }
3751
3752 static GstStructure *
3753 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3754 {
3755   GstStructure *s;
3756
3757   JBUF_LOCK (jbuf->priv);
3758   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3759       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3760       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3761       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3762       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3763   JBUF_UNLOCK (jbuf->priv);
3764
3765   return s;
3766 }