rtpjitterbuffer: improve clear-pt-map handling
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  *
95  * Last reviewed on 2007-05-28 (0.10.5)
96  */
97
98 #ifdef HAVE_CONFIG_H
99 #include "config.h"
100 #endif
101
102 #include <stdlib.h>
103 #include <string.h>
104 #include <gst/rtp/gstrtpbuffer.h>
105
106 #include "gstrtpjitterbuffer.h"
107 #include "rtpjitterbuffer.h"
108 #include "rtpstats.h"
109
110 #include <gst/glib-compat-private.h>
111
112 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
113 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
114
115 /* RTPJitterBuffer signals and args */
116 enum
117 {
118   SIGNAL_REQUEST_PT_MAP,
119   SIGNAL_CLEAR_PT_MAP,
120   SIGNAL_HANDLE_SYNC,
121   SIGNAL_ON_NPT_STOP,
122   SIGNAL_SET_ACTIVE,
123   LAST_SIGNAL
124 };
125
126 #define DEFAULT_LATENCY_MS          200
127 #define DEFAULT_DROP_ON_LATENCY     FALSE
128 #define DEFAULT_TS_OFFSET           0
129 #define DEFAULT_DO_LOST             FALSE
130 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
131 #define DEFAULT_PERCENT             0
132 #define DEFAULT_DO_RETRANSMISSION   FALSE
133 #define DEFAULT_RTX_DELAY           20
134 #define DEFAULT_RTX_DELAY_REORDER   3
135 #define DEFAULT_RTX_RETRY_TIMEOUT   40
136 #define DEFAULT_RTX_RETRY_PERIOD    160
137
138 enum
139 {
140   PROP_0,
141   PROP_LATENCY,
142   PROP_DROP_ON_LATENCY,
143   PROP_TS_OFFSET,
144   PROP_DO_LOST,
145   PROP_MODE,
146   PROP_PERCENT,
147   PROP_DO_RETRANSMISSION,
148   PROP_RTX_DELAY,
149   PROP_RTX_DELAY_REORDER,
150   PROP_RTX_RETRY_TIMEOUT,
151   PROP_RTX_RETRY_PERIOD,
152   PROP_STATS,
153   PROP_LAST
154 };
155
156 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
157
158 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
159   JBUF_LOCK (priv);                                   \
160   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
161     goto label;                                       \
162 } G_STMT_END
163 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
164
165 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
166   GST_DEBUG ("waiting timer");                            \
167   (priv)->waiting_timer = TRUE;                           \
168   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
169   (priv)->waiting_timer = FALSE;                          \
170   GST_DEBUG ("waiting timer done");                       \
171 } G_STMT_END
172 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
173   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
174     GST_DEBUG ("signal timer");                           \
175     g_cond_signal (&(priv)->jbuf_timer);                  \
176   }                                                       \
177 } G_STMT_END
178
179 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
180   GST_DEBUG ("waiting event");                           \
181   (priv)->waiting_event = TRUE;                          \
182   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
183   (priv)->waiting_event = FALSE;                         \
184   GST_DEBUG ("waiting event done");                      \
185   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
186     goto label;                                          \
187 } G_STMT_END
188 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
189   if (G_UNLIKELY ((priv)->waiting_event)) {              \
190     GST_DEBUG ("signal event");                          \
191     g_cond_signal (&(priv)->jbuf_event);                 \
192   }                                                      \
193 } G_STMT_END
194
195 struct _GstRtpJitterBufferPrivate
196 {
197   GstPad *sinkpad, *srcpad;
198   GstPad *rtcpsinkpad;
199
200   RTPJitterBuffer *jbuf;
201   GMutex jbuf_lock;
202   gboolean waiting_timer;
203   GCond jbuf_timer;
204   gboolean waiting_event;
205   GCond jbuf_event;
206   gboolean discont;
207   gboolean ts_discont;
208   gboolean active;
209   guint64 out_offset;
210
211   gboolean timer_running;
212   GThread *timer_thread;
213
214   /* properties */
215   guint latency_ms;
216   guint64 latency_ns;
217   gboolean drop_on_latency;
218   gint64 ts_offset;
219   gboolean do_lost;
220   gboolean do_retransmission;
221   gint rtx_delay;
222   gint rtx_delay_reorder;
223   gint rtx_retry_timeout;
224   gint rtx_retry_period;
225
226   /* the last seqnum we pushed out */
227   guint32 last_popped_seqnum;
228   /* the next expected seqnum we push */
229   guint32 next_seqnum;
230   /* last output time */
231   GstClockTime last_out_time;
232   /* last valid input timestamp and rtptime pair */
233   GstClockTime ips_dts;
234   guint64 ips_rtptime;
235   GstClockTime packet_spacing;
236
237   /* the next expected seqnum we receive */
238   GstClockTime last_in_dts;
239   guint32 last_in_seqnum;
240   guint32 next_in_seqnum;
241
242   GArray *timers;
243
244   /* start and stop ranges */
245   GstClockTime npt_start;
246   GstClockTime npt_stop;
247   guint64 ext_timestamp;
248   guint64 last_elapsed;
249   guint64 estimated_eos;
250   GstClockID eos_id;
251
252   /* state */
253   gboolean eos;
254
255   /* clock rate and rtp timestamp offset */
256   gint last_pt;
257   gint32 clock_rate;
258   gint64 clock_base;
259   gint64 prev_ts_offset;
260
261   /* when we are shutting down */
262   GstFlowReturn srcresult;
263   gboolean blocked;
264
265   /* for sync */
266   GstSegment segment;
267   GstClockID clock_id;
268   GstClockTime timer_timeout;
269   guint16 timer_seqnum;
270   /* the latency of the upstream peer, we have to take this into account when
271    * synchronizing the buffers. */
272   GstClockTime peer_latency;
273   guint64 ext_rtptime;
274   GstBuffer *last_sr;
275
276   /* some accounting */
277   guint64 num_late;
278   guint64 num_duplicates;
279   guint64 num_rtx_requests;
280   guint64 num_rtx_success;
281   guint64 num_rtx_failed;
282   gdouble avg_rtx_num;
283   guint64 avg_rtx_rtt;
284 };
285
286 typedef enum
287 {
288   TIMER_TYPE_EXPECTED,
289   TIMER_TYPE_LOST,
290   TIMER_TYPE_DEADLINE,
291   TIMER_TYPE_EOS
292 } TimerType;
293
294 typedef struct
295 {
296   guint idx;
297   guint16 seqnum;
298   guint num;
299   TimerType type;
300   GstClockTime timeout;
301   GstClockTime duration;
302   GstClockTime rtx_base;
303   GstClockTime rtx_delay;
304   GstClockTime rtx_retry;
305   GstClockTime rtx_last;
306   guint num_rtx_retry;
307 } TimerData;
308
309 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
310   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
311                                 GstRtpJitterBufferPrivate))
312
313 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
314 GST_STATIC_PAD_TEMPLATE ("sink",
315     GST_PAD_SINK,
316     GST_PAD_ALWAYS,
317     GST_STATIC_CAPS ("application/x-rtp, "
318         "clock-rate = (int) [ 1, 2147483647 ]"
319         /* "payload = (int) , "
320          * "encoding-name = (string) "
321          */ )
322     );
323
324 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
325 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
326     GST_PAD_SINK,
327     GST_PAD_REQUEST,
328     GST_STATIC_CAPS ("application/x-rtcp")
329     );
330
331 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
332 GST_STATIC_PAD_TEMPLATE ("src",
333     GST_PAD_SRC,
334     GST_PAD_ALWAYS,
335     GST_STATIC_CAPS ("application/x-rtp"
336         /* "payload = (int) , "
337          * "clock-rate = (int) , "
338          * "encoding-name = (string) "
339          */ )
340     );
341
342 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
343
344 #define gst_rtp_jitter_buffer_parent_class parent_class
345 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
346
347 /* object overrides */
348 static void gst_rtp_jitter_buffer_set_property (GObject * object,
349     guint prop_id, const GValue * value, GParamSpec * pspec);
350 static void gst_rtp_jitter_buffer_get_property (GObject * object,
351     guint prop_id, GValue * value, GParamSpec * pspec);
352 static void gst_rtp_jitter_buffer_finalize (GObject * object);
353
354 /* element overrides */
355 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
356     * element, GstStateChange transition);
357 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
358     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
359 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
360     GstPad * pad);
361 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
362
363 /* pad overrides */
364 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
365 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
366     GstObject * parent);
367
368 /* sinkpad overrides */
369 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
370     GstObject * parent, GstEvent * event);
371 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
372     GstObject * parent, GstBuffer * buffer);
373
374 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
375     GstObject * parent, GstEvent * event);
376 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
377     GstObject * parent, GstBuffer * buffer);
378
379 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
380     GstObject * parent, GstQuery * query);
381
382 /* srcpad overrides */
383 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
384     GstObject * parent, GstEvent * event);
385 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
386     GstObject * parent, GstPadMode mode, gboolean active);
387 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
388 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
389     GstObject * parent, GstQuery * query);
390
391 static void
392 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
393 static GstClockTime
394 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
395     gboolean active, guint64 base_time);
396 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
397
398 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
399 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
400
401 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
402
403 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
404     jitterbuffer);
405
406 static void
407 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
408 {
409   GObjectClass *gobject_class;
410   GstElementClass *gstelement_class;
411
412   gobject_class = (GObjectClass *) klass;
413   gstelement_class = (GstElementClass *) klass;
414
415   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
416
417   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
418
419   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
420   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
421
422   /**
423    * GstRtpJitterBuffer:latency:
424    *
425    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
426    * for at most this time.
427    */
428   g_object_class_install_property (gobject_class, PROP_LATENCY,
429       g_param_spec_uint ("latency", "Buffer latency in ms",
430           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432   /**
433    * GstRtpJitterBuffer:drop-on-latency:
434    *
435    * Drop oldest buffers when the queue is completely filled.
436    */
437   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
438       g_param_spec_boolean ("drop-on-latency",
439           "Drop buffers when maximum latency is reached",
440           "Tells the jitterbuffer to never exceed the given latency in size",
441           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442   /**
443    * GstRtpJitterBuffer:ts-offset:
444    *
445    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
446    * This is mainly used to ensure interstream synchronisation.
447    */
448   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
449       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
450           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
451           G_MAXINT64, DEFAULT_TS_OFFSET,
452           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453
454   /**
455    * GstRtpJitterBuffer:do-lost:
456    *
457    * Send out a GstRTPPacketLost event downstream when a packet is considered
458    * lost.
459    */
460   g_object_class_install_property (gobject_class, PROP_DO_LOST,
461       g_param_spec_boolean ("do-lost", "Do Lost",
462           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
463           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464
465   /**
466    * GstRtpJitterBuffer:mode:
467    *
468    * Control the buffering and timestamping mode used by the jitterbuffer.
469    */
470   g_object_class_install_property (gobject_class, PROP_MODE,
471       g_param_spec_enum ("mode", "Mode",
472           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
473           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474   /**
475    * GstRtpJitterBuffer:percent:
476    *
477    * The percent of the jitterbuffer that is filled.
478    */
479   g_object_class_install_property (gobject_class, PROP_PERCENT,
480       g_param_spec_int ("percent", "percent",
481           "The buffer filled percent", 0, 100,
482           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
483   /**
484    * GstRtpJitterBuffer:do-retransmission:
485    *
486    * Send out a GstRTPRetransmission event upstream when a packet is considered
487    * late and should be retransmitted.
488    *
489    * Since: 1.2
490    */
491   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
492       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
493           "Send retransmission events upstream when a packet is late",
494           DEFAULT_DO_RETRANSMISSION,
495           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496
497   /**
498    * GstRtpJitterBuffer:rtx-delay:
499    *
500    * When a packet did not arrive at the expected time, wait this extra amount
501    * of time before sending a retransmission event.
502    *
503    * When -1 is used, the max jitter will be used as extra delay.
504    *
505    * Since: 1.2
506    */
507   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
508       g_param_spec_int ("rtx-delay", "RTX Delay",
509           "Extra time in ms to wait before sending retransmission "
510           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
511           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
512   /**
513    * GstRtpJitterBuffer:rtx-delay-reorder:
514    *
515    * Assume that a retransmission event should be sent when we see
516    * this much packet reordering.
517    *
518    * When -1 is used, the value will be estimated based on observed packet
519    * reordering.
520    *
521    * Since: 1.2
522    */
523   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
524       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
525           "Sending retransmission event when this much reordering (-1 automatic)",
526           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
527           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
528   /**
529    * GstRtpJitterBuffer::rtx-retry-timeout:
530    *
531    * When no packet has been received after sending a retransmission event
532    * for this time, retry sending a retransmission event.
533    *
534    * When -1 is used, the value will be estimated based on observed round
535    * trip time.
536    *
537    * Since: 1.2
538    */
539   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
540       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
541           "Retry sending a transmission event after this timeout in "
542           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
543           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
544   /**
545    * GstRtpJitterBuffer:rtx-retry-period:
546    *
547    * The amount of time to try to get a retransmission.
548    *
549    * When -1 is used, the value will be estimated based on the jitterbuffer
550    * latency and the observed round trip time.
551    *
552    * Since: 1.2
553    */
554   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
555       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
556           "Try to get a retransmission for this many ms "
557           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
558           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
559   /**
560    * GstRtpJitterBuffer:stats:
561    *
562    * Various jitterbuffer statistics. This property returns a GstStructure
563    * with name application/x-rtp-jitterbuffer-stats with the following fields:
564    *
565    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
566    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
567    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
568    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
569    *
570    * Since: 1.4
571    */
572   g_object_class_install_property (gobject_class, PROP_STATS,
573       g_param_spec_boxed ("stats", "Statistics",
574           "Various statistics", GST_TYPE_STRUCTURE,
575           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
576
577   /**
578    * GstRtpJitterBuffer::request-pt-map:
579    * @buffer: the object which received the signal
580    * @pt: the pt
581    *
582    * Request the payload type as #GstCaps for @pt.
583    */
584   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
585       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
586       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
587           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
588       GST_TYPE_CAPS, 1, G_TYPE_UINT);
589   /**
590    * GstRtpJitterBuffer::handle-sync:
591    * @buffer: the object which received the signal
592    * @struct: a GstStructure containing sync values.
593    *
594    * Be notified of new sync values.
595    */
596   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
597       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
598       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
599           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
600       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
601
602   /**
603    * GstRtpJitterBuffer::on-npt-stop:
604    * @buffer: the object which received the signal
605    *
606    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
607    * the npt-stop position.
608    */
609   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
610       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
611       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
612           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
613       G_TYPE_NONE, 0, G_TYPE_NONE);
614
615   /**
616    * GstRtpJitterBuffer::clear-pt-map:
617    * @buffer: the object which received the signal
618    *
619    * Invalidate the clock-rate as obtained with the
620    * #GstRtpJitterBuffer::request-pt-map signal.
621    */
622   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
623       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
624       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
625       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
626       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
627
628   /**
629    * GstRtpJitterBuffer::set-active:
630    * @buffer: the object which received the signal
631    *
632    * Start pushing out packets with the given base time. This signal is only
633    * useful in buffering mode.
634    *
635    * Returns: the time of the last pushed packet.
636    */
637   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
638       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
639       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
640       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
641       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
642       G_TYPE_UINT64);
643
644   gstelement_class->change_state =
645       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
646   gstelement_class->request_new_pad =
647       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
648   gstelement_class->release_pad =
649       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
650   gstelement_class->provide_clock =
651       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
652
653   gst_element_class_add_pad_template (gstelement_class,
654       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
655   gst_element_class_add_pad_template (gstelement_class,
656       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
657   gst_element_class_add_pad_template (gstelement_class,
658       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
659
660   gst_element_class_set_static_metadata (gstelement_class,
661       "RTP packet jitter-buffer", "Filter/Network/RTP",
662       "A buffer that deals with network jitter and other transmission faults",
663       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
664       "Wim Taymans <wim.taymans@gmail.com>");
665
666   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
667   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
668
669   GST_DEBUG_CATEGORY_INIT
670       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
671 }
672
673 static void
674 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
675 {
676   GstRtpJitterBufferPrivate *priv;
677
678   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
679   jitterbuffer->priv = priv;
680
681   priv->latency_ms = DEFAULT_LATENCY_MS;
682   priv->latency_ns = priv->latency_ms * GST_MSECOND;
683   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
684   priv->do_lost = DEFAULT_DO_LOST;
685   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
686   priv->rtx_delay = DEFAULT_RTX_DELAY;
687   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
688   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
689   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
690
691   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
692   priv->jbuf = rtp_jitter_buffer_new ();
693   g_mutex_init (&priv->jbuf_lock);
694   g_cond_init (&priv->jbuf_timer);
695   g_cond_init (&priv->jbuf_event);
696
697   /* reset skew detection initialy */
698   rtp_jitter_buffer_reset_skew (priv->jbuf);
699   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
700   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
701   priv->active = TRUE;
702
703   priv->srcpad =
704       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
705       "src");
706
707   gst_pad_set_activatemode_function (priv->srcpad,
708       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
709   gst_pad_set_query_function (priv->srcpad,
710       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
711   gst_pad_set_event_function (priv->srcpad,
712       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
713
714   priv->sinkpad =
715       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
716       "sink");
717
718   gst_pad_set_chain_function (priv->sinkpad,
719       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
720   gst_pad_set_event_function (priv->sinkpad,
721       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
722   gst_pad_set_query_function (priv->sinkpad,
723       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
724
725   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
726   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
727
728   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
729 }
730
731 #define ITEM_TYPE_BUFFER        0
732 #define ITEM_TYPE_LOST          1
733
734 static RTPJitterBufferItem *
735 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
736     guint seqnum, guint count, guint rtptime)
737 {
738   RTPJitterBufferItem *item;
739
740   item = g_slice_new (RTPJitterBufferItem);
741   item->data = data;
742   item->next = NULL;
743   item->prev = NULL;
744   item->type = type;
745   item->dts = dts;
746   item->pts = pts;
747   item->seqnum = seqnum;
748   item->count = count;
749   item->rtptime = rtptime;
750
751   return item;
752 }
753
754 static void
755 free_item (RTPJitterBufferItem * item)
756 {
757   if (item->data)
758     gst_mini_object_unref (item->data);
759   g_slice_free (RTPJitterBufferItem, item);
760 }
761
762 static void
763 gst_rtp_jitter_buffer_finalize (GObject * object)
764 {
765   GstRtpJitterBuffer *jitterbuffer;
766   GstRtpJitterBufferPrivate *priv;
767
768   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
769   priv = jitterbuffer->priv;
770
771   g_array_free (priv->timers, TRUE);
772   g_mutex_clear (&priv->jbuf_lock);
773   g_cond_clear (&priv->jbuf_timer);
774   g_cond_clear (&priv->jbuf_event);
775
776   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
777   g_object_unref (priv->jbuf);
778
779   G_OBJECT_CLASS (parent_class)->finalize (object);
780 }
781
782 static GstIterator *
783 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
784 {
785   GstRtpJitterBuffer *jitterbuffer;
786   GstPad *otherpad = NULL;
787   GstIterator *it;
788   GValue val = { 0, };
789
790   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
791
792   if (pad == jitterbuffer->priv->sinkpad) {
793     otherpad = jitterbuffer->priv->srcpad;
794   } else if (pad == jitterbuffer->priv->srcpad) {
795     otherpad = jitterbuffer->priv->sinkpad;
796   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
797     otherpad = NULL;
798   }
799
800   g_value_init (&val, GST_TYPE_PAD);
801   g_value_set_object (&val, otherpad);
802   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
803   g_value_unset (&val);
804
805   return it;
806 }
807
808 static GstPad *
809 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
810 {
811   GstRtpJitterBufferPrivate *priv;
812
813   priv = jitterbuffer->priv;
814
815   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
816
817   priv->rtcpsinkpad =
818       gst_pad_new_from_static_template
819       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
820   gst_pad_set_chain_function (priv->rtcpsinkpad,
821       gst_rtp_jitter_buffer_chain_rtcp);
822   gst_pad_set_event_function (priv->rtcpsinkpad,
823       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
824   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
825       gst_rtp_jitter_buffer_iterate_internal_links);
826   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
827   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
828
829   return priv->rtcpsinkpad;
830 }
831
832 static void
833 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
834 {
835   GstRtpJitterBufferPrivate *priv;
836
837   priv = jitterbuffer->priv;
838
839   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
840
841   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
842
843   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
844   priv->rtcpsinkpad = NULL;
845 }
846
847 static GstPad *
848 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
849     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
850 {
851   GstRtpJitterBuffer *jitterbuffer;
852   GstElementClass *klass;
853   GstPad *result;
854   GstRtpJitterBufferPrivate *priv;
855
856   g_return_val_if_fail (templ != NULL, NULL);
857   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
858
859   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
860   priv = jitterbuffer->priv;
861   klass = GST_ELEMENT_GET_CLASS (element);
862
863   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
864
865   /* figure out the template */
866   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
867     if (priv->rtcpsinkpad != NULL)
868       goto exists;
869
870     result = create_rtcp_sink (jitterbuffer);
871   } else
872     goto wrong_template;
873
874   return result;
875
876   /* ERRORS */
877 wrong_template:
878   {
879     g_warning ("rtpjitterbuffer: this is not our template");
880     return NULL;
881   }
882 exists:
883   {
884     g_warning ("rtpjitterbuffer: pad already requested");
885     return NULL;
886   }
887 }
888
889 static void
890 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
891 {
892   GstRtpJitterBuffer *jitterbuffer;
893   GstRtpJitterBufferPrivate *priv;
894
895   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
896   g_return_if_fail (GST_IS_PAD (pad));
897
898   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
899   priv = jitterbuffer->priv;
900
901   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
902
903   if (priv->rtcpsinkpad == pad) {
904     remove_rtcp_sink (jitterbuffer);
905   } else
906     goto wrong_pad;
907
908   return;
909
910   /* ERRORS */
911 wrong_pad:
912   {
913     g_warning ("gstjitterbuffer: asked to release an unknown pad");
914     return;
915   }
916 }
917
918 static GstClock *
919 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
920 {
921   return gst_system_clock_obtain ();
922 }
923
924 static void
925 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
926 {
927   GstRtpJitterBufferPrivate *priv;
928
929   priv = jitterbuffer->priv;
930
931   /* this will trigger a new pt-map request signal, FIXME, do something better. */
932
933   JBUF_LOCK (priv);
934   priv->clock_rate = -1;
935   /* do not clear current content, but refresh state for new arrival */
936   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
937   rtp_jitter_buffer_reset_skew (priv->jbuf);
938   JBUF_UNLOCK (priv);
939 }
940
941 static GstClockTime
942 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
943     guint64 offset)
944 {
945   GstRtpJitterBufferPrivate *priv;
946   GstClockTime last_out;
947   RTPJitterBufferItem *item;
948
949   priv = jbuf->priv;
950
951   JBUF_LOCK (priv);
952   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
953       active, GST_TIME_ARGS (offset));
954
955   if (active != priv->active) {
956     /* add the amount of time spent in paused to the output offset. All
957      * outgoing buffers will have this offset applied to their timestamps in
958      * order to make them arrive in time in the sink. */
959     priv->out_offset = offset;
960     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
961         GST_TIME_ARGS (priv->out_offset));
962     priv->active = active;
963     JBUF_SIGNAL_EVENT (priv);
964   }
965   if (!active) {
966     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
967   }
968   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
969     /* head buffer timestamp and offset gives our output time */
970     last_out = item->dts + priv->ts_offset;
971   } else {
972     /* use last known time when the buffer is empty */
973     last_out = priv->last_out_time;
974   }
975   JBUF_UNLOCK (priv);
976
977   return last_out;
978 }
979
980 static GstCaps *
981 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
982 {
983   GstRtpJitterBuffer *jitterbuffer;
984   GstRtpJitterBufferPrivate *priv;
985   GstPad *other;
986   GstCaps *caps;
987   GstCaps *templ;
988
989   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
990   priv = jitterbuffer->priv;
991
992   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
993
994   caps = gst_pad_peer_query_caps (other, filter);
995
996   templ = gst_pad_get_pad_template_caps (pad);
997   if (caps == NULL) {
998     GST_DEBUG_OBJECT (jitterbuffer, "use template");
999     caps = templ;
1000   } else {
1001     GstCaps *intersect;
1002
1003     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1004
1005     intersect = gst_caps_intersect (caps, templ);
1006     gst_caps_unref (caps);
1007     gst_caps_unref (templ);
1008
1009     caps = intersect;
1010   }
1011   gst_object_unref (jitterbuffer);
1012
1013   return caps;
1014 }
1015
1016 /*
1017  * Must be called with JBUF_LOCK held
1018  */
1019
1020 static gboolean
1021 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1022     GstCaps * caps)
1023 {
1024   GstRtpJitterBufferPrivate *priv;
1025   GstStructure *caps_struct;
1026   guint val;
1027   GstClockTime tval;
1028
1029   priv = jitterbuffer->priv;
1030
1031   /* first parse the caps */
1032   caps_struct = gst_caps_get_structure (caps, 0);
1033
1034   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1035
1036   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1037    * measure the amount of data in the buffer */
1038   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1039     goto error;
1040
1041   if (priv->clock_rate <= 0)
1042     goto wrong_rate;
1043
1044   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1045
1046   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1047
1048   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1049    * can use this to track the amount of time elapsed on the sender. */
1050   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1051     priv->clock_base = val;
1052   else
1053     priv->clock_base = -1;
1054
1055   priv->ext_timestamp = priv->clock_base;
1056
1057   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1058       priv->clock_base);
1059
1060   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1061     /* first expected seqnum, only update when we didn't have a previous base. */
1062     if (priv->next_in_seqnum == -1)
1063       priv->next_in_seqnum = val;
1064     if (priv->next_seqnum == -1)
1065       priv->next_seqnum = val;
1066   }
1067
1068   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1069
1070   /* the start and stop times. The seqnum-base corresponds to the start time. We
1071    * will keep track of the seqnums on the output and when we reach the one
1072    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1073   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1074     priv->npt_start = tval;
1075   else
1076     priv->npt_start = 0;
1077
1078   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1079     priv->npt_stop = tval;
1080   else
1081     priv->npt_stop = -1;
1082
1083   GST_DEBUG_OBJECT (jitterbuffer,
1084       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1085       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1086
1087   return TRUE;
1088
1089   /* ERRORS */
1090 error:
1091   {
1092     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1093     return FALSE;
1094   }
1095 wrong_rate:
1096   {
1097     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1098     return FALSE;
1099   }
1100 }
1101
1102 static void
1103 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1104 {
1105   GstRtpJitterBufferPrivate *priv;
1106
1107   priv = jitterbuffer->priv;
1108
1109   JBUF_LOCK (priv);
1110   /* mark ourselves as flushing */
1111   priv->srcresult = GST_FLOW_FLUSHING;
1112   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1113   /* this unblocks any waiting pops on the src pad task */
1114   JBUF_SIGNAL_EVENT (priv);
1115   JBUF_UNLOCK (priv);
1116 }
1117
1118 static void
1119 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1120 {
1121   GstRtpJitterBufferPrivate *priv;
1122
1123   priv = jitterbuffer->priv;
1124
1125   JBUF_LOCK (priv);
1126   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1127   /* Mark as non flushing */
1128   priv->srcresult = GST_FLOW_OK;
1129   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1130   priv->last_popped_seqnum = -1;
1131   priv->last_out_time = -1;
1132   priv->next_seqnum = -1;
1133   priv->ips_rtptime = -1;
1134   priv->ips_dts = GST_CLOCK_TIME_NONE;
1135   priv->packet_spacing = 0;
1136   priv->next_in_seqnum = -1;
1137   priv->clock_rate = -1;
1138   priv->eos = FALSE;
1139   priv->estimated_eos = -1;
1140   priv->last_elapsed = 0;
1141   priv->ext_timestamp = -1;
1142   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1143   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1144   rtp_jitter_buffer_reset_skew (priv->jbuf);
1145   remove_all_timers (jitterbuffer);
1146   JBUF_UNLOCK (priv);
1147 }
1148
1149 static gboolean
1150 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1151     GstPadMode mode, gboolean active)
1152 {
1153   gboolean result;
1154   GstRtpJitterBuffer *jitterbuffer = NULL;
1155
1156   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1157
1158   switch (mode) {
1159     case GST_PAD_MODE_PUSH:
1160       if (active) {
1161         /* allow data processing */
1162         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1163
1164         /* start pushing out buffers */
1165         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1166         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1167             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1168       } else {
1169         /* make sure all data processing stops ASAP */
1170         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1171
1172         /* NOTE this will hardlock if the state change is called from the src pad
1173          * task thread because we will _join() the thread. */
1174         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1175         result = gst_pad_stop_task (pad);
1176       }
1177       break;
1178     default:
1179       result = FALSE;
1180       break;
1181   }
1182   return result;
1183 }
1184
1185 static GstStateChangeReturn
1186 gst_rtp_jitter_buffer_change_state (GstElement * element,
1187     GstStateChange transition)
1188 {
1189   GstRtpJitterBuffer *jitterbuffer;
1190   GstRtpJitterBufferPrivate *priv;
1191   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1192
1193   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1194   priv = jitterbuffer->priv;
1195
1196   switch (transition) {
1197     case GST_STATE_CHANGE_NULL_TO_READY:
1198       break;
1199     case GST_STATE_CHANGE_READY_TO_PAUSED:
1200       JBUF_LOCK (priv);
1201       /* reset negotiated values */
1202       priv->clock_rate = -1;
1203       priv->clock_base = -1;
1204       priv->peer_latency = 0;
1205       priv->last_pt = -1;
1206       /* block until we go to PLAYING */
1207       priv->blocked = TRUE;
1208       priv->timer_running = TRUE;
1209       priv->timer_thread =
1210           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1211       JBUF_UNLOCK (priv);
1212       break;
1213     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1214       JBUF_LOCK (priv);
1215       /* unblock to allow streaming in PLAYING */
1216       priv->blocked = FALSE;
1217       JBUF_SIGNAL_EVENT (priv);
1218       JBUF_SIGNAL_TIMER (priv);
1219       JBUF_UNLOCK (priv);
1220       break;
1221     default:
1222       break;
1223   }
1224
1225   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1226
1227   switch (transition) {
1228     case GST_STATE_CHANGE_READY_TO_PAUSED:
1229       /* we are a live element because we sync to the clock, which we can only
1230        * do in the PLAYING state */
1231       if (ret != GST_STATE_CHANGE_FAILURE)
1232         ret = GST_STATE_CHANGE_NO_PREROLL;
1233       break;
1234     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1235       JBUF_LOCK (priv);
1236       /* block to stop streaming when PAUSED */
1237       priv->blocked = TRUE;
1238       unschedule_current_timer (jitterbuffer);
1239       JBUF_UNLOCK (priv);
1240       if (ret != GST_STATE_CHANGE_FAILURE)
1241         ret = GST_STATE_CHANGE_NO_PREROLL;
1242       break;
1243     case GST_STATE_CHANGE_PAUSED_TO_READY:
1244       JBUF_LOCK (priv);
1245       gst_buffer_replace (&priv->last_sr, NULL);
1246       priv->timer_running = FALSE;
1247       unschedule_current_timer (jitterbuffer);
1248       JBUF_SIGNAL_TIMER (priv);
1249       JBUF_UNLOCK (priv);
1250       g_thread_join (priv->timer_thread);
1251       priv->timer_thread = NULL;
1252       break;
1253     case GST_STATE_CHANGE_READY_TO_NULL:
1254       break;
1255     default:
1256       break;
1257   }
1258
1259   return ret;
1260 }
1261
1262 static gboolean
1263 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1264     GstEvent * event)
1265 {
1266   gboolean ret = TRUE;
1267   GstRtpJitterBuffer *jitterbuffer;
1268   GstRtpJitterBufferPrivate *priv;
1269
1270   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1271   priv = jitterbuffer->priv;
1272
1273   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1274
1275   switch (GST_EVENT_TYPE (event)) {
1276     case GST_EVENT_LATENCY:
1277     {
1278       GstClockTime latency;
1279
1280       gst_event_parse_latency (event, &latency);
1281
1282       GST_DEBUG_OBJECT (jitterbuffer,
1283           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1284
1285       JBUF_LOCK (priv);
1286       /* adjust the overall buffer delay to the total pipeline latency in
1287        * buffering mode because if downstream consumes too fast (because of
1288        * large latency or queues, we would start rebuffering again. */
1289       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1290           RTP_JITTER_BUFFER_MODE_BUFFER) {
1291         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1292       }
1293       JBUF_UNLOCK (priv);
1294
1295       ret = gst_pad_push_event (priv->sinkpad, event);
1296       break;
1297     }
1298     default:
1299       ret = gst_pad_push_event (priv->sinkpad, event);
1300       break;
1301   }
1302
1303   return ret;
1304 }
1305
1306 static gboolean
1307 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1308     GstEvent * event)
1309 {
1310   gboolean ret = TRUE;
1311   GstRtpJitterBuffer *jitterbuffer;
1312   GstRtpJitterBufferPrivate *priv;
1313
1314   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1315   priv = jitterbuffer->priv;
1316
1317   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1318
1319   switch (GST_EVENT_TYPE (event)) {
1320     case GST_EVENT_CAPS:
1321     {
1322       GstCaps *caps;
1323
1324       gst_event_parse_caps (event, &caps);
1325
1326       JBUF_LOCK (priv);
1327       ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1328       JBUF_UNLOCK (priv);
1329
1330       /* set same caps on srcpad on success */
1331       if (ret)
1332         ret = gst_pad_push_event (priv->srcpad, event);
1333       else
1334         gst_event_unref (event);
1335       break;
1336     }
1337     case GST_EVENT_SEGMENT:
1338     {
1339       gst_event_copy_segment (event, &priv->segment);
1340
1341       /* we need time for now */
1342       if (priv->segment.format != GST_FORMAT_TIME)
1343         goto newseg_wrong_format;
1344
1345       GST_DEBUG_OBJECT (jitterbuffer,
1346           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1347
1348       /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1349       ret = gst_pad_push_event (priv->srcpad, event);
1350       break;
1351     }
1352     case GST_EVENT_FLUSH_START:
1353       ret = gst_pad_push_event (priv->srcpad, event);
1354       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1355       /* wait for the loop to go into PAUSED */
1356       gst_pad_pause_task (priv->srcpad);
1357       break;
1358     case GST_EVENT_FLUSH_STOP:
1359       ret = gst_pad_push_event (priv->srcpad, event);
1360       ret =
1361           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1362           GST_PAD_MODE_PUSH, TRUE);
1363       break;
1364     case GST_EVENT_EOS:
1365     {
1366       /* push EOS in queue. We always push it at the head */
1367       JBUF_LOCK (priv);
1368       /* check for flushing, we need to discard the event and return FALSE when
1369        * we are flushing */
1370       ret = priv->srcresult == GST_FLOW_OK;
1371       if (ret && !priv->eos) {
1372         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1373         priv->eos = TRUE;
1374         JBUF_SIGNAL_EVENT (priv);
1375       } else if (priv->eos) {
1376         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1377       } else {
1378         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1379             gst_flow_get_name (priv->srcresult));
1380       }
1381       JBUF_UNLOCK (priv);
1382       gst_event_unref (event);
1383       break;
1384     }
1385     default:
1386       ret = gst_pad_push_event (priv->srcpad, event);
1387       break;
1388   }
1389
1390 done:
1391
1392   return ret;
1393
1394   /* ERRORS */
1395 newseg_wrong_format:
1396   {
1397     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1398     ret = FALSE;
1399     gst_event_unref (event);
1400     goto done;
1401   }
1402 }
1403
1404 static gboolean
1405 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1406     GstEvent * event)
1407 {
1408   gboolean ret = TRUE;
1409   GstRtpJitterBuffer *jitterbuffer;
1410
1411   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1412
1413   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1414
1415   switch (GST_EVENT_TYPE (event)) {
1416     case GST_EVENT_FLUSH_START:
1417       gst_event_unref (event);
1418       break;
1419     case GST_EVENT_FLUSH_STOP:
1420       gst_event_unref (event);
1421       break;
1422     default:
1423       ret = gst_pad_event_default (pad, parent, event);
1424       break;
1425   }
1426
1427   return ret;
1428 }
1429
1430 /*
1431  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1432  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1433  * GST_FLOW_FLUSHING when the element is shutting down. On success
1434  * GST_FLOW_OK is returned.
1435  */
1436 static GstFlowReturn
1437 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1438     guint8 pt)
1439 {
1440   GValue ret = { 0 };
1441   GValue args[2] = { {0}, {0} };
1442   GstCaps *caps;
1443   gboolean res;
1444
1445   g_value_init (&args[0], GST_TYPE_ELEMENT);
1446   g_value_set_object (&args[0], jitterbuffer);
1447   g_value_init (&args[1], G_TYPE_UINT);
1448   g_value_set_uint (&args[1], pt);
1449
1450   g_value_init (&ret, GST_TYPE_CAPS);
1451   g_value_set_boxed (&ret, NULL);
1452
1453   JBUF_UNLOCK (jitterbuffer->priv);
1454   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1455       &ret);
1456   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1457
1458   g_value_unset (&args[0]);
1459   g_value_unset (&args[1]);
1460   caps = (GstCaps *) g_value_dup_boxed (&ret);
1461   g_value_unset (&ret);
1462   if (!caps)
1463     goto no_caps;
1464
1465   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1466   gst_caps_unref (caps);
1467
1468   if (G_UNLIKELY (!res))
1469     goto parse_failed;
1470
1471   return GST_FLOW_OK;
1472
1473   /* ERRORS */
1474 no_caps:
1475   {
1476     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1477     return GST_FLOW_ERROR;
1478   }
1479 out_flushing:
1480   {
1481     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1482     return GST_FLOW_FLUSHING;
1483   }
1484 parse_failed:
1485   {
1486     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1487     return GST_FLOW_ERROR;
1488   }
1489 }
1490
1491 /* call with jbuf lock held */
1492 static void
1493 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1494 {
1495   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1496
1497   /* too short a stream, or too close to EOS will never really fill buffer */
1498   if (*percent != -1 && priv->npt_stop != -1 &&
1499       priv->npt_stop - priv->npt_start <=
1500       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1501     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1502     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1503     *percent = 100;
1504   }
1505 }
1506
1507 static void
1508 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1509 {
1510   GstMessage *message;
1511
1512   /* Post a buffering message */
1513   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1514   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1515
1516   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1517 }
1518
1519 static GstClockTime
1520 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1521 {
1522   GstRtpJitterBufferPrivate *priv;
1523
1524   priv = jitterbuffer->priv;
1525
1526   if (timestamp == -1)
1527     return -1;
1528
1529   /* apply the timestamp offset, this is used for inter stream sync */
1530   timestamp += priv->ts_offset;
1531   /* add the offset, this is used when buffering */
1532   timestamp += priv->out_offset;
1533
1534   return timestamp;
1535 }
1536
1537 static TimerData *
1538 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1539 {
1540   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1541   TimerData *timer = NULL;
1542   gint i, len;
1543
1544   len = priv->timers->len;
1545   for (i = 0; i < len; i++) {
1546     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1547     if (test->seqnum == seqnum && test->type == type) {
1548       timer = test;
1549       break;
1550     }
1551   }
1552   return timer;
1553 }
1554
1555 static void
1556 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1557 {
1558   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1559
1560   if (priv->clock_id) {
1561     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1562     gst_clock_id_unschedule (priv->clock_id);
1563     priv->clock_id = NULL;
1564   }
1565 }
1566
1567 static GstClockTime
1568 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1569 {
1570   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1571   GstClockTime test_timeout;
1572
1573   if ((test_timeout = timer->timeout) == -1)
1574     return -1;
1575
1576   if (timer->type != TIMER_TYPE_EXPECTED) {
1577     /* add our latency and offset to get output times. */
1578     test_timeout = apply_offset (jitterbuffer, test_timeout);
1579     test_timeout += priv->latency_ns;
1580   }
1581   return test_timeout;
1582 }
1583
1584 static void
1585 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1586 {
1587   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1588
1589   if (priv->clock_id) {
1590     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1591
1592     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1593         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1594
1595     if (timeout == -1 || timeout < priv->timer_timeout)
1596       unschedule_current_timer (jitterbuffer);
1597   }
1598 }
1599
1600 static TimerData *
1601 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1602     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1603     GstClockTime duration)
1604 {
1605   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1606   TimerData *timer;
1607   gint len;
1608
1609   GST_DEBUG_OBJECT (jitterbuffer,
1610       "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1611       GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
1612
1613   len = priv->timers->len;
1614   g_array_set_size (priv->timers, len + 1);
1615   timer = &g_array_index (priv->timers, TimerData, len);
1616   timer->idx = len;
1617   timer->type = type;
1618   timer->seqnum = seqnum;
1619   timer->num = num;
1620   timer->timeout = timeout + delay;
1621   timer->duration = duration;
1622   if (type == TIMER_TYPE_EXPECTED) {
1623     timer->rtx_base = timeout;
1624     timer->rtx_delay = delay;
1625     timer->rtx_retry = 0;
1626   }
1627   timer->num_rtx_retry = 0;
1628   recalculate_timer (jitterbuffer, timer);
1629   JBUF_SIGNAL_TIMER (priv);
1630
1631   return timer;
1632 }
1633
1634 static void
1635 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1636     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1637 {
1638   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1639   gboolean seqchange, timechange;
1640   guint16 oldseq;
1641
1642   seqchange = timer->seqnum != seqnum;
1643   timechange = timer->timeout != timeout;
1644
1645   if (!seqchange && !timechange)
1646     return;
1647
1648   oldseq = timer->seqnum;
1649
1650   GST_DEBUG_OBJECT (jitterbuffer,
1651       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1652       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1653
1654   timer->timeout = timeout + delay;
1655   timer->seqnum = seqnum;
1656   if (reset) {
1657     timer->rtx_base = timeout;
1658     timer->rtx_delay = delay;
1659     timer->rtx_retry = 0;
1660   }
1661
1662   if (priv->clock_id) {
1663     /* we changed the seqnum and there is a timer currently waiting with this
1664      * seqnum, unschedule it */
1665     if (seqchange && priv->timer_seqnum == oldseq)
1666       unschedule_current_timer (jitterbuffer);
1667     /* we changed the time, check if it is earlier than what we are waiting
1668      * for and unschedule if so */
1669     else if (timechange)
1670       recalculate_timer (jitterbuffer, timer);
1671   }
1672 }
1673
1674 static TimerData *
1675 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1676     guint16 seqnum, GstClockTime timeout)
1677 {
1678   TimerData *timer;
1679
1680   /* find the seqnum timer */
1681   timer = find_timer (jitterbuffer, type, seqnum);
1682   if (timer == NULL) {
1683     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1684   } else {
1685     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1686   }
1687   return timer;
1688 }
1689
1690 static void
1691 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1692 {
1693   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1694   guint idx;
1695
1696   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1697     unschedule_current_timer (jitterbuffer);
1698
1699   idx = timer->idx;
1700   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1701   g_array_remove_index_fast (priv->timers, idx);
1702   timer->idx = idx;
1703 }
1704
1705 static void
1706 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1707 {
1708   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1709   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1710   g_array_set_size (priv->timers, 0);
1711   unschedule_current_timer (jitterbuffer);
1712 }
1713
1714 /* we just received a packet with seqnum and dts.
1715  *
1716  * First check for old seqnum that we are still expecting. If the gap with the
1717  * current seqnum is too big, unschedule the timeouts.
1718  *
1719  * If we have a valid packet spacing estimate we can set a timer for when we
1720  * should receive the next packet.
1721  * If we don't have a valid estimate, we remove any timer we might have
1722  * had for this packet.
1723  */
1724 static void
1725 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1726     GstClockTime dts, gboolean do_next_seqnum)
1727 {
1728   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1729   TimerData *timer = NULL;
1730   gint i, len;
1731
1732   /* go through all timers and unschedule the ones with a large gap, also find
1733    * the timer for the seqnum */
1734   len = priv->timers->len;
1735   for (i = 0; i < len; i++) {
1736     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1737     gint gap;
1738
1739     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1740
1741     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1742         test->seqnum, seqnum, gap);
1743
1744     if (gap == 0) {
1745       GST_DEBUG ("found timer for current seqnum");
1746       /* the timer for the current seqnum */
1747       timer = test;
1748     } else if (gap > priv->rtx_delay_reorder) {
1749       /* max gap, we exceeded the max reorder distance and we don't expect the
1750        * missing packet to be this reordered */
1751       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1752         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1753     }
1754   }
1755
1756   if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1757     GstClockTime expected, delay;
1758
1759     /* calculate expected arrival time of the next seqnum */
1760     expected = dts + priv->packet_spacing;
1761     delay = priv->rtx_delay * GST_MSECOND;
1762
1763     /* and update/install timer for next seqnum */
1764     if (timer)
1765       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1766           delay, TRUE);
1767     else
1768       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1769           expected, delay, priv->packet_spacing);
1770   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1771
1772     if (timer->num_rtx_retry > 0) {
1773       GstClockTime rtx_last;
1774
1775       /* we scheduled a retry for this packet and now we have it */
1776       priv->num_rtx_success++;
1777       /* all the previous retry attempts failed */
1778       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1779       /* number of retries before receiving the packet */
1780       if (priv->avg_rtx_num == 0.0)
1781         priv->avg_rtx_num = timer->num_rtx_retry;
1782       else
1783         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1784       /* calculate the delay between retransmission request and receiving this
1785        * packet, start with when we scheduled this timeout last */
1786       rtx_last = timer->rtx_last;
1787       if (dts > rtx_last) {
1788         GstClockTime delay;
1789         /* we have a valid delay if this packet arrived after we scheduled the
1790          * request */
1791         delay = dts - rtx_last;
1792         if (priv->avg_rtx_rtt == 0)
1793           priv->avg_rtx_rtt = delay;
1794         else
1795           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1796       }
1797       GST_LOG_OBJECT (jitterbuffer,
1798           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1799           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1800           ", avg-num %g, avg-rtt %" G_GUINT64_FORMAT, priv->num_rtx_success,
1801           priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
1802           priv->avg_rtx_num, priv->avg_rtx_rtt);
1803     }
1804     /* if we had a timer, remove it, we don't know when to expect the next
1805      * packet. */
1806     remove_timer (jitterbuffer, timer);
1807     /* we signal the _loop function because this new packet could be the one
1808      * it was waiting for */
1809     JBUF_SIGNAL_EVENT (priv);
1810   }
1811 }
1812
1813 static void
1814 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1815     GstClockTime dts)
1816 {
1817   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1818
1819   /* we need consecutive seqnums with a different
1820    * rtptime to estimate the packet spacing. */
1821   if (priv->ips_rtptime != rtptime) {
1822     /* rtptime changed, check dts diff */
1823     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1824       priv->packet_spacing = dts - priv->ips_dts;
1825       GST_DEBUG_OBJECT (jitterbuffer,
1826           "new packet spacing %" GST_TIME_FORMAT,
1827           GST_TIME_ARGS (priv->packet_spacing));
1828     }
1829     priv->ips_rtptime = rtptime;
1830     priv->ips_dts = dts;
1831   }
1832 }
1833
1834 static void
1835 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1836     guint16 seqnum, GstClockTime dts, gint gap)
1837 {
1838   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1839   GstClockTime total_duration, duration, expected_dts;
1840   TimerType type;
1841
1842   GST_DEBUG_OBJECT (jitterbuffer,
1843       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1844       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1845
1846   /* the total duration spanned by the missing packets */
1847   if (dts >= priv->last_in_dts)
1848     total_duration = dts - priv->last_in_dts;
1849   else
1850     total_duration = 0;
1851
1852   /* interpolate between the current time and the last time based on
1853    * number of packets we are missing, this is the estimated duration
1854    * for the missing packet based on equidistant packet spacing. */
1855   duration = total_duration / (gap + 1);
1856
1857   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1858       GST_TIME_ARGS (duration));
1859
1860   if (total_duration > priv->latency_ns) {
1861     GstClockTime gap_time;
1862     guint lost_packets;
1863
1864     gap_time = total_duration - priv->latency_ns;
1865
1866     if (duration > 0) {
1867       lost_packets = gap_time / duration;
1868       gap_time = lost_packets * duration;
1869     } else {
1870       lost_packets = gap;
1871     }
1872
1873     /* too many lost packets, some of the missing packets are already
1874      * too late and we can generate lost packet events for them. */
1875     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
1876         " > %" GST_TIME_FORMAT ", consider %u lost",
1877         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
1878         lost_packets);
1879
1880     /* this timer will fire immediately and the lost event will be pushed from
1881      * the timer thread */
1882     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
1883         priv->last_in_dts + duration, 0, gap_time);
1884
1885     expected += lost_packets;
1886     priv->last_in_dts += gap_time;
1887   }
1888
1889   expected_dts = priv->last_in_dts + duration;
1890
1891   if (priv->do_retransmission) {
1892     TimerData *timer;
1893
1894     type = TIMER_TYPE_EXPECTED;
1895     /* if we had a timer for the first missing packet, update it. */
1896     if ((timer = find_timer (jitterbuffer, type, expected))) {
1897       GstClockTime timeout = timer->timeout;
1898
1899       timer->duration = duration;
1900       if (timeout > expected_dts) {
1901         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
1902         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
1903             delay, TRUE);
1904       }
1905       expected++;
1906       expected_dts += duration;
1907     }
1908   } else {
1909     type = TIMER_TYPE_LOST;
1910   }
1911
1912   while (expected < seqnum) {
1913     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
1914     expected_dts += duration;
1915     expected++;
1916   }
1917 }
1918
1919 static GstFlowReturn
1920 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1921     GstBuffer * buffer)
1922 {
1923   GstRtpJitterBuffer *jitterbuffer;
1924   GstRtpJitterBufferPrivate *priv;
1925   guint16 seqnum;
1926   guint32 expected, rtptime;
1927   GstFlowReturn ret = GST_FLOW_OK;
1928   GstClockTime dts, pts;
1929   guint64 latency_ts;
1930   gboolean tail;
1931   gint percent = -1;
1932   guint8 pt;
1933   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1934   gboolean do_next_seqnum = FALSE;
1935   RTPJitterBufferItem *item;
1936
1937   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1938
1939   priv = jitterbuffer->priv;
1940
1941   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1942     goto invalid_buffer;
1943
1944   pt = gst_rtp_buffer_get_payload_type (&rtp);
1945   seqnum = gst_rtp_buffer_get_seq (&rtp);
1946   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1947   gst_rtp_buffer_unmap (&rtp);
1948
1949   /* make sure we have PTS and DTS set */
1950   pts = GST_BUFFER_PTS (buffer);
1951   dts = GST_BUFFER_DTS (buffer);
1952   if (dts == -1)
1953     dts = pts;
1954   else if (pts == -1)
1955     pts = dts;
1956
1957   /* take the DTS of the buffer. This is the time when the packet was
1958    * received and is used to calculate jitter and clock skew. We will adjust
1959    * this DTS with the smoothed value after processing it in the
1960    * jitterbuffer and assign it as the PTS. */
1961   /* bring to running time */
1962   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1963
1964   GST_DEBUG_OBJECT (jitterbuffer,
1965       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
1966       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
1967
1968   JBUF_LOCK_CHECK (priv, out_flushing);
1969
1970   if (G_UNLIKELY (priv->last_pt != pt)) {
1971     GstCaps *caps;
1972
1973     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1974         pt);
1975
1976     priv->last_pt = pt;
1977     /* reset clock-rate so that we get a new one */
1978     priv->clock_rate = -1;
1979
1980     /* Try to get the clock-rate from the caps first if we can. If there are no
1981      * caps we must fire the signal to get the clock-rate. */
1982     if ((caps = gst_pad_get_current_caps (pad))) {
1983       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1984       gst_caps_unref (caps);
1985     }
1986   }
1987
1988   if (G_UNLIKELY (priv->clock_rate == -1)) {
1989     /* no clock rate given on the caps, try to get one with the signal */
1990     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1991             pt) == GST_FLOW_FLUSHING)
1992       goto out_flushing;
1993
1994     if (G_UNLIKELY (priv->clock_rate == -1))
1995       goto no_clock_rate;
1996   }
1997
1998   /* don't accept more data on EOS */
1999   if (G_UNLIKELY (priv->eos))
2000     goto have_eos;
2001
2002   expected = priv->next_in_seqnum;
2003
2004   /* now check against our expected seqnum */
2005   if (G_LIKELY (expected != -1)) {
2006     gint gap;
2007
2008     /* now calculate gap */
2009     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2010
2011     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2012         expected, seqnum, gap);
2013
2014     if (G_LIKELY (gap == 0)) {
2015       /* packet is expected */
2016       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2017       do_next_seqnum = TRUE;
2018     } else {
2019       gboolean reset = FALSE;
2020
2021       if (gap < 0) {
2022         /* we received an old packet */
2023         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2024           /* too old packet, reset */
2025           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2026               -RTP_MAX_MISORDER);
2027           reset = TRUE;
2028         } else {
2029           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2030         }
2031       } else {
2032         /* new packet, we are missing some packets */
2033         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2034           /* packet too far in future, reset */
2035           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2036               RTP_MAX_DROPOUT);
2037           reset = TRUE;
2038         } else {
2039           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2040           /* fill in the gap with EXPECTED timers */
2041           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2042
2043           do_next_seqnum = TRUE;
2044         }
2045       }
2046       if (G_UNLIKELY (reset)) {
2047         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2048         rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
2049         rtp_jitter_buffer_reset_skew (priv->jbuf);
2050         remove_all_timers (jitterbuffer);
2051         priv->last_popped_seqnum = -1;
2052         priv->next_seqnum = seqnum;
2053         do_next_seqnum = TRUE;
2054       }
2055       /* reset spacing estimation when gap */
2056       priv->ips_rtptime = -1;
2057       priv->ips_dts = GST_CLOCK_TIME_NONE;
2058     }
2059   } else {
2060     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2061     /* we don't know what the next_in_seqnum should be, wait for the last
2062      * possible moment to push this buffer, maybe we get an earlier seqnum
2063      * while we wait */
2064     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2065     do_next_seqnum = TRUE;
2066     /* take rtptime and dts to calculate packet spacing */
2067     priv->ips_rtptime = rtptime;
2068     priv->ips_dts = dts;
2069   }
2070   if (do_next_seqnum) {
2071     priv->last_in_seqnum = seqnum;
2072     priv->last_in_dts = dts;
2073     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2074   }
2075
2076   /* let's check if this buffer is too late, we can only accept packets with
2077    * bigger seqnum than the one we last pushed. */
2078   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2079     gint gap;
2080
2081     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2082
2083     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2084     if (G_UNLIKELY (gap <= 0))
2085       goto too_late;
2086   }
2087
2088   /* let's drop oldest packet if the queue is already full and drop-on-latency
2089    * is set. We can only do this when there actually is a latency. When no
2090    * latency is set, we just pump it in the queue and let the other end push it
2091    * out as fast as possible. */
2092   if (priv->latency_ms && priv->drop_on_latency) {
2093     latency_ts =
2094         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2095
2096     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2097       RTPJitterBufferItem *old_item;
2098
2099       old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2100       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2101           old_item);
2102       priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2103       free_item (old_item);
2104     }
2105   }
2106
2107   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2108
2109   /* now insert the packet into the queue in sorted order. This function returns
2110    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2111    * have a duplicate. */
2112   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2113               &tail, &percent)))
2114     goto duplicate;
2115
2116   /* update timers */
2117   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2118
2119   /* we had an unhandled SR, handle it now */
2120   if (priv->last_sr)
2121     do_handle_sync (jitterbuffer);
2122
2123   /* signal addition of new buffer when the _loop is waiting. */
2124   if (priv->active)
2125     JBUF_SIGNAL_EVENT (priv);
2126
2127   /* let's unschedule and unblock any waiting buffers. We only want to do this
2128    * when the tail buffer changed */
2129   if (G_UNLIKELY (priv->clock_id && tail)) {
2130     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2131     unschedule_current_timer (jitterbuffer);
2132   }
2133
2134   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
2135       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
2136
2137   check_buffering_percent (jitterbuffer, &percent);
2138
2139 finished:
2140   JBUF_UNLOCK (priv);
2141
2142   if (percent != -1)
2143     post_buffering_percent (jitterbuffer, percent);
2144
2145   return ret;
2146
2147   /* ERRORS */
2148 invalid_buffer:
2149   {
2150     /* this is not fatal but should be filtered earlier */
2151     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2152         ("Received invalid RTP payload, dropping"));
2153     gst_buffer_unref (buffer);
2154     return GST_FLOW_OK;
2155   }
2156 no_clock_rate:
2157   {
2158     GST_WARNING_OBJECT (jitterbuffer,
2159         "No clock-rate in caps!, dropping buffer");
2160     gst_buffer_unref (buffer);
2161     goto finished;
2162   }
2163 out_flushing:
2164   {
2165     ret = priv->srcresult;
2166     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2167     gst_buffer_unref (buffer);
2168     goto finished;
2169   }
2170 have_eos:
2171   {
2172     ret = GST_FLOW_EOS;
2173     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2174     gst_buffer_unref (buffer);
2175     goto finished;
2176   }
2177 too_late:
2178   {
2179     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2180         " popped, dropping", seqnum, priv->last_popped_seqnum);
2181     priv->num_late++;
2182     gst_buffer_unref (buffer);
2183     goto finished;
2184   }
2185 duplicate:
2186   {
2187     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2188         seqnum);
2189     priv->num_duplicates++;
2190     free_item (item);
2191     goto finished;
2192   }
2193 }
2194
2195 static GstClockTime
2196 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2197 {
2198   guint64 ext_time, elapsed;
2199   guint32 rtp_time;
2200   GstRtpJitterBufferPrivate *priv;
2201
2202   priv = jitterbuffer->priv;
2203   rtp_time = item->rtptime;
2204
2205   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2206       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2207
2208   if (rtp_time < priv->ext_timestamp) {
2209     ext_time = priv->ext_timestamp;
2210   } else {
2211     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2212   }
2213
2214   if (ext_time > priv->clock_base)
2215     elapsed = ext_time - priv->clock_base;
2216   else
2217     elapsed = 0;
2218
2219   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2220   return elapsed;
2221 }
2222
2223 static void
2224 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2225     RTPJitterBufferItem * item)
2226 {
2227   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2228
2229   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2230       && priv->clock_base != -1 && priv->clock_rate > 0) {
2231     guint64 elapsed, estimated;
2232
2233     elapsed = compute_elapsed (jitterbuffer, item);
2234
2235     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2236       guint64 left;
2237       GstClockTime out_time;
2238
2239       priv->last_elapsed = elapsed;
2240
2241       left = priv->npt_stop - priv->npt_start;
2242       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2243           GST_TIME_ARGS (left));
2244
2245       out_time = item->dts;
2246
2247       if (elapsed > 0)
2248         estimated = gst_util_uint64_scale (out_time, left, elapsed);
2249       else {
2250         /* if there is almost nothing left,
2251          * we may never advance enough to end up in the above case */
2252         if (left < GST_SECOND)
2253           estimated = GST_SECOND;
2254         else
2255           estimated = -1;
2256       }
2257
2258       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2259           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2260
2261       if (estimated != -1 && priv->estimated_eos != estimated) {
2262         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2263         priv->estimated_eos = estimated;
2264       }
2265     }
2266   }
2267 }
2268
2269 /* take a buffer from the queue and push it */
2270 static GstFlowReturn
2271 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2272 {
2273   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2274   GstFlowReturn result;
2275   RTPJitterBufferItem *item;
2276   GstBuffer *outbuf;
2277   GstEvent *outevent;
2278   GstClockTime dts, pts;
2279   gint percent = -1;
2280   gboolean is_buffer, do_push = TRUE;
2281
2282   /* when we get here we are ready to pop and push the buffer */
2283   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2284
2285   is_buffer = GST_IS_BUFFER (item->data);
2286
2287   if (is_buffer) {
2288     check_buffering_percent (jitterbuffer, &percent);
2289
2290     /* we need to make writable to change the flags and timestamps */
2291     outbuf = gst_buffer_make_writable (item->data);
2292
2293     if (G_UNLIKELY (priv->discont)) {
2294       /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2295        * into the jitterbuffer so we can modify now. */
2296       GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2297       GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2298       priv->discont = FALSE;
2299     }
2300     if (G_UNLIKELY (priv->ts_discont)) {
2301       GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2302       priv->ts_discont = FALSE;
2303     }
2304
2305     dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2306     pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2307
2308     /* apply timestamp with offset to buffer now */
2309     GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2310     GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2311
2312     /* update the elapsed time when we need to check against the npt stop time. */
2313     update_estimated_eos (jitterbuffer, item);
2314
2315     priv->last_out_time = GST_BUFFER_PTS (outbuf);
2316   } else {
2317     outevent = item->data;
2318     if (item->type == ITEM_TYPE_LOST) {
2319       priv->discont = TRUE;
2320       if (!priv->do_lost)
2321         do_push = FALSE;
2322     }
2323   }
2324
2325   /* now we are ready to push the buffer. Save the seqnum and release the lock
2326    * so the other end can push stuff in the queue again. */
2327   priv->last_popped_seqnum = seqnum;
2328   priv->next_seqnum = (seqnum + item->count) & 0xffff;
2329   JBUF_UNLOCK (priv);
2330
2331   item->data = NULL;
2332   free_item (item);
2333
2334   if (is_buffer) {
2335     /* push buffer */
2336     if (percent != -1)
2337       post_buffering_percent (jitterbuffer, percent);
2338
2339     GST_DEBUG_OBJECT (jitterbuffer,
2340         "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2341         seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2342         GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2343     result = gst_pad_push (priv->srcpad, outbuf);
2344   } else {
2345     GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
2346
2347     if (do_push)
2348       gst_pad_push_event (priv->srcpad, outevent);
2349     else
2350       gst_event_unref (outevent);
2351
2352     result = GST_FLOW_OK;
2353   }
2354   JBUF_LOCK_CHECK (priv, out_flushing);
2355
2356   return result;
2357
2358   /* ERRORS */
2359 out_flushing:
2360   {
2361     return priv->srcresult;
2362   }
2363 }
2364
2365 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2366
2367 /* Peek a buffer and compare the seqnum to the expected seqnum.
2368  * If all is fine, the buffer is pushed.
2369  * If something is wrong, we wait for some event
2370  */
2371 static GstFlowReturn
2372 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2373 {
2374   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2375   GstFlowReturn result = GST_FLOW_OK;
2376   RTPJitterBufferItem *item;
2377   guint16 seqnum;
2378   guint32 next_seqnum;
2379   gint gap;
2380
2381   /* only push buffers when PLAYING and active and not buffering */
2382   if (priv->blocked || !priv->active ||
2383       rtp_jitter_buffer_is_buffering (priv->jbuf))
2384     return GST_FLOW_WAIT;
2385
2386 again:
2387   /* peek a buffer, we're just looking at the sequence number.
2388    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2389    * wait for a timeout or something to change.
2390    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2391   item = rtp_jitter_buffer_peek (priv->jbuf);
2392   if (item == NULL)
2393     goto wait;
2394
2395   /* get the seqnum and the next expected seqnum */
2396   seqnum = item->seqnum;
2397
2398   next_seqnum = priv->next_seqnum;
2399
2400   /* get the gap between this and the previous packet. If we don't know the
2401    * previous packet seqnum assume no gap. */
2402   if (G_UNLIKELY (next_seqnum == -1)) {
2403     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2404     /* we don't know what the next_seqnum should be, the chain function should
2405      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2406      * fires, so wait for that */
2407     result = GST_FLOW_WAIT;
2408   } else {
2409     /* else calculate GAP */
2410     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2411
2412     if (G_LIKELY (gap == 0)) {
2413       /* no missing packet, pop and push */
2414       result = pop_and_push_next (jitterbuffer, seqnum);
2415     } else if (G_UNLIKELY (gap < 0)) {
2416       RTPJitterBufferItem *item;
2417       /* if we have a packet that we already pushed or considered dropped, pop it
2418        * off and get the next packet */
2419       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2420           seqnum, next_seqnum);
2421       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2422       free_item (item);
2423       goto again;
2424     } else {
2425       /* the chain function has scheduled timers to request retransmission or
2426        * when to consider the packet lost, wait for that */
2427       GST_DEBUG_OBJECT (jitterbuffer,
2428           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2429           next_seqnum, seqnum, gap);
2430       result = GST_FLOW_WAIT;
2431     }
2432   }
2433   return result;
2434
2435 wait:
2436   {
2437     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2438     if (priv->eos)
2439       result = GST_FLOW_EOS;
2440     else
2441       result = GST_FLOW_WAIT;
2442     return result;
2443   }
2444 }
2445
2446 /* the timeout for when we expected a packet expired */
2447 static gboolean
2448 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2449     GstClockTime now)
2450 {
2451   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2452   GstEvent *event;
2453   guint delay;
2454
2455   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2456
2457   delay = timer->rtx_delay + timer->rtx_retry;
2458   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2459       gst_structure_new ("GstRTPRetransmissionRequest",
2460           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2461           "running-time", G_TYPE_UINT64, timer->rtx_base,
2462           "delay", G_TYPE_UINT, GST_TIME_AS_MSECONDS (delay),
2463           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2464           "frequency", G_TYPE_UINT, priv->rtx_retry_timeout,
2465           "period", G_TYPE_UINT, priv->rtx_retry_period,
2466           "deadline", G_TYPE_UINT, priv->latency_ms,
2467           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, NULL));
2468
2469   priv->num_rtx_requests++;
2470   timer->num_rtx_retry++;
2471   timer->rtx_last = now;
2472
2473   /* calculate the timeout for the next retransmission attempt */
2474   timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2475   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2476       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT,
2477       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2478       GST_TIME_ARGS (timer->rtx_retry));
2479
2480   if (timer->rtx_retry + timer->rtx_delay >
2481       (priv->rtx_retry_period * GST_MSECOND)) {
2482     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2483     /* too many retransmission request, we now convert the timer
2484      * to a lost timer, leave the num_rtx_retry as it is for stats */
2485     timer->type = TIMER_TYPE_LOST;
2486     timer->rtx_delay = 0;
2487     timer->rtx_retry = 0;
2488   }
2489   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2490       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2491
2492   JBUF_UNLOCK (priv);
2493   gst_pad_push_event (priv->sinkpad, event);
2494   JBUF_LOCK (priv);
2495
2496   return FALSE;
2497 }
2498
2499 /* a packet is lost */
2500 static gboolean
2501 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2502     GstClockTime now)
2503 {
2504   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2505   GstClockTime duration, timestamp;
2506   guint seqnum, lost_packets, num_rtx_retry;
2507   gboolean late;
2508   GstEvent *event;
2509   RTPJitterBufferItem *item;
2510
2511   seqnum = timer->seqnum;
2512   timestamp = apply_offset (jitterbuffer, timer->timeout);
2513   duration = timer->duration;
2514   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2515     duration = priv->packet_spacing;
2516   lost_packets = MAX (timer->num, 1);
2517   late = timer->num > 0;
2518   num_rtx_retry = timer->num_rtx_retry;
2519
2520   /* we had a gap and thus we lost some packets. Create an event for this.  */
2521   if (lost_packets > 1)
2522     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2523         seqnum + lost_packets - 1);
2524   else
2525     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2526
2527   priv->num_late += lost_packets;
2528   priv->num_rtx_failed += num_rtx_retry;
2529
2530   /* create paket lost event */
2531   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2532       gst_structure_new ("GstRTPPacketLost",
2533           "seqnum", G_TYPE_UINT, (guint) seqnum,
2534           "timestamp", G_TYPE_UINT64, timestamp,
2535           "duration", G_TYPE_UINT64, duration,
2536           "late", G_TYPE_BOOLEAN, late,
2537           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2538
2539   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2540   rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
2541
2542   /* remove timer now */
2543   remove_timer (jitterbuffer, timer);
2544   JBUF_SIGNAL_EVENT (priv);
2545
2546   return TRUE;
2547 }
2548
2549 static gboolean
2550 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2551     GstClockTime now)
2552 {
2553   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2554
2555   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2556   remove_timer (jitterbuffer, timer);
2557   JBUF_SIGNAL_EVENT (priv);
2558
2559   return TRUE;
2560 }
2561
2562 static gboolean
2563 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2564     GstClockTime now)
2565 {
2566   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2567
2568   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2569
2570   priv->next_seqnum = timer->seqnum;
2571   remove_timer (jitterbuffer, timer);
2572   JBUF_SIGNAL_EVENT (priv);
2573
2574   return TRUE;
2575 }
2576
2577 static gboolean
2578 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2579     GstClockTime now)
2580 {
2581   gboolean removed = FALSE;
2582
2583   switch (timer->type) {
2584     case TIMER_TYPE_EXPECTED:
2585       removed = do_expected_timeout (jitterbuffer, timer, now);
2586       break;
2587     case TIMER_TYPE_LOST:
2588       removed = do_lost_timeout (jitterbuffer, timer, now);
2589       break;
2590     case TIMER_TYPE_DEADLINE:
2591       removed = do_deadline_timeout (jitterbuffer, timer, now);
2592       break;
2593     case TIMER_TYPE_EOS:
2594       removed = do_eos_timeout (jitterbuffer, timer, now);
2595       break;
2596   }
2597   return removed;
2598 }
2599
2600 /* called when we need to wait for the next timeout.
2601  *
2602  * We loop over the array of recorded timeouts and wait for the earliest one.
2603  * When it timed out, do the logic associated with the timer.
2604  *
2605  * If there are no timers, we wait on a gcond until something new happens.
2606  */
2607 static void
2608 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2609 {
2610   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2611   GstClockTime now = 0;
2612
2613   JBUF_LOCK (priv);
2614   while (priv->timer_running) {
2615     TimerData *timer = NULL;
2616     GstClockTime timer_timeout = -1;
2617     gint i, len;
2618
2619     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
2620         GST_TIME_ARGS (now));
2621
2622     len = priv->timers->len;
2623     for (i = 0; i < len; i++) {
2624       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2625       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2626       gboolean save_best = FALSE;
2627
2628       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2629           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2630
2631       /* find the smallest timeout */
2632       if (timer == NULL) {
2633         save_best = TRUE;
2634       } else if (timer_timeout == -1) {
2635         /* we already have an immediate timeout, the new timer must be an
2636          * immediate timer with smaller seqnum to become the best */
2637         if (test_timeout == -1 && test->seqnum < timer->seqnum)
2638           save_best = TRUE;
2639       } else if (test_timeout == -1) {
2640         /* first immediate timer */
2641         save_best = TRUE;
2642       } else if (test_timeout < timer_timeout) {
2643         /* earlier timer */
2644         save_best = TRUE;
2645       } else if (test_timeout == timer_timeout && test->seqnum < timer->seqnum) {
2646         /* same timer, smaller seqnum */
2647         save_best = TRUE;
2648       }
2649       if (save_best) {
2650         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
2651         timer = test;
2652         timer_timeout = test_timeout;
2653       }
2654     }
2655     if (timer && !priv->blocked) {
2656       GstClock *clock;
2657       GstClockTime sync_time;
2658       GstClockID id;
2659       GstClockReturn ret;
2660       GstClockTimeDiff clock_jitter;
2661
2662       if (timer_timeout == -1 || timer_timeout <= now) {
2663         do_timeout (jitterbuffer, timer, now);
2664         /* check here, do_timeout could have released the lock */
2665         if (!priv->timer_running)
2666           break;
2667         continue;
2668       }
2669
2670       GST_OBJECT_LOCK (jitterbuffer);
2671       clock = GST_ELEMENT_CLOCK (jitterbuffer);
2672       if (!clock) {
2673         GST_OBJECT_UNLOCK (jitterbuffer);
2674         /* let's just push if there is no clock */
2675         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2676         now = timer_timeout;
2677         continue;
2678       }
2679
2680       /* prepare for sync against clock */
2681       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2682       /* add latency of peer to get input time */
2683       sync_time += priv->peer_latency;
2684
2685       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2686           " with sync time %" GST_TIME_FORMAT,
2687           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2688
2689       /* create an entry for the clock */
2690       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2691       priv->timer_timeout = timer_timeout;
2692       priv->timer_seqnum = timer->seqnum;
2693       GST_OBJECT_UNLOCK (jitterbuffer);
2694
2695       /* release the lock so that the other end can push stuff or unlock */
2696       JBUF_UNLOCK (priv);
2697
2698       ret = gst_clock_id_wait (id, &clock_jitter);
2699
2700       JBUF_LOCK (priv);
2701       if (!priv->timer_running)
2702         break;
2703
2704       if (ret != GST_CLOCK_UNSCHEDULED) {
2705         now = timer_timeout + MAX (clock_jitter, 0);
2706         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2707             ret, priv->timer_seqnum, clock_jitter);
2708       } else {
2709         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
2710       }
2711       /* and free the entry */
2712       gst_clock_id_unref (id);
2713       priv->clock_id = NULL;
2714     } else {
2715       /* no timers, wait for activity */
2716       JBUF_WAIT_TIMER (priv);
2717     }
2718   }
2719   JBUF_UNLOCK (priv);
2720
2721   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2722   return;
2723 }
2724
2725 /*
2726  * This funcion implements the main pushing loop on the source pad.
2727  *
2728  * It first tries to push as many buffers as possible. If there is a seqnum
2729  * mismatch, we wait for the next timeouts.
2730  */
2731 static void
2732 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2733 {
2734   GstRtpJitterBufferPrivate *priv;
2735   GstFlowReturn result;
2736
2737   priv = jitterbuffer->priv;
2738
2739   JBUF_LOCK_CHECK (priv, flushing);
2740   do {
2741     result = handle_next_buffer (jitterbuffer);
2742     if (G_LIKELY (result == GST_FLOW_WAIT)) {
2743       /* now wait for the next event */
2744       JBUF_WAIT_EVENT (priv, flushing);
2745       result = GST_FLOW_OK;
2746     }
2747   }
2748   while (result == GST_FLOW_OK);
2749   /* store result for upstream */
2750   priv->srcresult = result;
2751   JBUF_UNLOCK (priv);
2752
2753   /* if we get here we need to pause */
2754   goto pause;
2755
2756   /* ERRORS */
2757 flushing:
2758   {
2759     result = priv->srcresult;
2760     JBUF_UNLOCK (priv);
2761     goto pause;
2762   }
2763 pause:
2764   {
2765     const gchar *reason = gst_flow_get_name (result);
2766     GstEvent *event;
2767
2768     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2769     gst_pad_pause_task (priv->srcpad);
2770     if (result == GST_FLOW_EOS) {
2771       event = gst_event_new_eos ();
2772       gst_pad_push_event (priv->srcpad, event);
2773     }
2774     return;
2775   }
2776 }
2777
2778 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2779  * some sanity checks and then emit the handle-sync signal with the parameters.
2780  * This function must be called with the LOCK */
2781 static void
2782 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2783 {
2784   GstRtpJitterBufferPrivate *priv;
2785   guint64 base_rtptime, base_time;
2786   guint32 clock_rate;
2787   guint64 last_rtptime;
2788   guint64 clock_base;
2789   guint64 ext_rtptime, diff;
2790   gboolean drop = FALSE;
2791
2792   priv = jitterbuffer->priv;
2793
2794   /* get the last values from the jitterbuffer */
2795   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2796       &clock_rate, &last_rtptime);
2797
2798   clock_base = priv->clock_base;
2799   ext_rtptime = priv->ext_rtptime;
2800
2801   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2802       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2803       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2804       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2805
2806   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2807     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2808     drop = TRUE;
2809   } else {
2810     /* we can't accept anything that happened before we did the last resync */
2811     if (base_rtptime > ext_rtptime) {
2812       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2813       drop = TRUE;
2814     } else {
2815       /* the SR RTP timestamp must be something close to what we last observed
2816        * in the jitterbuffer */
2817       if (ext_rtptime > last_rtptime) {
2818         /* check how far ahead it is to our RTP timestamps */
2819         diff = ext_rtptime - last_rtptime;
2820         /* if bigger than 1 second, we drop it */
2821         if (diff > clock_rate) {
2822           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2823           /* should drop this, but some RTSP servers end up with bogus
2824            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2825            * so still trigger rptbin sync but invalidate RTCP data
2826            * (sync might use other methods) */
2827           ext_rtptime = -1;
2828         }
2829         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2830             G_GUINT64_FORMAT, last_rtptime, diff);
2831       }
2832     }
2833   }
2834
2835   if (!drop) {
2836     GstStructure *s;
2837
2838     s = gst_structure_new ("application/x-rtp-sync",
2839         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2840         "base-time", G_TYPE_UINT64, base_time,
2841         "clock-rate", G_TYPE_UINT, clock_rate,
2842         "clock-base", G_TYPE_UINT64, clock_base,
2843         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2844         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2845
2846     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2847     gst_buffer_replace (&priv->last_sr, NULL);
2848     JBUF_UNLOCK (priv);
2849     g_signal_emit (jitterbuffer,
2850         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2851     JBUF_LOCK (priv);
2852     gst_structure_free (s);
2853   } else {
2854     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2855   }
2856 }
2857
2858 static GstFlowReturn
2859 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2860     GstBuffer * buffer)
2861 {
2862   GstRtpJitterBuffer *jitterbuffer;
2863   GstRtpJitterBufferPrivate *priv;
2864   GstFlowReturn ret = GST_FLOW_OK;
2865   guint32 ssrc;
2866   GstRTCPPacket packet;
2867   guint64 ext_rtptime;
2868   guint32 rtptime;
2869   GstRTCPBuffer rtcp = { NULL, };
2870
2871   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2872
2873   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2874     goto invalid_buffer;
2875
2876   priv = jitterbuffer->priv;
2877
2878   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2879
2880   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2881     goto empty_buffer;
2882
2883   /* first packet must be SR or RR or else the validate would have failed */
2884   switch (gst_rtcp_packet_get_type (&packet)) {
2885     case GST_RTCP_TYPE_SR:
2886       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2887           NULL, NULL);
2888       break;
2889     default:
2890       goto ignore_buffer;
2891   }
2892   gst_rtcp_buffer_unmap (&rtcp);
2893
2894   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2895
2896   JBUF_LOCK (priv);
2897   /* convert the RTP timestamp to our extended timestamp, using the same offset
2898    * we used in the jitterbuffer */
2899   ext_rtptime = priv->jbuf->ext_rtptime;
2900   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2901
2902   priv->ext_rtptime = ext_rtptime;
2903   gst_buffer_replace (&priv->last_sr, buffer);
2904
2905   do_handle_sync (jitterbuffer);
2906   JBUF_UNLOCK (priv);
2907
2908 done:
2909   gst_buffer_unref (buffer);
2910
2911   return ret;
2912
2913 invalid_buffer:
2914   {
2915     /* this is not fatal but should be filtered earlier */
2916     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2917         ("Received invalid RTCP payload, dropping"));
2918     ret = GST_FLOW_OK;
2919     goto done;
2920   }
2921 empty_buffer:
2922   {
2923     /* this is not fatal but should be filtered earlier */
2924     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2925         ("Received empty RTCP payload, dropping"));
2926     gst_rtcp_buffer_unmap (&rtcp);
2927     ret = GST_FLOW_OK;
2928     goto done;
2929   }
2930 ignore_buffer:
2931   {
2932     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2933     gst_rtcp_buffer_unmap (&rtcp);
2934     ret = GST_FLOW_OK;
2935     goto done;
2936   }
2937 }
2938
2939 static gboolean
2940 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2941     GstQuery * query)
2942 {
2943   gboolean res = FALSE;
2944
2945   switch (GST_QUERY_TYPE (query)) {
2946     case GST_QUERY_CAPS:
2947     {
2948       GstCaps *filter, *caps;
2949
2950       gst_query_parse_caps (query, &filter);
2951       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2952       gst_query_set_caps_result (query, caps);
2953       gst_caps_unref (caps);
2954       res = TRUE;
2955       break;
2956     }
2957     default:
2958       if (GST_QUERY_IS_SERIALIZED (query)) {
2959         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2960         res = FALSE;
2961       } else {
2962         res = gst_pad_query_default (pad, parent, query);
2963       }
2964       break;
2965   }
2966   return res;
2967 }
2968
2969 static gboolean
2970 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2971     GstQuery * query)
2972 {
2973   GstRtpJitterBuffer *jitterbuffer;
2974   GstRtpJitterBufferPrivate *priv;
2975   gboolean res = FALSE;
2976
2977   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2978   priv = jitterbuffer->priv;
2979
2980   switch (GST_QUERY_TYPE (query)) {
2981     case GST_QUERY_LATENCY:
2982     {
2983       /* We need to send the query upstream and add the returned latency to our
2984        * own */
2985       GstClockTime min_latency, max_latency;
2986       gboolean us_live;
2987       GstClockTime our_latency;
2988
2989       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2990         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2991
2992         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2993             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2994             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2995
2996         /* store this so that we can safely sync on the peer buffers. */
2997         JBUF_LOCK (priv);
2998         priv->peer_latency = min_latency;
2999         our_latency = priv->latency_ns;
3000         JBUF_UNLOCK (priv);
3001
3002         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3003             GST_TIME_ARGS (our_latency));
3004
3005         /* we add some latency but can buffer an infinite amount of time */
3006         min_latency += our_latency;
3007         max_latency = -1;
3008
3009         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3010             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3011             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3012
3013         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3014       }
3015       break;
3016     }
3017     case GST_QUERY_POSITION:
3018     {
3019       GstClockTime start, last_out;
3020       GstFormat fmt;
3021
3022       gst_query_parse_position (query, &fmt, NULL);
3023       if (fmt != GST_FORMAT_TIME) {
3024         res = gst_pad_query_default (pad, parent, query);
3025         break;
3026       }
3027
3028       JBUF_LOCK (priv);
3029       start = priv->npt_start;
3030       last_out = priv->last_out_time;
3031       JBUF_UNLOCK (priv);
3032
3033       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3034           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3035           GST_TIME_ARGS (last_out));
3036
3037       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3038         /* bring 0-based outgoing time to stream time */
3039         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3040         res = TRUE;
3041       } else {
3042         res = gst_pad_query_default (pad, parent, query);
3043       }
3044       break;
3045     }
3046     case GST_QUERY_CAPS:
3047     {
3048       GstCaps *filter, *caps;
3049
3050       gst_query_parse_caps (query, &filter);
3051       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3052       gst_query_set_caps_result (query, caps);
3053       gst_caps_unref (caps);
3054       res = TRUE;
3055       break;
3056     }
3057     default:
3058       res = gst_pad_query_default (pad, parent, query);
3059       break;
3060   }
3061
3062   return res;
3063 }
3064
3065 static void
3066 gst_rtp_jitter_buffer_set_property (GObject * object,
3067     guint prop_id, const GValue * value, GParamSpec * pspec)
3068 {
3069   GstRtpJitterBuffer *jitterbuffer;
3070   GstRtpJitterBufferPrivate *priv;
3071
3072   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3073   priv = jitterbuffer->priv;
3074
3075   switch (prop_id) {
3076     case PROP_LATENCY:
3077     {
3078       guint new_latency, old_latency;
3079
3080       new_latency = g_value_get_uint (value);
3081
3082       JBUF_LOCK (priv);
3083       old_latency = priv->latency_ms;
3084       priv->latency_ms = new_latency;
3085       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3086       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3087       JBUF_UNLOCK (priv);
3088
3089       /* post message if latency changed, this will inform the parent pipeline
3090        * that a latency reconfiguration is possible/needed. */
3091       if (new_latency != old_latency) {
3092         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3093             GST_TIME_ARGS (new_latency * GST_MSECOND));
3094
3095         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3096             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3097       }
3098       break;
3099     }
3100     case PROP_DROP_ON_LATENCY:
3101       JBUF_LOCK (priv);
3102       priv->drop_on_latency = g_value_get_boolean (value);
3103       JBUF_UNLOCK (priv);
3104       break;
3105     case PROP_TS_OFFSET:
3106       JBUF_LOCK (priv);
3107       priv->ts_offset = g_value_get_int64 (value);
3108       priv->ts_discont = TRUE;
3109       JBUF_UNLOCK (priv);
3110       break;
3111     case PROP_DO_LOST:
3112       JBUF_LOCK (priv);
3113       priv->do_lost = g_value_get_boolean (value);
3114       JBUF_UNLOCK (priv);
3115       break;
3116     case PROP_MODE:
3117       JBUF_LOCK (priv);
3118       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3119       JBUF_UNLOCK (priv);
3120       break;
3121     case PROP_DO_RETRANSMISSION:
3122       JBUF_LOCK (priv);
3123       priv->do_retransmission = g_value_get_boolean (value);
3124       JBUF_UNLOCK (priv);
3125       break;
3126     case PROP_RTX_DELAY:
3127       JBUF_LOCK (priv);
3128       priv->rtx_delay = g_value_get_int (value);
3129       JBUF_UNLOCK (priv);
3130       break;
3131     case PROP_RTX_DELAY_REORDER:
3132       JBUF_LOCK (priv);
3133       priv->rtx_delay_reorder = g_value_get_int (value);
3134       JBUF_UNLOCK (priv);
3135       break;
3136     case PROP_RTX_RETRY_TIMEOUT:
3137       JBUF_LOCK (priv);
3138       priv->rtx_retry_timeout = g_value_get_int (value);
3139       JBUF_UNLOCK (priv);
3140       break;
3141     case PROP_RTX_RETRY_PERIOD:
3142       JBUF_LOCK (priv);
3143       priv->rtx_retry_period = g_value_get_int (value);
3144       JBUF_UNLOCK (priv);
3145       break;
3146     default:
3147       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3148       break;
3149   }
3150 }
3151
3152 static void
3153 gst_rtp_jitter_buffer_get_property (GObject * object,
3154     guint prop_id, GValue * value, GParamSpec * pspec)
3155 {
3156   GstRtpJitterBuffer *jitterbuffer;
3157   GstRtpJitterBufferPrivate *priv;
3158
3159   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3160   priv = jitterbuffer->priv;
3161
3162   switch (prop_id) {
3163     case PROP_LATENCY:
3164       JBUF_LOCK (priv);
3165       g_value_set_uint (value, priv->latency_ms);
3166       JBUF_UNLOCK (priv);
3167       break;
3168     case PROP_DROP_ON_LATENCY:
3169       JBUF_LOCK (priv);
3170       g_value_set_boolean (value, priv->drop_on_latency);
3171       JBUF_UNLOCK (priv);
3172       break;
3173     case PROP_TS_OFFSET:
3174       JBUF_LOCK (priv);
3175       g_value_set_int64 (value, priv->ts_offset);
3176       JBUF_UNLOCK (priv);
3177       break;
3178     case PROP_DO_LOST:
3179       JBUF_LOCK (priv);
3180       g_value_set_boolean (value, priv->do_lost);
3181       JBUF_UNLOCK (priv);
3182       break;
3183     case PROP_MODE:
3184       JBUF_LOCK (priv);
3185       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3186       JBUF_UNLOCK (priv);
3187       break;
3188     case PROP_PERCENT:
3189     {
3190       gint percent;
3191
3192       JBUF_LOCK (priv);
3193       if (priv->srcresult != GST_FLOW_OK)
3194         percent = 100;
3195       else
3196         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3197
3198       g_value_set_int (value, percent);
3199       JBUF_UNLOCK (priv);
3200       break;
3201     }
3202     case PROP_DO_RETRANSMISSION:
3203       JBUF_LOCK (priv);
3204       g_value_set_boolean (value, priv->do_retransmission);
3205       JBUF_UNLOCK (priv);
3206       break;
3207     case PROP_RTX_DELAY:
3208       JBUF_LOCK (priv);
3209       g_value_set_int (value, priv->rtx_delay);
3210       JBUF_UNLOCK (priv);
3211       break;
3212     case PROP_RTX_DELAY_REORDER:
3213       JBUF_LOCK (priv);
3214       g_value_set_int (value, priv->rtx_delay_reorder);
3215       JBUF_UNLOCK (priv);
3216       break;
3217     case PROP_RTX_RETRY_TIMEOUT:
3218       JBUF_LOCK (priv);
3219       g_value_set_int (value, priv->rtx_retry_timeout);
3220       JBUF_UNLOCK (priv);
3221       break;
3222     case PROP_RTX_RETRY_PERIOD:
3223       JBUF_LOCK (priv);
3224       g_value_set_int (value, priv->rtx_retry_period);
3225       JBUF_UNLOCK (priv);
3226       break;
3227     case PROP_STATS:
3228       g_value_take_boxed (value,
3229           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3230       break;
3231     default:
3232       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3233       break;
3234   }
3235 }
3236
3237 static GstStructure *
3238 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3239 {
3240   GstStructure *s;
3241
3242   JBUF_LOCK (jbuf->priv);
3243   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3244       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3245       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3246       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3247       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3248   JBUF_UNLOCK (jbuf->priv);
3249
3250   return s;
3251 }