rtpjitterbuffer: Don't forget to unlock the mutex when receiving GAPs in TCP streams
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  */
95
96 #ifdef HAVE_CONFIG_H
97 #include "config.h"
98 #endif
99
100 #include <stdlib.h>
101 #include <string.h>
102 #include <gst/rtp/gstrtpbuffer.h>
103
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
107
108 #include <gst/glib-compat-private.h>
109
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
112
113 /* RTPJitterBuffer signals and args */
114 enum
115 {
116   SIGNAL_REQUEST_PT_MAP,
117   SIGNAL_CLEAR_PT_MAP,
118   SIGNAL_HANDLE_SYNC,
119   SIGNAL_ON_NPT_STOP,
120   SIGNAL_SET_ACTIVE,
121   LAST_SIGNAL
122 };
123
124 #define DEFAULT_LATENCY_MS          200
125 #define DEFAULT_DROP_ON_LATENCY     FALSE
126 #define DEFAULT_TS_OFFSET           0
127 #define DEFAULT_DO_LOST             FALSE
128 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT             0
130 #define DEFAULT_DO_RETRANSMISSION   FALSE
131 #define DEFAULT_RTX_DELAY           -1
132 #define DEFAULT_RTX_MIN_DELAY       0
133 #define DEFAULT_RTX_DELAY_REORDER   3
134 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
135 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
136 #define DEFAULT_RTX_RETRY_PERIOD    -1
137
138 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
139 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
140
141 enum
142 {
143   PROP_0,
144   PROP_LATENCY,
145   PROP_DROP_ON_LATENCY,
146   PROP_TS_OFFSET,
147   PROP_DO_LOST,
148   PROP_MODE,
149   PROP_PERCENT,
150   PROP_DO_RETRANSMISSION,
151   PROP_RTX_DELAY,
152   PROP_RTX_MIN_DELAY,
153   PROP_RTX_DELAY_REORDER,
154   PROP_RTX_RETRY_TIMEOUT,
155   PROP_RTX_MIN_RETRY_TIMEOUT,
156   PROP_RTX_RETRY_PERIOD,
157   PROP_STATS,
158   PROP_LAST
159 };
160
161 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
162
163 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
164   JBUF_LOCK (priv);                                   \
165   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
166     goto label;                                       \
167 } G_STMT_END
168 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
169
170 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
171   GST_DEBUG ("waiting timer");                            \
172   (priv)->waiting_timer = TRUE;                           \
173   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
174   (priv)->waiting_timer = FALSE;                          \
175   GST_DEBUG ("waiting timer done");                       \
176 } G_STMT_END
177 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
178   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
179     GST_DEBUG ("signal timer");                           \
180     g_cond_signal (&(priv)->jbuf_timer);                  \
181   }                                                       \
182 } G_STMT_END
183
184 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
185   GST_DEBUG ("waiting event");                           \
186   (priv)->waiting_event = TRUE;                          \
187   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
188   (priv)->waiting_event = FALSE;                         \
189   GST_DEBUG ("waiting event done");                      \
190   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
191     goto label;                                          \
192 } G_STMT_END
193 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
194   if (G_UNLIKELY ((priv)->waiting_event)) {              \
195     GST_DEBUG ("signal event");                          \
196     g_cond_signal (&(priv)->jbuf_event);                 \
197   }                                                      \
198 } G_STMT_END
199
200 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
201   GST_DEBUG ("waiting query");                           \
202   (priv)->waiting_query = TRUE;                          \
203   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
204   (priv)->waiting_query = FALSE;                         \
205   GST_DEBUG ("waiting query done");                      \
206   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
207     goto label;                                          \
208 } G_STMT_END
209 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
210   (priv)->last_query = res;                              \
211   if (G_UNLIKELY ((priv)->waiting_query)) {              \
212     GST_DEBUG ("signal query");                          \
213     g_cond_signal (&(priv)->jbuf_query);                 \
214   }                                                      \
215 } G_STMT_END
216
217
218 struct _GstRtpJitterBufferPrivate
219 {
220   GstPad *sinkpad, *srcpad;
221   GstPad *rtcpsinkpad;
222
223   RTPJitterBuffer *jbuf;
224   GMutex jbuf_lock;
225   gboolean waiting_timer;
226   GCond jbuf_timer;
227   gboolean waiting_event;
228   GCond jbuf_event;
229   gboolean waiting_query;
230   GCond jbuf_query;
231   gboolean last_query;
232   gboolean discont;
233   gboolean ts_discont;
234   gboolean active;
235   guint64 out_offset;
236
237   gboolean timer_running;
238   GThread *timer_thread;
239
240   /* properties */
241   guint latency_ms;
242   guint64 latency_ns;
243   gboolean drop_on_latency;
244   gint64 ts_offset;
245   gboolean do_lost;
246   gboolean do_retransmission;
247   gint rtx_delay;
248   guint rtx_min_delay;
249   gint rtx_delay_reorder;
250   gint rtx_retry_timeout;
251   gint rtx_min_retry_timeout;
252   gint rtx_retry_period;
253
254   /* the last seqnum we pushed out */
255   guint32 last_popped_seqnum;
256   /* the next expected seqnum we push */
257   guint32 next_seqnum;
258   /* last output time */
259   GstClockTime last_out_time;
260   /* last valid input timestamp and rtptime pair */
261   GstClockTime ips_dts;
262   guint64 ips_rtptime;
263   GstClockTime packet_spacing;
264
265   /* the next expected seqnum we receive */
266   GstClockTime last_in_dts;
267   guint32 last_in_seqnum;
268   guint32 next_in_seqnum;
269
270   GArray *timers;
271
272   /* start and stop ranges */
273   GstClockTime npt_start;
274   GstClockTime npt_stop;
275   guint64 ext_timestamp;
276   guint64 last_elapsed;
277   guint64 estimated_eos;
278   GstClockID eos_id;
279
280   /* state */
281   gboolean eos;
282   guint last_percent;
283
284   /* clock rate and rtp timestamp offset */
285   gint last_pt;
286   gint32 clock_rate;
287   gint64 clock_base;
288   gint64 prev_ts_offset;
289
290   /* when we are shutting down */
291   GstFlowReturn srcresult;
292   gboolean blocked;
293
294   /* for sync */
295   GstSegment segment;
296   GstClockID clock_id;
297   GstClockTime timer_timeout;
298   guint16 timer_seqnum;
299   /* the latency of the upstream peer, we have to take this into account when
300    * synchronizing the buffers. */
301   GstClockTime peer_latency;
302   guint64 ext_rtptime;
303   GstBuffer *last_sr;
304
305   /* some accounting */
306   guint64 num_late;
307   guint64 num_duplicates;
308   guint64 num_rtx_requests;
309   guint64 num_rtx_success;
310   guint64 num_rtx_failed;
311   gdouble avg_rtx_num;
312   guint64 avg_rtx_rtt;
313
314   /* for the jitter */
315   GstClockTime last_dts;
316   guint64 last_rtptime;
317   GstClockTime avg_jitter;
318 };
319
320 typedef enum
321 {
322   TIMER_TYPE_EXPECTED,
323   TIMER_TYPE_LOST,
324   TIMER_TYPE_DEADLINE,
325   TIMER_TYPE_EOS
326 } TimerType;
327
328 typedef struct
329 {
330   guint idx;
331   guint16 seqnum;
332   guint num;
333   TimerType type;
334   GstClockTime timeout;
335   GstClockTime duration;
336   GstClockTime rtx_base;
337   GstClockTime rtx_delay;
338   GstClockTime rtx_retry;
339   GstClockTime rtx_last;
340   guint num_rtx_retry;
341 } TimerData;
342
343 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
344   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
345                                 GstRtpJitterBufferPrivate))
346
347 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
348 GST_STATIC_PAD_TEMPLATE ("sink",
349     GST_PAD_SINK,
350     GST_PAD_ALWAYS,
351     GST_STATIC_CAPS ("application/x-rtp"
352         /* "clock-rate = (int) [ 1, 2147483647 ], "
353          * "payload = (int) , "
354          * "encoding-name = (string) "
355          */ )
356     );
357
358 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
359 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
360     GST_PAD_SINK,
361     GST_PAD_REQUEST,
362     GST_STATIC_CAPS ("application/x-rtcp")
363     );
364
365 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
366 GST_STATIC_PAD_TEMPLATE ("src",
367     GST_PAD_SRC,
368     GST_PAD_ALWAYS,
369     GST_STATIC_CAPS ("application/x-rtp"
370         /* "payload = (int) , "
371          * "clock-rate = (int) , "
372          * "encoding-name = (string) "
373          */ )
374     );
375
376 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
377
378 #define gst_rtp_jitter_buffer_parent_class parent_class
379 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
380
381 /* object overrides */
382 static void gst_rtp_jitter_buffer_set_property (GObject * object,
383     guint prop_id, const GValue * value, GParamSpec * pspec);
384 static void gst_rtp_jitter_buffer_get_property (GObject * object,
385     guint prop_id, GValue * value, GParamSpec * pspec);
386 static void gst_rtp_jitter_buffer_finalize (GObject * object);
387
388 /* element overrides */
389 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
390     * element, GstStateChange transition);
391 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
392     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
393 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
394     GstPad * pad);
395 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
396
397 /* pad overrides */
398 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
399 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
400     GstObject * parent);
401
402 /* sinkpad overrides */
403 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
404     GstObject * parent, GstEvent * event);
405 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
406     GstObject * parent, GstBuffer * buffer);
407
408 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
409     GstObject * parent, GstEvent * event);
410 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
411     GstObject * parent, GstBuffer * buffer);
412
413 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
414     GstObject * parent, GstQuery * query);
415
416 /* srcpad overrides */
417 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
418     GstObject * parent, GstEvent * event);
419 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
420     GstObject * parent, GstPadMode mode, gboolean active);
421 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
422 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
423     GstObject * parent, GstQuery * query);
424
425 static void
426 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
427 static GstClockTime
428 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
429     gboolean active, guint64 base_time);
430 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
431
432 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
433 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
434
435 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
436
437 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
438     jitterbuffer);
439
440 static void
441 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
442 {
443   GObjectClass *gobject_class;
444   GstElementClass *gstelement_class;
445
446   gobject_class = (GObjectClass *) klass;
447   gstelement_class = (GstElementClass *) klass;
448
449   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
450
451   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
452
453   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
454   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
455
456   /**
457    * GstRtpJitterBuffer:latency:
458    *
459    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
460    * for at most this time.
461    */
462   g_object_class_install_property (gobject_class, PROP_LATENCY,
463       g_param_spec_uint ("latency", "Buffer latency in ms",
464           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
465           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
466   /**
467    * GstRtpJitterBuffer:drop-on-latency:
468    *
469    * Drop oldest buffers when the queue is completely filled.
470    */
471   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
472       g_param_spec_boolean ("drop-on-latency",
473           "Drop buffers when maximum latency is reached",
474           "Tells the jitterbuffer to never exceed the given latency in size",
475           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
476   /**
477    * GstRtpJitterBuffer:ts-offset:
478    *
479    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
480    * This is mainly used to ensure interstream synchronisation.
481    */
482   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
483       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
484           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
485           G_MAXINT64, DEFAULT_TS_OFFSET,
486           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
487
488   /**
489    * GstRtpJitterBuffer:do-lost:
490    *
491    * Send out a GstRTPPacketLost event downstream when a packet is considered
492    * lost.
493    */
494   g_object_class_install_property (gobject_class, PROP_DO_LOST,
495       g_param_spec_boolean ("do-lost", "Do Lost",
496           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
497           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
498
499   /**
500    * GstRtpJitterBuffer:mode:
501    *
502    * Control the buffering and timestamping mode used by the jitterbuffer.
503    */
504   g_object_class_install_property (gobject_class, PROP_MODE,
505       g_param_spec_enum ("mode", "Mode",
506           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
507           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
508   /**
509    * GstRtpJitterBuffer:percent:
510    *
511    * The percent of the jitterbuffer that is filled.
512    */
513   g_object_class_install_property (gobject_class, PROP_PERCENT,
514       g_param_spec_int ("percent", "percent",
515           "The buffer filled percent", 0, 100,
516           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
517   /**
518    * GstRtpJitterBuffer:do-retransmission:
519    *
520    * Send out a GstRTPRetransmission event upstream when a packet is considered
521    * late and should be retransmitted.
522    *
523    * Since: 1.2
524    */
525   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
526       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
527           "Send retransmission events upstream when a packet is late",
528           DEFAULT_DO_RETRANSMISSION,
529           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
530
531   /**
532    * GstRtpJitterBuffer:rtx-delay:
533    *
534    * When a packet did not arrive at the expected time, wait this extra amount
535    * of time before sending a retransmission event.
536    *
537    * When -1 is used, the max jitter will be used as extra delay.
538    *
539    * Since: 1.2
540    */
541   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
542       g_param_spec_int ("rtx-delay", "RTX Delay",
543           "Extra time in ms to wait before sending retransmission "
544           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
545           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
546
547   /**
548    * GstRtpJitterBuffer:rtx-min-delay:
549    *
550    * When a packet did not arrive at the expected time, wait at least this extra amount
551    * of time before sending a retransmission event.
552    *
553    * Since: 1.6
554    */
555   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
556       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
557           "Minimum time in ms to wait before sending retransmission "
558           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
559           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
560   /**
561    * GstRtpJitterBuffer:rtx-delay-reorder:
562    *
563    * Assume that a retransmission event should be sent when we see
564    * this much packet reordering.
565    *
566    * When -1 is used, the value will be estimated based on observed packet
567    * reordering.
568    *
569    * Since: 1.2
570    */
571   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
572       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
573           "Sending retransmission event when this much reordering (-1 automatic)",
574           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
575           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
576   /**
577    * GstRtpJitterBuffer::rtx-retry-timeout:
578    *
579    * When no packet has been received after sending a retransmission event
580    * for this time, retry sending a retransmission event.
581    *
582    * When -1 is used, the value will be estimated based on observed round
583    * trip time.
584    *
585    * Since: 1.2
586    */
587   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
588       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
589           "Retry sending a transmission event after this timeout in "
590           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
591           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
592   /**
593    * GstRtpJitterBuffer::rtx-min-retry-timeout:
594    *
595    * The minimum amount of time between retry timeouts. When
596    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
597    * minimum interval between retry timeouts.
598    *
599    * When -1 is used, the value will be estimated based on the
600    * packet spacing.
601    *
602    * Since: 1.6
603    */
604   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
605       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
606           "Minimum timeout between sending a transmission event in "
607           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
608           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
609   /**
610    * GstRtpJitterBuffer:rtx-retry-period:
611    *
612    * The amount of time to try to get a retransmission.
613    *
614    * When -1 is used, the value will be estimated based on the jitterbuffer
615    * latency and the observed round trip time.
616    *
617    * Since: 1.2
618    */
619   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
620       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
621           "Try to get a retransmission for this many ms "
622           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
623           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
624   /**
625    * GstRtpJitterBuffer:stats:
626    *
627    * Various jitterbuffer statistics. This property returns a GstStructure
628    * with name application/x-rtp-jitterbuffer-stats with the following fields:
629    *
630    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
631    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
632    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
633    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
634    *
635    * Since: 1.4
636    */
637   g_object_class_install_property (gobject_class, PROP_STATS,
638       g_param_spec_boxed ("stats", "Statistics",
639           "Various statistics", GST_TYPE_STRUCTURE,
640           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
641
642   /**
643    * GstRtpJitterBuffer::request-pt-map:
644    * @buffer: the object which received the signal
645    * @pt: the pt
646    *
647    * Request the payload type as #GstCaps for @pt.
648    */
649   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
650       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
651       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
652           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
653       GST_TYPE_CAPS, 1, G_TYPE_UINT);
654   /**
655    * GstRtpJitterBuffer::handle-sync:
656    * @buffer: the object which received the signal
657    * @struct: a GstStructure containing sync values.
658    *
659    * Be notified of new sync values.
660    */
661   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
662       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
663       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
664           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
665       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
666
667   /**
668    * GstRtpJitterBuffer::on-npt-stop:
669    * @buffer: the object which received the signal
670    *
671    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
672    * the npt-stop position.
673    */
674   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
675       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
676       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
677           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
678       G_TYPE_NONE, 0, G_TYPE_NONE);
679
680   /**
681    * GstRtpJitterBuffer::clear-pt-map:
682    * @buffer: the object which received the signal
683    *
684    * Invalidate the clock-rate as obtained with the
685    * #GstRtpJitterBuffer::request-pt-map signal.
686    */
687   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
688       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
689       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
690       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
691       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
692
693   /**
694    * GstRtpJitterBuffer::set-active:
695    * @buffer: the object which received the signal
696    *
697    * Start pushing out packets with the given base time. This signal is only
698    * useful in buffering mode.
699    *
700    * Returns: the time of the last pushed packet.
701    */
702   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
703       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
704       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
705       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
706       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
707       G_TYPE_UINT64);
708
709   gstelement_class->change_state =
710       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
711   gstelement_class->request_new_pad =
712       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
713   gstelement_class->release_pad =
714       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
715   gstelement_class->provide_clock =
716       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
717
718   gst_element_class_add_pad_template (gstelement_class,
719       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
720   gst_element_class_add_pad_template (gstelement_class,
721       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
722   gst_element_class_add_pad_template (gstelement_class,
723       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
724
725   gst_element_class_set_static_metadata (gstelement_class,
726       "RTP packet jitter-buffer", "Filter/Network/RTP",
727       "A buffer that deals with network jitter and other transmission faults",
728       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
729       "Wim Taymans <wim.taymans@gmail.com>");
730
731   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
732   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
733
734   GST_DEBUG_CATEGORY_INIT
735       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
736 }
737
738 static void
739 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
740 {
741   GstRtpJitterBufferPrivate *priv;
742
743   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
744   jitterbuffer->priv = priv;
745
746   priv->latency_ms = DEFAULT_LATENCY_MS;
747   priv->latency_ns = priv->latency_ms * GST_MSECOND;
748   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
749   priv->do_lost = DEFAULT_DO_LOST;
750   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
751   priv->rtx_delay = DEFAULT_RTX_DELAY;
752   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
753   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
754   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
755   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
756   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
757
758   priv->last_dts = -1;
759   priv->last_rtptime = -1;
760   priv->avg_jitter = 0;
761   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
762   priv->jbuf = rtp_jitter_buffer_new ();
763   g_mutex_init (&priv->jbuf_lock);
764   g_cond_init (&priv->jbuf_timer);
765   g_cond_init (&priv->jbuf_event);
766   g_cond_init (&priv->jbuf_query);
767
768   /* reset skew detection initialy */
769   rtp_jitter_buffer_reset_skew (priv->jbuf);
770   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
771   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
772   priv->active = TRUE;
773
774   priv->srcpad =
775       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
776       "src");
777
778   gst_pad_set_activatemode_function (priv->srcpad,
779       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
780   gst_pad_set_query_function (priv->srcpad,
781       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
782   gst_pad_set_event_function (priv->srcpad,
783       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
784
785   priv->sinkpad =
786       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
787       "sink");
788
789   gst_pad_set_chain_function (priv->sinkpad,
790       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
791   gst_pad_set_event_function (priv->sinkpad,
792       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
793   gst_pad_set_query_function (priv->sinkpad,
794       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
795
796   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
797   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
798
799   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
800 }
801
802 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
803
804 #define ITEM_TYPE_BUFFER        0
805 #define ITEM_TYPE_LOST          1
806 #define ITEM_TYPE_EVENT         2
807 #define ITEM_TYPE_QUERY         3
808
809 static RTPJitterBufferItem *
810 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
811     guint seqnum, guint count, guint rtptime)
812 {
813   RTPJitterBufferItem *item;
814
815   item = g_slice_new (RTPJitterBufferItem);
816   item->data = data;
817   item->next = NULL;
818   item->prev = NULL;
819   item->type = type;
820   item->dts = dts;
821   item->pts = pts;
822   item->seqnum = seqnum;
823   item->count = count;
824   item->rtptime = rtptime;
825
826   return item;
827 }
828
829 static void
830 free_item (RTPJitterBufferItem * item)
831 {
832   if (item->data && item->type != ITEM_TYPE_QUERY)
833     gst_mini_object_unref (item->data);
834   g_slice_free (RTPJitterBufferItem, item);
835 }
836
837 static void
838 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
839 {
840   GList **l = user_data;
841
842   if (item->data && item->type == ITEM_TYPE_EVENT
843       && GST_EVENT_IS_STICKY (item->data)) {
844     *l = g_list_prepend (*l, item->data);
845   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
846     gst_mini_object_unref (item->data);
847   }
848   g_slice_free (RTPJitterBufferItem, item);
849 }
850
851 static void
852 gst_rtp_jitter_buffer_finalize (GObject * object)
853 {
854   GstRtpJitterBuffer *jitterbuffer;
855   GstRtpJitterBufferPrivate *priv;
856
857   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
858   priv = jitterbuffer->priv;
859
860   g_array_free (priv->timers, TRUE);
861   g_mutex_clear (&priv->jbuf_lock);
862   g_cond_clear (&priv->jbuf_timer);
863   g_cond_clear (&priv->jbuf_event);
864   g_cond_clear (&priv->jbuf_query);
865
866   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
867   g_object_unref (priv->jbuf);
868
869   G_OBJECT_CLASS (parent_class)->finalize (object);
870 }
871
872 static GstIterator *
873 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
874 {
875   GstRtpJitterBuffer *jitterbuffer;
876   GstPad *otherpad = NULL;
877   GstIterator *it = NULL;
878   GValue val = { 0, };
879
880   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
881
882   if (pad == jitterbuffer->priv->sinkpad) {
883     otherpad = jitterbuffer->priv->srcpad;
884   } else if (pad == jitterbuffer->priv->srcpad) {
885     otherpad = jitterbuffer->priv->sinkpad;
886   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
887     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
888   }
889
890   if (it == NULL) {
891     g_value_init (&val, GST_TYPE_PAD);
892     g_value_set_object (&val, otherpad);
893     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
894     g_value_unset (&val);
895   }
896
897   return it;
898 }
899
900 static GstPad *
901 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
902 {
903   GstRtpJitterBufferPrivate *priv;
904
905   priv = jitterbuffer->priv;
906
907   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
908
909   priv->rtcpsinkpad =
910       gst_pad_new_from_static_template
911       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
912   gst_pad_set_chain_function (priv->rtcpsinkpad,
913       gst_rtp_jitter_buffer_chain_rtcp);
914   gst_pad_set_event_function (priv->rtcpsinkpad,
915       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
916   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
917       gst_rtp_jitter_buffer_iterate_internal_links);
918   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
919   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
920
921   return priv->rtcpsinkpad;
922 }
923
924 static void
925 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
926 {
927   GstRtpJitterBufferPrivate *priv;
928
929   priv = jitterbuffer->priv;
930
931   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
932
933   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
934
935   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
936   priv->rtcpsinkpad = NULL;
937 }
938
939 static GstPad *
940 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
941     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
942 {
943   GstRtpJitterBuffer *jitterbuffer;
944   GstElementClass *klass;
945   GstPad *result;
946   GstRtpJitterBufferPrivate *priv;
947
948   g_return_val_if_fail (templ != NULL, NULL);
949   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
950
951   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
952   priv = jitterbuffer->priv;
953   klass = GST_ELEMENT_GET_CLASS (element);
954
955   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
956
957   /* figure out the template */
958   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
959     if (priv->rtcpsinkpad != NULL)
960       goto exists;
961
962     result = create_rtcp_sink (jitterbuffer);
963   } else
964     goto wrong_template;
965
966   return result;
967
968   /* ERRORS */
969 wrong_template:
970   {
971     g_warning ("rtpjitterbuffer: this is not our template");
972     return NULL;
973   }
974 exists:
975   {
976     g_warning ("rtpjitterbuffer: pad already requested");
977     return NULL;
978   }
979 }
980
981 static void
982 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
983 {
984   GstRtpJitterBuffer *jitterbuffer;
985   GstRtpJitterBufferPrivate *priv;
986
987   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
988   g_return_if_fail (GST_IS_PAD (pad));
989
990   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
991   priv = jitterbuffer->priv;
992
993   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
994
995   if (priv->rtcpsinkpad == pad) {
996     remove_rtcp_sink (jitterbuffer);
997   } else
998     goto wrong_pad;
999
1000   return;
1001
1002   /* ERRORS */
1003 wrong_pad:
1004   {
1005     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1006     return;
1007   }
1008 }
1009
1010 static GstClock *
1011 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1012 {
1013   return gst_system_clock_obtain ();
1014 }
1015
1016 static void
1017 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1018 {
1019   GstRtpJitterBufferPrivate *priv;
1020
1021   priv = jitterbuffer->priv;
1022
1023   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1024
1025   JBUF_LOCK (priv);
1026   priv->clock_rate = -1;
1027   /* do not clear current content, but refresh state for new arrival */
1028   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1029   rtp_jitter_buffer_reset_skew (priv->jbuf);
1030   JBUF_UNLOCK (priv);
1031 }
1032
1033 static GstClockTime
1034 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1035     guint64 offset)
1036 {
1037   GstRtpJitterBufferPrivate *priv;
1038   GstClockTime last_out;
1039   RTPJitterBufferItem *item;
1040
1041   priv = jbuf->priv;
1042
1043   JBUF_LOCK (priv);
1044   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1045       active, GST_TIME_ARGS (offset));
1046
1047   if (active != priv->active) {
1048     /* add the amount of time spent in paused to the output offset. All
1049      * outgoing buffers will have this offset applied to their timestamps in
1050      * order to make them arrive in time in the sink. */
1051     priv->out_offset = offset;
1052     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1053         GST_TIME_ARGS (priv->out_offset));
1054     priv->active = active;
1055     JBUF_SIGNAL_EVENT (priv);
1056   }
1057   if (!active) {
1058     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1059   }
1060   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1061     /* head buffer timestamp and offset gives our output time */
1062     last_out = item->dts + priv->ts_offset;
1063   } else {
1064     /* use last known time when the buffer is empty */
1065     last_out = priv->last_out_time;
1066   }
1067   JBUF_UNLOCK (priv);
1068
1069   return last_out;
1070 }
1071
1072 static GstCaps *
1073 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1074 {
1075   GstRtpJitterBuffer *jitterbuffer;
1076   GstRtpJitterBufferPrivate *priv;
1077   GstPad *other;
1078   GstCaps *caps;
1079   GstCaps *templ;
1080
1081   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1082   priv = jitterbuffer->priv;
1083
1084   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1085
1086   caps = gst_pad_peer_query_caps (other, filter);
1087
1088   templ = gst_pad_get_pad_template_caps (pad);
1089   if (caps == NULL) {
1090     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1091     caps = templ;
1092   } else {
1093     GstCaps *intersect;
1094
1095     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1096
1097     intersect = gst_caps_intersect (caps, templ);
1098     gst_caps_unref (caps);
1099     gst_caps_unref (templ);
1100
1101     caps = intersect;
1102   }
1103   gst_object_unref (jitterbuffer);
1104
1105   return caps;
1106 }
1107
1108 /*
1109  * Must be called with JBUF_LOCK held
1110  */
1111
1112 static gboolean
1113 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1114     GstCaps * caps)
1115 {
1116   GstRtpJitterBufferPrivate *priv;
1117   GstStructure *caps_struct;
1118   guint val;
1119   GstClockTime tval;
1120
1121   priv = jitterbuffer->priv;
1122
1123   /* first parse the caps */
1124   caps_struct = gst_caps_get_structure (caps, 0);
1125
1126   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1127
1128   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1129    * measure the amount of data in the buffer */
1130   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1131     goto error;
1132
1133   if (priv->clock_rate <= 0)
1134     goto wrong_rate;
1135
1136   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1137
1138   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1139
1140   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1141    * can use this to track the amount of time elapsed on the sender. */
1142   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1143     priv->clock_base = val;
1144   else
1145     priv->clock_base = -1;
1146
1147   priv->ext_timestamp = priv->clock_base;
1148
1149   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1150       priv->clock_base);
1151
1152   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1153     /* first expected seqnum, only update when we didn't have a previous base. */
1154     if (priv->next_in_seqnum == -1)
1155       priv->next_in_seqnum = val;
1156     if (priv->next_seqnum == -1) {
1157       priv->next_seqnum = val;
1158       JBUF_SIGNAL_EVENT (priv);
1159     }
1160   }
1161
1162   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1163
1164   /* the start and stop times. The seqnum-base corresponds to the start time. We
1165    * will keep track of the seqnums on the output and when we reach the one
1166    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1167   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1168     priv->npt_start = tval;
1169   else
1170     priv->npt_start = 0;
1171
1172   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1173     priv->npt_stop = tval;
1174   else
1175     priv->npt_stop = -1;
1176
1177   GST_DEBUG_OBJECT (jitterbuffer,
1178       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1179       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1180
1181   return TRUE;
1182
1183   /* ERRORS */
1184 error:
1185   {
1186     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1187     return FALSE;
1188   }
1189 wrong_rate:
1190   {
1191     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1192     return FALSE;
1193   }
1194 }
1195
1196 static void
1197 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1198 {
1199   GstRtpJitterBufferPrivate *priv;
1200
1201   priv = jitterbuffer->priv;
1202
1203   JBUF_LOCK (priv);
1204   /* mark ourselves as flushing */
1205   priv->srcresult = GST_FLOW_FLUSHING;
1206   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1207   /* this unblocks any waiting pops on the src pad task */
1208   JBUF_SIGNAL_EVENT (priv);
1209   JBUF_SIGNAL_QUERY (priv, FALSE);
1210   JBUF_UNLOCK (priv);
1211 }
1212
1213 static void
1214 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1215 {
1216   GstRtpJitterBufferPrivate *priv;
1217
1218   priv = jitterbuffer->priv;
1219
1220   JBUF_LOCK (priv);
1221   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1222   /* Mark as non flushing */
1223   priv->srcresult = GST_FLOW_OK;
1224   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1225   priv->last_popped_seqnum = -1;
1226   priv->last_out_time = -1;
1227   priv->next_seqnum = -1;
1228   priv->ips_rtptime = -1;
1229   priv->ips_dts = GST_CLOCK_TIME_NONE;
1230   priv->packet_spacing = 0;
1231   priv->next_in_seqnum = -1;
1232   priv->clock_rate = -1;
1233   priv->last_pt = -1;
1234   priv->eos = FALSE;
1235   priv->estimated_eos = -1;
1236   priv->last_elapsed = 0;
1237   priv->ext_timestamp = -1;
1238   priv->avg_jitter = 0;
1239   priv->last_dts = -1;
1240   priv->last_rtptime = -1;
1241   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1242   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1243   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1244   rtp_jitter_buffer_reset_skew (priv->jbuf);
1245   remove_all_timers (jitterbuffer);
1246   JBUF_UNLOCK (priv);
1247 }
1248
1249 static gboolean
1250 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1251     GstPadMode mode, gboolean active)
1252 {
1253   gboolean result;
1254   GstRtpJitterBuffer *jitterbuffer = NULL;
1255
1256   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1257
1258   switch (mode) {
1259     case GST_PAD_MODE_PUSH:
1260       if (active) {
1261         /* allow data processing */
1262         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1263
1264         /* start pushing out buffers */
1265         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1266         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1267             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1268       } else {
1269         /* make sure all data processing stops ASAP */
1270         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1271
1272         /* NOTE this will hardlock if the state change is called from the src pad
1273          * task thread because we will _join() the thread. */
1274         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1275         result = gst_pad_stop_task (pad);
1276       }
1277       break;
1278     default:
1279       result = FALSE;
1280       break;
1281   }
1282   return result;
1283 }
1284
1285 static GstStateChangeReturn
1286 gst_rtp_jitter_buffer_change_state (GstElement * element,
1287     GstStateChange transition)
1288 {
1289   GstRtpJitterBuffer *jitterbuffer;
1290   GstRtpJitterBufferPrivate *priv;
1291   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1292
1293   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1294   priv = jitterbuffer->priv;
1295
1296   switch (transition) {
1297     case GST_STATE_CHANGE_NULL_TO_READY:
1298       break;
1299     case GST_STATE_CHANGE_READY_TO_PAUSED:
1300       JBUF_LOCK (priv);
1301       /* reset negotiated values */
1302       priv->clock_rate = -1;
1303       priv->clock_base = -1;
1304       priv->peer_latency = 0;
1305       priv->last_pt = -1;
1306       /* block until we go to PLAYING */
1307       priv->blocked = TRUE;
1308       priv->timer_running = TRUE;
1309       priv->timer_thread =
1310           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1311       JBUF_UNLOCK (priv);
1312       break;
1313     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1314       JBUF_LOCK (priv);
1315       /* unblock to allow streaming in PLAYING */
1316       priv->blocked = FALSE;
1317       JBUF_SIGNAL_EVENT (priv);
1318       JBUF_SIGNAL_TIMER (priv);
1319       JBUF_UNLOCK (priv);
1320       break;
1321     default:
1322       break;
1323   }
1324
1325   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1326
1327   switch (transition) {
1328     case GST_STATE_CHANGE_READY_TO_PAUSED:
1329       /* we are a live element because we sync to the clock, which we can only
1330        * do in the PLAYING state */
1331       if (ret != GST_STATE_CHANGE_FAILURE)
1332         ret = GST_STATE_CHANGE_NO_PREROLL;
1333       break;
1334     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1335       JBUF_LOCK (priv);
1336       /* block to stop streaming when PAUSED */
1337       priv->blocked = TRUE;
1338       unschedule_current_timer (jitterbuffer);
1339       JBUF_UNLOCK (priv);
1340       if (ret != GST_STATE_CHANGE_FAILURE)
1341         ret = GST_STATE_CHANGE_NO_PREROLL;
1342       break;
1343     case GST_STATE_CHANGE_PAUSED_TO_READY:
1344       JBUF_LOCK (priv);
1345       gst_buffer_replace (&priv->last_sr, NULL);
1346       priv->timer_running = FALSE;
1347       unschedule_current_timer (jitterbuffer);
1348       JBUF_SIGNAL_TIMER (priv);
1349       JBUF_SIGNAL_QUERY (priv, FALSE);
1350       JBUF_UNLOCK (priv);
1351       g_thread_join (priv->timer_thread);
1352       priv->timer_thread = NULL;
1353       break;
1354     case GST_STATE_CHANGE_READY_TO_NULL:
1355       break;
1356     default:
1357       break;
1358   }
1359
1360   return ret;
1361 }
1362
1363 static gboolean
1364 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1365     GstEvent * event)
1366 {
1367   gboolean ret = TRUE;
1368   GstRtpJitterBuffer *jitterbuffer;
1369   GstRtpJitterBufferPrivate *priv;
1370
1371   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1372   priv = jitterbuffer->priv;
1373
1374   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1375
1376   switch (GST_EVENT_TYPE (event)) {
1377     case GST_EVENT_LATENCY:
1378     {
1379       GstClockTime latency;
1380
1381       gst_event_parse_latency (event, &latency);
1382
1383       GST_DEBUG_OBJECT (jitterbuffer,
1384           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1385
1386       JBUF_LOCK (priv);
1387       /* adjust the overall buffer delay to the total pipeline latency in
1388        * buffering mode because if downstream consumes too fast (because of
1389        * large latency or queues, we would start rebuffering again. */
1390       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1391           RTP_JITTER_BUFFER_MODE_BUFFER) {
1392         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1393       }
1394       JBUF_UNLOCK (priv);
1395
1396       ret = gst_pad_push_event (priv->sinkpad, event);
1397       break;
1398     }
1399     default:
1400       ret = gst_pad_push_event (priv->sinkpad, event);
1401       break;
1402   }
1403
1404   return ret;
1405 }
1406
1407 /* handles and stores the event in the jitterbuffer, must be called with
1408  * LOCK */
1409 static gboolean
1410 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1411 {
1412   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1413   RTPJitterBufferItem *item;
1414   gboolean head;
1415
1416   switch (GST_EVENT_TYPE (event)) {
1417     case GST_EVENT_CAPS:
1418     {
1419       GstCaps *caps;
1420
1421       gst_event_parse_caps (event, &caps);
1422       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1423       break;
1424     }
1425     case GST_EVENT_SEGMENT:
1426       gst_event_copy_segment (event, &priv->segment);
1427
1428       /* we need time for now */
1429       if (priv->segment.format != GST_FORMAT_TIME)
1430         goto newseg_wrong_format;
1431
1432       GST_DEBUG_OBJECT (jitterbuffer,
1433           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1434       break;
1435     case GST_EVENT_EOS:
1436       priv->eos = TRUE;
1437       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1438       break;
1439     default:
1440       break;
1441   }
1442
1443
1444   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1445   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1446   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1447   if (head)
1448     JBUF_SIGNAL_EVENT (priv);
1449
1450   return TRUE;
1451
1452   /* ERRORS */
1453 newseg_wrong_format:
1454   {
1455     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1456     gst_event_unref (event);
1457     return FALSE;
1458   }
1459 }
1460
1461 static gboolean
1462 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1463     GstEvent * event)
1464 {
1465   gboolean ret = TRUE;
1466   GstRtpJitterBuffer *jitterbuffer;
1467   GstRtpJitterBufferPrivate *priv;
1468
1469   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1470   priv = jitterbuffer->priv;
1471
1472   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1473
1474   switch (GST_EVENT_TYPE (event)) {
1475     case GST_EVENT_FLUSH_START:
1476       ret = gst_pad_push_event (priv->srcpad, event);
1477       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1478       /* wait for the loop to go into PAUSED */
1479       gst_pad_pause_task (priv->srcpad);
1480       break;
1481     case GST_EVENT_FLUSH_STOP:
1482       ret = gst_pad_push_event (priv->srcpad, event);
1483       ret =
1484           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1485           GST_PAD_MODE_PUSH, TRUE);
1486       break;
1487     default:
1488       if (GST_EVENT_IS_SERIALIZED (event)) {
1489         /* serialized events go in the queue */
1490         JBUF_LOCK (priv);
1491         if (priv->srcresult != GST_FLOW_OK) {
1492           /* Errors in sticky event pushing are no problem and ignored here
1493            * as they will cause more meaningful errors during data flow.
1494            * For EOS events, that are not followed by data flow, we still
1495            * return FALSE here though.
1496            */
1497           if (!GST_EVENT_IS_STICKY (event) ||
1498               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1499             goto out_flow_error;
1500         }
1501         /* refuse more events on EOS */
1502         if (priv->eos)
1503           goto out_eos;
1504         ret = queue_event (jitterbuffer, event);
1505         JBUF_UNLOCK (priv);
1506       } else {
1507         /* non-serialized events are forwarded downstream immediately */
1508         ret = gst_pad_push_event (priv->srcpad, event);
1509       }
1510       break;
1511   }
1512   return ret;
1513
1514   /* ERRORS */
1515 out_flow_error:
1516   {
1517     GST_DEBUG_OBJECT (jitterbuffer,
1518         "refusing event, we have a downstream flow error: %s",
1519         gst_flow_get_name (priv->srcresult));
1520     JBUF_UNLOCK (priv);
1521     gst_event_unref (event);
1522     return FALSE;
1523   }
1524 out_eos:
1525   {
1526     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1527     JBUF_UNLOCK (priv);
1528     gst_event_unref (event);
1529     return FALSE;
1530   }
1531 }
1532
1533 static gboolean
1534 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1535     GstEvent * event)
1536 {
1537   gboolean ret = TRUE;
1538   GstRtpJitterBuffer *jitterbuffer;
1539
1540   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1541
1542   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1543
1544   switch (GST_EVENT_TYPE (event)) {
1545     case GST_EVENT_FLUSH_START:
1546       gst_event_unref (event);
1547       break;
1548     case GST_EVENT_FLUSH_STOP:
1549       gst_event_unref (event);
1550       break;
1551     default:
1552       ret = gst_pad_event_default (pad, parent, event);
1553       break;
1554   }
1555
1556   return ret;
1557 }
1558
1559 /*
1560  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1561  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1562  * GST_FLOW_FLUSHING when the element is shutting down. On success
1563  * GST_FLOW_OK is returned.
1564  */
1565 static GstFlowReturn
1566 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1567     guint8 pt)
1568 {
1569   GValue ret = { 0 };
1570   GValue args[2] = { {0}, {0} };
1571   GstCaps *caps;
1572   gboolean res;
1573
1574   g_value_init (&args[0], GST_TYPE_ELEMENT);
1575   g_value_set_object (&args[0], jitterbuffer);
1576   g_value_init (&args[1], G_TYPE_UINT);
1577   g_value_set_uint (&args[1], pt);
1578
1579   g_value_init (&ret, GST_TYPE_CAPS);
1580   g_value_set_boxed (&ret, NULL);
1581
1582   JBUF_UNLOCK (jitterbuffer->priv);
1583   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1584       &ret);
1585   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1586
1587   g_value_unset (&args[0]);
1588   g_value_unset (&args[1]);
1589   caps = (GstCaps *) g_value_dup_boxed (&ret);
1590   g_value_unset (&ret);
1591   if (!caps)
1592     goto no_caps;
1593
1594   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1595   gst_caps_unref (caps);
1596
1597   if (G_UNLIKELY (!res))
1598     goto parse_failed;
1599
1600   return GST_FLOW_OK;
1601
1602   /* ERRORS */
1603 no_caps:
1604   {
1605     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1606     return GST_FLOW_ERROR;
1607   }
1608 out_flushing:
1609   {
1610     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1611     return GST_FLOW_FLUSHING;
1612   }
1613 parse_failed:
1614   {
1615     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1616     return GST_FLOW_ERROR;
1617   }
1618 }
1619
1620 /* call with jbuf lock held */
1621 static GstMessage *
1622 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1623 {
1624   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1625   GstMessage *message = NULL;
1626
1627   if (percent == -1)
1628     return NULL;
1629
1630   /* Post a buffering message */
1631   if (priv->last_percent != percent) {
1632     priv->last_percent = percent;
1633     message =
1634         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1635     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1636   }
1637
1638   return message;
1639 }
1640
1641 static GstClockTime
1642 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1643 {
1644   GstRtpJitterBufferPrivate *priv;
1645
1646   priv = jitterbuffer->priv;
1647
1648   if (timestamp == -1)
1649     return -1;
1650
1651   /* apply the timestamp offset, this is used for inter stream sync */
1652   timestamp += priv->ts_offset;
1653   /* add the offset, this is used when buffering */
1654   timestamp += priv->out_offset;
1655
1656   return timestamp;
1657 }
1658
1659 static TimerData *
1660 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1661 {
1662   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1663   TimerData *timer = NULL;
1664   gint i, len;
1665
1666   len = priv->timers->len;
1667   for (i = 0; i < len; i++) {
1668     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1669     if (test->seqnum == seqnum && test->type == type) {
1670       timer = test;
1671       break;
1672     }
1673   }
1674   return timer;
1675 }
1676
1677 static void
1678 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1679 {
1680   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1681
1682   if (priv->clock_id) {
1683     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1684     gst_clock_id_unschedule (priv->clock_id);
1685     priv->clock_id = NULL;
1686   }
1687 }
1688
1689 static GstClockTime
1690 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1691 {
1692   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1693   GstClockTime test_timeout;
1694
1695   if ((test_timeout = timer->timeout) == -1)
1696     return -1;
1697
1698   if (timer->type != TIMER_TYPE_EXPECTED) {
1699     /* add our latency and offset to get output times. */
1700     test_timeout = apply_offset (jitterbuffer, test_timeout);
1701     test_timeout += priv->latency_ns;
1702   }
1703   return test_timeout;
1704 }
1705
1706 static void
1707 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1708 {
1709   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1710
1711   if (priv->clock_id) {
1712     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1713
1714     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1715         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1716
1717     if (timeout == -1 || timeout < priv->timer_timeout)
1718       unschedule_current_timer (jitterbuffer);
1719   }
1720 }
1721
1722 static TimerData *
1723 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1724     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1725     GstClockTime duration)
1726 {
1727   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1728   TimerData *timer;
1729   gint len;
1730
1731   GST_DEBUG_OBJECT (jitterbuffer,
1732       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1733       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1734       GST_TIME_ARGS (delay));
1735
1736   len = priv->timers->len;
1737   g_array_set_size (priv->timers, len + 1);
1738   timer = &g_array_index (priv->timers, TimerData, len);
1739   timer->idx = len;
1740   timer->type = type;
1741   timer->seqnum = seqnum;
1742   timer->num = num;
1743   timer->timeout = timeout + delay;
1744   timer->duration = duration;
1745   if (type == TIMER_TYPE_EXPECTED) {
1746     timer->rtx_base = timeout;
1747     timer->rtx_delay = delay;
1748     timer->rtx_retry = 0;
1749   }
1750   timer->num_rtx_retry = 0;
1751   recalculate_timer (jitterbuffer, timer);
1752   JBUF_SIGNAL_TIMER (priv);
1753
1754   return timer;
1755 }
1756
1757 static void
1758 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1759     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1760 {
1761   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1762   gboolean seqchange, timechange;
1763   guint16 oldseq;
1764
1765   seqchange = timer->seqnum != seqnum;
1766   timechange = timer->timeout != timeout;
1767
1768   if (!seqchange && !timechange)
1769     return;
1770
1771   oldseq = timer->seqnum;
1772
1773   GST_DEBUG_OBJECT (jitterbuffer,
1774       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1775       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1776
1777   timer->timeout = timeout + delay;
1778   timer->seqnum = seqnum;
1779   if (reset) {
1780     timer->rtx_base = timeout;
1781     timer->rtx_delay = delay;
1782     timer->rtx_retry = 0;
1783   }
1784   if (seqchange)
1785     timer->num_rtx_retry = 0;
1786
1787   if (priv->clock_id) {
1788     /* we changed the seqnum and there is a timer currently waiting with this
1789      * seqnum, unschedule it */
1790     if (seqchange && priv->timer_seqnum == oldseq)
1791       unschedule_current_timer (jitterbuffer);
1792     /* we changed the time, check if it is earlier than what we are waiting
1793      * for and unschedule if so */
1794     else if (timechange)
1795       recalculate_timer (jitterbuffer, timer);
1796   }
1797 }
1798
1799 static TimerData *
1800 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1801     guint16 seqnum, GstClockTime timeout)
1802 {
1803   TimerData *timer;
1804
1805   /* find the seqnum timer */
1806   timer = find_timer (jitterbuffer, type, seqnum);
1807   if (timer == NULL) {
1808     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1809   } else {
1810     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1811   }
1812   return timer;
1813 }
1814
1815 static void
1816 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1817 {
1818   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1819   guint idx;
1820
1821   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1822     unschedule_current_timer (jitterbuffer);
1823
1824   idx = timer->idx;
1825   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1826   g_array_remove_index_fast (priv->timers, idx);
1827   timer->idx = idx;
1828 }
1829
1830 static void
1831 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1832 {
1833   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1834   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1835   g_array_set_size (priv->timers, 0);
1836   unschedule_current_timer (jitterbuffer);
1837 }
1838
1839 /* get the extra delay to wait before sending RTX */
1840 static GstClockTime
1841 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1842 {
1843   GstClockTime delay;
1844
1845   if (priv->rtx_delay == -1) {
1846     if (priv->avg_jitter == 0)
1847       delay = DEFAULT_AUTO_RTX_DELAY;
1848     else
1849       /* jitter is in nanoseconds, 2x jitter is a good margin */
1850       delay = priv->avg_jitter * 2;
1851   } else {
1852     delay = priv->rtx_delay * GST_MSECOND;
1853   }
1854   if (priv->rtx_min_delay > 0)
1855     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1856
1857   return delay;
1858 }
1859
1860 /* we just received a packet with seqnum and dts.
1861  *
1862  * First check for old seqnum that we are still expecting. If the gap with the
1863  * current seqnum is too big, unschedule the timeouts.
1864  *
1865  * If we have a valid packet spacing estimate we can set a timer for when we
1866  * should receive the next packet.
1867  * If we don't have a valid estimate, we remove any timer we might have
1868  * had for this packet.
1869  */
1870 static void
1871 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1872     GstClockTime dts, gboolean do_next_seqnum)
1873 {
1874   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1875   TimerData *timer = NULL;
1876   gint i, len;
1877
1878   /* go through all timers and unschedule the ones with a large gap, also find
1879    * the timer for the seqnum */
1880   len = priv->timers->len;
1881   for (i = 0; i < len; i++) {
1882     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1883     gint gap;
1884
1885     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1886
1887     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1888         test->type, test->seqnum, seqnum, gap);
1889
1890     if (gap == 0) {
1891       GST_DEBUG ("found timer for current seqnum");
1892       /* the timer for the current seqnum */
1893       timer = test;
1894       /* when no retransmission, we can stop now, we only need to find the
1895        * timer for the current seqnum */
1896       if (!priv->do_retransmission)
1897         break;
1898     } else if (gap > priv->rtx_delay_reorder) {
1899       /* max gap, we exceeded the max reorder distance and we don't expect the
1900        * missing packet to be this reordered */
1901       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1902         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1903     }
1904   }
1905
1906   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1907       && priv->do_retransmission;
1908
1909   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1910     if (timer->num_rtx_retry > 0) {
1911       GstClockTime rtx_last, delay;
1912
1913       /* we scheduled a retry for this packet and now we have it */
1914       priv->num_rtx_success++;
1915       /* all the previous retry attempts failed */
1916       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1917       /* number of retries before receiving the packet */
1918       if (priv->avg_rtx_num == 0.0)
1919         priv->avg_rtx_num = timer->num_rtx_retry;
1920       else
1921         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1922       /* calculate the delay between retransmission request and receiving this
1923        * packet, start with when we scheduled this timeout last */
1924       rtx_last = timer->rtx_last;
1925       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1926         /* we have a valid delay if this packet arrived after we scheduled the
1927          * request */
1928         delay = dts - rtx_last;
1929         if (priv->avg_rtx_rtt == 0)
1930           priv->avg_rtx_rtt = delay;
1931         else
1932           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1933       } else
1934         delay = 0;
1935
1936       GST_LOG_OBJECT (jitterbuffer,
1937           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1938           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1939           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
1940           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
1941           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
1942           GST_TIME_ARGS (priv->avg_rtx_rtt));
1943
1944       /* don't try to estimate the next seqnum because this is a retransmitted
1945        * packet and it probably did not arrive with the expected packet
1946        * spacing. */
1947       do_next_seqnum = FALSE;
1948     }
1949   }
1950
1951   if (do_next_seqnum) {
1952     GstClockTime expected, delay;
1953
1954     /* calculate expected arrival time of the next seqnum */
1955     expected = dts + priv->packet_spacing;
1956
1957     delay = get_rtx_delay (priv);
1958
1959     /* and update/install timer for next seqnum */
1960     if (timer)
1961       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1962           delay, TRUE);
1963     else
1964       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1965           expected, delay, priv->packet_spacing);
1966   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1967     /* if we had a timer, remove it, we don't know when to expect the next
1968      * packet. */
1969     remove_timer (jitterbuffer, timer);
1970   }
1971 }
1972
1973 static void
1974 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1975     GstClockTime dts)
1976 {
1977   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1978
1979   /* we need consecutive seqnums with a different
1980    * rtptime to estimate the packet spacing. */
1981   if (priv->ips_rtptime != rtptime) {
1982     /* rtptime changed, check dts diff */
1983     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1984       priv->packet_spacing = dts - priv->ips_dts;
1985       GST_DEBUG_OBJECT (jitterbuffer,
1986           "new packet spacing %" GST_TIME_FORMAT,
1987           GST_TIME_ARGS (priv->packet_spacing));
1988     }
1989     priv->ips_rtptime = rtptime;
1990     priv->ips_dts = dts;
1991   }
1992 }
1993
1994 static void
1995 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1996     guint16 seqnum, GstClockTime dts, gint gap)
1997 {
1998   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1999   GstClockTime total_duration, duration, expected_dts;
2000   TimerType type;
2001
2002   GST_DEBUG_OBJECT (jitterbuffer,
2003       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2004       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2005
2006   /* the total duration spanned by the missing packets */
2007   if (dts >= priv->last_in_dts)
2008     total_duration = dts - priv->last_in_dts;
2009   else
2010     total_duration = 0;
2011
2012   /* interpolate between the current time and the last time based on
2013    * number of packets we are missing, this is the estimated duration
2014    * for the missing packet based on equidistant packet spacing. */
2015   duration = total_duration / (gap + 1);
2016
2017   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2018       GST_TIME_ARGS (duration));
2019
2020   if (total_duration > priv->latency_ns) {
2021     GstClockTime gap_time;
2022     guint lost_packets;
2023
2024     gap_time = total_duration - priv->latency_ns;
2025
2026     if (duration > 0) {
2027       lost_packets = gap_time / duration;
2028       gap_time = lost_packets * duration;
2029     } else {
2030       lost_packets = gap;
2031     }
2032
2033     /* too many lost packets, some of the missing packets are already
2034      * too late and we can generate lost packet events for them. */
2035     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2036         " > %" GST_TIME_FORMAT ", consider %u lost",
2037         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2038         lost_packets);
2039
2040     /* this timer will fire immediately and the lost event will be pushed from
2041      * the timer thread */
2042     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2043         priv->last_in_dts + duration, 0, gap_time);
2044
2045     expected += lost_packets;
2046     priv->last_in_dts += gap_time;
2047   }
2048
2049   expected_dts = priv->last_in_dts + duration;
2050
2051   if (priv->do_retransmission) {
2052     TimerData *timer;
2053
2054     type = TIMER_TYPE_EXPECTED;
2055     /* if we had a timer for the first missing packet, update it. */
2056     if ((timer = find_timer (jitterbuffer, type, expected))) {
2057       GstClockTime timeout = timer->timeout;
2058
2059       timer->duration = duration;
2060       if (timeout > expected_dts) {
2061         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2062         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2063             delay, TRUE);
2064       }
2065       expected++;
2066       expected_dts += duration;
2067     }
2068   } else {
2069     type = TIMER_TYPE_LOST;
2070   }
2071
2072   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2073     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2074     expected_dts += duration;
2075     expected++;
2076   }
2077 }
2078
2079 static void
2080 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2081     guint rtptime)
2082 {
2083   gint32 rtpdiff;
2084   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2085   GstRtpJitterBufferPrivate *priv;
2086
2087   priv = jitterbuffer->priv;
2088
2089   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2090     goto no_time;
2091
2092   if (priv->last_dts != -1)
2093     dtsdiff = dts - priv->last_dts;
2094   else
2095     dtsdiff = 0;
2096
2097   if (priv->last_rtptime != -1)
2098     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2099   else
2100     rtpdiff = 0;
2101
2102   priv->last_dts = dts;
2103   priv->last_rtptime = rtptime;
2104
2105   if (rtpdiff > 0)
2106     rtpdiffns =
2107         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2108   else
2109     rtpdiffns =
2110         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2111
2112   diff = ABS (dtsdiff - rtpdiffns);
2113
2114   /* jitter is stored in nanoseconds */
2115   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2116
2117   GST_LOG_OBJECT (jitterbuffer,
2118       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2119       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2120       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2121       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2122
2123   return;
2124
2125   /* ERRORS */
2126 no_time:
2127   {
2128     GST_DEBUG_OBJECT (jitterbuffer,
2129         "no dts or no clock-rate, can't calculate jitter");
2130     return;
2131   }
2132 }
2133
2134 static GstFlowReturn
2135 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2136     GstBuffer * buffer)
2137 {
2138   GstRtpJitterBuffer *jitterbuffer;
2139   GstRtpJitterBufferPrivate *priv;
2140   guint16 seqnum;
2141   guint32 expected, rtptime;
2142   GstFlowReturn ret = GST_FLOW_OK;
2143   GstClockTime dts, pts;
2144   guint64 latency_ts;
2145   gboolean head;
2146   gint percent = -1;
2147   guint8 pt;
2148   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2149   gboolean do_next_seqnum = FALSE;
2150   RTPJitterBufferItem *item;
2151   GstMessage *msg = NULL;
2152
2153   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2154
2155   priv = jitterbuffer->priv;
2156
2157   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2158     goto invalid_buffer;
2159
2160   pt = gst_rtp_buffer_get_payload_type (&rtp);
2161   seqnum = gst_rtp_buffer_get_seq (&rtp);
2162   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2163   gst_rtp_buffer_unmap (&rtp);
2164
2165   /* make sure we have PTS and DTS set */
2166   pts = GST_BUFFER_PTS (buffer);
2167   dts = GST_BUFFER_DTS (buffer);
2168   if (dts == -1)
2169     dts = pts;
2170   else if (pts == -1)
2171     pts = dts;
2172
2173   /* take the DTS of the buffer. This is the time when the packet was
2174    * received and is used to calculate jitter and clock skew. We will adjust
2175    * this DTS with the smoothed value after processing it in the
2176    * jitterbuffer and assign it as the PTS. */
2177   /* bring to running time */
2178   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2179
2180   GST_DEBUG_OBJECT (jitterbuffer,
2181       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2182       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2183
2184   JBUF_LOCK_CHECK (priv, out_flushing);
2185
2186   if (G_UNLIKELY (priv->last_pt != pt)) {
2187     GstCaps *caps;
2188
2189     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2190         pt);
2191
2192     priv->last_pt = pt;
2193     /* reset clock-rate so that we get a new one */
2194     priv->clock_rate = -1;
2195
2196     /* Try to get the clock-rate from the caps first if we can. If there are no
2197      * caps we must fire the signal to get the clock-rate. */
2198     if ((caps = gst_pad_get_current_caps (pad))) {
2199       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2200       gst_caps_unref (caps);
2201     }
2202   }
2203
2204   if (G_UNLIKELY (priv->clock_rate == -1)) {
2205     /* no clock rate given on the caps, try to get one with the signal */
2206     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2207             pt) == GST_FLOW_FLUSHING)
2208       goto out_flushing;
2209
2210     if (G_UNLIKELY (priv->clock_rate == -1))
2211       goto no_clock_rate;
2212   }
2213
2214   /* don't accept more data on EOS */
2215   if (G_UNLIKELY (priv->eos))
2216     goto have_eos;
2217
2218   calculate_jitter (jitterbuffer, dts, rtptime);
2219
2220   expected = priv->next_in_seqnum;
2221
2222   /* now check against our expected seqnum */
2223   if (G_LIKELY (expected != -1)) {
2224     gint gap;
2225
2226     /* now calculate gap */
2227     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2228
2229     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2230         expected, seqnum, gap);
2231
2232     if (G_LIKELY (gap == 0)) {
2233       /* packet is expected */
2234       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2235       do_next_seqnum = TRUE;
2236     } else {
2237       gboolean reset = FALSE;
2238
2239       if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2240         /* We would run into calculations with GST_CLOCK_TIME_NONE below
2241          * and can't compensate for anything without DTS on RTP packets
2242          */
2243         goto gap_but_no_dts;
2244       } else if (gap < 0) {
2245         /* we received an old packet */
2246         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2247           /* too old packet, reset */
2248           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2249               -RTP_MAX_MISORDER);
2250           reset = TRUE;
2251         } else {
2252           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2253         }
2254       } else {
2255         /* new packet, we are missing some packets */
2256         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2257           /* packet too far in future, reset */
2258           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2259               RTP_MAX_DROPOUT);
2260           reset = TRUE;
2261         } else {
2262           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2263           /* fill in the gap with EXPECTED timers */
2264           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2265
2266           do_next_seqnum = TRUE;
2267         }
2268       }
2269       if (G_UNLIKELY (reset)) {
2270         GList *events = NULL, *l;
2271
2272         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2273         rtp_jitter_buffer_flush (priv->jbuf,
2274             (GFunc) free_item_and_retain_events, &events);
2275         rtp_jitter_buffer_reset_skew (priv->jbuf);
2276         remove_all_timers (jitterbuffer);
2277         priv->last_popped_seqnum = -1;
2278         priv->next_seqnum = seqnum;
2279         do_next_seqnum = TRUE;
2280
2281         /* Insert all sticky events again in order, otherwise we would
2282          * potentially loose STREAM_START, CAPS or SEGMENT events
2283          */
2284         events = g_list_reverse (events);
2285         for (l = events; l; l = l->next) {
2286           RTPJitterBufferItem *item;
2287
2288           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2289           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2290         }
2291         g_list_free (events);
2292
2293         JBUF_SIGNAL_EVENT (priv);
2294       }
2295       /* reset spacing estimation when gap */
2296       priv->ips_rtptime = -1;
2297       priv->ips_dts = GST_CLOCK_TIME_NONE;
2298     }
2299   } else {
2300     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2301     /* we don't know what the next_in_seqnum should be, wait for the last
2302      * possible moment to push this buffer, maybe we get an earlier seqnum
2303      * while we wait */
2304     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2305     do_next_seqnum = TRUE;
2306     /* take rtptime and dts to calculate packet spacing */
2307     priv->ips_rtptime = rtptime;
2308     priv->ips_dts = dts;
2309   }
2310   if (do_next_seqnum) {
2311     priv->last_in_seqnum = seqnum;
2312     priv->last_in_dts = dts;
2313     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2314   }
2315
2316   /* let's check if this buffer is too late, we can only accept packets with
2317    * bigger seqnum than the one we last pushed. */
2318   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2319     gint gap;
2320
2321     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2322
2323     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2324     if (G_UNLIKELY (gap <= 0))
2325       goto too_late;
2326   }
2327
2328   /* let's drop oldest packet if the queue is already full and drop-on-latency
2329    * is set. We can only do this when there actually is a latency. When no
2330    * latency is set, we just pump it in the queue and let the other end push it
2331    * out as fast as possible. */
2332   if (priv->latency_ms && priv->drop_on_latency) {
2333     latency_ts =
2334         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2335
2336     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2337       RTPJitterBufferItem *old_item;
2338
2339       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2340
2341       if (IS_DROPABLE (old_item)) {
2342         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2343         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2344             old_item);
2345         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2346         free_item (old_item);
2347       }
2348       /* we might have removed some head buffers, signal the pushing thread to
2349        * see if it can push now */
2350       JBUF_SIGNAL_EVENT (priv);
2351     }
2352   }
2353
2354   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2355
2356   /* now insert the packet into the queue in sorted order. This function returns
2357    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2358    * have a duplicate. */
2359   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2360               &head, &percent)))
2361     goto duplicate;
2362
2363   /* update timers */
2364   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2365
2366   /* we had an unhandled SR, handle it now */
2367   if (priv->last_sr)
2368     do_handle_sync (jitterbuffer);
2369
2370   if (G_UNLIKELY (head)) {
2371     /* signal addition of new buffer when the _loop is waiting. */
2372     if (G_LIKELY (priv->active))
2373       JBUF_SIGNAL_EVENT (priv);
2374
2375     /* let's unschedule and unblock any waiting buffers. We only want to do this
2376      * when the head buffer changed */
2377     if (G_UNLIKELY (priv->clock_id)) {
2378       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2379       unschedule_current_timer (jitterbuffer);
2380     }
2381   }
2382
2383   GST_DEBUG_OBJECT (jitterbuffer,
2384       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2385       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2386
2387   msg = check_buffering_percent (jitterbuffer, percent);
2388
2389 finished:
2390   JBUF_UNLOCK (priv);
2391
2392   if (msg)
2393     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2394
2395   return ret;
2396
2397   /* ERRORS */
2398 invalid_buffer:
2399   {
2400     /* this is not fatal but should be filtered earlier */
2401     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2402         ("Received invalid RTP payload, dropping"));
2403     gst_buffer_unref (buffer);
2404     return GST_FLOW_OK;
2405   }
2406 no_clock_rate:
2407   {
2408     GST_WARNING_OBJECT (jitterbuffer,
2409         "No clock-rate in caps!, dropping buffer");
2410     gst_buffer_unref (buffer);
2411     goto finished;
2412   }
2413 out_flushing:
2414   {
2415     ret = priv->srcresult;
2416     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2417     gst_buffer_unref (buffer);
2418     goto finished;
2419   }
2420 have_eos:
2421   {
2422     ret = GST_FLOW_EOS;
2423     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2424     gst_buffer_unref (buffer);
2425     goto finished;
2426   }
2427 too_late:
2428   {
2429     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2430         " popped, dropping", seqnum, priv->last_popped_seqnum);
2431     priv->num_late++;
2432     gst_buffer_unref (buffer);
2433     goto finished;
2434   }
2435 duplicate:
2436   {
2437     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2438         seqnum);
2439     priv->num_duplicates++;
2440     free_item (item);
2441     goto finished;
2442   }
2443 gap_but_no_dts:
2444   {
2445     /* this is fatal as we can't compensate for gaps without DTS */
2446     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2447         ("Received packet without DTS after a gap"));
2448     gst_buffer_unref (buffer);
2449     ret = GST_FLOW_ERROR;
2450     goto finished;
2451   }
2452 }
2453
2454 static GstClockTime
2455 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2456 {
2457   guint64 ext_time, elapsed;
2458   guint32 rtp_time;
2459   GstRtpJitterBufferPrivate *priv;
2460
2461   priv = jitterbuffer->priv;
2462   rtp_time = item->rtptime;
2463
2464   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2465       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2466
2467   if (rtp_time < priv->ext_timestamp) {
2468     ext_time = priv->ext_timestamp;
2469   } else {
2470     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2471   }
2472
2473   if (ext_time > priv->clock_base)
2474     elapsed = ext_time - priv->clock_base;
2475   else
2476     elapsed = 0;
2477
2478   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2479   return elapsed;
2480 }
2481
2482 static void
2483 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2484     RTPJitterBufferItem * item)
2485 {
2486   guint64 total, elapsed, left, estimated;
2487   GstClockTime out_time;
2488   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2489
2490   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2491       || priv->clock_base == -1 || priv->clock_rate <= 0)
2492     return;
2493
2494   /* compute the elapsed time */
2495   elapsed = compute_elapsed (jitterbuffer, item);
2496
2497   /* do nothing if elapsed time doesn't increment */
2498   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2499     return;
2500
2501   priv->last_elapsed = elapsed;
2502
2503   /* this is the total time we need to play */
2504   total = priv->npt_stop - priv->npt_start;
2505   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2506       GST_TIME_ARGS (total));
2507
2508   /* this is how much time there is left */
2509   if (total > elapsed)
2510     left = total - elapsed;
2511   else
2512     left = 0;
2513
2514   /* if we have less time left that the size of the buffer, we will not
2515    * be able to keep it filled, disabled buffering then */
2516   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2517     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2518         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2519     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2520   }
2521
2522   /* this is the current time as running-time */
2523   out_time = item->dts;
2524
2525   if (elapsed > 0)
2526     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2527   else {
2528     /* if there is almost nothing left,
2529      * we may never advance enough to end up in the above case */
2530     if (total < GST_SECOND)
2531       estimated = GST_SECOND;
2532     else
2533       estimated = -1;
2534   }
2535   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2536       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2537
2538   if (estimated != -1 && priv->estimated_eos != estimated) {
2539     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2540     priv->estimated_eos = estimated;
2541   }
2542 }
2543
2544 /* take a buffer from the queue and push it */
2545 static GstFlowReturn
2546 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2547 {
2548   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2549   GstFlowReturn result = GST_FLOW_OK;
2550   RTPJitterBufferItem *item;
2551   GstBuffer *outbuf = NULL;
2552   GstEvent *outevent = NULL;
2553   GstQuery *outquery = NULL;
2554   GstClockTime dts, pts;
2555   gint percent = -1;
2556   gboolean do_push = TRUE;
2557   guint type;
2558   GstMessage *msg;
2559
2560   /* when we get here we are ready to pop and push the buffer */
2561   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2562   type = item->type;
2563
2564   switch (type) {
2565     case ITEM_TYPE_BUFFER:
2566
2567       /* we need to make writable to change the flags and timestamps */
2568       outbuf = gst_buffer_make_writable (item->data);
2569
2570       if (G_UNLIKELY (priv->discont)) {
2571         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2572          * into the jitterbuffer so we can modify now. */
2573         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2574         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2575         priv->discont = FALSE;
2576       }
2577       if (G_UNLIKELY (priv->ts_discont)) {
2578         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2579         priv->ts_discont = FALSE;
2580       }
2581
2582       dts =
2583           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2584       pts =
2585           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2586
2587       /* apply timestamp with offset to buffer now */
2588       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2589       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2590
2591       /* update the elapsed time when we need to check against the npt stop time. */
2592       update_estimated_eos (jitterbuffer, item);
2593
2594       priv->last_out_time = GST_BUFFER_PTS (outbuf);
2595       break;
2596     case ITEM_TYPE_LOST:
2597       priv->discont = TRUE;
2598       if (!priv->do_lost)
2599         do_push = FALSE;
2600       /* FALLTHROUGH */
2601     case ITEM_TYPE_EVENT:
2602       outevent = item->data;
2603       break;
2604     case ITEM_TYPE_QUERY:
2605       outquery = item->data;
2606       break;
2607   }
2608
2609   /* now we are ready to push the buffer. Save the seqnum and release the lock
2610    * so the other end can push stuff in the queue again. */
2611   if (seqnum != -1) {
2612     priv->last_popped_seqnum = seqnum;
2613     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2614   }
2615   msg = check_buffering_percent (jitterbuffer, percent);
2616   JBUF_UNLOCK (priv);
2617
2618   item->data = NULL;
2619   free_item (item);
2620
2621   if (msg)
2622     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2623
2624   switch (type) {
2625     case ITEM_TYPE_BUFFER:
2626       /* push buffer */
2627       GST_DEBUG_OBJECT (jitterbuffer,
2628           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2629           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2630           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2631       result = gst_pad_push (priv->srcpad, outbuf);
2632
2633       JBUF_LOCK_CHECK (priv, out_flushing);
2634       break;
2635     case ITEM_TYPE_LOST:
2636     case ITEM_TYPE_EVENT:
2637       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2638           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2639
2640       if (do_push)
2641         gst_pad_push_event (priv->srcpad, outevent);
2642       else
2643         gst_event_unref (outevent);
2644
2645       result = GST_FLOW_OK;
2646
2647       JBUF_LOCK_CHECK (priv, out_flushing);
2648       break;
2649     case ITEM_TYPE_QUERY:
2650     {
2651       gboolean res;
2652
2653       res = gst_pad_peer_query (priv->srcpad, outquery);
2654
2655       JBUF_LOCK_CHECK (priv, out_flushing);
2656       result = GST_FLOW_OK;
2657       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2658       JBUF_SIGNAL_QUERY (priv, res);
2659       break;
2660     }
2661   }
2662   return result;
2663
2664   /* ERRORS */
2665 out_flushing:
2666   {
2667     return priv->srcresult;
2668   }
2669 }
2670
2671 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2672
2673 /* Peek a buffer and compare the seqnum to the expected seqnum.
2674  * If all is fine, the buffer is pushed.
2675  * If something is wrong, we wait for some event
2676  */
2677 static GstFlowReturn
2678 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2679 {
2680   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2681   GstFlowReturn result = GST_FLOW_OK;
2682   RTPJitterBufferItem *item;
2683   guint seqnum;
2684   guint32 next_seqnum;
2685   gint gap;
2686
2687   /* only push buffers when PLAYING and active and not buffering */
2688   if (priv->blocked || !priv->active ||
2689       rtp_jitter_buffer_is_buffering (priv->jbuf))
2690     return GST_FLOW_WAIT;
2691
2692 again:
2693   /* peek a buffer, we're just looking at the sequence number.
2694    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2695    * wait for a timeout or something to change.
2696    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2697   item = rtp_jitter_buffer_peek (priv->jbuf);
2698   if (item == NULL)
2699     goto wait;
2700
2701   /* get the seqnum and the next expected seqnum */
2702   seqnum = item->seqnum;
2703   if (seqnum == -1)
2704     goto do_push;
2705
2706   next_seqnum = priv->next_seqnum;
2707
2708   /* get the gap between this and the previous packet. If we don't know the
2709    * previous packet seqnum assume no gap. */
2710   if (G_UNLIKELY (next_seqnum == -1)) {
2711     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2712     /* we don't know what the next_seqnum should be, the chain function should
2713      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2714      * fires, so wait for that */
2715     result = GST_FLOW_WAIT;
2716   } else {
2717     /* else calculate GAP */
2718     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2719
2720     if (G_LIKELY (gap == 0)) {
2721     do_push:
2722       /* no missing packet, pop and push */
2723       result = pop_and_push_next (jitterbuffer, seqnum);
2724     } else if (G_UNLIKELY (gap < 0)) {
2725       RTPJitterBufferItem *item;
2726       /* if we have a packet that we already pushed or considered dropped, pop it
2727        * off and get the next packet */
2728       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2729           seqnum, next_seqnum);
2730       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2731       free_item (item);
2732       goto again;
2733     } else {
2734       /* the chain function has scheduled timers to request retransmission or
2735        * when to consider the packet lost, wait for that */
2736       GST_DEBUG_OBJECT (jitterbuffer,
2737           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2738           next_seqnum, seqnum, gap);
2739       result = GST_FLOW_WAIT;
2740     }
2741   }
2742   return result;
2743
2744 wait:
2745   {
2746     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2747     if (priv->eos)
2748       result = GST_FLOW_EOS;
2749     else
2750       result = GST_FLOW_WAIT;
2751     return result;
2752   }
2753 }
2754
2755 static GstClockTime
2756 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2757 {
2758   GstClockTime rtx_retry_timeout;
2759   GstClockTime rtx_min_retry_timeout;
2760
2761   if (priv->rtx_retry_timeout == -1) {
2762     if (priv->avg_rtx_rtt == 0)
2763       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2764     else
2765       /* we want to ask for a retransmission after we waited for a
2766        * complete RTT and the additional jitter */
2767       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2768   } else {
2769     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2770   }
2771   /* make sure we don't retry too often. On very low latency networks,
2772    * the RTT and jitter can be very low. */
2773   if (priv->rtx_min_retry_timeout == -1) {
2774     rtx_min_retry_timeout = priv->packet_spacing;
2775   } else {
2776     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2777   }
2778   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2779
2780   return rtx_retry_timeout;
2781 }
2782
2783 static GstClockTime
2784 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2785     GstClockTime rtx_retry_timeout)
2786 {
2787   GstClockTime rtx_retry_period;
2788
2789   if (priv->rtx_retry_period == -1) {
2790     /* we retry up to the configured jitterbuffer size but leaving some
2791      * room for the retransmission to arrive in time */
2792     if (rtx_retry_timeout > priv->latency_ns) {
2793       rtx_retry_period = 0;
2794     } else {
2795       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2796     }
2797   } else {
2798     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2799   }
2800   return rtx_retry_period;
2801 }
2802
2803 /* the timeout for when we expected a packet expired */
2804 static gboolean
2805 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2806     GstClockTime now)
2807 {
2808   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2809   GstEvent *event;
2810   guint delay, delay_ms, avg_rtx_rtt_ms;
2811   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2812   GstClockTime rtx_retry_period;
2813   GstClockTime rtx_retry_timeout;
2814   GstClock *clock;
2815
2816   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2817       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2818
2819   rtx_retry_timeout = get_rtx_retry_timeout (priv);
2820   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2821
2822   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2823       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2824       GST_TIME_ARGS (rtx_retry_period));
2825
2826   delay = timer->rtx_delay + timer->rtx_retry;
2827
2828   delay_ms = GST_TIME_AS_MSECONDS (delay);
2829   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2830   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2831   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2832
2833   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2834       gst_structure_new ("GstRTPRetransmissionRequest",
2835           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2836           "running-time", G_TYPE_UINT64, timer->rtx_base,
2837           "delay", G_TYPE_UINT, delay_ms,
2838           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2839           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2840           "period", G_TYPE_UINT, rtx_retry_period_ms,
2841           "deadline", G_TYPE_UINT, priv->latency_ms,
2842           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2843           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2844
2845   priv->num_rtx_requests++;
2846   timer->num_rtx_retry++;
2847
2848   GST_OBJECT_LOCK (jitterbuffer);
2849   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2850     timer->rtx_last = gst_clock_get_time (clock);
2851     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2852   } else {
2853     timer->rtx_last = now;
2854   }
2855   GST_OBJECT_UNLOCK (jitterbuffer);
2856
2857   /* calculate the timeout for the next retransmission attempt */
2858   timer->rtx_retry += rtx_retry_timeout;
2859   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2860       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2861       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2862       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2863
2864   if (timer->rtx_retry + timer->rtx_delay > rtx_retry_period) {
2865     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2866     /* too many retransmission request, we now convert the timer
2867      * to a lost timer, leave the num_rtx_retry as it is for stats */
2868     timer->type = TIMER_TYPE_LOST;
2869     timer->rtx_delay = 0;
2870     timer->rtx_retry = 0;
2871   }
2872   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2873       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2874
2875   JBUF_UNLOCK (priv);
2876   gst_pad_push_event (priv->sinkpad, event);
2877   JBUF_LOCK (priv);
2878
2879   return FALSE;
2880 }
2881
2882 /* a packet is lost */
2883 static gboolean
2884 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2885     GstClockTime now)
2886 {
2887   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2888   GstClockTime duration, timestamp;
2889   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
2890   gboolean late, head;
2891   GstEvent *event;
2892   RTPJitterBufferItem *item;
2893
2894   seqnum = timer->seqnum;
2895   timestamp = apply_offset (jitterbuffer, timer->timeout);
2896   duration = timer->duration;
2897   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2898     duration = priv->packet_spacing;
2899   lost_packets = MAX (timer->num, 1);
2900   late = timer->num > 0;
2901   num_rtx_retry = timer->num_rtx_retry;
2902
2903   /* we had a gap and thus we lost some packets. Create an event for this.  */
2904   if (lost_packets > 1)
2905     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2906         seqnum + lost_packets - 1);
2907   else
2908     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2909
2910   priv->num_late += lost_packets;
2911   priv->num_rtx_failed += num_rtx_retry;
2912
2913   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
2914
2915   /* we now only accept seqnum bigger than this */
2916   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
2917     priv->next_in_seqnum = next_in_seqnum;
2918
2919   /* create paket lost event */
2920   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2921       gst_structure_new ("GstRTPPacketLost",
2922           "seqnum", G_TYPE_UINT, (guint) seqnum,
2923           "timestamp", G_TYPE_UINT64, timestamp,
2924           "duration", G_TYPE_UINT64, duration,
2925           "late", G_TYPE_BOOLEAN, late,
2926           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2927
2928   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2929   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2930
2931   /* remove timer now */
2932   remove_timer (jitterbuffer, timer);
2933   if (head)
2934     JBUF_SIGNAL_EVENT (priv);
2935
2936   return TRUE;
2937 }
2938
2939 static gboolean
2940 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2941     GstClockTime now)
2942 {
2943   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2944
2945   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2946   remove_timer (jitterbuffer, timer);
2947   if (!priv->eos) {
2948     /* there was no EOS in the buffer, put one in there now */
2949     queue_event (jitterbuffer, gst_event_new_eos ());
2950   }
2951   JBUF_SIGNAL_EVENT (priv);
2952
2953   return TRUE;
2954 }
2955
2956 static gboolean
2957 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2958     GstClockTime now)
2959 {
2960   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2961
2962   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2963
2964   /* timer seqnum might have been obsoleted by caps seqnum-base,
2965    * only mess with current ongoing seqnum if still unknown */
2966   if (priv->next_seqnum == -1)
2967     priv->next_seqnum = timer->seqnum;
2968   remove_timer (jitterbuffer, timer);
2969   JBUF_SIGNAL_EVENT (priv);
2970
2971   return TRUE;
2972 }
2973
2974 static gboolean
2975 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2976     GstClockTime now)
2977 {
2978   gboolean removed = FALSE;
2979
2980   switch (timer->type) {
2981     case TIMER_TYPE_EXPECTED:
2982       removed = do_expected_timeout (jitterbuffer, timer, now);
2983       break;
2984     case TIMER_TYPE_LOST:
2985       removed = do_lost_timeout (jitterbuffer, timer, now);
2986       break;
2987     case TIMER_TYPE_DEADLINE:
2988       removed = do_deadline_timeout (jitterbuffer, timer, now);
2989       break;
2990     case TIMER_TYPE_EOS:
2991       removed = do_eos_timeout (jitterbuffer, timer, now);
2992       break;
2993   }
2994   return removed;
2995 }
2996
2997 /* called when we need to wait for the next timeout.
2998  *
2999  * We loop over the array of recorded timeouts and wait for the earliest one.
3000  * When it timed out, do the logic associated with the timer.
3001  *
3002  * If there are no timers, we wait on a gcond until something new happens.
3003  */
3004 static void
3005 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3006 {
3007   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3008   GstClockTime now = 0;
3009
3010   JBUF_LOCK (priv);
3011   while (priv->timer_running) {
3012     TimerData *timer = NULL;
3013     GstClockTime timer_timeout = -1;
3014     gint i, len;
3015
3016     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3017         GST_TIME_ARGS (now));
3018
3019     len = priv->timers->len;
3020     for (i = 0; i < len; i++) {
3021       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3022       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3023       gboolean save_best = FALSE;
3024
3025       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3026           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3027
3028       /* find the smallest timeout */
3029       if (timer == NULL) {
3030         save_best = TRUE;
3031       } else if (timer_timeout == -1) {
3032         /* we already have an immediate timeout, the new timer must be an
3033          * immediate timer with smaller seqnum to become the best */
3034         if (test_timeout == -1
3035             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3036                     timer->seqnum) > 0))
3037           save_best = TRUE;
3038       } else if (test_timeout == -1) {
3039         /* first immediate timer */
3040         save_best = TRUE;
3041       } else if (test_timeout < timer_timeout) {
3042         /* earlier timer */
3043         save_best = TRUE;
3044       } else if (test_timeout == timer_timeout
3045           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3046                   timer->seqnum) > 0)) {
3047         /* same timer, smaller seqnum */
3048         save_best = TRUE;
3049       }
3050       if (save_best) {
3051         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3052         timer = test;
3053         timer_timeout = test_timeout;
3054       }
3055     }
3056     if (timer && !priv->blocked) {
3057       GstClock *clock;
3058       GstClockTime sync_time;
3059       GstClockID id;
3060       GstClockReturn ret;
3061       GstClockTimeDiff clock_jitter;
3062
3063       if (timer_timeout == -1 || timer_timeout <= now) {
3064         do_timeout (jitterbuffer, timer, now);
3065         /* check here, do_timeout could have released the lock */
3066         if (!priv->timer_running)
3067           break;
3068         continue;
3069       }
3070
3071       GST_OBJECT_LOCK (jitterbuffer);
3072       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3073       if (!clock) {
3074         GST_OBJECT_UNLOCK (jitterbuffer);
3075         /* let's just push if there is no clock */
3076         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3077         now = timer_timeout;
3078         continue;
3079       }
3080
3081       /* prepare for sync against clock */
3082       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3083       /* add latency of peer to get input time */
3084       sync_time += priv->peer_latency;
3085
3086       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3087           " with sync time %" GST_TIME_FORMAT,
3088           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3089
3090       /* create an entry for the clock */
3091       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3092       priv->timer_timeout = timer_timeout;
3093       priv->timer_seqnum = timer->seqnum;
3094       GST_OBJECT_UNLOCK (jitterbuffer);
3095
3096       /* release the lock so that the other end can push stuff or unlock */
3097       JBUF_UNLOCK (priv);
3098
3099       ret = gst_clock_id_wait (id, &clock_jitter);
3100
3101       JBUF_LOCK (priv);
3102       if (!priv->timer_running) {
3103         gst_clock_id_unref (id);
3104         priv->clock_id = NULL;
3105         break;
3106       }
3107
3108       if (ret != GST_CLOCK_UNSCHEDULED) {
3109         now = timer_timeout + MAX (clock_jitter, 0);
3110         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3111             ret, priv->timer_seqnum, clock_jitter);
3112       } else {
3113         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3114       }
3115       /* and free the entry */
3116       gst_clock_id_unref (id);
3117       priv->clock_id = NULL;
3118     } else {
3119       /* no timers, wait for activity */
3120       JBUF_WAIT_TIMER (priv);
3121     }
3122   }
3123   JBUF_UNLOCK (priv);
3124
3125   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3126   return;
3127 }
3128
3129 /*
3130  * This funcion implements the main pushing loop on the source pad.
3131  *
3132  * It first tries to push as many buffers as possible. If there is a seqnum
3133  * mismatch, we wait for the next timeouts.
3134  */
3135 static void
3136 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3137 {
3138   GstRtpJitterBufferPrivate *priv;
3139   GstFlowReturn result = GST_FLOW_OK;
3140
3141   priv = jitterbuffer->priv;
3142
3143   JBUF_LOCK_CHECK (priv, flushing);
3144   do {
3145     result = handle_next_buffer (jitterbuffer);
3146     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3147       /* now wait for the next event */
3148       JBUF_WAIT_EVENT (priv, flushing);
3149       result = GST_FLOW_OK;
3150     }
3151   }
3152   while (result == GST_FLOW_OK);
3153   /* store result for upstream */
3154   priv->srcresult = result;
3155   /* if we get here we need to pause */
3156   goto pause;
3157
3158   /* ERRORS */
3159 flushing:
3160   {
3161     result = priv->srcresult;
3162     goto pause;
3163   }
3164 pause:
3165   {
3166     GstEvent *event;
3167
3168     JBUF_SIGNAL_QUERY (priv, FALSE);
3169     JBUF_UNLOCK (priv);
3170
3171     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3172         gst_flow_get_name (result));
3173     gst_pad_pause_task (priv->srcpad);
3174     if (result == GST_FLOW_EOS) {
3175       event = gst_event_new_eos ();
3176       gst_pad_push_event (priv->srcpad, event);
3177     }
3178     return;
3179   }
3180 }
3181
3182 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3183  * some sanity checks and then emit the handle-sync signal with the parameters.
3184  * This function must be called with the LOCK */
3185 static void
3186 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3187 {
3188   GstRtpJitterBufferPrivate *priv;
3189   guint64 base_rtptime, base_time;
3190   guint32 clock_rate;
3191   guint64 last_rtptime;
3192   guint64 clock_base;
3193   guint64 ext_rtptime, diff;
3194   gboolean valid = TRUE, keep = FALSE;
3195
3196   priv = jitterbuffer->priv;
3197
3198   /* get the last values from the jitterbuffer */
3199   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3200       &clock_rate, &last_rtptime);
3201
3202   clock_base = priv->clock_base;
3203   ext_rtptime = priv->ext_rtptime;
3204
3205   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3206       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3207       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3208       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3209
3210   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3211     /* we keep this SR packet for later. When we get a valid RTP packet the
3212      * above values will be set and we can try to use the SR packet */
3213     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3214     keep = TRUE;
3215   } else {
3216     /* we can't accept anything that happened before we did the last resync */
3217     if (base_rtptime > ext_rtptime) {
3218       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3219       valid = FALSE;
3220     } else {
3221       /* the SR RTP timestamp must be something close to what we last observed
3222        * in the jitterbuffer */
3223       if (ext_rtptime > last_rtptime) {
3224         /* check how far ahead it is to our RTP timestamps */
3225         diff = ext_rtptime - last_rtptime;
3226         /* if bigger than 1 second, we drop it */
3227         if (diff > clock_rate) {
3228           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3229           /* should drop this, but some RTSP servers end up with bogus
3230            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3231            * so still trigger rptbin sync but invalidate RTCP data
3232            * (sync might use other methods) */
3233           ext_rtptime = -1;
3234         }
3235         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3236             G_GUINT64_FORMAT, last_rtptime, diff);
3237       }
3238     }
3239   }
3240
3241   if (keep) {
3242     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3243   } else if (valid) {
3244     GstStructure *s;
3245
3246     s = gst_structure_new ("application/x-rtp-sync",
3247         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3248         "base-time", G_TYPE_UINT64, base_time,
3249         "clock-rate", G_TYPE_UINT, clock_rate,
3250         "clock-base", G_TYPE_UINT64, clock_base,
3251         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3252         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3253
3254     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3255     gst_buffer_replace (&priv->last_sr, NULL);
3256     JBUF_UNLOCK (priv);
3257     g_signal_emit (jitterbuffer,
3258         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3259     JBUF_LOCK (priv);
3260     gst_structure_free (s);
3261   } else {
3262     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3263     gst_buffer_replace (&priv->last_sr, NULL);
3264   }
3265 }
3266
3267 static GstFlowReturn
3268 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3269     GstBuffer * buffer)
3270 {
3271   GstRtpJitterBuffer *jitterbuffer;
3272   GstRtpJitterBufferPrivate *priv;
3273   GstFlowReturn ret = GST_FLOW_OK;
3274   guint32 ssrc;
3275   GstRTCPPacket packet;
3276   guint64 ext_rtptime;
3277   guint32 rtptime;
3278   GstRTCPBuffer rtcp = { NULL, };
3279
3280   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3281
3282   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3283     goto invalid_buffer;
3284
3285   priv = jitterbuffer->priv;
3286
3287   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3288
3289   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3290     goto empty_buffer;
3291
3292   /* first packet must be SR or RR or else the validate would have failed */
3293   switch (gst_rtcp_packet_get_type (&packet)) {
3294     case GST_RTCP_TYPE_SR:
3295       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3296           NULL, NULL);
3297       break;
3298     default:
3299       goto ignore_buffer;
3300   }
3301   gst_rtcp_buffer_unmap (&rtcp);
3302
3303   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3304
3305   JBUF_LOCK (priv);
3306   /* convert the RTP timestamp to our extended timestamp, using the same offset
3307    * we used in the jitterbuffer */
3308   ext_rtptime = priv->jbuf->ext_rtptime;
3309   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3310
3311   priv->ext_rtptime = ext_rtptime;
3312   gst_buffer_replace (&priv->last_sr, buffer);
3313
3314   do_handle_sync (jitterbuffer);
3315   JBUF_UNLOCK (priv);
3316
3317 done:
3318   gst_buffer_unref (buffer);
3319
3320   return ret;
3321
3322 invalid_buffer:
3323   {
3324     /* this is not fatal but should be filtered earlier */
3325     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3326         ("Received invalid RTCP payload, dropping"));
3327     ret = GST_FLOW_OK;
3328     goto done;
3329   }
3330 empty_buffer:
3331   {
3332     /* this is not fatal but should be filtered earlier */
3333     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3334         ("Received empty RTCP payload, dropping"));
3335     gst_rtcp_buffer_unmap (&rtcp);
3336     ret = GST_FLOW_OK;
3337     goto done;
3338   }
3339 ignore_buffer:
3340   {
3341     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3342     gst_rtcp_buffer_unmap (&rtcp);
3343     ret = GST_FLOW_OK;
3344     goto done;
3345   }
3346 }
3347
3348 static gboolean
3349 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3350     GstQuery * query)
3351 {
3352   gboolean res = FALSE;
3353   GstRtpJitterBuffer *jitterbuffer;
3354   GstRtpJitterBufferPrivate *priv;
3355
3356   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3357   priv = jitterbuffer->priv;
3358
3359   switch (GST_QUERY_TYPE (query)) {
3360     case GST_QUERY_CAPS:
3361     {
3362       GstCaps *filter, *caps;
3363
3364       gst_query_parse_caps (query, &filter);
3365       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3366       gst_query_set_caps_result (query, caps);
3367       gst_caps_unref (caps);
3368       res = TRUE;
3369       break;
3370     }
3371     default:
3372       if (GST_QUERY_IS_SERIALIZED (query)) {
3373         RTPJitterBufferItem *item;
3374         gboolean head;
3375
3376         JBUF_LOCK_CHECK (priv, out_flushing);
3377         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3378             RTP_JITTER_BUFFER_MODE_BUFFER) {
3379           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3380           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3381           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3382           if (head)
3383             JBUF_SIGNAL_EVENT (priv);
3384           JBUF_WAIT_QUERY (priv, out_flushing);
3385           res = priv->last_query;
3386         } else {
3387           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3388           res = FALSE;
3389         }
3390         JBUF_UNLOCK (priv);
3391       } else {
3392         res = gst_pad_query_default (pad, parent, query);
3393       }
3394       break;
3395   }
3396   return res;
3397   /* ERRORS */
3398 out_flushing:
3399   {
3400     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3401     JBUF_UNLOCK (priv);
3402     return FALSE;
3403   }
3404
3405 }
3406
3407 static gboolean
3408 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3409     GstQuery * query)
3410 {
3411   GstRtpJitterBuffer *jitterbuffer;
3412   GstRtpJitterBufferPrivate *priv;
3413   gboolean res = FALSE;
3414
3415   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3416   priv = jitterbuffer->priv;
3417
3418   switch (GST_QUERY_TYPE (query)) {
3419     case GST_QUERY_LATENCY:
3420     {
3421       /* We need to send the query upstream and add the returned latency to our
3422        * own */
3423       GstClockTime min_latency, max_latency;
3424       gboolean us_live;
3425       GstClockTime our_latency;
3426
3427       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3428         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3429
3430         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3431             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3432             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3433
3434         /* store this so that we can safely sync on the peer buffers. */
3435         JBUF_LOCK (priv);
3436         priv->peer_latency = min_latency;
3437         our_latency = priv->latency_ns;
3438         JBUF_UNLOCK (priv);
3439
3440         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3441             GST_TIME_ARGS (our_latency));
3442
3443         /* we add some latency but can buffer an infinite amount of time */
3444         min_latency += our_latency;
3445         max_latency = -1;
3446
3447         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3448             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3449             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3450
3451         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3452       }
3453       break;
3454     }
3455     case GST_QUERY_POSITION:
3456     {
3457       GstClockTime start, last_out;
3458       GstFormat fmt;
3459
3460       gst_query_parse_position (query, &fmt, NULL);
3461       if (fmt != GST_FORMAT_TIME) {
3462         res = gst_pad_query_default (pad, parent, query);
3463         break;
3464       }
3465
3466       JBUF_LOCK (priv);
3467       start = priv->npt_start;
3468       last_out = priv->last_out_time;
3469       JBUF_UNLOCK (priv);
3470
3471       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3472           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3473           GST_TIME_ARGS (last_out));
3474
3475       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3476         /* bring 0-based outgoing time to stream time */
3477         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3478         res = TRUE;
3479       } else {
3480         res = gst_pad_query_default (pad, parent, query);
3481       }
3482       break;
3483     }
3484     case GST_QUERY_CAPS:
3485     {
3486       GstCaps *filter, *caps;
3487
3488       gst_query_parse_caps (query, &filter);
3489       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3490       gst_query_set_caps_result (query, caps);
3491       gst_caps_unref (caps);
3492       res = TRUE;
3493       break;
3494     }
3495     default:
3496       res = gst_pad_query_default (pad, parent, query);
3497       break;
3498   }
3499
3500   return res;
3501 }
3502
3503 static void
3504 gst_rtp_jitter_buffer_set_property (GObject * object,
3505     guint prop_id, const GValue * value, GParamSpec * pspec)
3506 {
3507   GstRtpJitterBuffer *jitterbuffer;
3508   GstRtpJitterBufferPrivate *priv;
3509
3510   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3511   priv = jitterbuffer->priv;
3512
3513   switch (prop_id) {
3514     case PROP_LATENCY:
3515     {
3516       guint new_latency, old_latency;
3517
3518       new_latency = g_value_get_uint (value);
3519
3520       JBUF_LOCK (priv);
3521       old_latency = priv->latency_ms;
3522       priv->latency_ms = new_latency;
3523       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3524       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3525       JBUF_UNLOCK (priv);
3526
3527       /* post message if latency changed, this will inform the parent pipeline
3528        * that a latency reconfiguration is possible/needed. */
3529       if (new_latency != old_latency) {
3530         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3531             GST_TIME_ARGS (new_latency * GST_MSECOND));
3532
3533         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3534             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3535       }
3536       break;
3537     }
3538     case PROP_DROP_ON_LATENCY:
3539       JBUF_LOCK (priv);
3540       priv->drop_on_latency = g_value_get_boolean (value);
3541       JBUF_UNLOCK (priv);
3542       break;
3543     case PROP_TS_OFFSET:
3544       JBUF_LOCK (priv);
3545       priv->ts_offset = g_value_get_int64 (value);
3546       priv->ts_discont = TRUE;
3547       JBUF_UNLOCK (priv);
3548       break;
3549     case PROP_DO_LOST:
3550       JBUF_LOCK (priv);
3551       priv->do_lost = g_value_get_boolean (value);
3552       JBUF_UNLOCK (priv);
3553       break;
3554     case PROP_MODE:
3555       JBUF_LOCK (priv);
3556       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3557       JBUF_UNLOCK (priv);
3558       break;
3559     case PROP_DO_RETRANSMISSION:
3560       JBUF_LOCK (priv);
3561       priv->do_retransmission = g_value_get_boolean (value);
3562       JBUF_UNLOCK (priv);
3563       break;
3564     case PROP_RTX_DELAY:
3565       JBUF_LOCK (priv);
3566       priv->rtx_delay = g_value_get_int (value);
3567       JBUF_UNLOCK (priv);
3568       break;
3569     case PROP_RTX_MIN_DELAY:
3570       JBUF_LOCK (priv);
3571       priv->rtx_min_delay = g_value_get_uint (value);
3572       JBUF_UNLOCK (priv);
3573       break;
3574     case PROP_RTX_DELAY_REORDER:
3575       JBUF_LOCK (priv);
3576       priv->rtx_delay_reorder = g_value_get_int (value);
3577       JBUF_UNLOCK (priv);
3578       break;
3579     case PROP_RTX_RETRY_TIMEOUT:
3580       JBUF_LOCK (priv);
3581       priv->rtx_retry_timeout = g_value_get_int (value);
3582       JBUF_UNLOCK (priv);
3583       break;
3584     case PROP_RTX_MIN_RETRY_TIMEOUT:
3585       JBUF_LOCK (priv);
3586       priv->rtx_min_retry_timeout = g_value_get_int (value);
3587       JBUF_UNLOCK (priv);
3588       break;
3589     case PROP_RTX_RETRY_PERIOD:
3590       JBUF_LOCK (priv);
3591       priv->rtx_retry_period = g_value_get_int (value);
3592       JBUF_UNLOCK (priv);
3593       break;
3594     default:
3595       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3596       break;
3597   }
3598 }
3599
3600 static void
3601 gst_rtp_jitter_buffer_get_property (GObject * object,
3602     guint prop_id, GValue * value, GParamSpec * pspec)
3603 {
3604   GstRtpJitterBuffer *jitterbuffer;
3605   GstRtpJitterBufferPrivate *priv;
3606
3607   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3608   priv = jitterbuffer->priv;
3609
3610   switch (prop_id) {
3611     case PROP_LATENCY:
3612       JBUF_LOCK (priv);
3613       g_value_set_uint (value, priv->latency_ms);
3614       JBUF_UNLOCK (priv);
3615       break;
3616     case PROP_DROP_ON_LATENCY:
3617       JBUF_LOCK (priv);
3618       g_value_set_boolean (value, priv->drop_on_latency);
3619       JBUF_UNLOCK (priv);
3620       break;
3621     case PROP_TS_OFFSET:
3622       JBUF_LOCK (priv);
3623       g_value_set_int64 (value, priv->ts_offset);
3624       JBUF_UNLOCK (priv);
3625       break;
3626     case PROP_DO_LOST:
3627       JBUF_LOCK (priv);
3628       g_value_set_boolean (value, priv->do_lost);
3629       JBUF_UNLOCK (priv);
3630       break;
3631     case PROP_MODE:
3632       JBUF_LOCK (priv);
3633       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3634       JBUF_UNLOCK (priv);
3635       break;
3636     case PROP_PERCENT:
3637     {
3638       gint percent;
3639
3640       JBUF_LOCK (priv);
3641       if (priv->srcresult != GST_FLOW_OK)
3642         percent = 100;
3643       else
3644         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3645
3646       g_value_set_int (value, percent);
3647       JBUF_UNLOCK (priv);
3648       break;
3649     }
3650     case PROP_DO_RETRANSMISSION:
3651       JBUF_LOCK (priv);
3652       g_value_set_boolean (value, priv->do_retransmission);
3653       JBUF_UNLOCK (priv);
3654       break;
3655     case PROP_RTX_DELAY:
3656       JBUF_LOCK (priv);
3657       g_value_set_int (value, priv->rtx_delay);
3658       JBUF_UNLOCK (priv);
3659       break;
3660     case PROP_RTX_MIN_DELAY:
3661       JBUF_LOCK (priv);
3662       g_value_set_uint (value, priv->rtx_min_delay);
3663       JBUF_UNLOCK (priv);
3664       break;
3665     case PROP_RTX_DELAY_REORDER:
3666       JBUF_LOCK (priv);
3667       g_value_set_int (value, priv->rtx_delay_reorder);
3668       JBUF_UNLOCK (priv);
3669       break;
3670     case PROP_RTX_RETRY_TIMEOUT:
3671       JBUF_LOCK (priv);
3672       g_value_set_int (value, priv->rtx_retry_timeout);
3673       JBUF_UNLOCK (priv);
3674       break;
3675     case PROP_RTX_MIN_RETRY_TIMEOUT:
3676       JBUF_LOCK (priv);
3677       g_value_set_int (value, priv->rtx_min_retry_timeout);
3678       JBUF_UNLOCK (priv);
3679       break;
3680     case PROP_RTX_RETRY_PERIOD:
3681       JBUF_LOCK (priv);
3682       g_value_set_int (value, priv->rtx_retry_period);
3683       JBUF_UNLOCK (priv);
3684       break;
3685     case PROP_STATS:
3686       g_value_take_boxed (value,
3687           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3688       break;
3689     default:
3690       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3691       break;
3692   }
3693 }
3694
3695 static GstStructure *
3696 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3697 {
3698   GstStructure *s;
3699
3700   JBUF_LOCK (jbuf->priv);
3701   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3702       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3703       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3704       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3705       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3706   JBUF_UNLOCK (jbuf->priv);
3707
3708   return s;
3709 }