rtpjitterbuffer: serialize events in the buffer
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  *
95  * Last reviewed on 2007-05-28 (0.10.5)
96  */
97
98 #ifdef HAVE_CONFIG_H
99 #include "config.h"
100 #endif
101
102 #include <stdlib.h>
103 #include <string.h>
104 #include <gst/rtp/gstrtpbuffer.h>
105
106 #include "gstrtpjitterbuffer.h"
107 #include "rtpjitterbuffer.h"
108 #include "rtpstats.h"
109
110 #include <gst/glib-compat-private.h>
111
112 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
113 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
114
115 /* RTPJitterBuffer signals and args */
116 enum
117 {
118   SIGNAL_REQUEST_PT_MAP,
119   SIGNAL_CLEAR_PT_MAP,
120   SIGNAL_HANDLE_SYNC,
121   SIGNAL_ON_NPT_STOP,
122   SIGNAL_SET_ACTIVE,
123   LAST_SIGNAL
124 };
125
126 #define DEFAULT_LATENCY_MS          200
127 #define DEFAULT_DROP_ON_LATENCY     FALSE
128 #define DEFAULT_TS_OFFSET           0
129 #define DEFAULT_DO_LOST             FALSE
130 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
131 #define DEFAULT_PERCENT             0
132 #define DEFAULT_DO_RETRANSMISSION   FALSE
133 #define DEFAULT_RTX_DELAY           20
134 #define DEFAULT_RTX_DELAY_REORDER   3
135 #define DEFAULT_RTX_RETRY_TIMEOUT   40
136 #define DEFAULT_RTX_RETRY_PERIOD    160
137
138 enum
139 {
140   PROP_0,
141   PROP_LATENCY,
142   PROP_DROP_ON_LATENCY,
143   PROP_TS_OFFSET,
144   PROP_DO_LOST,
145   PROP_MODE,
146   PROP_PERCENT,
147   PROP_DO_RETRANSMISSION,
148   PROP_RTX_DELAY,
149   PROP_RTX_DELAY_REORDER,
150   PROP_RTX_RETRY_TIMEOUT,
151   PROP_RTX_RETRY_PERIOD,
152   PROP_STATS,
153   PROP_LAST
154 };
155
156 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
157
158 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
159   JBUF_LOCK (priv);                                   \
160   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
161     goto label;                                       \
162 } G_STMT_END
163 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
164
165 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
166   GST_DEBUG ("waiting timer");                            \
167   (priv)->waiting_timer = TRUE;                           \
168   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
169   (priv)->waiting_timer = FALSE;                          \
170   GST_DEBUG ("waiting timer done");                       \
171 } G_STMT_END
172 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
173   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
174     GST_DEBUG ("signal timer");                           \
175     g_cond_signal (&(priv)->jbuf_timer);                  \
176   }                                                       \
177 } G_STMT_END
178
179 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
180   GST_DEBUG ("waiting event");                           \
181   (priv)->waiting_event = TRUE;                          \
182   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
183   (priv)->waiting_event = FALSE;                         \
184   GST_DEBUG ("waiting event done");                      \
185   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
186     goto label;                                          \
187 } G_STMT_END
188 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
189   if (G_UNLIKELY ((priv)->waiting_event)) {              \
190     GST_DEBUG ("signal event");                          \
191     g_cond_signal (&(priv)->jbuf_event);                 \
192   }                                                      \
193 } G_STMT_END
194
195 struct _GstRtpJitterBufferPrivate
196 {
197   GstPad *sinkpad, *srcpad;
198   GstPad *rtcpsinkpad;
199
200   RTPJitterBuffer *jbuf;
201   GMutex jbuf_lock;
202   gboolean waiting_timer;
203   GCond jbuf_timer;
204   gboolean waiting_event;
205   GCond jbuf_event;
206   gboolean discont;
207   gboolean ts_discont;
208   gboolean active;
209   guint64 out_offset;
210
211   gboolean timer_running;
212   GThread *timer_thread;
213
214   /* properties */
215   guint latency_ms;
216   guint64 latency_ns;
217   gboolean drop_on_latency;
218   gint64 ts_offset;
219   gboolean do_lost;
220   gboolean do_retransmission;
221   gint rtx_delay;
222   gint rtx_delay_reorder;
223   gint rtx_retry_timeout;
224   gint rtx_retry_period;
225
226   /* the last seqnum we pushed out */
227   guint32 last_popped_seqnum;
228   /* the next expected seqnum we push */
229   guint32 next_seqnum;
230   /* last output time */
231   GstClockTime last_out_time;
232   /* last valid input timestamp and rtptime pair */
233   GstClockTime ips_dts;
234   guint64 ips_rtptime;
235   GstClockTime packet_spacing;
236
237   /* the next expected seqnum we receive */
238   GstClockTime last_in_dts;
239   guint32 last_in_seqnum;
240   guint32 next_in_seqnum;
241
242   GArray *timers;
243
244   /* start and stop ranges */
245   GstClockTime npt_start;
246   GstClockTime npt_stop;
247   guint64 ext_timestamp;
248   guint64 last_elapsed;
249   guint64 estimated_eos;
250   GstClockID eos_id;
251
252   /* state */
253   gboolean eos;
254
255   /* clock rate and rtp timestamp offset */
256   gint last_pt;
257   gint32 clock_rate;
258   gint64 clock_base;
259   gint64 prev_ts_offset;
260
261   /* when we are shutting down */
262   GstFlowReturn srcresult;
263   gboolean blocked;
264
265   /* for sync */
266   GstSegment segment;
267   GstClockID clock_id;
268   GstClockTime timer_timeout;
269   guint16 timer_seqnum;
270   /* the latency of the upstream peer, we have to take this into account when
271    * synchronizing the buffers. */
272   GstClockTime peer_latency;
273   guint64 ext_rtptime;
274   GstBuffer *last_sr;
275
276   /* some accounting */
277   guint64 num_late;
278   guint64 num_duplicates;
279   guint64 num_rtx_requests;
280   guint64 num_rtx_success;
281   guint64 num_rtx_failed;
282   gdouble avg_rtx_num;
283   guint64 avg_rtx_rtt;
284 };
285
286 typedef enum
287 {
288   TIMER_TYPE_EXPECTED,
289   TIMER_TYPE_LOST,
290   TIMER_TYPE_DEADLINE,
291   TIMER_TYPE_EOS
292 } TimerType;
293
294 typedef struct
295 {
296   guint idx;
297   guint16 seqnum;
298   guint num;
299   TimerType type;
300   GstClockTime timeout;
301   GstClockTime duration;
302   GstClockTime rtx_base;
303   GstClockTime rtx_delay;
304   GstClockTime rtx_retry;
305   GstClockTime rtx_last;
306   guint num_rtx_retry;
307 } TimerData;
308
309 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
310   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
311                                 GstRtpJitterBufferPrivate))
312
313 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
314 GST_STATIC_PAD_TEMPLATE ("sink",
315     GST_PAD_SINK,
316     GST_PAD_ALWAYS,
317     GST_STATIC_CAPS ("application/x-rtp, "
318         "clock-rate = (int) [ 1, 2147483647 ]"
319         /* "payload = (int) , "
320          * "encoding-name = (string) "
321          */ )
322     );
323
324 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
325 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
326     GST_PAD_SINK,
327     GST_PAD_REQUEST,
328     GST_STATIC_CAPS ("application/x-rtcp")
329     );
330
331 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
332 GST_STATIC_PAD_TEMPLATE ("src",
333     GST_PAD_SRC,
334     GST_PAD_ALWAYS,
335     GST_STATIC_CAPS ("application/x-rtp"
336         /* "payload = (int) , "
337          * "clock-rate = (int) , "
338          * "encoding-name = (string) "
339          */ )
340     );
341
342 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
343
344 #define gst_rtp_jitter_buffer_parent_class parent_class
345 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
346
347 /* object overrides */
348 static void gst_rtp_jitter_buffer_set_property (GObject * object,
349     guint prop_id, const GValue * value, GParamSpec * pspec);
350 static void gst_rtp_jitter_buffer_get_property (GObject * object,
351     guint prop_id, GValue * value, GParamSpec * pspec);
352 static void gst_rtp_jitter_buffer_finalize (GObject * object);
353
354 /* element overrides */
355 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
356     * element, GstStateChange transition);
357 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
358     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
359 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
360     GstPad * pad);
361 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
362
363 /* pad overrides */
364 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
365 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
366     GstObject * parent);
367
368 /* sinkpad overrides */
369 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
370     GstObject * parent, GstEvent * event);
371 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
372     GstObject * parent, GstBuffer * buffer);
373
374 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
375     GstObject * parent, GstEvent * event);
376 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
377     GstObject * parent, GstBuffer * buffer);
378
379 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
380     GstObject * parent, GstQuery * query);
381
382 /* srcpad overrides */
383 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
384     GstObject * parent, GstEvent * event);
385 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
386     GstObject * parent, GstPadMode mode, gboolean active);
387 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
388 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
389     GstObject * parent, GstQuery * query);
390
391 static void
392 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
393 static GstClockTime
394 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
395     gboolean active, guint64 base_time);
396 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
397
398 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
399 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
400
401 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
402
403 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
404     jitterbuffer);
405
406 static void
407 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
408 {
409   GObjectClass *gobject_class;
410   GstElementClass *gstelement_class;
411
412   gobject_class = (GObjectClass *) klass;
413   gstelement_class = (GstElementClass *) klass;
414
415   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
416
417   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
418
419   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
420   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
421
422   /**
423    * GstRtpJitterBuffer:latency:
424    *
425    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
426    * for at most this time.
427    */
428   g_object_class_install_property (gobject_class, PROP_LATENCY,
429       g_param_spec_uint ("latency", "Buffer latency in ms",
430           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
431           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
432   /**
433    * GstRtpJitterBuffer:drop-on-latency:
434    *
435    * Drop oldest buffers when the queue is completely filled.
436    */
437   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
438       g_param_spec_boolean ("drop-on-latency",
439           "Drop buffers when maximum latency is reached",
440           "Tells the jitterbuffer to never exceed the given latency in size",
441           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
442   /**
443    * GstRtpJitterBuffer:ts-offset:
444    *
445    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
446    * This is mainly used to ensure interstream synchronisation.
447    */
448   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
449       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
450           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
451           G_MAXINT64, DEFAULT_TS_OFFSET,
452           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
453
454   /**
455    * GstRtpJitterBuffer:do-lost:
456    *
457    * Send out a GstRTPPacketLost event downstream when a packet is considered
458    * lost.
459    */
460   g_object_class_install_property (gobject_class, PROP_DO_LOST,
461       g_param_spec_boolean ("do-lost", "Do Lost",
462           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
463           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
464
465   /**
466    * GstRtpJitterBuffer:mode:
467    *
468    * Control the buffering and timestamping mode used by the jitterbuffer.
469    */
470   g_object_class_install_property (gobject_class, PROP_MODE,
471       g_param_spec_enum ("mode", "Mode",
472           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
473           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
474   /**
475    * GstRtpJitterBuffer:percent:
476    *
477    * The percent of the jitterbuffer that is filled.
478    */
479   g_object_class_install_property (gobject_class, PROP_PERCENT,
480       g_param_spec_int ("percent", "percent",
481           "The buffer filled percent", 0, 100,
482           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
483   /**
484    * GstRtpJitterBuffer:do-retransmission:
485    *
486    * Send out a GstRTPRetransmission event upstream when a packet is considered
487    * late and should be retransmitted.
488    *
489    * Since: 1.2
490    */
491   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
492       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
493           "Send retransmission events upstream when a packet is late",
494           DEFAULT_DO_RETRANSMISSION,
495           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496
497   /**
498    * GstRtpJitterBuffer:rtx-delay:
499    *
500    * When a packet did not arrive at the expected time, wait this extra amount
501    * of time before sending a retransmission event.
502    *
503    * When -1 is used, the max jitter will be used as extra delay.
504    *
505    * Since: 1.2
506    */
507   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
508       g_param_spec_int ("rtx-delay", "RTX Delay",
509           "Extra time in ms to wait before sending retransmission "
510           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
511           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
512   /**
513    * GstRtpJitterBuffer:rtx-delay-reorder:
514    *
515    * Assume that a retransmission event should be sent when we see
516    * this much packet reordering.
517    *
518    * When -1 is used, the value will be estimated based on observed packet
519    * reordering.
520    *
521    * Since: 1.2
522    */
523   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
524       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
525           "Sending retransmission event when this much reordering (-1 automatic)",
526           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
527           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
528   /**
529    * GstRtpJitterBuffer::rtx-retry-timeout:
530    *
531    * When no packet has been received after sending a retransmission event
532    * for this time, retry sending a retransmission event.
533    *
534    * When -1 is used, the value will be estimated based on observed round
535    * trip time.
536    *
537    * Since: 1.2
538    */
539   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
540       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
541           "Retry sending a transmission event after this timeout in "
542           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
543           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
544   /**
545    * GstRtpJitterBuffer:rtx-retry-period:
546    *
547    * The amount of time to try to get a retransmission.
548    *
549    * When -1 is used, the value will be estimated based on the jitterbuffer
550    * latency and the observed round trip time.
551    *
552    * Since: 1.2
553    */
554   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
555       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
556           "Try to get a retransmission for this many ms "
557           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
558           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
559   /**
560    * GstRtpJitterBuffer:stats:
561    *
562    * Various jitterbuffer statistics. This property returns a GstStructure
563    * with name application/x-rtp-jitterbuffer-stats with the following fields:
564    *
565    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
566    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
567    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
568    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
569    *
570    * Since: 1.4
571    */
572   g_object_class_install_property (gobject_class, PROP_STATS,
573       g_param_spec_boxed ("stats", "Statistics",
574           "Various statistics", GST_TYPE_STRUCTURE,
575           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
576
577   /**
578    * GstRtpJitterBuffer::request-pt-map:
579    * @buffer: the object which received the signal
580    * @pt: the pt
581    *
582    * Request the payload type as #GstCaps for @pt.
583    */
584   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
585       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
586       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
587           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
588       GST_TYPE_CAPS, 1, G_TYPE_UINT);
589   /**
590    * GstRtpJitterBuffer::handle-sync:
591    * @buffer: the object which received the signal
592    * @struct: a GstStructure containing sync values.
593    *
594    * Be notified of new sync values.
595    */
596   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
597       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
598       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
599           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
600       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
601
602   /**
603    * GstRtpJitterBuffer::on-npt-stop:
604    * @buffer: the object which received the signal
605    *
606    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
607    * the npt-stop position.
608    */
609   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
610       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
611       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
612           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
613       G_TYPE_NONE, 0, G_TYPE_NONE);
614
615   /**
616    * GstRtpJitterBuffer::clear-pt-map:
617    * @buffer: the object which received the signal
618    *
619    * Invalidate the clock-rate as obtained with the
620    * #GstRtpJitterBuffer::request-pt-map signal.
621    */
622   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
623       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
624       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
625       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
626       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
627
628   /**
629    * GstRtpJitterBuffer::set-active:
630    * @buffer: the object which received the signal
631    *
632    * Start pushing out packets with the given base time. This signal is only
633    * useful in buffering mode.
634    *
635    * Returns: the time of the last pushed packet.
636    */
637   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
638       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
639       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
640       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
641       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
642       G_TYPE_UINT64);
643
644   gstelement_class->change_state =
645       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
646   gstelement_class->request_new_pad =
647       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
648   gstelement_class->release_pad =
649       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
650   gstelement_class->provide_clock =
651       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
652
653   gst_element_class_add_pad_template (gstelement_class,
654       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
655   gst_element_class_add_pad_template (gstelement_class,
656       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
657   gst_element_class_add_pad_template (gstelement_class,
658       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
659
660   gst_element_class_set_static_metadata (gstelement_class,
661       "RTP packet jitter-buffer", "Filter/Network/RTP",
662       "A buffer that deals with network jitter and other transmission faults",
663       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
664       "Wim Taymans <wim.taymans@gmail.com>");
665
666   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
667   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
668
669   GST_DEBUG_CATEGORY_INIT
670       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
671 }
672
673 static void
674 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
675 {
676   GstRtpJitterBufferPrivate *priv;
677
678   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
679   jitterbuffer->priv = priv;
680
681   priv->latency_ms = DEFAULT_LATENCY_MS;
682   priv->latency_ns = priv->latency_ms * GST_MSECOND;
683   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
684   priv->do_lost = DEFAULT_DO_LOST;
685   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
686   priv->rtx_delay = DEFAULT_RTX_DELAY;
687   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
688   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
689   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
690
691   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
692   priv->jbuf = rtp_jitter_buffer_new ();
693   g_mutex_init (&priv->jbuf_lock);
694   g_cond_init (&priv->jbuf_timer);
695   g_cond_init (&priv->jbuf_event);
696
697   /* reset skew detection initialy */
698   rtp_jitter_buffer_reset_skew (priv->jbuf);
699   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
700   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
701   priv->active = TRUE;
702
703   priv->srcpad =
704       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
705       "src");
706
707   gst_pad_set_activatemode_function (priv->srcpad,
708       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
709   gst_pad_set_query_function (priv->srcpad,
710       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
711   gst_pad_set_event_function (priv->srcpad,
712       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
713
714   priv->sinkpad =
715       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
716       "sink");
717
718   gst_pad_set_chain_function (priv->sinkpad,
719       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
720   gst_pad_set_event_function (priv->sinkpad,
721       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
722   gst_pad_set_query_function (priv->sinkpad,
723       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
724
725   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
726   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
727
728   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
729 }
730
731 #define ITEM_TYPE_BUFFER        0
732 #define ITEM_TYPE_LOST          1
733 #define ITEM_TYPE_EVENT         2
734
735 static RTPJitterBufferItem *
736 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
737     guint seqnum, guint count, guint rtptime)
738 {
739   RTPJitterBufferItem *item;
740
741   item = g_slice_new (RTPJitterBufferItem);
742   item->data = data;
743   item->next = NULL;
744   item->prev = NULL;
745   item->type = type;
746   item->dts = dts;
747   item->pts = pts;
748   item->seqnum = seqnum;
749   item->count = count;
750   item->rtptime = rtptime;
751
752   return item;
753 }
754
755 static void
756 free_item (RTPJitterBufferItem * item)
757 {
758   if (item->data)
759     gst_mini_object_unref (item->data);
760   g_slice_free (RTPJitterBufferItem, item);
761 }
762
763 static void
764 gst_rtp_jitter_buffer_finalize (GObject * object)
765 {
766   GstRtpJitterBuffer *jitterbuffer;
767   GstRtpJitterBufferPrivate *priv;
768
769   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
770   priv = jitterbuffer->priv;
771
772   g_array_free (priv->timers, TRUE);
773   g_mutex_clear (&priv->jbuf_lock);
774   g_cond_clear (&priv->jbuf_timer);
775   g_cond_clear (&priv->jbuf_event);
776
777   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
778   g_object_unref (priv->jbuf);
779
780   G_OBJECT_CLASS (parent_class)->finalize (object);
781 }
782
783 static GstIterator *
784 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
785 {
786   GstRtpJitterBuffer *jitterbuffer;
787   GstPad *otherpad = NULL;
788   GstIterator *it;
789   GValue val = { 0, };
790
791   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
792
793   if (pad == jitterbuffer->priv->sinkpad) {
794     otherpad = jitterbuffer->priv->srcpad;
795   } else if (pad == jitterbuffer->priv->srcpad) {
796     otherpad = jitterbuffer->priv->sinkpad;
797   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
798     otherpad = NULL;
799   }
800
801   g_value_init (&val, GST_TYPE_PAD);
802   g_value_set_object (&val, otherpad);
803   it = gst_iterator_new_single (GST_TYPE_PAD, &val);
804   g_value_unset (&val);
805
806   return it;
807 }
808
809 static GstPad *
810 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
811 {
812   GstRtpJitterBufferPrivate *priv;
813
814   priv = jitterbuffer->priv;
815
816   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
817
818   priv->rtcpsinkpad =
819       gst_pad_new_from_static_template
820       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
821   gst_pad_set_chain_function (priv->rtcpsinkpad,
822       gst_rtp_jitter_buffer_chain_rtcp);
823   gst_pad_set_event_function (priv->rtcpsinkpad,
824       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
825   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
826       gst_rtp_jitter_buffer_iterate_internal_links);
827   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
828   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
829
830   return priv->rtcpsinkpad;
831 }
832
833 static void
834 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
835 {
836   GstRtpJitterBufferPrivate *priv;
837
838   priv = jitterbuffer->priv;
839
840   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
841
842   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
843
844   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
845   priv->rtcpsinkpad = NULL;
846 }
847
848 static GstPad *
849 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
850     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
851 {
852   GstRtpJitterBuffer *jitterbuffer;
853   GstElementClass *klass;
854   GstPad *result;
855   GstRtpJitterBufferPrivate *priv;
856
857   g_return_val_if_fail (templ != NULL, NULL);
858   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
859
860   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
861   priv = jitterbuffer->priv;
862   klass = GST_ELEMENT_GET_CLASS (element);
863
864   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
865
866   /* figure out the template */
867   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
868     if (priv->rtcpsinkpad != NULL)
869       goto exists;
870
871     result = create_rtcp_sink (jitterbuffer);
872   } else
873     goto wrong_template;
874
875   return result;
876
877   /* ERRORS */
878 wrong_template:
879   {
880     g_warning ("rtpjitterbuffer: this is not our template");
881     return NULL;
882   }
883 exists:
884   {
885     g_warning ("rtpjitterbuffer: pad already requested");
886     return NULL;
887   }
888 }
889
890 static void
891 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
892 {
893   GstRtpJitterBuffer *jitterbuffer;
894   GstRtpJitterBufferPrivate *priv;
895
896   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
897   g_return_if_fail (GST_IS_PAD (pad));
898
899   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
900   priv = jitterbuffer->priv;
901
902   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
903
904   if (priv->rtcpsinkpad == pad) {
905     remove_rtcp_sink (jitterbuffer);
906   } else
907     goto wrong_pad;
908
909   return;
910
911   /* ERRORS */
912 wrong_pad:
913   {
914     g_warning ("gstjitterbuffer: asked to release an unknown pad");
915     return;
916   }
917 }
918
919 static GstClock *
920 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
921 {
922   return gst_system_clock_obtain ();
923 }
924
925 static void
926 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
927 {
928   GstRtpJitterBufferPrivate *priv;
929
930   priv = jitterbuffer->priv;
931
932   /* this will trigger a new pt-map request signal, FIXME, do something better. */
933
934   JBUF_LOCK (priv);
935   priv->clock_rate = -1;
936   /* do not clear current content, but refresh state for new arrival */
937   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
938   rtp_jitter_buffer_reset_skew (priv->jbuf);
939   JBUF_UNLOCK (priv);
940 }
941
942 static GstClockTime
943 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
944     guint64 offset)
945 {
946   GstRtpJitterBufferPrivate *priv;
947   GstClockTime last_out;
948   RTPJitterBufferItem *item;
949
950   priv = jbuf->priv;
951
952   JBUF_LOCK (priv);
953   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
954       active, GST_TIME_ARGS (offset));
955
956   if (active != priv->active) {
957     /* add the amount of time spent in paused to the output offset. All
958      * outgoing buffers will have this offset applied to their timestamps in
959      * order to make them arrive in time in the sink. */
960     priv->out_offset = offset;
961     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
962         GST_TIME_ARGS (priv->out_offset));
963     priv->active = active;
964     JBUF_SIGNAL_EVENT (priv);
965   }
966   if (!active) {
967     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
968   }
969   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
970     /* head buffer timestamp and offset gives our output time */
971     last_out = item->dts + priv->ts_offset;
972   } else {
973     /* use last known time when the buffer is empty */
974     last_out = priv->last_out_time;
975   }
976   JBUF_UNLOCK (priv);
977
978   return last_out;
979 }
980
981 static GstCaps *
982 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
983 {
984   GstRtpJitterBuffer *jitterbuffer;
985   GstRtpJitterBufferPrivate *priv;
986   GstPad *other;
987   GstCaps *caps;
988   GstCaps *templ;
989
990   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
991   priv = jitterbuffer->priv;
992
993   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
994
995   caps = gst_pad_peer_query_caps (other, filter);
996
997   templ = gst_pad_get_pad_template_caps (pad);
998   if (caps == NULL) {
999     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1000     caps = templ;
1001   } else {
1002     GstCaps *intersect;
1003
1004     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1005
1006     intersect = gst_caps_intersect (caps, templ);
1007     gst_caps_unref (caps);
1008     gst_caps_unref (templ);
1009
1010     caps = intersect;
1011   }
1012   gst_object_unref (jitterbuffer);
1013
1014   return caps;
1015 }
1016
1017 /*
1018  * Must be called with JBUF_LOCK held
1019  */
1020
1021 static gboolean
1022 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1023     GstCaps * caps)
1024 {
1025   GstRtpJitterBufferPrivate *priv;
1026   GstStructure *caps_struct;
1027   guint val;
1028   GstClockTime tval;
1029
1030   priv = jitterbuffer->priv;
1031
1032   /* first parse the caps */
1033   caps_struct = gst_caps_get_structure (caps, 0);
1034
1035   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1036
1037   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1038    * measure the amount of data in the buffer */
1039   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1040     goto error;
1041
1042   if (priv->clock_rate <= 0)
1043     goto wrong_rate;
1044
1045   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1046
1047   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1048
1049   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1050    * can use this to track the amount of time elapsed on the sender. */
1051   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1052     priv->clock_base = val;
1053   else
1054     priv->clock_base = -1;
1055
1056   priv->ext_timestamp = priv->clock_base;
1057
1058   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1059       priv->clock_base);
1060
1061   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1062     /* first expected seqnum, only update when we didn't have a previous base. */
1063     if (priv->next_in_seqnum == -1)
1064       priv->next_in_seqnum = val;
1065     if (priv->next_seqnum == -1)
1066       priv->next_seqnum = val;
1067   }
1068
1069   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1070
1071   /* the start and stop times. The seqnum-base corresponds to the start time. We
1072    * will keep track of the seqnums on the output and when we reach the one
1073    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1074   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1075     priv->npt_start = tval;
1076   else
1077     priv->npt_start = 0;
1078
1079   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1080     priv->npt_stop = tval;
1081   else
1082     priv->npt_stop = -1;
1083
1084   GST_DEBUG_OBJECT (jitterbuffer,
1085       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1086       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1087
1088   return TRUE;
1089
1090   /* ERRORS */
1091 error:
1092   {
1093     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1094     return FALSE;
1095   }
1096 wrong_rate:
1097   {
1098     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1099     return FALSE;
1100   }
1101 }
1102
1103 static void
1104 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1105 {
1106   GstRtpJitterBufferPrivate *priv;
1107
1108   priv = jitterbuffer->priv;
1109
1110   JBUF_LOCK (priv);
1111   /* mark ourselves as flushing */
1112   priv->srcresult = GST_FLOW_FLUSHING;
1113   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1114   /* this unblocks any waiting pops on the src pad task */
1115   JBUF_SIGNAL_EVENT (priv);
1116   JBUF_UNLOCK (priv);
1117 }
1118
1119 static void
1120 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1121 {
1122   GstRtpJitterBufferPrivate *priv;
1123
1124   priv = jitterbuffer->priv;
1125
1126   JBUF_LOCK (priv);
1127   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1128   /* Mark as non flushing */
1129   priv->srcresult = GST_FLOW_OK;
1130   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1131   priv->last_popped_seqnum = -1;
1132   priv->last_out_time = -1;
1133   priv->next_seqnum = -1;
1134   priv->ips_rtptime = -1;
1135   priv->ips_dts = GST_CLOCK_TIME_NONE;
1136   priv->packet_spacing = 0;
1137   priv->next_in_seqnum = -1;
1138   priv->clock_rate = -1;
1139   priv->eos = FALSE;
1140   priv->estimated_eos = -1;
1141   priv->last_elapsed = 0;
1142   priv->ext_timestamp = -1;
1143   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1144   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1145   rtp_jitter_buffer_reset_skew (priv->jbuf);
1146   remove_all_timers (jitterbuffer);
1147   JBUF_UNLOCK (priv);
1148 }
1149
1150 static gboolean
1151 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1152     GstPadMode mode, gboolean active)
1153 {
1154   gboolean result;
1155   GstRtpJitterBuffer *jitterbuffer = NULL;
1156
1157   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1158
1159   switch (mode) {
1160     case GST_PAD_MODE_PUSH:
1161       if (active) {
1162         /* allow data processing */
1163         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1164
1165         /* start pushing out buffers */
1166         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1167         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1168             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1169       } else {
1170         /* make sure all data processing stops ASAP */
1171         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1172
1173         /* NOTE this will hardlock if the state change is called from the src pad
1174          * task thread because we will _join() the thread. */
1175         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1176         result = gst_pad_stop_task (pad);
1177       }
1178       break;
1179     default:
1180       result = FALSE;
1181       break;
1182   }
1183   return result;
1184 }
1185
1186 static GstStateChangeReturn
1187 gst_rtp_jitter_buffer_change_state (GstElement * element,
1188     GstStateChange transition)
1189 {
1190   GstRtpJitterBuffer *jitterbuffer;
1191   GstRtpJitterBufferPrivate *priv;
1192   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1193
1194   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1195   priv = jitterbuffer->priv;
1196
1197   switch (transition) {
1198     case GST_STATE_CHANGE_NULL_TO_READY:
1199       break;
1200     case GST_STATE_CHANGE_READY_TO_PAUSED:
1201       JBUF_LOCK (priv);
1202       /* reset negotiated values */
1203       priv->clock_rate = -1;
1204       priv->clock_base = -1;
1205       priv->peer_latency = 0;
1206       priv->last_pt = -1;
1207       /* block until we go to PLAYING */
1208       priv->blocked = TRUE;
1209       priv->timer_running = TRUE;
1210       priv->timer_thread =
1211           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1212       JBUF_UNLOCK (priv);
1213       break;
1214     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1215       JBUF_LOCK (priv);
1216       /* unblock to allow streaming in PLAYING */
1217       priv->blocked = FALSE;
1218       JBUF_SIGNAL_EVENT (priv);
1219       JBUF_SIGNAL_TIMER (priv);
1220       JBUF_UNLOCK (priv);
1221       break;
1222     default:
1223       break;
1224   }
1225
1226   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1227
1228   switch (transition) {
1229     case GST_STATE_CHANGE_READY_TO_PAUSED:
1230       /* we are a live element because we sync to the clock, which we can only
1231        * do in the PLAYING state */
1232       if (ret != GST_STATE_CHANGE_FAILURE)
1233         ret = GST_STATE_CHANGE_NO_PREROLL;
1234       break;
1235     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1236       JBUF_LOCK (priv);
1237       /* block to stop streaming when PAUSED */
1238       priv->blocked = TRUE;
1239       unschedule_current_timer (jitterbuffer);
1240       JBUF_UNLOCK (priv);
1241       if (ret != GST_STATE_CHANGE_FAILURE)
1242         ret = GST_STATE_CHANGE_NO_PREROLL;
1243       break;
1244     case GST_STATE_CHANGE_PAUSED_TO_READY:
1245       JBUF_LOCK (priv);
1246       gst_buffer_replace (&priv->last_sr, NULL);
1247       priv->timer_running = FALSE;
1248       unschedule_current_timer (jitterbuffer);
1249       JBUF_SIGNAL_TIMER (priv);
1250       JBUF_UNLOCK (priv);
1251       g_thread_join (priv->timer_thread);
1252       priv->timer_thread = NULL;
1253       break;
1254     case GST_STATE_CHANGE_READY_TO_NULL:
1255       break;
1256     default:
1257       break;
1258   }
1259
1260   return ret;
1261 }
1262
1263 static gboolean
1264 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1265     GstEvent * event)
1266 {
1267   gboolean ret = TRUE;
1268   GstRtpJitterBuffer *jitterbuffer;
1269   GstRtpJitterBufferPrivate *priv;
1270
1271   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1272   priv = jitterbuffer->priv;
1273
1274   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1275
1276   switch (GST_EVENT_TYPE (event)) {
1277     case GST_EVENT_LATENCY:
1278     {
1279       GstClockTime latency;
1280
1281       gst_event_parse_latency (event, &latency);
1282
1283       GST_DEBUG_OBJECT (jitterbuffer,
1284           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1285
1286       JBUF_LOCK (priv);
1287       /* adjust the overall buffer delay to the total pipeline latency in
1288        * buffering mode because if downstream consumes too fast (because of
1289        * large latency or queues, we would start rebuffering again. */
1290       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1291           RTP_JITTER_BUFFER_MODE_BUFFER) {
1292         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1293       }
1294       JBUF_UNLOCK (priv);
1295
1296       ret = gst_pad_push_event (priv->sinkpad, event);
1297       break;
1298     }
1299     default:
1300       ret = gst_pad_push_event (priv->sinkpad, event);
1301       break;
1302   }
1303
1304   return ret;
1305 }
1306
1307 /* handles and stores the event in the jitterbuffer, must be called with
1308  * LOCK */
1309 static gboolean
1310 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1311 {
1312   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1313   RTPJitterBufferItem *item;
1314
1315   switch (GST_EVENT_TYPE (event)) {
1316     case GST_EVENT_CAPS:
1317     {
1318       GstCaps *caps;
1319
1320       gst_event_parse_caps (event, &caps);
1321       if (!gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps))
1322         goto wrong_caps;
1323
1324       break;
1325     }
1326     case GST_EVENT_SEGMENT:
1327       gst_event_copy_segment (event, &priv->segment);
1328
1329       /* we need time for now */
1330       if (priv->segment.format != GST_FORMAT_TIME)
1331         goto newseg_wrong_format;
1332
1333       GST_DEBUG_OBJECT (jitterbuffer,
1334           "newsegment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1335       break;
1336     case GST_EVENT_EOS:
1337       priv->eos = TRUE;
1338       break;
1339     default:
1340       break;
1341   }
1342
1343
1344   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1345   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1346   rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
1347   JBUF_SIGNAL_EVENT (priv);
1348
1349   return TRUE;
1350
1351   /* ERRORS */
1352 wrong_caps:
1353   {
1354     GST_DEBUG_OBJECT (jitterbuffer, "received invalid caps");
1355     gst_event_unref (event);
1356     return FALSE;
1357   }
1358 newseg_wrong_format:
1359   {
1360     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1361     gst_event_unref (event);
1362     return FALSE;
1363   }
1364 }
1365
1366 static gboolean
1367 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1368     GstEvent * event)
1369 {
1370   gboolean ret = TRUE;
1371   GstRtpJitterBuffer *jitterbuffer;
1372   GstRtpJitterBufferPrivate *priv;
1373
1374   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1375   priv = jitterbuffer->priv;
1376
1377   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1378
1379   switch (GST_EVENT_TYPE (event)) {
1380     case GST_EVENT_FLUSH_START:
1381       ret = gst_pad_push_event (priv->srcpad, event);
1382       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1383       /* wait for the loop to go into PAUSED */
1384       gst_pad_pause_task (priv->srcpad);
1385       break;
1386     case GST_EVENT_FLUSH_STOP:
1387       ret = gst_pad_push_event (priv->srcpad, event);
1388       ret =
1389           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1390           GST_PAD_MODE_PUSH, TRUE);
1391       break;
1392     default:
1393       if (GST_EVENT_IS_SERIALIZED (event)) {
1394         /* serialized events go in the queue */
1395         JBUF_LOCK (priv);
1396         if (priv->srcresult != GST_FLOW_OK) {
1397           /* Errors in sticky event pushing are no problem and ignored here
1398            * as they will cause more meaningful errors during data flow.
1399            * For EOS events, that are not followed by data flow, we still
1400            * return FALSE here though.
1401            */
1402           if (!GST_EVENT_IS_STICKY (event) ||
1403               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1404             goto out_flow_error;
1405         }
1406         /* refuse more events on EOS */
1407         if (priv->eos)
1408           goto out_eos;
1409         ret = queue_event (jitterbuffer, event);
1410         JBUF_UNLOCK (priv);
1411       } else {
1412         /* non-serialized events are forwarded downstream immediately */
1413         ret = gst_pad_push_event (priv->srcpad, event);
1414       }
1415       break;
1416   }
1417   return ret;
1418
1419   /* ERRORS */
1420 out_flow_error:
1421   {
1422     GST_DEBUG_OBJECT (jitterbuffer,
1423         "refusing event, we have a downstream flow error: %s",
1424         gst_flow_get_name (priv->srcresult));
1425     JBUF_UNLOCK (priv);
1426     gst_event_unref (event);
1427     return FALSE;
1428   }
1429 out_eos:
1430   {
1431     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1432     JBUF_UNLOCK (priv);
1433     gst_event_unref (event);
1434     return FALSE;
1435   }
1436 }
1437
1438 static gboolean
1439 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1440     GstEvent * event)
1441 {
1442   gboolean ret = TRUE;
1443   GstRtpJitterBuffer *jitterbuffer;
1444
1445   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1446
1447   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1448
1449   switch (GST_EVENT_TYPE (event)) {
1450     case GST_EVENT_FLUSH_START:
1451       gst_event_unref (event);
1452       break;
1453     case GST_EVENT_FLUSH_STOP:
1454       gst_event_unref (event);
1455       break;
1456     default:
1457       ret = gst_pad_event_default (pad, parent, event);
1458       break;
1459   }
1460
1461   return ret;
1462 }
1463
1464 /*
1465  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1466  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1467  * GST_FLOW_FLUSHING when the element is shutting down. On success
1468  * GST_FLOW_OK is returned.
1469  */
1470 static GstFlowReturn
1471 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1472     guint8 pt)
1473 {
1474   GValue ret = { 0 };
1475   GValue args[2] = { {0}, {0} };
1476   GstCaps *caps;
1477   gboolean res;
1478
1479   g_value_init (&args[0], GST_TYPE_ELEMENT);
1480   g_value_set_object (&args[0], jitterbuffer);
1481   g_value_init (&args[1], G_TYPE_UINT);
1482   g_value_set_uint (&args[1], pt);
1483
1484   g_value_init (&ret, GST_TYPE_CAPS);
1485   g_value_set_boxed (&ret, NULL);
1486
1487   JBUF_UNLOCK (jitterbuffer->priv);
1488   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1489       &ret);
1490   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1491
1492   g_value_unset (&args[0]);
1493   g_value_unset (&args[1]);
1494   caps = (GstCaps *) g_value_dup_boxed (&ret);
1495   g_value_unset (&ret);
1496   if (!caps)
1497     goto no_caps;
1498
1499   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1500   gst_caps_unref (caps);
1501
1502   if (G_UNLIKELY (!res))
1503     goto parse_failed;
1504
1505   return GST_FLOW_OK;
1506
1507   /* ERRORS */
1508 no_caps:
1509   {
1510     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1511     return GST_FLOW_ERROR;
1512   }
1513 out_flushing:
1514   {
1515     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1516     return GST_FLOW_FLUSHING;
1517   }
1518 parse_failed:
1519   {
1520     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1521     return GST_FLOW_ERROR;
1522   }
1523 }
1524
1525 /* call with jbuf lock held */
1526 static void
1527 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1528 {
1529   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1530
1531   /* too short a stream, or too close to EOS will never really fill buffer */
1532   if (*percent != -1 && priv->npt_stop != -1 &&
1533       priv->npt_stop - priv->npt_start <=
1534       rtp_jitter_buffer_get_delay (priv->jbuf)) {
1535     GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1536     rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1537     *percent = 100;
1538   }
1539 }
1540
1541 static void
1542 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1543 {
1544   GstMessage *message;
1545
1546   /* Post a buffering message */
1547   message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1548   gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1549
1550   gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1551 }
1552
1553 static GstClockTime
1554 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1555 {
1556   GstRtpJitterBufferPrivate *priv;
1557
1558   priv = jitterbuffer->priv;
1559
1560   if (timestamp == -1)
1561     return -1;
1562
1563   /* apply the timestamp offset, this is used for inter stream sync */
1564   timestamp += priv->ts_offset;
1565   /* add the offset, this is used when buffering */
1566   timestamp += priv->out_offset;
1567
1568   return timestamp;
1569 }
1570
1571 static TimerData *
1572 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1573 {
1574   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1575   TimerData *timer = NULL;
1576   gint i, len;
1577
1578   len = priv->timers->len;
1579   for (i = 0; i < len; i++) {
1580     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1581     if (test->seqnum == seqnum && test->type == type) {
1582       timer = test;
1583       break;
1584     }
1585   }
1586   return timer;
1587 }
1588
1589 static void
1590 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1591 {
1592   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1593
1594   if (priv->clock_id) {
1595     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1596     gst_clock_id_unschedule (priv->clock_id);
1597     priv->clock_id = NULL;
1598   }
1599 }
1600
1601 static GstClockTime
1602 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1603 {
1604   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1605   GstClockTime test_timeout;
1606
1607   if ((test_timeout = timer->timeout) == -1)
1608     return -1;
1609
1610   if (timer->type != TIMER_TYPE_EXPECTED) {
1611     /* add our latency and offset to get output times. */
1612     test_timeout = apply_offset (jitterbuffer, test_timeout);
1613     test_timeout += priv->latency_ns;
1614   }
1615   return test_timeout;
1616 }
1617
1618 static void
1619 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1620 {
1621   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1622
1623   if (priv->clock_id) {
1624     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1625
1626     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1627         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1628
1629     if (timeout == -1 || timeout < priv->timer_timeout)
1630       unschedule_current_timer (jitterbuffer);
1631   }
1632 }
1633
1634 static TimerData *
1635 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1636     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1637     GstClockTime duration)
1638 {
1639   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1640   TimerData *timer;
1641   gint len;
1642
1643   GST_DEBUG_OBJECT (jitterbuffer,
1644       "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1645       GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
1646
1647   len = priv->timers->len;
1648   g_array_set_size (priv->timers, len + 1);
1649   timer = &g_array_index (priv->timers, TimerData, len);
1650   timer->idx = len;
1651   timer->type = type;
1652   timer->seqnum = seqnum;
1653   timer->num = num;
1654   timer->timeout = timeout + delay;
1655   timer->duration = duration;
1656   if (type == TIMER_TYPE_EXPECTED) {
1657     timer->rtx_base = timeout;
1658     timer->rtx_delay = delay;
1659     timer->rtx_retry = 0;
1660   }
1661   timer->num_rtx_retry = 0;
1662   recalculate_timer (jitterbuffer, timer);
1663   JBUF_SIGNAL_TIMER (priv);
1664
1665   return timer;
1666 }
1667
1668 static void
1669 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1670     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1671 {
1672   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1673   gboolean seqchange, timechange;
1674   guint16 oldseq;
1675
1676   seqchange = timer->seqnum != seqnum;
1677   timechange = timer->timeout != timeout;
1678
1679   if (!seqchange && !timechange)
1680     return;
1681
1682   oldseq = timer->seqnum;
1683
1684   GST_DEBUG_OBJECT (jitterbuffer,
1685       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1686       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1687
1688   timer->timeout = timeout + delay;
1689   timer->seqnum = seqnum;
1690   if (reset) {
1691     timer->rtx_base = timeout;
1692     timer->rtx_delay = delay;
1693     timer->rtx_retry = 0;
1694   }
1695
1696   if (priv->clock_id) {
1697     /* we changed the seqnum and there is a timer currently waiting with this
1698      * seqnum, unschedule it */
1699     if (seqchange && priv->timer_seqnum == oldseq)
1700       unschedule_current_timer (jitterbuffer);
1701     /* we changed the time, check if it is earlier than what we are waiting
1702      * for and unschedule if so */
1703     else if (timechange)
1704       recalculate_timer (jitterbuffer, timer);
1705   }
1706 }
1707
1708 static TimerData *
1709 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1710     guint16 seqnum, GstClockTime timeout)
1711 {
1712   TimerData *timer;
1713
1714   /* find the seqnum timer */
1715   timer = find_timer (jitterbuffer, type, seqnum);
1716   if (timer == NULL) {
1717     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1718   } else {
1719     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1720   }
1721   return timer;
1722 }
1723
1724 static void
1725 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1726 {
1727   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1728   guint idx;
1729
1730   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1731     unschedule_current_timer (jitterbuffer);
1732
1733   idx = timer->idx;
1734   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1735   g_array_remove_index_fast (priv->timers, idx);
1736   timer->idx = idx;
1737 }
1738
1739 static void
1740 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1741 {
1742   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1743   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1744   g_array_set_size (priv->timers, 0);
1745   unschedule_current_timer (jitterbuffer);
1746 }
1747
1748 /* we just received a packet with seqnum and dts.
1749  *
1750  * First check for old seqnum that we are still expecting. If the gap with the
1751  * current seqnum is too big, unschedule the timeouts.
1752  *
1753  * If we have a valid packet spacing estimate we can set a timer for when we
1754  * should receive the next packet.
1755  * If we don't have a valid estimate, we remove any timer we might have
1756  * had for this packet.
1757  */
1758 static void
1759 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1760     GstClockTime dts, gboolean do_next_seqnum)
1761 {
1762   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1763   TimerData *timer = NULL;
1764   gint i, len;
1765
1766   /* go through all timers and unschedule the ones with a large gap, also find
1767    * the timer for the seqnum */
1768   len = priv->timers->len;
1769   for (i = 0; i < len; i++) {
1770     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1771     gint gap;
1772
1773     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1774
1775     GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1776         test->seqnum, seqnum, gap);
1777
1778     if (gap == 0) {
1779       GST_DEBUG ("found timer for current seqnum");
1780       /* the timer for the current seqnum */
1781       timer = test;
1782     } else if (gap > priv->rtx_delay_reorder) {
1783       /* max gap, we exceeded the max reorder distance and we don't expect the
1784        * missing packet to be this reordered */
1785       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1786         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1787     }
1788   }
1789
1790   if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1791     GstClockTime expected, delay;
1792
1793     /* calculate expected arrival time of the next seqnum */
1794     expected = dts + priv->packet_spacing;
1795     delay = priv->rtx_delay * GST_MSECOND;
1796
1797     /* and update/install timer for next seqnum */
1798     if (timer)
1799       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1800           delay, TRUE);
1801     else
1802       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1803           expected, delay, priv->packet_spacing);
1804   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1805
1806     if (timer->num_rtx_retry > 0) {
1807       GstClockTime rtx_last;
1808
1809       /* we scheduled a retry for this packet and now we have it */
1810       priv->num_rtx_success++;
1811       /* all the previous retry attempts failed */
1812       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1813       /* number of retries before receiving the packet */
1814       if (priv->avg_rtx_num == 0.0)
1815         priv->avg_rtx_num = timer->num_rtx_retry;
1816       else
1817         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1818       /* calculate the delay between retransmission request and receiving this
1819        * packet, start with when we scheduled this timeout last */
1820       rtx_last = timer->rtx_last;
1821       if (dts > rtx_last) {
1822         GstClockTime delay;
1823         /* we have a valid delay if this packet arrived after we scheduled the
1824          * request */
1825         delay = dts - rtx_last;
1826         if (priv->avg_rtx_rtt == 0)
1827           priv->avg_rtx_rtt = delay;
1828         else
1829           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1830       }
1831       GST_LOG_OBJECT (jitterbuffer,
1832           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1833           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1834           ", avg-num %g, avg-rtt %" G_GUINT64_FORMAT, priv->num_rtx_success,
1835           priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
1836           priv->avg_rtx_num, priv->avg_rtx_rtt);
1837     }
1838     /* if we had a timer, remove it, we don't know when to expect the next
1839      * packet. */
1840     remove_timer (jitterbuffer, timer);
1841     /* we signal the _loop function because this new packet could be the one
1842      * it was waiting for */
1843     JBUF_SIGNAL_EVENT (priv);
1844   }
1845 }
1846
1847 static void
1848 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1849     GstClockTime dts)
1850 {
1851   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1852
1853   /* we need consecutive seqnums with a different
1854    * rtptime to estimate the packet spacing. */
1855   if (priv->ips_rtptime != rtptime) {
1856     /* rtptime changed, check dts diff */
1857     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1858       priv->packet_spacing = dts - priv->ips_dts;
1859       GST_DEBUG_OBJECT (jitterbuffer,
1860           "new packet spacing %" GST_TIME_FORMAT,
1861           GST_TIME_ARGS (priv->packet_spacing));
1862     }
1863     priv->ips_rtptime = rtptime;
1864     priv->ips_dts = dts;
1865   }
1866 }
1867
1868 static void
1869 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1870     guint16 seqnum, GstClockTime dts, gint gap)
1871 {
1872   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1873   GstClockTime total_duration, duration, expected_dts;
1874   TimerType type;
1875
1876   GST_DEBUG_OBJECT (jitterbuffer,
1877       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1878       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1879
1880   /* the total duration spanned by the missing packets */
1881   if (dts >= priv->last_in_dts)
1882     total_duration = dts - priv->last_in_dts;
1883   else
1884     total_duration = 0;
1885
1886   /* interpolate between the current time and the last time based on
1887    * number of packets we are missing, this is the estimated duration
1888    * for the missing packet based on equidistant packet spacing. */
1889   duration = total_duration / (gap + 1);
1890
1891   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1892       GST_TIME_ARGS (duration));
1893
1894   if (total_duration > priv->latency_ns) {
1895     GstClockTime gap_time;
1896     guint lost_packets;
1897
1898     gap_time = total_duration - priv->latency_ns;
1899
1900     if (duration > 0) {
1901       lost_packets = gap_time / duration;
1902       gap_time = lost_packets * duration;
1903     } else {
1904       lost_packets = gap;
1905     }
1906
1907     /* too many lost packets, some of the missing packets are already
1908      * too late and we can generate lost packet events for them. */
1909     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
1910         " > %" GST_TIME_FORMAT ", consider %u lost",
1911         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
1912         lost_packets);
1913
1914     /* this timer will fire immediately and the lost event will be pushed from
1915      * the timer thread */
1916     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
1917         priv->last_in_dts + duration, 0, gap_time);
1918
1919     expected += lost_packets;
1920     priv->last_in_dts += gap_time;
1921   }
1922
1923   expected_dts = priv->last_in_dts + duration;
1924
1925   if (priv->do_retransmission) {
1926     TimerData *timer;
1927
1928     type = TIMER_TYPE_EXPECTED;
1929     /* if we had a timer for the first missing packet, update it. */
1930     if ((timer = find_timer (jitterbuffer, type, expected))) {
1931       GstClockTime timeout = timer->timeout;
1932
1933       timer->duration = duration;
1934       if (timeout > expected_dts) {
1935         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
1936         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
1937             delay, TRUE);
1938       }
1939       expected++;
1940       expected_dts += duration;
1941     }
1942   } else {
1943     type = TIMER_TYPE_LOST;
1944   }
1945
1946   while (expected < seqnum) {
1947     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
1948     expected_dts += duration;
1949     expected++;
1950   }
1951 }
1952
1953 static GstFlowReturn
1954 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1955     GstBuffer * buffer)
1956 {
1957   GstRtpJitterBuffer *jitterbuffer;
1958   GstRtpJitterBufferPrivate *priv;
1959   guint16 seqnum;
1960   guint32 expected, rtptime;
1961   GstFlowReturn ret = GST_FLOW_OK;
1962   GstClockTime dts, pts;
1963   guint64 latency_ts;
1964   gboolean tail;
1965   gint percent = -1;
1966   guint8 pt;
1967   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1968   gboolean do_next_seqnum = FALSE;
1969   RTPJitterBufferItem *item;
1970
1971   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1972
1973   priv = jitterbuffer->priv;
1974
1975   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1976     goto invalid_buffer;
1977
1978   pt = gst_rtp_buffer_get_payload_type (&rtp);
1979   seqnum = gst_rtp_buffer_get_seq (&rtp);
1980   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1981   gst_rtp_buffer_unmap (&rtp);
1982
1983   /* make sure we have PTS and DTS set */
1984   pts = GST_BUFFER_PTS (buffer);
1985   dts = GST_BUFFER_DTS (buffer);
1986   if (dts == -1)
1987     dts = pts;
1988   else if (pts == -1)
1989     pts = dts;
1990
1991   /* take the DTS of the buffer. This is the time when the packet was
1992    * received and is used to calculate jitter and clock skew. We will adjust
1993    * this DTS with the smoothed value after processing it in the
1994    * jitterbuffer and assign it as the PTS. */
1995   /* bring to running time */
1996   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1997
1998   GST_DEBUG_OBJECT (jitterbuffer,
1999       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2000       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2001
2002   JBUF_LOCK_CHECK (priv, out_flushing);
2003
2004   if (G_UNLIKELY (priv->last_pt != pt)) {
2005     GstCaps *caps;
2006
2007     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2008         pt);
2009
2010     priv->last_pt = pt;
2011     /* reset clock-rate so that we get a new one */
2012     priv->clock_rate = -1;
2013
2014     /* Try to get the clock-rate from the caps first if we can. If there are no
2015      * caps we must fire the signal to get the clock-rate. */
2016     if ((caps = gst_pad_get_current_caps (pad))) {
2017       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2018       gst_caps_unref (caps);
2019     }
2020   }
2021
2022   if (G_UNLIKELY (priv->clock_rate == -1)) {
2023     /* no clock rate given on the caps, try to get one with the signal */
2024     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2025             pt) == GST_FLOW_FLUSHING)
2026       goto out_flushing;
2027
2028     if (G_UNLIKELY (priv->clock_rate == -1))
2029       goto no_clock_rate;
2030   }
2031
2032   /* don't accept more data on EOS */
2033   if (G_UNLIKELY (priv->eos))
2034     goto have_eos;
2035
2036   expected = priv->next_in_seqnum;
2037
2038   /* now check against our expected seqnum */
2039   if (G_LIKELY (expected != -1)) {
2040     gint gap;
2041
2042     /* now calculate gap */
2043     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2044
2045     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2046         expected, seqnum, gap);
2047
2048     if (G_LIKELY (gap == 0)) {
2049       /* packet is expected */
2050       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2051       do_next_seqnum = TRUE;
2052     } else {
2053       gboolean reset = FALSE;
2054
2055       if (gap < 0) {
2056         /* we received an old packet */
2057         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2058           /* too old packet, reset */
2059           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2060               -RTP_MAX_MISORDER);
2061           reset = TRUE;
2062         } else {
2063           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2064         }
2065       } else {
2066         /* new packet, we are missing some packets */
2067         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2068           /* packet too far in future, reset */
2069           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2070               RTP_MAX_DROPOUT);
2071           reset = TRUE;
2072         } else {
2073           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2074           /* fill in the gap with EXPECTED timers */
2075           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2076
2077           do_next_seqnum = TRUE;
2078         }
2079       }
2080       if (G_UNLIKELY (reset)) {
2081         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2082         rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
2083         rtp_jitter_buffer_reset_skew (priv->jbuf);
2084         remove_all_timers (jitterbuffer);
2085         priv->last_popped_seqnum = -1;
2086         priv->next_seqnum = seqnum;
2087         do_next_seqnum = TRUE;
2088       }
2089       /* reset spacing estimation when gap */
2090       priv->ips_rtptime = -1;
2091       priv->ips_dts = GST_CLOCK_TIME_NONE;
2092     }
2093   } else {
2094     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2095     /* we don't know what the next_in_seqnum should be, wait for the last
2096      * possible moment to push this buffer, maybe we get an earlier seqnum
2097      * while we wait */
2098     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2099     do_next_seqnum = TRUE;
2100     /* take rtptime and dts to calculate packet spacing */
2101     priv->ips_rtptime = rtptime;
2102     priv->ips_dts = dts;
2103   }
2104   if (do_next_seqnum) {
2105     priv->last_in_seqnum = seqnum;
2106     priv->last_in_dts = dts;
2107     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2108   }
2109
2110   /* let's check if this buffer is too late, we can only accept packets with
2111    * bigger seqnum than the one we last pushed. */
2112   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2113     gint gap;
2114
2115     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2116
2117     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2118     if (G_UNLIKELY (gap <= 0))
2119       goto too_late;
2120   }
2121
2122   /* let's drop oldest packet if the queue is already full and drop-on-latency
2123    * is set. We can only do this when there actually is a latency. When no
2124    * latency is set, we just pump it in the queue and let the other end push it
2125    * out as fast as possible. */
2126   if (priv->latency_ms && priv->drop_on_latency) {
2127     latency_ts =
2128         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2129
2130     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2131       RTPJitterBufferItem *old_item;
2132
2133       old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2134       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2135           old_item);
2136       priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2137       free_item (old_item);
2138     }
2139   }
2140
2141   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2142
2143   /* now insert the packet into the queue in sorted order. This function returns
2144    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2145    * have a duplicate. */
2146   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2147               &tail, &percent)))
2148     goto duplicate;
2149
2150   /* update timers */
2151   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2152
2153   /* we had an unhandled SR, handle it now */
2154   if (priv->last_sr)
2155     do_handle_sync (jitterbuffer);
2156
2157   /* signal addition of new buffer when the _loop is waiting. */
2158   if (priv->active)
2159     JBUF_SIGNAL_EVENT (priv);
2160
2161   /* let's unschedule and unblock any waiting buffers. We only want to do this
2162    * when the tail buffer changed */
2163   if (G_UNLIKELY (priv->clock_id && tail)) {
2164     GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2165     unschedule_current_timer (jitterbuffer);
2166   }
2167
2168   GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
2169       seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
2170
2171   check_buffering_percent (jitterbuffer, &percent);
2172
2173 finished:
2174   JBUF_UNLOCK (priv);
2175
2176   if (percent != -1)
2177     post_buffering_percent (jitterbuffer, percent);
2178
2179   return ret;
2180
2181   /* ERRORS */
2182 invalid_buffer:
2183   {
2184     /* this is not fatal but should be filtered earlier */
2185     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2186         ("Received invalid RTP payload, dropping"));
2187     gst_buffer_unref (buffer);
2188     return GST_FLOW_OK;
2189   }
2190 no_clock_rate:
2191   {
2192     GST_WARNING_OBJECT (jitterbuffer,
2193         "No clock-rate in caps!, dropping buffer");
2194     gst_buffer_unref (buffer);
2195     goto finished;
2196   }
2197 out_flushing:
2198   {
2199     ret = priv->srcresult;
2200     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2201     gst_buffer_unref (buffer);
2202     goto finished;
2203   }
2204 have_eos:
2205   {
2206     ret = GST_FLOW_EOS;
2207     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2208     gst_buffer_unref (buffer);
2209     goto finished;
2210   }
2211 too_late:
2212   {
2213     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2214         " popped, dropping", seqnum, priv->last_popped_seqnum);
2215     priv->num_late++;
2216     gst_buffer_unref (buffer);
2217     goto finished;
2218   }
2219 duplicate:
2220   {
2221     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2222         seqnum);
2223     priv->num_duplicates++;
2224     free_item (item);
2225     goto finished;
2226   }
2227 }
2228
2229 static GstClockTime
2230 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2231 {
2232   guint64 ext_time, elapsed;
2233   guint32 rtp_time;
2234   GstRtpJitterBufferPrivate *priv;
2235
2236   priv = jitterbuffer->priv;
2237   rtp_time = item->rtptime;
2238
2239   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2240       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2241
2242   if (rtp_time < priv->ext_timestamp) {
2243     ext_time = priv->ext_timestamp;
2244   } else {
2245     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2246   }
2247
2248   if (ext_time > priv->clock_base)
2249     elapsed = ext_time - priv->clock_base;
2250   else
2251     elapsed = 0;
2252
2253   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2254   return elapsed;
2255 }
2256
2257 static void
2258 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2259     RTPJitterBufferItem * item)
2260 {
2261   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2262
2263   if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2264       && priv->clock_base != -1 && priv->clock_rate > 0) {
2265     guint64 elapsed, estimated;
2266
2267     elapsed = compute_elapsed (jitterbuffer, item);
2268
2269     if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2270       guint64 left;
2271       GstClockTime out_time;
2272
2273       priv->last_elapsed = elapsed;
2274
2275       left = priv->npt_stop - priv->npt_start;
2276       GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2277           GST_TIME_ARGS (left));
2278
2279       out_time = item->dts;
2280
2281       if (elapsed > 0)
2282         estimated = gst_util_uint64_scale (out_time, left, elapsed);
2283       else {
2284         /* if there is almost nothing left,
2285          * we may never advance enough to end up in the above case */
2286         if (left < GST_SECOND)
2287           estimated = GST_SECOND;
2288         else
2289           estimated = -1;
2290       }
2291
2292       GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2293           GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2294
2295       if (estimated != -1 && priv->estimated_eos != estimated) {
2296         set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2297         priv->estimated_eos = estimated;
2298       }
2299     }
2300   }
2301 }
2302
2303 /* take a buffer from the queue and push it */
2304 static GstFlowReturn
2305 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2306 {
2307   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2308   GstFlowReturn result;
2309   RTPJitterBufferItem *item;
2310   GstBuffer *outbuf;
2311   GstEvent *outevent;
2312   GstClockTime dts, pts;
2313   gint percent = -1;
2314   gboolean is_buffer, do_push = TRUE;
2315
2316   /* when we get here we are ready to pop and push the buffer */
2317   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2318
2319   is_buffer = GST_IS_BUFFER (item->data);
2320
2321   if (is_buffer) {
2322     check_buffering_percent (jitterbuffer, &percent);
2323
2324     /* we need to make writable to change the flags and timestamps */
2325     outbuf = gst_buffer_make_writable (item->data);
2326
2327     if (G_UNLIKELY (priv->discont)) {
2328       /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2329        * into the jitterbuffer so we can modify now. */
2330       GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2331       GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2332       priv->discont = FALSE;
2333     }
2334     if (G_UNLIKELY (priv->ts_discont)) {
2335       GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2336       priv->ts_discont = FALSE;
2337     }
2338
2339     dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2340     pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2341
2342     /* apply timestamp with offset to buffer now */
2343     GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2344     GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2345
2346     /* update the elapsed time when we need to check against the npt stop time. */
2347     update_estimated_eos (jitterbuffer, item);
2348
2349     priv->last_out_time = GST_BUFFER_PTS (outbuf);
2350   } else {
2351     outevent = item->data;
2352     if (item->type == ITEM_TYPE_LOST) {
2353       priv->discont = TRUE;
2354       if (!priv->do_lost)
2355         do_push = FALSE;
2356     }
2357   }
2358
2359   /* now we are ready to push the buffer. Save the seqnum and release the lock
2360    * so the other end can push stuff in the queue again. */
2361   if (seqnum != -1) {
2362     priv->last_popped_seqnum = seqnum;
2363     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2364   }
2365   JBUF_UNLOCK (priv);
2366
2367   item->data = NULL;
2368   free_item (item);
2369
2370   if (is_buffer) {
2371     /* push buffer */
2372     if (percent != -1)
2373       post_buffering_percent (jitterbuffer, percent);
2374
2375     GST_DEBUG_OBJECT (jitterbuffer,
2376         "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2377         seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2378         GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2379     result = gst_pad_push (priv->srcpad, outbuf);
2380   } else {
2381     GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
2382
2383     if (do_push)
2384       gst_pad_push_event (priv->srcpad, outevent);
2385     else
2386       gst_event_unref (outevent);
2387
2388     result = GST_FLOW_OK;
2389   }
2390   JBUF_LOCK_CHECK (priv, out_flushing);
2391
2392   return result;
2393
2394   /* ERRORS */
2395 out_flushing:
2396   {
2397     return priv->srcresult;
2398   }
2399 }
2400
2401 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2402
2403 /* Peek a buffer and compare the seqnum to the expected seqnum.
2404  * If all is fine, the buffer is pushed.
2405  * If something is wrong, we wait for some event
2406  */
2407 static GstFlowReturn
2408 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2409 {
2410   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2411   GstFlowReturn result = GST_FLOW_OK;
2412   RTPJitterBufferItem *item;
2413   guint seqnum;
2414   guint32 next_seqnum;
2415   gint gap;
2416
2417   /* only push buffers when PLAYING and active and not buffering */
2418   if (priv->blocked || !priv->active ||
2419       rtp_jitter_buffer_is_buffering (priv->jbuf))
2420     return GST_FLOW_WAIT;
2421
2422 again:
2423   /* peek a buffer, we're just looking at the sequence number.
2424    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2425    * wait for a timeout or something to change.
2426    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2427   item = rtp_jitter_buffer_peek (priv->jbuf);
2428   if (item == NULL)
2429     goto wait;
2430
2431   /* get the seqnum and the next expected seqnum */
2432   seqnum = item->seqnum;
2433   if (seqnum == -1)
2434     goto do_push;
2435
2436   next_seqnum = priv->next_seqnum;
2437
2438   /* get the gap between this and the previous packet. If we don't know the
2439    * previous packet seqnum assume no gap. */
2440   if (G_UNLIKELY (next_seqnum == -1)) {
2441     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2442     /* we don't know what the next_seqnum should be, the chain function should
2443      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2444      * fires, so wait for that */
2445     result = GST_FLOW_WAIT;
2446   } else {
2447     /* else calculate GAP */
2448     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2449
2450     if (G_LIKELY (gap == 0)) {
2451     do_push:
2452       /* no missing packet, pop and push */
2453       result = pop_and_push_next (jitterbuffer, seqnum);
2454     } else if (G_UNLIKELY (gap < 0)) {
2455       RTPJitterBufferItem *item;
2456       /* if we have a packet that we already pushed or considered dropped, pop it
2457        * off and get the next packet */
2458       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2459           seqnum, next_seqnum);
2460       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2461       free_item (item);
2462       goto again;
2463     } else {
2464       /* the chain function has scheduled timers to request retransmission or
2465        * when to consider the packet lost, wait for that */
2466       GST_DEBUG_OBJECT (jitterbuffer,
2467           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2468           next_seqnum, seqnum, gap);
2469       result = GST_FLOW_WAIT;
2470     }
2471   }
2472   return result;
2473
2474 wait:
2475   {
2476     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2477     if (priv->eos)
2478       result = GST_FLOW_EOS;
2479     else
2480       result = GST_FLOW_WAIT;
2481     return result;
2482   }
2483 }
2484
2485 /* the timeout for when we expected a packet expired */
2486 static gboolean
2487 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2488     GstClockTime now)
2489 {
2490   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2491   GstEvent *event;
2492   guint delay;
2493
2494   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2495
2496   delay = timer->rtx_delay + timer->rtx_retry;
2497   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2498       gst_structure_new ("GstRTPRetransmissionRequest",
2499           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2500           "running-time", G_TYPE_UINT64, timer->rtx_base,
2501           "delay", G_TYPE_UINT, GST_TIME_AS_MSECONDS (delay),
2502           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2503           "frequency", G_TYPE_UINT, priv->rtx_retry_timeout,
2504           "period", G_TYPE_UINT, priv->rtx_retry_period,
2505           "deadline", G_TYPE_UINT, priv->latency_ms,
2506           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, NULL));
2507
2508   priv->num_rtx_requests++;
2509   timer->num_rtx_retry++;
2510   timer->rtx_last = now;
2511
2512   /* calculate the timeout for the next retransmission attempt */
2513   timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2514   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2515       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT,
2516       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2517       GST_TIME_ARGS (timer->rtx_retry));
2518
2519   if (timer->rtx_retry + timer->rtx_delay >
2520       (priv->rtx_retry_period * GST_MSECOND)) {
2521     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2522     /* too many retransmission request, we now convert the timer
2523      * to a lost timer, leave the num_rtx_retry as it is for stats */
2524     timer->type = TIMER_TYPE_LOST;
2525     timer->rtx_delay = 0;
2526     timer->rtx_retry = 0;
2527   }
2528   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2529       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2530
2531   JBUF_UNLOCK (priv);
2532   gst_pad_push_event (priv->sinkpad, event);
2533   JBUF_LOCK (priv);
2534
2535   return FALSE;
2536 }
2537
2538 /* a packet is lost */
2539 static gboolean
2540 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2541     GstClockTime now)
2542 {
2543   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2544   GstClockTime duration, timestamp;
2545   guint seqnum, lost_packets, num_rtx_retry;
2546   gboolean late;
2547   GstEvent *event;
2548   RTPJitterBufferItem *item;
2549
2550   seqnum = timer->seqnum;
2551   timestamp = apply_offset (jitterbuffer, timer->timeout);
2552   duration = timer->duration;
2553   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2554     duration = priv->packet_spacing;
2555   lost_packets = MAX (timer->num, 1);
2556   late = timer->num > 0;
2557   num_rtx_retry = timer->num_rtx_retry;
2558
2559   /* we had a gap and thus we lost some packets. Create an event for this.  */
2560   if (lost_packets > 1)
2561     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2562         seqnum + lost_packets - 1);
2563   else
2564     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2565
2566   priv->num_late += lost_packets;
2567   priv->num_rtx_failed += num_rtx_retry;
2568
2569   /* create paket lost event */
2570   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2571       gst_structure_new ("GstRTPPacketLost",
2572           "seqnum", G_TYPE_UINT, (guint) seqnum,
2573           "timestamp", G_TYPE_UINT64, timestamp,
2574           "duration", G_TYPE_UINT64, duration,
2575           "late", G_TYPE_BOOLEAN, late,
2576           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2577
2578   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2579   rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
2580
2581   /* remove timer now */
2582   remove_timer (jitterbuffer, timer);
2583   JBUF_SIGNAL_EVENT (priv);
2584
2585   return TRUE;
2586 }
2587
2588 static gboolean
2589 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2590     GstClockTime now)
2591 {
2592   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2593
2594   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2595   remove_timer (jitterbuffer, timer);
2596   JBUF_SIGNAL_EVENT (priv);
2597
2598   return TRUE;
2599 }
2600
2601 static gboolean
2602 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2603     GstClockTime now)
2604 {
2605   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2606
2607   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2608
2609   priv->next_seqnum = timer->seqnum;
2610   remove_timer (jitterbuffer, timer);
2611   JBUF_SIGNAL_EVENT (priv);
2612
2613   return TRUE;
2614 }
2615
2616 static gboolean
2617 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2618     GstClockTime now)
2619 {
2620   gboolean removed = FALSE;
2621
2622   switch (timer->type) {
2623     case TIMER_TYPE_EXPECTED:
2624       removed = do_expected_timeout (jitterbuffer, timer, now);
2625       break;
2626     case TIMER_TYPE_LOST:
2627       removed = do_lost_timeout (jitterbuffer, timer, now);
2628       break;
2629     case TIMER_TYPE_DEADLINE:
2630       removed = do_deadline_timeout (jitterbuffer, timer, now);
2631       break;
2632     case TIMER_TYPE_EOS:
2633       removed = do_eos_timeout (jitterbuffer, timer, now);
2634       break;
2635   }
2636   return removed;
2637 }
2638
2639 /* called when we need to wait for the next timeout.
2640  *
2641  * We loop over the array of recorded timeouts and wait for the earliest one.
2642  * When it timed out, do the logic associated with the timer.
2643  *
2644  * If there are no timers, we wait on a gcond until something new happens.
2645  */
2646 static void
2647 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2648 {
2649   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2650   GstClockTime now = 0;
2651
2652   JBUF_LOCK (priv);
2653   while (priv->timer_running) {
2654     TimerData *timer = NULL;
2655     GstClockTime timer_timeout = -1;
2656     gint i, len;
2657
2658     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
2659         GST_TIME_ARGS (now));
2660
2661     len = priv->timers->len;
2662     for (i = 0; i < len; i++) {
2663       TimerData *test = &g_array_index (priv->timers, TimerData, i);
2664       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2665       gboolean save_best = FALSE;
2666
2667       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2668           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2669
2670       /* find the smallest timeout */
2671       if (timer == NULL) {
2672         save_best = TRUE;
2673       } else if (timer_timeout == -1) {
2674         /* we already have an immediate timeout, the new timer must be an
2675          * immediate timer with smaller seqnum to become the best */
2676         if (test_timeout == -1 && test->seqnum < timer->seqnum)
2677           save_best = TRUE;
2678       } else if (test_timeout == -1) {
2679         /* first immediate timer */
2680         save_best = TRUE;
2681       } else if (test_timeout < timer_timeout) {
2682         /* earlier timer */
2683         save_best = TRUE;
2684       } else if (test_timeout == timer_timeout && test->seqnum < timer->seqnum) {
2685         /* same timer, smaller seqnum */
2686         save_best = TRUE;
2687       }
2688       if (save_best) {
2689         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
2690         timer = test;
2691         timer_timeout = test_timeout;
2692       }
2693     }
2694     if (timer && !priv->blocked) {
2695       GstClock *clock;
2696       GstClockTime sync_time;
2697       GstClockID id;
2698       GstClockReturn ret;
2699       GstClockTimeDiff clock_jitter;
2700
2701       if (timer_timeout == -1 || timer_timeout <= now) {
2702         do_timeout (jitterbuffer, timer, now);
2703         /* check here, do_timeout could have released the lock */
2704         if (!priv->timer_running)
2705           break;
2706         continue;
2707       }
2708
2709       GST_OBJECT_LOCK (jitterbuffer);
2710       clock = GST_ELEMENT_CLOCK (jitterbuffer);
2711       if (!clock) {
2712         GST_OBJECT_UNLOCK (jitterbuffer);
2713         /* let's just push if there is no clock */
2714         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2715         now = timer_timeout;
2716         continue;
2717       }
2718
2719       /* prepare for sync against clock */
2720       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2721       /* add latency of peer to get input time */
2722       sync_time += priv->peer_latency;
2723
2724       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2725           " with sync time %" GST_TIME_FORMAT,
2726           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2727
2728       /* create an entry for the clock */
2729       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2730       priv->timer_timeout = timer_timeout;
2731       priv->timer_seqnum = timer->seqnum;
2732       GST_OBJECT_UNLOCK (jitterbuffer);
2733
2734       /* release the lock so that the other end can push stuff or unlock */
2735       JBUF_UNLOCK (priv);
2736
2737       ret = gst_clock_id_wait (id, &clock_jitter);
2738
2739       JBUF_LOCK (priv);
2740       if (!priv->timer_running)
2741         break;
2742
2743       if (ret != GST_CLOCK_UNSCHEDULED) {
2744         now = timer_timeout + MAX (clock_jitter, 0);
2745         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2746             ret, priv->timer_seqnum, clock_jitter);
2747       } else {
2748         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
2749       }
2750       /* and free the entry */
2751       gst_clock_id_unref (id);
2752       priv->clock_id = NULL;
2753     } else {
2754       /* no timers, wait for activity */
2755       JBUF_WAIT_TIMER (priv);
2756     }
2757   }
2758   JBUF_UNLOCK (priv);
2759
2760   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2761   return;
2762 }
2763
2764 /*
2765  * This funcion implements the main pushing loop on the source pad.
2766  *
2767  * It first tries to push as many buffers as possible. If there is a seqnum
2768  * mismatch, we wait for the next timeouts.
2769  */
2770 static void
2771 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2772 {
2773   GstRtpJitterBufferPrivate *priv;
2774   GstFlowReturn result;
2775
2776   priv = jitterbuffer->priv;
2777
2778   JBUF_LOCK_CHECK (priv, flushing);
2779   do {
2780     result = handle_next_buffer (jitterbuffer);
2781     if (G_LIKELY (result == GST_FLOW_WAIT)) {
2782       /* now wait for the next event */
2783       JBUF_WAIT_EVENT (priv, flushing);
2784       result = GST_FLOW_OK;
2785     }
2786   }
2787   while (result == GST_FLOW_OK);
2788   /* store result for upstream */
2789   priv->srcresult = result;
2790   JBUF_UNLOCK (priv);
2791
2792   /* if we get here we need to pause */
2793   goto pause;
2794
2795   /* ERRORS */
2796 flushing:
2797   {
2798     result = priv->srcresult;
2799     JBUF_UNLOCK (priv);
2800     goto pause;
2801   }
2802 pause:
2803   {
2804     const gchar *reason = gst_flow_get_name (result);
2805     GstEvent *event;
2806
2807     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2808     gst_pad_pause_task (priv->srcpad);
2809     if (result == GST_FLOW_EOS) {
2810       event = gst_event_new_eos ();
2811       gst_pad_push_event (priv->srcpad, event);
2812     }
2813     return;
2814   }
2815 }
2816
2817 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2818  * some sanity checks and then emit the handle-sync signal with the parameters.
2819  * This function must be called with the LOCK */
2820 static void
2821 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2822 {
2823   GstRtpJitterBufferPrivate *priv;
2824   guint64 base_rtptime, base_time;
2825   guint32 clock_rate;
2826   guint64 last_rtptime;
2827   guint64 clock_base;
2828   guint64 ext_rtptime, diff;
2829   gboolean drop = FALSE;
2830
2831   priv = jitterbuffer->priv;
2832
2833   /* get the last values from the jitterbuffer */
2834   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2835       &clock_rate, &last_rtptime);
2836
2837   clock_base = priv->clock_base;
2838   ext_rtptime = priv->ext_rtptime;
2839
2840   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2841       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2842       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2843       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2844
2845   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2846     GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2847     drop = TRUE;
2848   } else {
2849     /* we can't accept anything that happened before we did the last resync */
2850     if (base_rtptime > ext_rtptime) {
2851       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2852       drop = TRUE;
2853     } else {
2854       /* the SR RTP timestamp must be something close to what we last observed
2855        * in the jitterbuffer */
2856       if (ext_rtptime > last_rtptime) {
2857         /* check how far ahead it is to our RTP timestamps */
2858         diff = ext_rtptime - last_rtptime;
2859         /* if bigger than 1 second, we drop it */
2860         if (diff > clock_rate) {
2861           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2862           /* should drop this, but some RTSP servers end up with bogus
2863            * way too ahead RTCP packet when repeated PAUSE/PLAY,
2864            * so still trigger rptbin sync but invalidate RTCP data
2865            * (sync might use other methods) */
2866           ext_rtptime = -1;
2867         }
2868         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2869             G_GUINT64_FORMAT, last_rtptime, diff);
2870       }
2871     }
2872   }
2873
2874   if (!drop) {
2875     GstStructure *s;
2876
2877     s = gst_structure_new ("application/x-rtp-sync",
2878         "base-rtptime", G_TYPE_UINT64, base_rtptime,
2879         "base-time", G_TYPE_UINT64, base_time,
2880         "clock-rate", G_TYPE_UINT, clock_rate,
2881         "clock-base", G_TYPE_UINT64, clock_base,
2882         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2883         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2884
2885     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2886     gst_buffer_replace (&priv->last_sr, NULL);
2887     JBUF_UNLOCK (priv);
2888     g_signal_emit (jitterbuffer,
2889         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2890     JBUF_LOCK (priv);
2891     gst_structure_free (s);
2892   } else {
2893     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2894   }
2895 }
2896
2897 static GstFlowReturn
2898 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2899     GstBuffer * buffer)
2900 {
2901   GstRtpJitterBuffer *jitterbuffer;
2902   GstRtpJitterBufferPrivate *priv;
2903   GstFlowReturn ret = GST_FLOW_OK;
2904   guint32 ssrc;
2905   GstRTCPPacket packet;
2906   guint64 ext_rtptime;
2907   guint32 rtptime;
2908   GstRTCPBuffer rtcp = { NULL, };
2909
2910   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2911
2912   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2913     goto invalid_buffer;
2914
2915   priv = jitterbuffer->priv;
2916
2917   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2918
2919   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2920     goto empty_buffer;
2921
2922   /* first packet must be SR or RR or else the validate would have failed */
2923   switch (gst_rtcp_packet_get_type (&packet)) {
2924     case GST_RTCP_TYPE_SR:
2925       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2926           NULL, NULL);
2927       break;
2928     default:
2929       goto ignore_buffer;
2930   }
2931   gst_rtcp_buffer_unmap (&rtcp);
2932
2933   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2934
2935   JBUF_LOCK (priv);
2936   /* convert the RTP timestamp to our extended timestamp, using the same offset
2937    * we used in the jitterbuffer */
2938   ext_rtptime = priv->jbuf->ext_rtptime;
2939   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2940
2941   priv->ext_rtptime = ext_rtptime;
2942   gst_buffer_replace (&priv->last_sr, buffer);
2943
2944   do_handle_sync (jitterbuffer);
2945   JBUF_UNLOCK (priv);
2946
2947 done:
2948   gst_buffer_unref (buffer);
2949
2950   return ret;
2951
2952 invalid_buffer:
2953   {
2954     /* this is not fatal but should be filtered earlier */
2955     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2956         ("Received invalid RTCP payload, dropping"));
2957     ret = GST_FLOW_OK;
2958     goto done;
2959   }
2960 empty_buffer:
2961   {
2962     /* this is not fatal but should be filtered earlier */
2963     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2964         ("Received empty RTCP payload, dropping"));
2965     gst_rtcp_buffer_unmap (&rtcp);
2966     ret = GST_FLOW_OK;
2967     goto done;
2968   }
2969 ignore_buffer:
2970   {
2971     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2972     gst_rtcp_buffer_unmap (&rtcp);
2973     ret = GST_FLOW_OK;
2974     goto done;
2975   }
2976 }
2977
2978 static gboolean
2979 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2980     GstQuery * query)
2981 {
2982   gboolean res = FALSE;
2983
2984   switch (GST_QUERY_TYPE (query)) {
2985     case GST_QUERY_CAPS:
2986     {
2987       GstCaps *filter, *caps;
2988
2989       gst_query_parse_caps (query, &filter);
2990       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2991       gst_query_set_caps_result (query, caps);
2992       gst_caps_unref (caps);
2993       res = TRUE;
2994       break;
2995     }
2996     default:
2997       if (GST_QUERY_IS_SERIALIZED (query)) {
2998         GST_WARNING_OBJECT (pad, "unhandled serialized query");
2999         res = FALSE;
3000       } else {
3001         res = gst_pad_query_default (pad, parent, query);
3002       }
3003       break;
3004   }
3005   return res;
3006 }
3007
3008 static gboolean
3009 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3010     GstQuery * query)
3011 {
3012   GstRtpJitterBuffer *jitterbuffer;
3013   GstRtpJitterBufferPrivate *priv;
3014   gboolean res = FALSE;
3015
3016   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3017   priv = jitterbuffer->priv;
3018
3019   switch (GST_QUERY_TYPE (query)) {
3020     case GST_QUERY_LATENCY:
3021     {
3022       /* We need to send the query upstream and add the returned latency to our
3023        * own */
3024       GstClockTime min_latency, max_latency;
3025       gboolean us_live;
3026       GstClockTime our_latency;
3027
3028       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3029         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3030
3031         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3032             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3033             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3034
3035         /* store this so that we can safely sync on the peer buffers. */
3036         JBUF_LOCK (priv);
3037         priv->peer_latency = min_latency;
3038         our_latency = priv->latency_ns;
3039         JBUF_UNLOCK (priv);
3040
3041         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3042             GST_TIME_ARGS (our_latency));
3043
3044         /* we add some latency but can buffer an infinite amount of time */
3045         min_latency += our_latency;
3046         max_latency = -1;
3047
3048         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3049             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3050             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3051
3052         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3053       }
3054       break;
3055     }
3056     case GST_QUERY_POSITION:
3057     {
3058       GstClockTime start, last_out;
3059       GstFormat fmt;
3060
3061       gst_query_parse_position (query, &fmt, NULL);
3062       if (fmt != GST_FORMAT_TIME) {
3063         res = gst_pad_query_default (pad, parent, query);
3064         break;
3065       }
3066
3067       JBUF_LOCK (priv);
3068       start = priv->npt_start;
3069       last_out = priv->last_out_time;
3070       JBUF_UNLOCK (priv);
3071
3072       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3073           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3074           GST_TIME_ARGS (last_out));
3075
3076       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3077         /* bring 0-based outgoing time to stream time */
3078         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3079         res = TRUE;
3080       } else {
3081         res = gst_pad_query_default (pad, parent, query);
3082       }
3083       break;
3084     }
3085     case GST_QUERY_CAPS:
3086     {
3087       GstCaps *filter, *caps;
3088
3089       gst_query_parse_caps (query, &filter);
3090       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3091       gst_query_set_caps_result (query, caps);
3092       gst_caps_unref (caps);
3093       res = TRUE;
3094       break;
3095     }
3096     default:
3097       res = gst_pad_query_default (pad, parent, query);
3098       break;
3099   }
3100
3101   return res;
3102 }
3103
3104 static void
3105 gst_rtp_jitter_buffer_set_property (GObject * object,
3106     guint prop_id, const GValue * value, GParamSpec * pspec)
3107 {
3108   GstRtpJitterBuffer *jitterbuffer;
3109   GstRtpJitterBufferPrivate *priv;
3110
3111   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3112   priv = jitterbuffer->priv;
3113
3114   switch (prop_id) {
3115     case PROP_LATENCY:
3116     {
3117       guint new_latency, old_latency;
3118
3119       new_latency = g_value_get_uint (value);
3120
3121       JBUF_LOCK (priv);
3122       old_latency = priv->latency_ms;
3123       priv->latency_ms = new_latency;
3124       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3125       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3126       JBUF_UNLOCK (priv);
3127
3128       /* post message if latency changed, this will inform the parent pipeline
3129        * that a latency reconfiguration is possible/needed. */
3130       if (new_latency != old_latency) {
3131         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3132             GST_TIME_ARGS (new_latency * GST_MSECOND));
3133
3134         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3135             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3136       }
3137       break;
3138     }
3139     case PROP_DROP_ON_LATENCY:
3140       JBUF_LOCK (priv);
3141       priv->drop_on_latency = g_value_get_boolean (value);
3142       JBUF_UNLOCK (priv);
3143       break;
3144     case PROP_TS_OFFSET:
3145       JBUF_LOCK (priv);
3146       priv->ts_offset = g_value_get_int64 (value);
3147       priv->ts_discont = TRUE;
3148       JBUF_UNLOCK (priv);
3149       break;
3150     case PROP_DO_LOST:
3151       JBUF_LOCK (priv);
3152       priv->do_lost = g_value_get_boolean (value);
3153       JBUF_UNLOCK (priv);
3154       break;
3155     case PROP_MODE:
3156       JBUF_LOCK (priv);
3157       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3158       JBUF_UNLOCK (priv);
3159       break;
3160     case PROP_DO_RETRANSMISSION:
3161       JBUF_LOCK (priv);
3162       priv->do_retransmission = g_value_get_boolean (value);
3163       JBUF_UNLOCK (priv);
3164       break;
3165     case PROP_RTX_DELAY:
3166       JBUF_LOCK (priv);
3167       priv->rtx_delay = g_value_get_int (value);
3168       JBUF_UNLOCK (priv);
3169       break;
3170     case PROP_RTX_DELAY_REORDER:
3171       JBUF_LOCK (priv);
3172       priv->rtx_delay_reorder = g_value_get_int (value);
3173       JBUF_UNLOCK (priv);
3174       break;
3175     case PROP_RTX_RETRY_TIMEOUT:
3176       JBUF_LOCK (priv);
3177       priv->rtx_retry_timeout = g_value_get_int (value);
3178       JBUF_UNLOCK (priv);
3179       break;
3180     case PROP_RTX_RETRY_PERIOD:
3181       JBUF_LOCK (priv);
3182       priv->rtx_retry_period = g_value_get_int (value);
3183       JBUF_UNLOCK (priv);
3184       break;
3185     default:
3186       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3187       break;
3188   }
3189 }
3190
3191 static void
3192 gst_rtp_jitter_buffer_get_property (GObject * object,
3193     guint prop_id, GValue * value, GParamSpec * pspec)
3194 {
3195   GstRtpJitterBuffer *jitterbuffer;
3196   GstRtpJitterBufferPrivate *priv;
3197
3198   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3199   priv = jitterbuffer->priv;
3200
3201   switch (prop_id) {
3202     case PROP_LATENCY:
3203       JBUF_LOCK (priv);
3204       g_value_set_uint (value, priv->latency_ms);
3205       JBUF_UNLOCK (priv);
3206       break;
3207     case PROP_DROP_ON_LATENCY:
3208       JBUF_LOCK (priv);
3209       g_value_set_boolean (value, priv->drop_on_latency);
3210       JBUF_UNLOCK (priv);
3211       break;
3212     case PROP_TS_OFFSET:
3213       JBUF_LOCK (priv);
3214       g_value_set_int64 (value, priv->ts_offset);
3215       JBUF_UNLOCK (priv);
3216       break;
3217     case PROP_DO_LOST:
3218       JBUF_LOCK (priv);
3219       g_value_set_boolean (value, priv->do_lost);
3220       JBUF_UNLOCK (priv);
3221       break;
3222     case PROP_MODE:
3223       JBUF_LOCK (priv);
3224       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3225       JBUF_UNLOCK (priv);
3226       break;
3227     case PROP_PERCENT:
3228     {
3229       gint percent;
3230
3231       JBUF_LOCK (priv);
3232       if (priv->srcresult != GST_FLOW_OK)
3233         percent = 100;
3234       else
3235         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3236
3237       g_value_set_int (value, percent);
3238       JBUF_UNLOCK (priv);
3239       break;
3240     }
3241     case PROP_DO_RETRANSMISSION:
3242       JBUF_LOCK (priv);
3243       g_value_set_boolean (value, priv->do_retransmission);
3244       JBUF_UNLOCK (priv);
3245       break;
3246     case PROP_RTX_DELAY:
3247       JBUF_LOCK (priv);
3248       g_value_set_int (value, priv->rtx_delay);
3249       JBUF_UNLOCK (priv);
3250       break;
3251     case PROP_RTX_DELAY_REORDER:
3252       JBUF_LOCK (priv);
3253       g_value_set_int (value, priv->rtx_delay_reorder);
3254       JBUF_UNLOCK (priv);
3255       break;
3256     case PROP_RTX_RETRY_TIMEOUT:
3257       JBUF_LOCK (priv);
3258       g_value_set_int (value, priv->rtx_retry_timeout);
3259       JBUF_UNLOCK (priv);
3260       break;
3261     case PROP_RTX_RETRY_PERIOD:
3262       JBUF_LOCK (priv);
3263       g_value_set_int (value, priv->rtx_retry_period);
3264       JBUF_UNLOCK (priv);
3265       break;
3266     case PROP_STATS:
3267       g_value_take_boxed (value,
3268           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3269       break;
3270     default:
3271       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3272       break;
3273   }
3274 }
3275
3276 static GstStructure *
3277 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3278 {
3279   GstStructure *s;
3280
3281   JBUF_LOCK (jbuf->priv);
3282   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3283       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3284       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3285       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3286       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3287   JBUF_UNLOCK (jbuf->priv);
3288
3289   return s;
3290 }