WIP: rtpjitterbuffer: Add RFC7273 media clock handling
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *  Copyright 2015 Kurento (http://kurento.org/)
9  *   @author: Miguel ParĂ­s <mparisdiaz@gmail.com>
10  *
11  * This library is free software; you can redistribute it and/or
12  * modify it under the terms of the GNU Library General Public
13  * License as published by the Free Software Foundation; either
14  * version 2 of the License, or (at your option) any later version.
15  *
16  * This library is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
19  * Library General Public License for more details.
20  *
21  * You should have received a copy of the GNU Library General Public
22  * License along with this library; if not, write to the
23  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
24  * Boston, MA 02110-1301, USA.
25  *
26  */
27
28 /**
29  * SECTION:element-rtpjitterbuffer
30  *
31  * This element reorders and removes duplicate RTP packets as they are received
32  * from a network source.
33  *
34  * The element needs the clock-rate of the RTP payload in order to estimate the
35  * delay. This information is obtained either from the caps on the sink pad or,
36  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
37  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
38  *
39  * The rtpjitterbuffer will wait for missing packets up to a configurable time
40  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
41  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
42  * property is set, lost packets will result in a custom serialized downstream
43  * event of name GstRTPPacketLost. The lost packet events are usually used by a
44  * depayloader or other element to create concealment data or some other logic
45  * to gracefully handle the missing packets.
46  *
47  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
48  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
49  * buffer.
50  *
51  * The jitterbuffer can also be configured to send early retransmission events
52  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
53  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
54  * sends a custom upstream event named GstRTPRetransmissionRequest when the
55  * packet is considered late. The initial expected packet arrival time is
56  * calculated as follows:
57  *
58  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
59  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
60  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
61  *     packets with different rtptime.
62  *
63  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
64  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
65  *     previously scheduled timeout is overwritten.
66  *
67  * - If seqnum N arrived, all seqnum older than
68  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
69  *     immediately. This is to request fast feedback for abonormally reorder
70  *     packets before any of the previous timeouts is triggered.
71  *
72  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
73  * event. After the initial timeout expires and the retransmission event is
74  * sent, the timeout is scheduled for
75  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
76  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
77  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
78  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
79  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
80  * retransmission requests are sent and the regular logic is performed to
81  * schedule a lost packet as discussed above.
82  *
83  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
84  * to the pipeline.
85  *
86  * This element will automatically be used inside rtpbin.
87  *
88  * <refsect2>
89  * <title>Example pipelines</title>
90  * |[
91  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
92  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
93  * inserted into the pipeline to smooth out network jitter and to reorder the
94  * out-of-order RTP packets.
95  * </refsect2>
96  */
97
98 #ifdef HAVE_CONFIG_H
99 #include "config.h"
100 #endif
101
102 #include <stdlib.h>
103 #include <stdio.h>
104 #include <string.h>
105 #include <gst/rtp/gstrtpbuffer.h>
106 #include <gst/net/net.h>
107
108 #include "gstrtpjitterbuffer.h"
109 #include "rtpjitterbuffer.h"
110 #include "rtpstats.h"
111
112 #include <gst/glib-compat-private.h>
113
114 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
115 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
116
117 /* RTPJitterBuffer signals and args */
118 enum
119 {
120   SIGNAL_REQUEST_PT_MAP,
121   SIGNAL_CLEAR_PT_MAP,
122   SIGNAL_HANDLE_SYNC,
123   SIGNAL_ON_NPT_STOP,
124   SIGNAL_SET_ACTIVE,
125   LAST_SIGNAL
126 };
127
128 #define DEFAULT_LATENCY_MS          200
129 #define DEFAULT_DROP_ON_LATENCY     FALSE
130 #define DEFAULT_TS_OFFSET           0
131 #define DEFAULT_DO_LOST             FALSE
132 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
133 #define DEFAULT_PERCENT             0
134 #define DEFAULT_DO_RETRANSMISSION   FALSE
135 #define DEFAULT_RTX_NEXT_SEQNUM     TRUE
136 #define DEFAULT_RTX_DELAY           -1
137 #define DEFAULT_RTX_MIN_DELAY       0
138 #define DEFAULT_RTX_DELAY_REORDER   3
139 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
140 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
141 #define DEFAULT_RTX_RETRY_PERIOD    -1
142 #define DEFAULT_RTX_MAX_RETRIES    -1
143 #define DEFAULT_MAX_RTCP_RTP_TIME_DIFF 1000
144 #define DEFAULT_MAX_DROPOUT_TIME    60000
145 #define DEFAULT_MAX_MISORDER_TIME   2000
146
147 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
148 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
149
150 enum
151 {
152   PROP_0,
153   PROP_LATENCY,
154   PROP_DROP_ON_LATENCY,
155   PROP_TS_OFFSET,
156   PROP_DO_LOST,
157   PROP_MODE,
158   PROP_PERCENT,
159   PROP_DO_RETRANSMISSION,
160   PROP_RTX_NEXT_SEQNUM,
161   PROP_RTX_DELAY,
162   PROP_RTX_MIN_DELAY,
163   PROP_RTX_DELAY_REORDER,
164   PROP_RTX_RETRY_TIMEOUT,
165   PROP_RTX_MIN_RETRY_TIMEOUT,
166   PROP_RTX_RETRY_PERIOD,
167   PROP_RTX_MAX_RETRIES,
168   PROP_STATS,
169   PROP_MAX_RTCP_RTP_TIME_DIFF,
170   PROP_MAX_DROPOUT_TIME,
171   PROP_MAX_MISORDER_TIME
172 };
173
174 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
175
176 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
177   JBUF_LOCK (priv);                                   \
178   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
179     goto label;                                       \
180 } G_STMT_END
181 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
182
183 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
184   GST_DEBUG ("waiting timer");                            \
185   (priv)->waiting_timer = TRUE;                           \
186   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
187   (priv)->waiting_timer = FALSE;                          \
188   GST_DEBUG ("waiting timer done");                       \
189 } G_STMT_END
190 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
191   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
192     GST_DEBUG ("signal timer");                           \
193     g_cond_signal (&(priv)->jbuf_timer);                  \
194   }                                                       \
195 } G_STMT_END
196
197 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
198   GST_DEBUG ("waiting event");                           \
199   (priv)->waiting_event = TRUE;                          \
200   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
201   (priv)->waiting_event = FALSE;                         \
202   GST_DEBUG ("waiting event done");                      \
203   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
204     goto label;                                          \
205 } G_STMT_END
206 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
207   if (G_UNLIKELY ((priv)->waiting_event)) {              \
208     GST_DEBUG ("signal event");                          \
209     g_cond_signal (&(priv)->jbuf_event);                 \
210   }                                                      \
211 } G_STMT_END
212
213 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
214   GST_DEBUG ("waiting query");                           \
215   (priv)->waiting_query = TRUE;                          \
216   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
217   (priv)->waiting_query = FALSE;                         \
218   GST_DEBUG ("waiting query done");                      \
219   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
220     goto label;                                          \
221 } G_STMT_END
222 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
223   (priv)->last_query = res;                              \
224   if (G_UNLIKELY ((priv)->waiting_query)) {              \
225     GST_DEBUG ("signal query");                          \
226     g_cond_signal (&(priv)->jbuf_query);                 \
227   }                                                      \
228 } G_STMT_END
229
230
231 struct _GstRtpJitterBufferPrivate
232 {
233   GstPad *sinkpad, *srcpad;
234   GstPad *rtcpsinkpad;
235
236   RTPJitterBuffer *jbuf;
237   GMutex jbuf_lock;
238   gboolean waiting_timer;
239   GCond jbuf_timer;
240   gboolean waiting_event;
241   GCond jbuf_event;
242   gboolean waiting_query;
243   GCond jbuf_query;
244   gboolean last_query;
245   gboolean discont;
246   gboolean ts_discont;
247   gboolean active;
248   guint64 out_offset;
249
250   gboolean timer_running;
251   GThread *timer_thread;
252
253   /* properties */
254   guint latency_ms;
255   guint64 latency_ns;
256   gboolean drop_on_latency;
257   gint64 ts_offset;
258   gboolean do_lost;
259   gboolean do_retransmission;
260   gboolean rtx_next_seqnum;
261   gint rtx_delay;
262   guint rtx_min_delay;
263   gint rtx_delay_reorder;
264   gint rtx_retry_timeout;
265   gint rtx_min_retry_timeout;
266   gint rtx_retry_period;
267   gint rtx_max_retries;
268   gint max_rtcp_rtp_time_diff;
269   guint32 max_dropout_time;
270   guint32 max_misorder_time;
271
272   /* the last seqnum we pushed out */
273   guint32 last_popped_seqnum;
274   /* the next expected seqnum we push */
275   guint32 next_seqnum;
276   /* seqnum-base, if known */
277   guint32 seqnum_base;
278   /* last output time */
279   GstClockTime last_out_time;
280   /* last valid input timestamp and rtptime pair */
281   GstClockTime ips_dts;
282   guint64 ips_rtptime;
283   GstClockTime packet_spacing;
284
285   GQueue gap_packets;
286
287   /* the next expected seqnum we receive */
288   GstClockTime last_in_dts;
289   guint32 next_in_seqnum;
290
291   GArray *timers;
292
293   /* start and stop ranges */
294   GstClockTime npt_start;
295   GstClockTime npt_stop;
296   guint64 ext_timestamp;
297   guint64 last_elapsed;
298   guint64 estimated_eos;
299   GstClockID eos_id;
300
301   /* state */
302   gboolean eos;
303   guint last_percent;
304
305   /* clock rate and rtp timestamp offset */
306   gint last_pt;
307   gint32 clock_rate;
308   gint64 clock_base;
309   gint64 prev_ts_offset;
310
311   /* when we are shutting down */
312   GstFlowReturn srcresult;
313   gboolean blocked;
314
315   /* for sync */
316   GstSegment segment;
317   GstClockID clock_id;
318   GstClockTime timer_timeout;
319   guint16 timer_seqnum;
320   /* the latency of the upstream peer, we have to take this into account when
321    * synchronizing the buffers. */
322   GstClockTime peer_latency;
323   guint64 ext_rtptime;
324   GstBuffer *last_sr;
325
326   /* some accounting */
327   guint64 num_late;
328   guint64 num_duplicates;
329   guint64 num_rtx_requests;
330   guint64 num_rtx_success;
331   guint64 num_rtx_failed;
332   gdouble avg_rtx_num;
333   guint64 avg_rtx_rtt;
334   RTPPacketRateCtx packet_rate_ctx;
335
336   /* for the jitter */
337   GstClockTime last_dts;
338   guint64 last_rtptime;
339   GstClockTime avg_jitter;
340 };
341
342 typedef enum
343 {
344   TIMER_TYPE_EXPECTED,
345   TIMER_TYPE_LOST,
346   TIMER_TYPE_DEADLINE,
347   TIMER_TYPE_EOS
348 } TimerType;
349
350 typedef struct
351 {
352   guint idx;
353   guint16 seqnum;
354   guint num;
355   TimerType type;
356   GstClockTime timeout;
357   GstClockTime duration;
358   GstClockTime rtx_base;
359   GstClockTime rtx_delay;
360   GstClockTime rtx_retry;
361   GstClockTime rtx_last;
362   guint num_rtx_retry;
363 } TimerData;
364
365 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
366   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
367                                 GstRtpJitterBufferPrivate))
368
369 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
370 GST_STATIC_PAD_TEMPLATE ("sink",
371     GST_PAD_SINK,
372     GST_PAD_ALWAYS,
373     GST_STATIC_CAPS ("application/x-rtp"
374         /* "clock-rate = (int) [ 1, 2147483647 ], "
375          * "payload = (int) , "
376          * "encoding-name = (string) "
377          */ )
378     );
379
380 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
381 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
382     GST_PAD_SINK,
383     GST_PAD_REQUEST,
384     GST_STATIC_CAPS ("application/x-rtcp")
385     );
386
387 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
388 GST_STATIC_PAD_TEMPLATE ("src",
389     GST_PAD_SRC,
390     GST_PAD_ALWAYS,
391     GST_STATIC_CAPS ("application/x-rtp"
392         /* "payload = (int) , "
393          * "clock-rate = (int) , "
394          * "encoding-name = (string) "
395          */ )
396     );
397
398 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
399
400 #define gst_rtp_jitter_buffer_parent_class parent_class
401 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
402
403 /* object overrides */
404 static void gst_rtp_jitter_buffer_set_property (GObject * object,
405     guint prop_id, const GValue * value, GParamSpec * pspec);
406 static void gst_rtp_jitter_buffer_get_property (GObject * object,
407     guint prop_id, GValue * value, GParamSpec * pspec);
408 static void gst_rtp_jitter_buffer_finalize (GObject * object);
409
410 /* element overrides */
411 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
412     * element, GstStateChange transition);
413 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
414     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
415 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
416     GstPad * pad);
417 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
418 static gboolean gst_rtp_jitter_buffer_set_clock (GstElement * element,
419     GstClock * clock);
420
421 /* pad overrides */
422 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
423 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
424     GstObject * parent);
425
426 /* sinkpad overrides */
427 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
428     GstObject * parent, GstEvent * event);
429 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
430     GstObject * parent, GstBuffer * buffer);
431
432 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
433     GstObject * parent, GstEvent * event);
434 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
435     GstObject * parent, GstBuffer * buffer);
436
437 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
438     GstObject * parent, GstQuery * query);
439
440 /* srcpad overrides */
441 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
442     GstObject * parent, GstEvent * event);
443 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
444     GstObject * parent, GstPadMode mode, gboolean active);
445 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
446 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
447     GstObject * parent, GstQuery * query);
448
449 static void
450 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
451 static GstClockTime
452 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
453     gboolean active, guint64 base_time);
454 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
455
456 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
457 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
458
459 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
460
461 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
462     jitterbuffer);
463
464 static void
465 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
466 {
467   GObjectClass *gobject_class;
468   GstElementClass *gstelement_class;
469
470   gobject_class = (GObjectClass *) klass;
471   gstelement_class = (GstElementClass *) klass;
472
473   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
474
475   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
476
477   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
478   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
479
480   /**
481    * GstRtpJitterBuffer:latency:
482    *
483    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
484    * for at most this time.
485    */
486   g_object_class_install_property (gobject_class, PROP_LATENCY,
487       g_param_spec_uint ("latency", "Buffer latency in ms",
488           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
489           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
490   /**
491    * GstRtpJitterBuffer:drop-on-latency:
492    *
493    * Drop oldest buffers when the queue is completely filled.
494    */
495   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
496       g_param_spec_boolean ("drop-on-latency",
497           "Drop buffers when maximum latency is reached",
498           "Tells the jitterbuffer to never exceed the given latency in size",
499           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
500   /**
501    * GstRtpJitterBuffer:ts-offset:
502    *
503    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
504    * This is mainly used to ensure interstream synchronisation.
505    */
506   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
507       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
508           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
509           G_MAXINT64, DEFAULT_TS_OFFSET,
510           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
511
512   /**
513    * GstRtpJitterBuffer:do-lost:
514    *
515    * Send out a GstRTPPacketLost event downstream when a packet is considered
516    * lost.
517    */
518   g_object_class_install_property (gobject_class, PROP_DO_LOST,
519       g_param_spec_boolean ("do-lost", "Do Lost",
520           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
521           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
522
523   /**
524    * GstRtpJitterBuffer:mode:
525    *
526    * Control the buffering and timestamping mode used by the jitterbuffer.
527    */
528   g_object_class_install_property (gobject_class, PROP_MODE,
529       g_param_spec_enum ("mode", "Mode",
530           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
531           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
532   /**
533    * GstRtpJitterBuffer:percent:
534    *
535    * The percent of the jitterbuffer that is filled.
536    */
537   g_object_class_install_property (gobject_class, PROP_PERCENT,
538       g_param_spec_int ("percent", "percent",
539           "The buffer filled percent", 0, 100,
540           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
541   /**
542    * GstRtpJitterBuffer:do-retransmission:
543    *
544    * Send out a GstRTPRetransmission event upstream when a packet is considered
545    * late and should be retransmitted.
546    *
547    * Since: 1.2
548    */
549   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
550       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
551           "Send retransmission events upstream when a packet is late",
552           DEFAULT_DO_RETRANSMISSION,
553           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
554
555   /**
556    * GstRtpJitterBuffer:rtx-next-seqnum
557    *
558    * Estimate when the next packet should arrive and schedule a retransmission
559    * request for it.
560    * This is, when packet N arrives, a GstRTPRetransmission event is schedule
561    * for packet N+1. So it will be requested if it does not arrive at the expected time.
562    * The expected time is calculated using the dts of N and the packet spacing.
563    *
564    * Since: 1.6
565    */
566   g_object_class_install_property (gobject_class, PROP_RTX_NEXT_SEQNUM,
567       g_param_spec_boolean ("rtx-next-seqnum", "RTX next seqnum",
568           "Estimate when the next packet should arrive and schedule a "
569           "retransmission request for it.",
570           DEFAULT_RTX_NEXT_SEQNUM, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
571
572   /**
573    * GstRtpJitterBuffer:rtx-delay:
574    *
575    * When a packet did not arrive at the expected time, wait this extra amount
576    * of time before sending a retransmission event.
577    *
578    * When -1 is used, the max jitter will be used as extra delay.
579    *
580    * Since: 1.2
581    */
582   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
583       g_param_spec_int ("rtx-delay", "RTX Delay",
584           "Extra time in ms to wait before sending retransmission "
585           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
586           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
587
588   /**
589    * GstRtpJitterBuffer:rtx-min-delay:
590    *
591    * When a packet did not arrive at the expected time, wait at least this extra amount
592    * of time before sending a retransmission event.
593    *
594    * Since: 1.6
595    */
596   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
597       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
598           "Minimum time in ms to wait before sending retransmission "
599           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
600           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
601   /**
602    * GstRtpJitterBuffer:rtx-delay-reorder:
603    *
604    * Assume that a retransmission event should be sent when we see
605    * this much packet reordering.
606    *
607    * When -1 is used, the value will be estimated based on observed packet
608    * reordering.
609    *
610    * Since: 1.2
611    */
612   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
613       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
614           "Sending retransmission event when this much reordering (-1 automatic)",
615           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
616           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
617   /**
618    * GstRtpJitterBuffer::rtx-retry-timeout:
619    *
620    * When no packet has been received after sending a retransmission event
621    * for this time, retry sending a retransmission event.
622    *
623    * When -1 is used, the value will be estimated based on observed round
624    * trip time.
625    *
626    * Since: 1.2
627    */
628   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
629       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
630           "Retry sending a transmission event after this timeout in "
631           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
632           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
633   /**
634    * GstRtpJitterBuffer::rtx-min-retry-timeout:
635    *
636    * The minimum amount of time between retry timeouts. When
637    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
638    * minimum interval between retry timeouts.
639    *
640    * When -1 is used, the value will be estimated based on the
641    * packet spacing.
642    *
643    * Since: 1.6
644    */
645   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
646       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
647           "Minimum timeout between sending a transmission event in "
648           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
649           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
650   /**
651    * GstRtpJitterBuffer:rtx-retry-period:
652    *
653    * The amount of time to try to get a retransmission.
654    *
655    * When -1 is used, the value will be estimated based on the jitterbuffer
656    * latency and the observed round trip time.
657    *
658    * Since: 1.2
659    */
660   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
661       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
662           "Try to get a retransmission for this many ms "
663           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
664           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
665   /**
666    * GstRtpJitterBuffer:rtx-max-retries:
667    *
668    * The maximum number of retries to request a retransmission.
669    *
670    * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
671    * When -1 is used, the number of retransmission request will not be limited.
672    *
673    * Since: 1.6
674    */
675   g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
676       g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
677           "The maximum number of retries to request a retransmission. "
678           "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
679           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
680
681   g_object_class_install_property (gobject_class, PROP_MAX_DROPOUT_TIME,
682       g_param_spec_uint ("max-dropout-time", "Max dropout time",
683           "The maximum time (milliseconds) of missing packets tolerated.",
684           0, G_MAXUINT, DEFAULT_MAX_DROPOUT_TIME,
685           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
686
687   g_object_class_install_property (gobject_class, PROP_MAX_MISORDER_TIME,
688       g_param_spec_uint ("max-misorder-time", "Max misorder time",
689           "The maximum time (milliseconds) of misordered packets tolerated.",
690           0, G_MAXUINT, DEFAULT_MAX_MISORDER_TIME,
691           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
692   /**
693    * GstRtpJitterBuffer:stats:
694    *
695    * Various jitterbuffer statistics. This property returns a GstStructure
696    * with name application/x-rtp-jitterbuffer-stats with the following fields:
697    *
698    * <itemizedlist>
699    * <listitem>
700    *   <para>
701    *   #guint64
702    *   <classname>&quot;rtx-count&quot;</classname>:
703    *   the number of retransmissions requested.
704    *   </para>
705    * </listitem>
706    * <listitem>
707    *   <para>
708    *   #guint64
709    *   <classname>&quot;rtx-success-count&quot;</classname>:
710    *   the number of successful retransmissions.
711    *   </para>
712    * </listitem>
713    * <listitem>
714    *   <para>
715    *   #gdouble
716    *   <classname>&quot;rtx-per-packet&quot;</classname>:
717    *   average number of RTX per packet.
718    *   </para>
719    * </listitem>
720    * <listitem>
721    *   <para>
722    *   #guint64
723    *   <classname>&quot;rtx-rtt&quot;</classname>:
724    *   average round trip time per RTX.
725    *   </para>
726    * </listitem>
727    * </itemizedlist>
728    *
729    * Since: 1.4
730    */
731   g_object_class_install_property (gobject_class, PROP_STATS,
732       g_param_spec_boxed ("stats", "Statistics",
733           "Various statistics", GST_TYPE_STRUCTURE,
734           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
735
736   /**
737    * GstRtpJitterBuffer:max-rtcp-rtp-time-diff
738    *
739    * The maximum amount of time in ms that the RTP time in the RTCP SRs
740    * is allowed to be ahead of the last RTP packet we received. Use
741    * -1 to disable ignoring of RTCP packets.
742    *
743    * Since: 1.8
744    */
745   g_object_class_install_property (gobject_class, PROP_MAX_RTCP_RTP_TIME_DIFF,
746       g_param_spec_int ("max-rtcp-rtp-time-diff", "Max RTCP RTP Time Diff",
747           "Maximum amount of time in ms that the RTP time in RTCP SRs "
748           "is allowed to be ahead (-1 disabled)", -1, G_MAXINT,
749           DEFAULT_MAX_RTCP_RTP_TIME_DIFF,
750           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
751
752   /**
753    * GstRtpJitterBuffer::request-pt-map:
754    * @buffer: the object which received the signal
755    * @pt: the pt
756    *
757    * Request the payload type as #GstCaps for @pt.
758    */
759   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
760       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
761       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
762           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
763       GST_TYPE_CAPS, 1, G_TYPE_UINT);
764   /**
765    * GstRtpJitterBuffer::handle-sync:
766    * @buffer: the object which received the signal
767    * @struct: a GstStructure containing sync values.
768    *
769    * Be notified of new sync values.
770    */
771   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
772       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
773       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
774           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
775       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
776
777   /**
778    * GstRtpJitterBuffer::on-npt-stop:
779    * @buffer: the object which received the signal
780    *
781    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
782    * the npt-stop position.
783    */
784   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
785       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
786       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
787           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
788       G_TYPE_NONE, 0, G_TYPE_NONE);
789
790   /**
791    * GstRtpJitterBuffer::clear-pt-map:
792    * @buffer: the object which received the signal
793    *
794    * Invalidate the clock-rate as obtained with the
795    * #GstRtpJitterBuffer::request-pt-map signal.
796    */
797   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
798       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
799       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
800       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
801       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
802
803   /**
804    * GstRtpJitterBuffer::set-active:
805    * @buffer: the object which received the signal
806    *
807    * Start pushing out packets with the given base time. This signal is only
808    * useful in buffering mode.
809    *
810    * Returns: the time of the last pushed packet.
811    */
812   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
813       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
814       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
815       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
816       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
817       G_TYPE_UINT64);
818
819   gstelement_class->change_state =
820       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
821   gstelement_class->request_new_pad =
822       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
823   gstelement_class->release_pad =
824       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
825   gstelement_class->provide_clock =
826       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
827   gstelement_class->set_clock =
828       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_clock);
829
830   gst_element_class_add_pad_template (gstelement_class,
831       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
832   gst_element_class_add_pad_template (gstelement_class,
833       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
834   gst_element_class_add_pad_template (gstelement_class,
835       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
836
837   gst_element_class_set_static_metadata (gstelement_class,
838       "RTP packet jitter-buffer", "Filter/Network/RTP",
839       "A buffer that deals with network jitter and other transmission faults",
840       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
841       "Wim Taymans <wim.taymans@gmail.com>");
842
843   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
844   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
845
846   GST_DEBUG_CATEGORY_INIT
847       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
848 }
849
850 static void
851 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
852 {
853   GstRtpJitterBufferPrivate *priv;
854
855   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
856   jitterbuffer->priv = priv;
857
858   priv->latency_ms = DEFAULT_LATENCY_MS;
859   priv->latency_ns = priv->latency_ms * GST_MSECOND;
860   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
861   priv->do_lost = DEFAULT_DO_LOST;
862   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
863   priv->rtx_next_seqnum = DEFAULT_RTX_NEXT_SEQNUM;
864   priv->rtx_delay = DEFAULT_RTX_DELAY;
865   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
866   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
867   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
868   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
869   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
870   priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
871   priv->max_rtcp_rtp_time_diff = DEFAULT_MAX_RTCP_RTP_TIME_DIFF;
872   priv->max_dropout_time = DEFAULT_MAX_DROPOUT_TIME;
873   priv->max_misorder_time = DEFAULT_MAX_MISORDER_TIME;
874
875   priv->last_dts = -1;
876   priv->last_rtptime = -1;
877   priv->avg_jitter = 0;
878   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
879   priv->jbuf = rtp_jitter_buffer_new ();
880   g_mutex_init (&priv->jbuf_lock);
881   g_cond_init (&priv->jbuf_timer);
882   g_cond_init (&priv->jbuf_event);
883   g_cond_init (&priv->jbuf_query);
884   g_queue_init (&priv->gap_packets);
885
886   /* reset skew detection initialy */
887   rtp_jitter_buffer_reset_skew (priv->jbuf);
888   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
889   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
890   priv->active = TRUE;
891
892   priv->srcpad =
893       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
894       "src");
895
896   gst_pad_set_activatemode_function (priv->srcpad,
897       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
898   gst_pad_set_query_function (priv->srcpad,
899       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
900   gst_pad_set_event_function (priv->srcpad,
901       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
902
903   priv->sinkpad =
904       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
905       "sink");
906
907   gst_pad_set_chain_function (priv->sinkpad,
908       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
909   gst_pad_set_event_function (priv->sinkpad,
910       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
911   gst_pad_set_query_function (priv->sinkpad,
912       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
913
914   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
915   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
916
917   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
918 }
919
920 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
921
922 #define ITEM_TYPE_BUFFER        0
923 #define ITEM_TYPE_LOST          1
924 #define ITEM_TYPE_EVENT         2
925 #define ITEM_TYPE_QUERY         3
926
927 static RTPJitterBufferItem *
928 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
929     guint seqnum, guint count, guint rtptime)
930 {
931   RTPJitterBufferItem *item;
932
933   item = g_slice_new (RTPJitterBufferItem);
934   item->data = data;
935   item->next = NULL;
936   item->prev = NULL;
937   item->type = type;
938   item->dts = dts;
939   item->pts = pts;
940   item->seqnum = seqnum;
941   item->count = count;
942   item->rtptime = rtptime;
943
944   return item;
945 }
946
947 static void
948 free_item (RTPJitterBufferItem * item)
949 {
950   g_return_if_fail (item != NULL);
951
952   if (item->data && item->type != ITEM_TYPE_QUERY)
953     gst_mini_object_unref (item->data);
954   g_slice_free (RTPJitterBufferItem, item);
955 }
956
957 static void
958 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
959 {
960   GList **l = user_data;
961
962   if (item->data && item->type == ITEM_TYPE_EVENT
963       && GST_EVENT_IS_STICKY (item->data)) {
964     *l = g_list_prepend (*l, item->data);
965   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
966     gst_mini_object_unref (item->data);
967   }
968   g_slice_free (RTPJitterBufferItem, item);
969 }
970
971 static void
972 gst_rtp_jitter_buffer_finalize (GObject * object)
973 {
974   GstRtpJitterBuffer *jitterbuffer;
975   GstRtpJitterBufferPrivate *priv;
976
977   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
978   priv = jitterbuffer->priv;
979
980   g_array_free (priv->timers, TRUE);
981   g_mutex_clear (&priv->jbuf_lock);
982   g_cond_clear (&priv->jbuf_timer);
983   g_cond_clear (&priv->jbuf_event);
984   g_cond_clear (&priv->jbuf_query);
985
986   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
987   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
988   g_queue_clear (&priv->gap_packets);
989   g_object_unref (priv->jbuf);
990
991   G_OBJECT_CLASS (parent_class)->finalize (object);
992 }
993
994 static GstIterator *
995 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
996 {
997   GstRtpJitterBuffer *jitterbuffer;
998   GstPad *otherpad = NULL;
999   GstIterator *it = NULL;
1000   GValue val = { 0, };
1001
1002   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1003
1004   if (pad == jitterbuffer->priv->sinkpad) {
1005     otherpad = jitterbuffer->priv->srcpad;
1006   } else if (pad == jitterbuffer->priv->srcpad) {
1007     otherpad = jitterbuffer->priv->sinkpad;
1008   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
1009     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
1010   }
1011
1012   if (it == NULL) {
1013     g_value_init (&val, GST_TYPE_PAD);
1014     g_value_set_object (&val, otherpad);
1015     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
1016     g_value_unset (&val);
1017   }
1018
1019   return it;
1020 }
1021
1022 static GstPad *
1023 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1024 {
1025   GstRtpJitterBufferPrivate *priv;
1026
1027   priv = jitterbuffer->priv;
1028
1029   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
1030
1031   priv->rtcpsinkpad =
1032       gst_pad_new_from_static_template
1033       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
1034   gst_pad_set_chain_function (priv->rtcpsinkpad,
1035       gst_rtp_jitter_buffer_chain_rtcp);
1036   gst_pad_set_event_function (priv->rtcpsinkpad,
1037       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
1038   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
1039       gst_rtp_jitter_buffer_iterate_internal_links);
1040   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
1041   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1042
1043   return priv->rtcpsinkpad;
1044 }
1045
1046 static void
1047 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
1048 {
1049   GstRtpJitterBufferPrivate *priv;
1050
1051   priv = jitterbuffer->priv;
1052
1053   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
1054
1055   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
1056
1057   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
1058   priv->rtcpsinkpad = NULL;
1059 }
1060
1061 static GstPad *
1062 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
1063     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
1064 {
1065   GstRtpJitterBuffer *jitterbuffer;
1066   GstElementClass *klass;
1067   GstPad *result;
1068   GstRtpJitterBufferPrivate *priv;
1069
1070   g_return_val_if_fail (templ != NULL, NULL);
1071   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
1072
1073   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1074   priv = jitterbuffer->priv;
1075   klass = GST_ELEMENT_GET_CLASS (element);
1076
1077   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
1078
1079   /* figure out the template */
1080   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
1081     if (priv->rtcpsinkpad != NULL)
1082       goto exists;
1083
1084     result = create_rtcp_sink (jitterbuffer);
1085   } else
1086     goto wrong_template;
1087
1088   return result;
1089
1090   /* ERRORS */
1091 wrong_template:
1092   {
1093     g_warning ("rtpjitterbuffer: this is not our template");
1094     return NULL;
1095   }
1096 exists:
1097   {
1098     g_warning ("rtpjitterbuffer: pad already requested");
1099     return NULL;
1100   }
1101 }
1102
1103 static void
1104 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1105 {
1106   GstRtpJitterBuffer *jitterbuffer;
1107   GstRtpJitterBufferPrivate *priv;
1108
1109   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1110   g_return_if_fail (GST_IS_PAD (pad));
1111
1112   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1113   priv = jitterbuffer->priv;
1114
1115   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1116
1117   if (priv->rtcpsinkpad == pad) {
1118     remove_rtcp_sink (jitterbuffer);
1119   } else
1120     goto wrong_pad;
1121
1122   return;
1123
1124   /* ERRORS */
1125 wrong_pad:
1126   {
1127     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1128     return;
1129   }
1130 }
1131
1132 static GstClock *
1133 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1134 {
1135   return gst_system_clock_obtain ();
1136 }
1137
1138 static gboolean
1139 gst_rtp_jitter_buffer_set_clock (GstElement * element, GstClock * clock)
1140 {
1141   GstRtpJitterBuffer *jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1142
1143   rtp_jitter_buffer_set_pipeline_clock (jitterbuffer->priv->jbuf, clock);
1144
1145   return TRUE;
1146 }
1147
1148 static void
1149 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1150 {
1151   GstRtpJitterBufferPrivate *priv;
1152
1153   priv = jitterbuffer->priv;
1154
1155   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1156
1157   JBUF_LOCK (priv);
1158   priv->clock_rate = -1;
1159   /* do not clear current content, but refresh state for new arrival */
1160   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1161   rtp_jitter_buffer_reset_skew (priv->jbuf);
1162   JBUF_UNLOCK (priv);
1163 }
1164
1165 static GstClockTime
1166 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1167     guint64 offset)
1168 {
1169   GstRtpJitterBufferPrivate *priv;
1170   GstClockTime last_out;
1171   RTPJitterBufferItem *item;
1172
1173   priv = jbuf->priv;
1174
1175   JBUF_LOCK (priv);
1176   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1177       active, GST_TIME_ARGS (offset));
1178
1179   if (active != priv->active) {
1180     /* add the amount of time spent in paused to the output offset. All
1181      * outgoing buffers will have this offset applied to their timestamps in
1182      * order to make them arrive in time in the sink. */
1183     priv->out_offset = offset;
1184     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1185         GST_TIME_ARGS (priv->out_offset));
1186     priv->active = active;
1187     JBUF_SIGNAL_EVENT (priv);
1188   }
1189   if (!active) {
1190     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1191   }
1192   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1193     /* head buffer timestamp and offset gives our output time */
1194     last_out = item->dts + priv->ts_offset;
1195   } else {
1196     /* use last known time when the buffer is empty */
1197     last_out = priv->last_out_time;
1198   }
1199   JBUF_UNLOCK (priv);
1200
1201   return last_out;
1202 }
1203
1204 static GstCaps *
1205 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1206 {
1207   GstRtpJitterBuffer *jitterbuffer;
1208   GstRtpJitterBufferPrivate *priv;
1209   GstPad *other;
1210   GstCaps *caps;
1211   GstCaps *templ;
1212
1213   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1214   priv = jitterbuffer->priv;
1215
1216   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1217
1218   caps = gst_pad_peer_query_caps (other, filter);
1219
1220   templ = gst_pad_get_pad_template_caps (pad);
1221   if (caps == NULL) {
1222     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1223     caps = templ;
1224   } else {
1225     GstCaps *intersect;
1226
1227     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1228
1229     intersect = gst_caps_intersect (caps, templ);
1230     gst_caps_unref (caps);
1231     gst_caps_unref (templ);
1232
1233     caps = intersect;
1234   }
1235   gst_object_unref (jitterbuffer);
1236
1237   return caps;
1238 }
1239
1240 /*
1241  * Must be called with JBUF_LOCK held
1242  */
1243
1244 static gboolean
1245 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1246     GstCaps * caps)
1247 {
1248   GstRtpJitterBufferPrivate *priv;
1249   GstStructure *caps_struct;
1250   guint val;
1251   GstClockTime tval;
1252   const gchar *ts_refclk, *mediaclk;
1253
1254   priv = jitterbuffer->priv;
1255
1256   /* first parse the caps */
1257   caps_struct = gst_caps_get_structure (caps, 0);
1258
1259   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1260
1261   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1262    * measure the amount of data in the buffer */
1263   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1264     goto error;
1265
1266   if (priv->clock_rate <= 0)
1267     goto wrong_rate;
1268
1269   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1270
1271   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1272
1273   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1274    * can use this to track the amount of time elapsed on the sender. */
1275   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1276     priv->clock_base = val;
1277   else
1278     priv->clock_base = -1;
1279
1280   priv->ext_timestamp = priv->clock_base;
1281
1282   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1283       priv->clock_base);
1284
1285   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1286     /* first expected seqnum, only update when we didn't have a previous base. */
1287     if (priv->next_in_seqnum == -1)
1288       priv->next_in_seqnum = val;
1289     if (priv->next_seqnum == -1) {
1290       priv->next_seqnum = val;
1291       JBUF_SIGNAL_EVENT (priv);
1292     }
1293     priv->seqnum_base = val;
1294   } else {
1295     priv->seqnum_base = -1;
1296   }
1297
1298   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1299
1300   /* the start and stop times. The seqnum-base corresponds to the start time. We
1301    * will keep track of the seqnums on the output and when we reach the one
1302    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1303   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1304     priv->npt_start = tval;
1305   else
1306     priv->npt_start = 0;
1307
1308   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1309     priv->npt_stop = tval;
1310   else
1311     priv->npt_stop = -1;
1312
1313   GST_DEBUG_OBJECT (jitterbuffer,
1314       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1315       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1316
1317   if ((ts_refclk = gst_structure_get_string (caps_struct, "a-ts-refclk"))) {
1318     GstClock *clock = NULL;
1319     guint64 clock_offset = -1;
1320
1321     GST_DEBUG_OBJECT (jitterbuffer, "Have timestamp reference clock %s",
1322         ts_refclk);
1323
1324     if (g_str_has_prefix (ts_refclk, "ntp=")) {
1325       if (g_str_has_prefix (ts_refclk, "ntp=/traceable/")) {
1326         GST_FIXME_OBJECT (jitterbuffer, "Can't handle traceable NTP clocks");
1327       } else {
1328         const gchar *host, *portstr;
1329         gchar *hostname;
1330         guint port;
1331
1332         host = ts_refclk + sizeof ("ntp=") - 1;
1333         if (host[0] == '[') {
1334           /* IPv6 */
1335           portstr = strchr (host, ']');
1336           if (portstr && portstr[1] == ':')
1337             portstr = portstr + 1;
1338           else
1339             portstr = NULL;
1340         } else {
1341           portstr = strrchr (host, ':');
1342         }
1343
1344
1345         if (!portstr || sscanf (portstr, ":%u", &port) != 1)
1346           port = 123;
1347
1348         if (portstr)
1349           hostname = g_strndup (host, (portstr - host));
1350         else
1351           hostname = g_strdup (host);
1352
1353         clock = gst_ntp_clock_new (NULL, hostname, port, 0);
1354         g_free (hostname);
1355       }
1356     } else if (g_str_has_prefix (ts_refclk, "ptp=IEEE1588-2008:")) {
1357       const gchar *domainstr =
1358           ts_refclk + sizeof ("ptp=IEEE1588-2008:XX-XX-XX-XX-XX-XX-XX-XX") - 1;
1359       guint domain;
1360
1361       if (domainstr[0] != ':' || sscanf (domainstr, ":%u", &domain) != 1)
1362         domain = 0;
1363
1364       clock = gst_ptp_clock_new (NULL, domain);
1365     } else {
1366       GST_FIXME_OBJECT (jitterbuffer, "Unsupported timestamp reference clock");
1367     }
1368
1369     if ((mediaclk = gst_structure_get_string (caps_struct, "a-mediaclk"))) {
1370       GST_DEBUG_OBJECT (jitterbuffer, "Got media clock %s", mediaclk);
1371
1372       if (!g_str_has_prefix (mediaclk, "direct=")
1373           || sscanf (mediaclk, "direct=%" G_GUINT64_FORMAT, &clock_offset) != 1)
1374         GST_FIXME_OBJECT (jitterbuffer, "Unsupported media clock");
1375       if (strstr (mediaclk, "rate=") != NULL) {
1376         GST_FIXME_OBJECT (jitterbuffer, "Rate property not supported");
1377         clock_offset = -1;
1378       }
1379     }
1380
1381     rtp_jitter_buffer_set_media_clock (priv->jbuf, clock, clock_offset);
1382   } else {
1383     rtp_jitter_buffer_set_media_clock (priv->jbuf, NULL, -1);
1384   }
1385
1386   return TRUE;
1387
1388   /* ERRORS */
1389 error:
1390   {
1391     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1392     return FALSE;
1393   }
1394 wrong_rate:
1395   {
1396     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1397     return FALSE;
1398   }
1399 }
1400
1401 static void
1402 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1403 {
1404   GstRtpJitterBufferPrivate *priv;
1405
1406   priv = jitterbuffer->priv;
1407
1408   JBUF_LOCK (priv);
1409   /* mark ourselves as flushing */
1410   priv->srcresult = GST_FLOW_FLUSHING;
1411   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1412   /* this unblocks any waiting pops on the src pad task */
1413   JBUF_SIGNAL_EVENT (priv);
1414   JBUF_SIGNAL_QUERY (priv, FALSE);
1415   JBUF_UNLOCK (priv);
1416 }
1417
1418 static void
1419 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1420 {
1421   GstRtpJitterBufferPrivate *priv;
1422
1423   priv = jitterbuffer->priv;
1424
1425   JBUF_LOCK (priv);
1426   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1427   /* Mark as non flushing */
1428   priv->srcresult = GST_FLOW_OK;
1429   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1430   priv->last_popped_seqnum = -1;
1431   priv->last_out_time = -1;
1432   priv->next_seqnum = -1;
1433   priv->seqnum_base = -1;
1434   priv->ips_rtptime = -1;
1435   priv->ips_dts = GST_CLOCK_TIME_NONE;
1436   priv->packet_spacing = 0;
1437   priv->next_in_seqnum = -1;
1438   priv->clock_rate = -1;
1439   priv->last_pt = -1;
1440   priv->eos = FALSE;
1441   priv->estimated_eos = -1;
1442   priv->last_elapsed = 0;
1443   priv->ext_timestamp = -1;
1444   priv->avg_jitter = 0;
1445   priv->last_dts = -1;
1446   priv->last_rtptime = -1;
1447   priv->last_in_dts = 0;
1448   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1449   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1450   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1451   rtp_jitter_buffer_reset_skew (priv->jbuf);
1452   remove_all_timers (jitterbuffer);
1453   g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
1454   g_queue_clear (&priv->gap_packets);
1455   JBUF_UNLOCK (priv);
1456 }
1457
1458 static gboolean
1459 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1460     GstPadMode mode, gboolean active)
1461 {
1462   gboolean result;
1463   GstRtpJitterBuffer *jitterbuffer = NULL;
1464
1465   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1466
1467   switch (mode) {
1468     case GST_PAD_MODE_PUSH:
1469       if (active) {
1470         /* allow data processing */
1471         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1472
1473         /* start pushing out buffers */
1474         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1475         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1476             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1477       } else {
1478         /* make sure all data processing stops ASAP */
1479         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1480
1481         /* NOTE this will hardlock if the state change is called from the src pad
1482          * task thread because we will _join() the thread. */
1483         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1484         result = gst_pad_stop_task (pad);
1485       }
1486       break;
1487     default:
1488       result = FALSE;
1489       break;
1490   }
1491   return result;
1492 }
1493
1494 static GstStateChangeReturn
1495 gst_rtp_jitter_buffer_change_state (GstElement * element,
1496     GstStateChange transition)
1497 {
1498   GstRtpJitterBuffer *jitterbuffer;
1499   GstRtpJitterBufferPrivate *priv;
1500   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1501
1502   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1503   priv = jitterbuffer->priv;
1504
1505   switch (transition) {
1506     case GST_STATE_CHANGE_NULL_TO_READY:
1507       break;
1508     case GST_STATE_CHANGE_READY_TO_PAUSED:
1509       JBUF_LOCK (priv);
1510       /* reset negotiated values */
1511       priv->clock_rate = -1;
1512       priv->clock_base = -1;
1513       priv->peer_latency = 0;
1514       priv->last_pt = -1;
1515       /* block until we go to PLAYING */
1516       priv->blocked = TRUE;
1517       priv->timer_running = TRUE;
1518       priv->timer_thread =
1519           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1520       JBUF_UNLOCK (priv);
1521       break;
1522     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1523       JBUF_LOCK (priv);
1524       /* unblock to allow streaming in PLAYING */
1525       priv->blocked = FALSE;
1526       JBUF_SIGNAL_EVENT (priv);
1527       JBUF_SIGNAL_TIMER (priv);
1528       JBUF_UNLOCK (priv);
1529       break;
1530     default:
1531       break;
1532   }
1533
1534   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1535
1536   switch (transition) {
1537     case GST_STATE_CHANGE_READY_TO_PAUSED:
1538       /* we are a live element because we sync to the clock, which we can only
1539        * do in the PLAYING state */
1540       if (ret != GST_STATE_CHANGE_FAILURE)
1541         ret = GST_STATE_CHANGE_NO_PREROLL;
1542       break;
1543     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1544       JBUF_LOCK (priv);
1545       /* block to stop streaming when PAUSED */
1546       priv->blocked = TRUE;
1547       unschedule_current_timer (jitterbuffer);
1548       JBUF_UNLOCK (priv);
1549       if (ret != GST_STATE_CHANGE_FAILURE)
1550         ret = GST_STATE_CHANGE_NO_PREROLL;
1551       break;
1552     case GST_STATE_CHANGE_PAUSED_TO_READY:
1553       JBUF_LOCK (priv);
1554       gst_buffer_replace (&priv->last_sr, NULL);
1555       priv->timer_running = FALSE;
1556       unschedule_current_timer (jitterbuffer);
1557       JBUF_SIGNAL_TIMER (priv);
1558       JBUF_SIGNAL_QUERY (priv, FALSE);
1559       JBUF_UNLOCK (priv);
1560       g_thread_join (priv->timer_thread);
1561       priv->timer_thread = NULL;
1562       break;
1563     case GST_STATE_CHANGE_READY_TO_NULL:
1564       break;
1565     default:
1566       break;
1567   }
1568
1569   return ret;
1570 }
1571
1572 static gboolean
1573 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1574     GstEvent * event)
1575 {
1576   gboolean ret = TRUE;
1577   GstRtpJitterBuffer *jitterbuffer;
1578   GstRtpJitterBufferPrivate *priv;
1579
1580   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1581   priv = jitterbuffer->priv;
1582
1583   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1584
1585   switch (GST_EVENT_TYPE (event)) {
1586     case GST_EVENT_LATENCY:
1587     {
1588       GstClockTime latency;
1589
1590       gst_event_parse_latency (event, &latency);
1591
1592       GST_DEBUG_OBJECT (jitterbuffer,
1593           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1594
1595       JBUF_LOCK (priv);
1596       /* adjust the overall buffer delay to the total pipeline latency in
1597        * buffering mode because if downstream consumes too fast (because of
1598        * large latency or queues, we would start rebuffering again. */
1599       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1600           RTP_JITTER_BUFFER_MODE_BUFFER) {
1601         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1602       }
1603       JBUF_UNLOCK (priv);
1604
1605       ret = gst_pad_push_event (priv->sinkpad, event);
1606       break;
1607     }
1608     default:
1609       ret = gst_pad_push_event (priv->sinkpad, event);
1610       break;
1611   }
1612
1613   return ret;
1614 }
1615
1616 /* handles and stores the event in the jitterbuffer, must be called with
1617  * LOCK */
1618 static gboolean
1619 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1620 {
1621   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1622   RTPJitterBufferItem *item;
1623   gboolean head;
1624
1625   switch (GST_EVENT_TYPE (event)) {
1626     case GST_EVENT_CAPS:
1627     {
1628       GstCaps *caps;
1629
1630       gst_event_parse_caps (event, &caps);
1631       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1632       break;
1633     }
1634     case GST_EVENT_SEGMENT:
1635       gst_event_copy_segment (event, &priv->segment);
1636
1637       /* we need time for now */
1638       if (priv->segment.format != GST_FORMAT_TIME)
1639         goto newseg_wrong_format;
1640
1641       GST_DEBUG_OBJECT (jitterbuffer,
1642           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1643       break;
1644     case GST_EVENT_EOS:
1645       priv->eos = TRUE;
1646       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1647       break;
1648     default:
1649       break;
1650   }
1651
1652
1653   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1654   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1655   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1656   if (head)
1657     JBUF_SIGNAL_EVENT (priv);
1658
1659   return TRUE;
1660
1661   /* ERRORS */
1662 newseg_wrong_format:
1663   {
1664     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1665     gst_event_unref (event);
1666     return FALSE;
1667   }
1668 }
1669
1670 static gboolean
1671 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1672     GstEvent * event)
1673 {
1674   gboolean ret = TRUE;
1675   GstRtpJitterBuffer *jitterbuffer;
1676   GstRtpJitterBufferPrivate *priv;
1677
1678   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1679   priv = jitterbuffer->priv;
1680
1681   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1682
1683   switch (GST_EVENT_TYPE (event)) {
1684     case GST_EVENT_FLUSH_START:
1685       ret = gst_pad_push_event (priv->srcpad, event);
1686       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1687       /* wait for the loop to go into PAUSED */
1688       gst_pad_pause_task (priv->srcpad);
1689       break;
1690     case GST_EVENT_FLUSH_STOP:
1691       ret = gst_pad_push_event (priv->srcpad, event);
1692       ret =
1693           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1694           GST_PAD_MODE_PUSH, TRUE);
1695       break;
1696     default:
1697       if (GST_EVENT_IS_SERIALIZED (event)) {
1698         /* serialized events go in the queue */
1699         JBUF_LOCK (priv);
1700         if (priv->srcresult != GST_FLOW_OK) {
1701           /* Errors in sticky event pushing are no problem and ignored here
1702            * as they will cause more meaningful errors during data flow.
1703            * For EOS events, that are not followed by data flow, we still
1704            * return FALSE here though.
1705            */
1706           if (!GST_EVENT_IS_STICKY (event) ||
1707               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1708             goto out_flow_error;
1709         }
1710         /* refuse more events on EOS */
1711         if (priv->eos)
1712           goto out_eos;
1713         ret = queue_event (jitterbuffer, event);
1714         JBUF_UNLOCK (priv);
1715       } else {
1716         /* non-serialized events are forwarded downstream immediately */
1717         ret = gst_pad_push_event (priv->srcpad, event);
1718       }
1719       break;
1720   }
1721   return ret;
1722
1723   /* ERRORS */
1724 out_flow_error:
1725   {
1726     GST_DEBUG_OBJECT (jitterbuffer,
1727         "refusing event, we have a downstream flow error: %s",
1728         gst_flow_get_name (priv->srcresult));
1729     JBUF_UNLOCK (priv);
1730     gst_event_unref (event);
1731     return FALSE;
1732   }
1733 out_eos:
1734   {
1735     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1736     JBUF_UNLOCK (priv);
1737     gst_event_unref (event);
1738     return FALSE;
1739   }
1740 }
1741
1742 static gboolean
1743 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1744     GstEvent * event)
1745 {
1746   gboolean ret = TRUE;
1747   GstRtpJitterBuffer *jitterbuffer;
1748
1749   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1750
1751   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1752
1753   switch (GST_EVENT_TYPE (event)) {
1754     case GST_EVENT_FLUSH_START:
1755       gst_event_unref (event);
1756       break;
1757     case GST_EVENT_FLUSH_STOP:
1758       gst_event_unref (event);
1759       break;
1760     default:
1761       ret = gst_pad_event_default (pad, parent, event);
1762       break;
1763   }
1764
1765   return ret;
1766 }
1767
1768 /*
1769  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1770  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1771  * GST_FLOW_FLUSHING when the element is shutting down. On success
1772  * GST_FLOW_OK is returned.
1773  */
1774 static GstFlowReturn
1775 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1776     guint8 pt)
1777 {
1778   GValue ret = { 0 };
1779   GValue args[2] = { {0}, {0} };
1780   GstCaps *caps;
1781   gboolean res;
1782
1783   g_value_init (&args[0], GST_TYPE_ELEMENT);
1784   g_value_set_object (&args[0], jitterbuffer);
1785   g_value_init (&args[1], G_TYPE_UINT);
1786   g_value_set_uint (&args[1], pt);
1787
1788   g_value_init (&ret, GST_TYPE_CAPS);
1789   g_value_set_boxed (&ret, NULL);
1790
1791   JBUF_UNLOCK (jitterbuffer->priv);
1792   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1793       &ret);
1794   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1795
1796   g_value_unset (&args[0]);
1797   g_value_unset (&args[1]);
1798   caps = (GstCaps *) g_value_dup_boxed (&ret);
1799   g_value_unset (&ret);
1800   if (!caps)
1801     goto no_caps;
1802
1803   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1804   gst_caps_unref (caps);
1805
1806   if (G_UNLIKELY (!res))
1807     goto parse_failed;
1808
1809   return GST_FLOW_OK;
1810
1811   /* ERRORS */
1812 no_caps:
1813   {
1814     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1815     return GST_FLOW_ERROR;
1816   }
1817 out_flushing:
1818   {
1819     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1820     return GST_FLOW_FLUSHING;
1821   }
1822 parse_failed:
1823   {
1824     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1825     return GST_FLOW_ERROR;
1826   }
1827 }
1828
1829 /* call with jbuf lock held */
1830 static GstMessage *
1831 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1832 {
1833   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1834   GstMessage *message = NULL;
1835
1836   if (percent == -1)
1837     return NULL;
1838
1839   /* Post a buffering message */
1840   if (priv->last_percent != percent) {
1841     priv->last_percent = percent;
1842     message =
1843         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1844     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1845   }
1846
1847   return message;
1848 }
1849
1850 static GstClockTime
1851 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1852 {
1853   GstRtpJitterBufferPrivate *priv;
1854
1855   priv = jitterbuffer->priv;
1856
1857   if (timestamp == -1)
1858     return -1;
1859
1860   /* apply the timestamp offset, this is used for inter stream sync */
1861   timestamp += priv->ts_offset;
1862   /* add the offset, this is used when buffering */
1863   timestamp += priv->out_offset;
1864
1865   return timestamp;
1866 }
1867
1868 static TimerData *
1869 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1870 {
1871   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1872   TimerData *timer = NULL;
1873   gint i, len;
1874
1875   len = priv->timers->len;
1876   for (i = 0; i < len; i++) {
1877     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1878     if (test->seqnum == seqnum && test->type == type) {
1879       timer = test;
1880       break;
1881     }
1882   }
1883   return timer;
1884 }
1885
1886 static void
1887 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1888 {
1889   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1890
1891   if (priv->clock_id) {
1892     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1893     gst_clock_id_unschedule (priv->clock_id);
1894     priv->clock_id = NULL;
1895   }
1896 }
1897
1898 static GstClockTime
1899 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1900 {
1901   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1902   GstClockTime test_timeout;
1903
1904   if ((test_timeout = timer->timeout) == -1)
1905     return -1;
1906
1907   if (timer->type != TIMER_TYPE_EXPECTED) {
1908     /* add our latency and offset to get output times. */
1909     test_timeout = apply_offset (jitterbuffer, test_timeout);
1910     test_timeout += priv->latency_ns;
1911   }
1912   return test_timeout;
1913 }
1914
1915 static void
1916 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1917 {
1918   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1919
1920   if (priv->clock_id) {
1921     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1922
1923     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1924         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1925
1926     if (timeout == -1 || timeout < priv->timer_timeout)
1927       unschedule_current_timer (jitterbuffer);
1928   }
1929 }
1930
1931 static TimerData *
1932 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1933     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1934     GstClockTime duration)
1935 {
1936   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1937   TimerData *timer;
1938   gint len;
1939
1940   GST_DEBUG_OBJECT (jitterbuffer,
1941       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1942       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1943       GST_TIME_ARGS (delay));
1944
1945   len = priv->timers->len;
1946   g_array_set_size (priv->timers, len + 1);
1947   timer = &g_array_index (priv->timers, TimerData, len);
1948   timer->idx = len;
1949   timer->type = type;
1950   timer->seqnum = seqnum;
1951   timer->num = num;
1952   timer->timeout = timeout + delay;
1953   timer->duration = duration;
1954   if (type == TIMER_TYPE_EXPECTED) {
1955     timer->rtx_base = timeout;
1956     timer->rtx_delay = delay;
1957     timer->rtx_retry = 0;
1958   }
1959   timer->num_rtx_retry = 0;
1960   recalculate_timer (jitterbuffer, timer);
1961   JBUF_SIGNAL_TIMER (priv);
1962
1963   return timer;
1964 }
1965
1966 static void
1967 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1968     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1969 {
1970   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1971   gboolean seqchange, timechange;
1972   guint16 oldseq;
1973
1974   seqchange = timer->seqnum != seqnum;
1975   timechange = timer->timeout != timeout;
1976
1977   if (!seqchange && !timechange)
1978     return;
1979
1980   oldseq = timer->seqnum;
1981
1982   GST_DEBUG_OBJECT (jitterbuffer,
1983       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1984       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1985
1986   timer->timeout = timeout + delay;
1987   timer->seqnum = seqnum;
1988   if (reset) {
1989     timer->rtx_base = timeout;
1990     timer->rtx_delay = delay;
1991     timer->rtx_retry = 0;
1992   }
1993   if (seqchange)
1994     timer->num_rtx_retry = 0;
1995
1996   if (priv->clock_id) {
1997     /* we changed the seqnum and there is a timer currently waiting with this
1998      * seqnum, unschedule it */
1999     if (seqchange && priv->timer_seqnum == oldseq)
2000       unschedule_current_timer (jitterbuffer);
2001     /* we changed the time, check if it is earlier than what we are waiting
2002      * for and unschedule if so */
2003     else if (timechange)
2004       recalculate_timer (jitterbuffer, timer);
2005   }
2006 }
2007
2008 static TimerData *
2009 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
2010     guint16 seqnum, GstClockTime timeout)
2011 {
2012   TimerData *timer;
2013
2014   /* find the seqnum timer */
2015   timer = find_timer (jitterbuffer, type, seqnum);
2016   if (timer == NULL) {
2017     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
2018   } else {
2019     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
2020   }
2021   return timer;
2022 }
2023
2024 static void
2025 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
2026 {
2027   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2028   guint idx;
2029
2030   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
2031     unschedule_current_timer (jitterbuffer);
2032
2033   idx = timer->idx;
2034   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
2035   g_array_remove_index_fast (priv->timers, idx);
2036   timer->idx = idx;
2037 }
2038
2039 static void
2040 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
2041 {
2042   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2043   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
2044   g_array_set_size (priv->timers, 0);
2045   unschedule_current_timer (jitterbuffer);
2046 }
2047
2048 /* get the extra delay to wait before sending RTX */
2049 static GstClockTime
2050 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
2051 {
2052   GstClockTime delay;
2053
2054   if (priv->rtx_delay == -1) {
2055     if (priv->avg_jitter == 0 && priv->packet_spacing == 0) {
2056       delay = DEFAULT_AUTO_RTX_DELAY;
2057     } else {
2058       /* jitter is in nanoseconds, maximum of 2x jitter and half the
2059        * packet spacing is a good margin */
2060       delay = MAX (priv->avg_jitter * 2, priv->packet_spacing / 2);
2061     }
2062   } else {
2063     delay = priv->rtx_delay * GST_MSECOND;
2064   }
2065   if (priv->rtx_min_delay > 0)
2066     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
2067
2068   return delay;
2069 }
2070
2071 /* we just received a packet with seqnum and dts.
2072  *
2073  * First check for old seqnum that we are still expecting. If the gap with the
2074  * current seqnum is too big, unschedule the timeouts.
2075  *
2076  * If we have a valid packet spacing estimate we can set a timer for when we
2077  * should receive the next packet.
2078  * If we don't have a valid estimate, we remove any timer we might have
2079  * had for this packet.
2080  */
2081 static void
2082 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
2083     GstClockTime dts, gboolean do_next_seqnum)
2084 {
2085   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2086   TimerData *timer = NULL;
2087   gint i, len;
2088
2089   /* go through all timers and unschedule the ones with a large gap, also find
2090    * the timer for the seqnum */
2091   len = priv->timers->len;
2092   for (i = 0; i < len; i++) {
2093     TimerData *test = &g_array_index (priv->timers, TimerData, i);
2094     gint gap;
2095
2096     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
2097
2098     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
2099         test->type, test->seqnum, seqnum, gap);
2100
2101     if (gap == 0) {
2102       GST_DEBUG ("found timer for current seqnum");
2103       /* the timer for the current seqnum */
2104       timer = test;
2105       /* when no retransmission, we can stop now, we only need to find the
2106        * timer for the current seqnum */
2107       if (!priv->do_retransmission)
2108         break;
2109     } else if (gap > priv->rtx_delay_reorder) {
2110       /* max gap, we exceeded the max reorder distance and we don't expect the
2111        * missing packet to be this reordered */
2112       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
2113         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
2114     }
2115   }
2116
2117   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
2118       && priv->do_retransmission && priv->rtx_next_seqnum;
2119
2120   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2121     if (timer->num_rtx_retry > 0) {
2122       GstClockTime rtx_last, delay;
2123
2124       /* we scheduled a retry for this packet and now we have it */
2125       priv->num_rtx_success++;
2126       /* all the previous retry attempts failed */
2127       priv->num_rtx_failed += timer->num_rtx_retry - 1;
2128       /* number of retries before receiving the packet */
2129       if (priv->avg_rtx_num == 0.0)
2130         priv->avg_rtx_num = timer->num_rtx_retry;
2131       else
2132         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
2133       /* calculate the delay between retransmission request and receiving this
2134        * packet, start with when we scheduled this timeout last */
2135       rtx_last = timer->rtx_last;
2136       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
2137         /* we have a valid delay if this packet arrived after we scheduled the
2138          * request */
2139         delay = dts - rtx_last;
2140         if (priv->avg_rtx_rtt == 0)
2141           priv->avg_rtx_rtt = delay;
2142         else
2143           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
2144       } else
2145         delay = 0;
2146
2147       GST_LOG_OBJECT (jitterbuffer,
2148           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
2149           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
2150           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
2151           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
2152           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
2153           GST_TIME_ARGS (priv->avg_rtx_rtt));
2154
2155       /* don't try to estimate the next seqnum because this is a retransmitted
2156        * packet and it probably did not arrive with the expected packet
2157        * spacing. */
2158       do_next_seqnum = FALSE;
2159     }
2160   }
2161
2162   if (do_next_seqnum && dts != GST_CLOCK_TIME_NONE) {
2163     GstClockTime expected, delay;
2164
2165     /* calculate expected arrival time of the next seqnum */
2166     expected = dts + priv->packet_spacing;
2167
2168     delay = get_rtx_delay (priv);
2169
2170     /* and update/install timer for next seqnum */
2171     if (timer) {
2172       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
2173           delay, TRUE);
2174     } else {
2175       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
2176           expected, delay, priv->packet_spacing);
2177     }
2178   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
2179     /* if we had a timer, remove it, we don't know when to expect the next
2180      * packet. */
2181     remove_timer (jitterbuffer, timer);
2182   }
2183 }
2184
2185 static void
2186 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2187     GstClockTime dts)
2188 {
2189   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2190
2191   /* we need consecutive seqnums with a different
2192    * rtptime to estimate the packet spacing. */
2193   if (priv->ips_rtptime != rtptime) {
2194     /* rtptime changed, check dts diff */
2195     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2196       GstClockTime new_packet_spacing = dts - priv->ips_dts;
2197       GstClockTime old_packet_spacing = priv->packet_spacing;
2198
2199       /* Biased towards bigger packet spacings to prevent
2200        * too many unneeded retransmission requests for next
2201        * packets that just arrive a little later than we would
2202        * expect */
2203       if (old_packet_spacing > new_packet_spacing)
2204         priv->packet_spacing =
2205             (new_packet_spacing + 3 * old_packet_spacing) / 4;
2206       else if (old_packet_spacing > 0)
2207         priv->packet_spacing =
2208             (3 * new_packet_spacing + old_packet_spacing) / 4;
2209       else
2210         priv->packet_spacing = new_packet_spacing;
2211
2212       GST_DEBUG_OBJECT (jitterbuffer,
2213           "new packet spacing %" GST_TIME_FORMAT
2214           " old packet spacing %" GST_TIME_FORMAT
2215           " combined to %" GST_TIME_FORMAT,
2216           GST_TIME_ARGS (new_packet_spacing),
2217           GST_TIME_ARGS (old_packet_spacing),
2218           GST_TIME_ARGS (priv->packet_spacing));
2219     }
2220     priv->ips_rtptime = rtptime;
2221     priv->ips_dts = dts;
2222   }
2223 }
2224
2225 static void
2226 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2227     guint16 seqnum, GstClockTime dts, gint gap)
2228 {
2229   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2230   GstClockTime total_duration, duration, expected_dts;
2231   TimerType type;
2232
2233   GST_DEBUG_OBJECT (jitterbuffer,
2234       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2235       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2236
2237   if (dts == GST_CLOCK_TIME_NONE) {
2238     GST_WARNING_OBJECT (jitterbuffer, "Have no DTS");
2239     return;
2240   }
2241
2242   /* the total duration spanned by the missing packets */
2243   if (dts >= priv->last_in_dts)
2244     total_duration = dts - priv->last_in_dts;
2245   else
2246     total_duration = 0;
2247
2248   /* interpolate between the current time and the last time based on
2249    * number of packets we are missing, this is the estimated duration
2250    * for the missing packet based on equidistant packet spacing. */
2251   duration = total_duration / (gap + 1);
2252
2253   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2254       GST_TIME_ARGS (duration));
2255
2256   if (total_duration > priv->latency_ns) {
2257     GstClockTime gap_time;
2258     guint lost_packets;
2259
2260     if (duration > 0) {
2261       GstClockTime gap_dur = gap * duration;
2262       if (gap_dur > priv->latency_ns)
2263         gap_time = gap_dur - priv->latency_ns;
2264       else
2265         gap_time = 0;
2266       lost_packets = gap_time / duration;
2267     } else {
2268       gap_time = total_duration - priv->latency_ns;
2269       lost_packets = gap;
2270     }
2271
2272     /* too many lost packets, some of the missing packets are already
2273      * too late and we can generate lost packet events for them. */
2274     GST_DEBUG_OBJECT (jitterbuffer,
2275         "lost packets (%d, #%d->#%d) duration too large %" GST_TIME_FORMAT
2276         " > %" GST_TIME_FORMAT ", consider %u lost (%" GST_TIME_FORMAT ")",
2277         gap, expected, seqnum, GST_TIME_ARGS (total_duration),
2278         GST_TIME_ARGS (priv->latency_ns), lost_packets,
2279         GST_TIME_ARGS (gap_time));
2280
2281     /* this timer will fire immediately and the lost event will be pushed from
2282      * the timer thread */
2283     if (lost_packets > 0) {
2284       add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2285           priv->last_in_dts + duration, 0, gap_time);
2286       expected += lost_packets;
2287       priv->last_in_dts += gap_time;
2288     }
2289   }
2290
2291   expected_dts = priv->last_in_dts + duration;
2292
2293   if (priv->do_retransmission) {
2294     TimerData *timer;
2295
2296     type = TIMER_TYPE_EXPECTED;
2297     /* if we had a timer for the first missing packet, update it. */
2298     if ((timer = find_timer (jitterbuffer, type, expected))) {
2299       GstClockTime timeout = timer->timeout;
2300
2301       timer->duration = duration;
2302       if (timeout > (expected_dts + timer->rtx_retry)) {
2303         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2304         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2305             delay, TRUE);
2306       }
2307       expected++;
2308       expected_dts += duration;
2309     }
2310   } else {
2311     type = TIMER_TYPE_LOST;
2312   }
2313
2314   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2315     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2316     expected_dts += duration;
2317     expected++;
2318   }
2319 }
2320
2321 static void
2322 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2323     guint rtptime)
2324 {
2325   gint32 rtpdiff;
2326   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2327   GstRtpJitterBufferPrivate *priv;
2328
2329   priv = jitterbuffer->priv;
2330
2331   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2332     goto no_time;
2333
2334   if (priv->last_dts != -1)
2335     dtsdiff = dts - priv->last_dts;
2336   else
2337     dtsdiff = 0;
2338
2339   if (priv->last_rtptime != -1)
2340     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2341   else
2342     rtpdiff = 0;
2343
2344   priv->last_dts = dts;
2345   priv->last_rtptime = rtptime;
2346
2347   if (rtpdiff > 0)
2348     rtpdiffns =
2349         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2350   else
2351     rtpdiffns =
2352         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2353
2354   diff = ABS (dtsdiff - rtpdiffns);
2355
2356   /* jitter is stored in nanoseconds */
2357   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2358
2359   GST_LOG_OBJECT (jitterbuffer,
2360       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2361       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2362       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2363       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2364
2365   return;
2366
2367   /* ERRORS */
2368 no_time:
2369   {
2370     GST_DEBUG_OBJECT (jitterbuffer,
2371         "no dts or no clock-rate, can't calculate jitter");
2372     return;
2373   }
2374 }
2375
2376 static gint
2377 compare_buffer_seqnum (GstBuffer * a, GstBuffer * b, gpointer user_data)
2378 {
2379   GstRTPBuffer rtp_a = GST_RTP_BUFFER_INIT;
2380   GstRTPBuffer rtp_b = GST_RTP_BUFFER_INIT;
2381   guint seq_a, seq_b;
2382
2383   gst_rtp_buffer_map (a, GST_MAP_READ, &rtp_a);
2384   seq_a = gst_rtp_buffer_get_seq (&rtp_a);
2385   gst_rtp_buffer_unmap (&rtp_a);
2386
2387   gst_rtp_buffer_map (b, GST_MAP_READ, &rtp_b);
2388   seq_b = gst_rtp_buffer_get_seq (&rtp_b);
2389   gst_rtp_buffer_unmap (&rtp_b);
2390
2391   return gst_rtp_buffer_compare_seqnum (seq_b, seq_a);
2392 }
2393
2394 static gboolean
2395 handle_big_gap_buffer (GstRtpJitterBuffer * jitterbuffer, gboolean future,
2396     GstBuffer * buffer, guint8 pt, guint16 seqnum, gint gap, guint max_dropout,
2397     guint max_misorder)
2398 {
2399   GstRtpJitterBufferPrivate *priv;
2400   guint gap_packets_length;
2401   gboolean reset = FALSE;
2402
2403   priv = jitterbuffer->priv;
2404
2405   if ((gap_packets_length = g_queue_get_length (&priv->gap_packets)) > 0) {
2406     GList *l;
2407     guint32 prev_gap_seq = -1;
2408     gboolean all_consecutive = TRUE;
2409
2410     g_queue_insert_sorted (&priv->gap_packets, buffer,
2411         (GCompareDataFunc) compare_buffer_seqnum, NULL);
2412
2413     for (l = priv->gap_packets.head; l; l = l->next) {
2414       GstBuffer *gap_buffer = l->data;
2415       GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2416       guint32 gap_seq;
2417
2418       gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2419
2420       all_consecutive = (gst_rtp_buffer_get_payload_type (&gap_rtp) == pt);
2421
2422       gap_seq = gst_rtp_buffer_get_seq (&gap_rtp);
2423       if (prev_gap_seq == -1)
2424         prev_gap_seq = gap_seq;
2425       else if (gst_rtp_buffer_compare_seqnum (gap_seq, prev_gap_seq) != -1)
2426         all_consecutive = FALSE;
2427       else
2428         prev_gap_seq = gap_seq;
2429
2430       gst_rtp_buffer_unmap (&gap_rtp);
2431       if (!all_consecutive)
2432         break;
2433     }
2434
2435     if (all_consecutive && gap_packets_length > 3) {
2436       GST_DEBUG_OBJECT (jitterbuffer,
2437           "buffer too %s %d < %d, got 5 consecutive ones - reset",
2438           (future ? "new" : "old"), gap,
2439           (future ? max_dropout : -max_misorder));
2440       reset = TRUE;
2441     } else if (!all_consecutive) {
2442       g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2443       g_queue_clear (&priv->gap_packets);
2444       GST_DEBUG_OBJECT (jitterbuffer,
2445           "buffer too %s %d < %d, got no 5 consecutive ones - dropping",
2446           (future ? "new" : "old"), gap,
2447           (future ? max_dropout : -max_misorder));
2448       buffer = NULL;
2449     } else {
2450       GST_DEBUG_OBJECT (jitterbuffer,
2451           "buffer too %s %d < %d, got %u consecutive ones - waiting",
2452           (future ? "new" : "old"), gap,
2453           (future ? max_dropout : -max_misorder), gap_packets_length + 1);
2454       buffer = NULL;
2455     }
2456   } else {
2457     GST_DEBUG_OBJECT (jitterbuffer,
2458         "buffer too %s %d < %d, first one - waiting", (future ? "new" : "old"),
2459         gap, -max_misorder);
2460     g_queue_push_tail (&priv->gap_packets, buffer);
2461     buffer = NULL;
2462   }
2463
2464   return reset;
2465 }
2466
2467 static GstClockTime
2468 get_current_running_time (GstRtpJitterBuffer * jitterbuffer)
2469 {
2470   GstClock *clock = gst_element_get_clock (GST_ELEMENT_CAST (jitterbuffer));
2471   GstClockTime running_time = GST_CLOCK_TIME_NONE;
2472
2473   if (clock) {
2474     GstClockTime base_time =
2475         gst_element_get_base_time (GST_ELEMENT_CAST (jitterbuffer));
2476     GstClockTime clock_time = gst_clock_get_time (clock);
2477
2478     if (clock_time > base_time)
2479       running_time = clock_time - base_time;
2480     else
2481       running_time = 0;
2482
2483     gst_object_unref (clock);
2484   }
2485
2486   return running_time;
2487 }
2488
2489 static GstFlowReturn
2490 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2491     GstBuffer * buffer)
2492 {
2493   GstRtpJitterBuffer *jitterbuffer;
2494   GstRtpJitterBufferPrivate *priv;
2495   guint16 seqnum;
2496   guint32 expected, rtptime;
2497   GstFlowReturn ret = GST_FLOW_OK;
2498   GstClockTime dts, pts;
2499   guint64 latency_ts;
2500   gboolean head;
2501   gint percent = -1;
2502   guint8 pt;
2503   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2504   gboolean do_next_seqnum = FALSE;
2505   RTPJitterBufferItem *item;
2506   GstMessage *msg = NULL;
2507   gboolean estimated_dts = FALSE;
2508   guint32 packet_rate, max_dropout, max_misorder;
2509
2510   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2511
2512   priv = jitterbuffer->priv;
2513
2514   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2515     goto invalid_buffer;
2516
2517   pt = gst_rtp_buffer_get_payload_type (&rtp);
2518   seqnum = gst_rtp_buffer_get_seq (&rtp);
2519   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2520   gst_rtp_buffer_unmap (&rtp);
2521
2522   /* make sure we have PTS and DTS set */
2523   pts = GST_BUFFER_PTS (buffer);
2524   dts = GST_BUFFER_DTS (buffer);
2525   if (dts == -1)
2526     dts = pts;
2527   else if (pts == -1)
2528     pts = dts;
2529
2530   if (dts == -1) {
2531     /* If we have no DTS here, i.e. no capture time, get one from the
2532      * clock now to have something to calculate with in the future. */
2533     dts = get_current_running_time (jitterbuffer);
2534     pts = dts;
2535
2536     /* Remember that we estimated the DTS if we are running already
2537      * and this is not our first packet (or first packet after a reset).
2538      * If it's the first packet, we somehow must generate a timestamp for
2539      * everything, otherwise we can't calculate any times
2540      */
2541     estimated_dts = (priv->next_in_seqnum != -1);
2542   } else {
2543     /* take the DTS of the buffer. This is the time when the packet was
2544      * received and is used to calculate jitter and clock skew. We will adjust
2545      * this DTS with the smoothed value after processing it in the
2546      * jitterbuffer and assign it as the PTS. */
2547     /* bring to running time */
2548     dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2549   }
2550
2551   GST_DEBUG_OBJECT (jitterbuffer,
2552       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2553       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2554
2555   JBUF_LOCK_CHECK (priv, out_flushing);
2556
2557   if (G_UNLIKELY (priv->last_pt != pt)) {
2558     GstCaps *caps;
2559
2560     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2561         pt);
2562
2563     priv->last_pt = pt;
2564     /* reset clock-rate so that we get a new one */
2565     priv->clock_rate = -1;
2566
2567     /* Try to get the clock-rate from the caps first if we can. If there are no
2568      * caps we must fire the signal to get the clock-rate. */
2569     if ((caps = gst_pad_get_current_caps (pad))) {
2570       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2571       gst_caps_unref (caps);
2572     }
2573   }
2574
2575   if (G_UNLIKELY (priv->clock_rate == -1)) {
2576     /* no clock rate given on the caps, try to get one with the signal */
2577     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2578             pt) == GST_FLOW_FLUSHING)
2579       goto out_flushing;
2580
2581     if (G_UNLIKELY (priv->clock_rate == -1))
2582       goto no_clock_rate;
2583   }
2584
2585   /* don't accept more data on EOS */
2586   if (G_UNLIKELY (priv->eos))
2587     goto have_eos;
2588
2589   calculate_jitter (jitterbuffer, dts, rtptime);
2590
2591   if (priv->seqnum_base != -1) {
2592     gint gap;
2593
2594     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2595
2596     if (gap < 0) {
2597       GST_DEBUG_OBJECT (jitterbuffer,
2598           "packet seqnum #%d before seqnum-base #%d", seqnum,
2599           priv->seqnum_base);
2600       gst_buffer_unref (buffer);
2601       ret = GST_FLOW_OK;
2602       goto finished;
2603     } else if (gap > 16384) {
2604       /* From now on don't compare against the seqnum base anymore as
2605        * at some point in the future we will wrap around and also that
2606        * much reordering is very unlikely */
2607       priv->seqnum_base = -1;
2608     }
2609   }
2610
2611   expected = priv->next_in_seqnum;
2612
2613   packet_rate =
2614       gst_rtp_packet_rate_ctx_update (&priv->packet_rate_ctx, seqnum, rtptime);
2615   max_dropout =
2616       gst_rtp_packet_rate_ctx_get_max_dropout (&priv->packet_rate_ctx,
2617       priv->max_dropout_time);
2618   max_misorder =
2619       gst_rtp_packet_rate_ctx_get_max_misorder (&priv->packet_rate_ctx,
2620       priv->max_misorder_time);
2621   GST_TRACE_OBJECT (jitterbuffer,
2622       "packet_rate: %d, max_dropout: %d, max_misorder: %d", packet_rate,
2623       max_dropout, max_misorder);
2624
2625   /* now check against our expected seqnum */
2626   if (G_LIKELY (expected != -1)) {
2627     gint gap;
2628
2629     /* now calculate gap */
2630     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2631
2632     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2633         expected, seqnum, gap);
2634
2635     if (G_LIKELY (gap == 0)) {
2636       /* packet is expected */
2637       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2638       do_next_seqnum = TRUE;
2639     } else {
2640       gboolean reset = FALSE;
2641
2642       if (gap < 0) {
2643         /* we received an old packet */
2644         if (G_UNLIKELY (gap != -1 && gap < -max_misorder)) {
2645           reset =
2646               handle_big_gap_buffer (jitterbuffer, FALSE, buffer, pt, seqnum,
2647               gap, max_dropout, max_misorder);
2648           buffer = NULL;
2649         } else {
2650           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2651         }
2652       } else {
2653         /* new packet, we are missing some packets */
2654         if (G_UNLIKELY (priv->timers->len >= max_dropout)) {
2655           /* If we have timers for more than RTP_MAX_DROPOUT packets
2656            * pending this means that we have a huge gap overall. We can
2657            * reset the jitterbuffer at this point because there's
2658            * just too much data missing to be able to do anything
2659            * sensible with the past data. Just try again from the
2660            * next packet */
2661           GST_WARNING_OBJECT (jitterbuffer,
2662               "%d pending timers > %d - resetting", priv->timers->len,
2663               max_dropout);
2664           reset = TRUE;
2665           gst_buffer_unref (buffer);
2666           buffer = NULL;
2667         } else if (G_UNLIKELY (gap >= max_dropout)) {
2668           reset =
2669               handle_big_gap_buffer (jitterbuffer, TRUE, buffer, pt, seqnum,
2670               gap, max_dropout, max_misorder);
2671           buffer = NULL;
2672         } else {
2673           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2674           /* fill in the gap with EXPECTED timers */
2675           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2676
2677           do_next_seqnum = TRUE;
2678         }
2679       }
2680       if (G_UNLIKELY (reset)) {
2681         GList *events = NULL, *l;
2682         GList *buffers;
2683
2684         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2685         rtp_jitter_buffer_flush (priv->jbuf,
2686             (GFunc) free_item_and_retain_events, &events);
2687         rtp_jitter_buffer_reset_skew (priv->jbuf);
2688         remove_all_timers (jitterbuffer);
2689         priv->discont = TRUE;
2690         priv->last_popped_seqnum = -1;
2691
2692         if (priv->gap_packets.head) {
2693           GstBuffer *gap_buffer = priv->gap_packets.head->data;
2694           GstRTPBuffer gap_rtp = GST_RTP_BUFFER_INIT;
2695
2696           gst_rtp_buffer_map (gap_buffer, GST_MAP_READ, &gap_rtp);
2697           priv->next_seqnum = gst_rtp_buffer_get_seq (&gap_rtp);
2698           gst_rtp_buffer_unmap (&gap_rtp);
2699         } else {
2700           priv->next_seqnum = seqnum;
2701         }
2702
2703         priv->last_in_dts = -1;
2704         priv->next_in_seqnum = -1;
2705
2706         /* Insert all sticky events again in order, otherwise we would
2707          * potentially loose STREAM_START, CAPS or SEGMENT events
2708          */
2709         events = g_list_reverse (events);
2710         for (l = events; l; l = l->next) {
2711           RTPJitterBufferItem *item;
2712
2713           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2714           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2715         }
2716         g_list_free (events);
2717
2718         JBUF_SIGNAL_EVENT (priv);
2719
2720         /* reset spacing estimation when gap */
2721         priv->ips_rtptime = -1;
2722         priv->ips_dts = GST_CLOCK_TIME_NONE;
2723
2724         buffers = g_list_copy (priv->gap_packets.head);
2725         g_queue_clear (&priv->gap_packets);
2726
2727         priv->ips_rtptime = -1;
2728         priv->ips_dts = GST_CLOCK_TIME_NONE;
2729         JBUF_UNLOCK (jitterbuffer->priv);
2730
2731         for (l = buffers; l; l = l->next) {
2732           ret = gst_rtp_jitter_buffer_chain (pad, parent, l->data);
2733           l->data = NULL;
2734           if (ret != GST_FLOW_OK)
2735             break;
2736         }
2737         for (; l; l = l->next)
2738           gst_buffer_unref (l->data);
2739         g_list_free (buffers);
2740
2741         return ret;
2742       }
2743       /* reset spacing estimation when gap */
2744       priv->ips_rtptime = -1;
2745       priv->ips_dts = GST_CLOCK_TIME_NONE;
2746     }
2747   } else {
2748     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2749
2750     /* we don't know what the next_in_seqnum should be, wait for the last
2751      * possible moment to push this buffer, maybe we get an earlier seqnum
2752      * while we wait */
2753     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2754     do_next_seqnum = TRUE;
2755     /* take rtptime and dts to calculate packet spacing */
2756     priv->ips_rtptime = rtptime;
2757     priv->ips_dts = dts;
2758   }
2759
2760   /* We had no huge gap, let's drop all the gap packets */
2761   if (buffer != NULL) {
2762     GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets");
2763     g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
2764     g_queue_clear (&priv->gap_packets);
2765   } else {
2766     GST_DEBUG_OBJECT (jitterbuffer,
2767         "Had big gap, waiting for more consecutive packets");
2768     JBUF_UNLOCK (jitterbuffer->priv);
2769     return GST_FLOW_OK;
2770   }
2771
2772   if (do_next_seqnum) {
2773     priv->last_in_dts = dts;
2774     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2775   }
2776
2777   /* let's check if this buffer is too late, we can only accept packets with
2778    * bigger seqnum than the one we last pushed. */
2779   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2780     gint gap;
2781
2782     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2783
2784     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2785     if (G_UNLIKELY (gap <= 0))
2786       goto too_late;
2787   }
2788
2789   /* let's drop oldest packet if the queue is already full and drop-on-latency
2790    * is set. We can only do this when there actually is a latency. When no
2791    * latency is set, we just pump it in the queue and let the other end push it
2792    * out as fast as possible. */
2793   if (priv->latency_ms && priv->drop_on_latency) {
2794     latency_ts =
2795         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2796
2797     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2798       RTPJitterBufferItem *old_item;
2799
2800       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2801
2802       if (IS_DROPABLE (old_item)) {
2803         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2804         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2805             old_item);
2806         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2807         free_item (old_item);
2808       }
2809       /* we might have removed some head buffers, signal the pushing thread to
2810        * see if it can push now */
2811       JBUF_SIGNAL_EVENT (priv);
2812     }
2813   }
2814
2815   /* If we estimated the DTS, don't consider it in the clock skew calculations
2816    * later. The code above always sets dts to pts or the other way around if
2817    * any of those is valid in the buffer, so we know that if we estimated the
2818    * dts that both are unknown */
2819   if (estimated_dts)
2820     item =
2821         alloc_item (buffer, ITEM_TYPE_BUFFER, GST_CLOCK_TIME_NONE,
2822         GST_CLOCK_TIME_NONE, seqnum, 1, rtptime);
2823   else
2824     item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2825
2826   /* now insert the packet into the queue in sorted order. This function returns
2827    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2828    * have a duplicate. */
2829   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2830               &head, &percent)))
2831     goto duplicate;
2832
2833   /* update timers */
2834   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2835
2836   /* we had an unhandled SR, handle it now */
2837   if (priv->last_sr)
2838     do_handle_sync (jitterbuffer);
2839
2840   if (G_UNLIKELY (head)) {
2841     /* signal addition of new buffer when the _loop is waiting. */
2842     if (G_LIKELY (priv->active))
2843       JBUF_SIGNAL_EVENT (priv);
2844
2845     /* let's unschedule and unblock any waiting buffers. We only want to do this
2846      * when the head buffer changed */
2847     if (G_UNLIKELY (priv->clock_id)) {
2848       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2849       unschedule_current_timer (jitterbuffer);
2850     }
2851   }
2852
2853   GST_DEBUG_OBJECT (jitterbuffer,
2854       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2855       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2856
2857   msg = check_buffering_percent (jitterbuffer, percent);
2858
2859 finished:
2860   JBUF_UNLOCK (priv);
2861
2862   if (msg)
2863     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2864
2865   return ret;
2866
2867   /* ERRORS */
2868 invalid_buffer:
2869   {
2870     /* this is not fatal but should be filtered earlier */
2871     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2872         ("Received invalid RTP payload, dropping"));
2873     gst_buffer_unref (buffer);
2874     return GST_FLOW_OK;
2875   }
2876 no_clock_rate:
2877   {
2878     GST_WARNING_OBJECT (jitterbuffer,
2879         "No clock-rate in caps!, dropping buffer");
2880     gst_buffer_unref (buffer);
2881     goto finished;
2882   }
2883 out_flushing:
2884   {
2885     ret = priv->srcresult;
2886     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2887     gst_buffer_unref (buffer);
2888     goto finished;
2889   }
2890 have_eos:
2891   {
2892     ret = GST_FLOW_EOS;
2893     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2894     gst_buffer_unref (buffer);
2895     goto finished;
2896   }
2897 too_late:
2898   {
2899     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2900         " popped, dropping", seqnum, priv->last_popped_seqnum);
2901     priv->num_late++;
2902     gst_buffer_unref (buffer);
2903     goto finished;
2904   }
2905 duplicate:
2906   {
2907     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2908         seqnum);
2909     priv->num_duplicates++;
2910     free_item (item);
2911     goto finished;
2912   }
2913 }
2914
2915 static GstClockTime
2916 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2917 {
2918   guint64 ext_time, elapsed;
2919   guint32 rtp_time;
2920   GstRtpJitterBufferPrivate *priv;
2921
2922   priv = jitterbuffer->priv;
2923   rtp_time = item->rtptime;
2924
2925   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2926       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2927
2928   ext_time = priv->ext_timestamp;
2929   ext_time = gst_rtp_buffer_ext_timestamp (&ext_time, rtp_time);
2930   if (ext_time < priv->ext_timestamp) {
2931     ext_time = priv->ext_timestamp;
2932   } else {
2933     priv->ext_timestamp = ext_time;
2934   }
2935
2936   if (ext_time > priv->clock_base)
2937     elapsed = ext_time - priv->clock_base;
2938   else
2939     elapsed = 0;
2940
2941   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2942   return elapsed;
2943 }
2944
2945 static void
2946 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2947     RTPJitterBufferItem * item)
2948 {
2949   guint64 total, elapsed, left, estimated;
2950   GstClockTime out_time;
2951   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2952
2953   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2954       || priv->clock_base == -1 || priv->clock_rate <= 0)
2955     return;
2956
2957   /* compute the elapsed time */
2958   elapsed = compute_elapsed (jitterbuffer, item);
2959
2960   /* do nothing if elapsed time doesn't increment */
2961   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2962     return;
2963
2964   priv->last_elapsed = elapsed;
2965
2966   /* this is the total time we need to play */
2967   total = priv->npt_stop - priv->npt_start;
2968   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2969       GST_TIME_ARGS (total));
2970
2971   /* this is how much time there is left */
2972   if (total > elapsed)
2973     left = total - elapsed;
2974   else
2975     left = 0;
2976
2977   /* if we have less time left that the size of the buffer, we will not
2978    * be able to keep it filled, disabled buffering then */
2979   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2980     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2981         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2982     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2983   }
2984
2985   /* this is the current time as running-time */
2986   out_time = item->dts;
2987
2988   if (elapsed > 0)
2989     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2990   else {
2991     /* if there is almost nothing left,
2992      * we may never advance enough to end up in the above case */
2993     if (total < GST_SECOND)
2994       estimated = GST_SECOND;
2995     else
2996       estimated = -1;
2997   }
2998   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2999       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
3000
3001   if (estimated != -1 && priv->estimated_eos != estimated) {
3002     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
3003     priv->estimated_eos = estimated;
3004   }
3005 }
3006
3007 /* take a buffer from the queue and push it */
3008 static GstFlowReturn
3009 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
3010 {
3011   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3012   GstFlowReturn result = GST_FLOW_OK;
3013   RTPJitterBufferItem *item;
3014   GstBuffer *outbuf = NULL;
3015   GstEvent *outevent = NULL;
3016   GstQuery *outquery = NULL;
3017   GstClockTime dts, pts;
3018   gint percent = -1;
3019   gboolean do_push = TRUE;
3020   guint type;
3021   GstMessage *msg;
3022
3023   /* when we get here we are ready to pop and push the buffer */
3024   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
3025   type = item->type;
3026
3027   switch (type) {
3028     case ITEM_TYPE_BUFFER:
3029
3030       /* we need to make writable to change the flags and timestamps */
3031       outbuf = gst_buffer_make_writable (item->data);
3032
3033       if (G_UNLIKELY (priv->discont)) {
3034         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
3035          * into the jitterbuffer so we can modify now. */
3036         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
3037         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
3038         priv->discont = FALSE;
3039       }
3040       if (G_UNLIKELY (priv->ts_discont)) {
3041         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
3042         priv->ts_discont = FALSE;
3043       }
3044
3045       dts =
3046           gst_segment_position_from_running_time (&priv->segment,
3047           GST_FORMAT_TIME, item->dts);
3048       pts =
3049           gst_segment_position_from_running_time (&priv->segment,
3050           GST_FORMAT_TIME, item->pts);
3051
3052       /* apply timestamp with offset to buffer now */
3053       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
3054       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
3055
3056       /* update the elapsed time when we need to check against the npt stop time. */
3057       update_estimated_eos (jitterbuffer, item);
3058
3059       priv->last_out_time = GST_BUFFER_PTS (outbuf);
3060       break;
3061     case ITEM_TYPE_LOST:
3062       priv->discont = TRUE;
3063       if (!priv->do_lost)
3064         do_push = FALSE;
3065       /* FALLTHROUGH */
3066     case ITEM_TYPE_EVENT:
3067       outevent = item->data;
3068       break;
3069     case ITEM_TYPE_QUERY:
3070       outquery = item->data;
3071       break;
3072   }
3073
3074   /* now we are ready to push the buffer. Save the seqnum and release the lock
3075    * so the other end can push stuff in the queue again. */
3076   if (seqnum != -1) {
3077     priv->last_popped_seqnum = seqnum;
3078     priv->next_seqnum = (seqnum + item->count) & 0xffff;
3079   }
3080   msg = check_buffering_percent (jitterbuffer, percent);
3081   JBUF_UNLOCK (priv);
3082
3083   item->data = NULL;
3084   free_item (item);
3085
3086   if (msg)
3087     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
3088
3089   switch (type) {
3090     case ITEM_TYPE_BUFFER:
3091       /* push buffer */
3092       GST_DEBUG_OBJECT (jitterbuffer,
3093           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
3094           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
3095           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
3096       result = gst_pad_push (priv->srcpad, outbuf);
3097
3098       JBUF_LOCK_CHECK (priv, out_flushing);
3099       break;
3100     case ITEM_TYPE_LOST:
3101     case ITEM_TYPE_EVENT:
3102       /* We got not enough consecutive packets with a huge gap, we can
3103        * as well just drop them here now on EOS */
3104       if (GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
3105         GST_DEBUG_OBJECT (jitterbuffer, "Clearing gap packets on EOS");
3106         g_queue_foreach (&priv->gap_packets, (GFunc) gst_buffer_unref, NULL);
3107         g_queue_clear (&priv->gap_packets);
3108       }
3109
3110       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
3111           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
3112
3113       if (do_push)
3114         gst_pad_push_event (priv->srcpad, outevent);
3115       else
3116         gst_event_unref (outevent);
3117
3118       result = GST_FLOW_OK;
3119
3120       JBUF_LOCK_CHECK (priv, out_flushing);
3121       break;
3122     case ITEM_TYPE_QUERY:
3123     {
3124       gboolean res;
3125
3126       res = gst_pad_peer_query (priv->srcpad, outquery);
3127
3128       JBUF_LOCK_CHECK (priv, out_flushing);
3129       result = GST_FLOW_OK;
3130       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
3131       JBUF_SIGNAL_QUERY (priv, res);
3132       break;
3133     }
3134   }
3135   return result;
3136
3137   /* ERRORS */
3138 out_flushing:
3139   {
3140     return priv->srcresult;
3141   }
3142 }
3143
3144 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
3145
3146 /* Peek a buffer and compare the seqnum to the expected seqnum.
3147  * If all is fine, the buffer is pushed.
3148  * If something is wrong, we wait for some event
3149  */
3150 static GstFlowReturn
3151 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
3152 {
3153   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3154   GstFlowReturn result;
3155   RTPJitterBufferItem *item;
3156   guint seqnum;
3157   guint32 next_seqnum;
3158
3159   /* only push buffers when PLAYING and active and not buffering */
3160   if (priv->blocked || !priv->active ||
3161       rtp_jitter_buffer_is_buffering (priv->jbuf)) {
3162     return GST_FLOW_WAIT;
3163   }
3164
3165   /* peek a buffer, we're just looking at the sequence number.
3166    * If all is fine, we'll pop and push it. If the sequence number is wrong we
3167    * wait for a timeout or something to change.
3168    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
3169   item = rtp_jitter_buffer_peek (priv->jbuf);
3170   if (item == NULL) {
3171     goto wait;
3172   }
3173
3174   /* get the seqnum and the next expected seqnum */
3175   seqnum = item->seqnum;
3176   if (seqnum == -1) {
3177     return pop_and_push_next (jitterbuffer, seqnum);
3178   }
3179
3180   next_seqnum = priv->next_seqnum;
3181
3182   /* get the gap between this and the previous packet. If we don't know the
3183    * previous packet seqnum assume no gap. */
3184   if (G_UNLIKELY (next_seqnum == -1)) {
3185     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
3186     /* we don't know what the next_seqnum should be, the chain function should
3187      * have scheduled a DEADLINE timer that will increment next_seqnum when it
3188      * fires, so wait for that */
3189     result = GST_FLOW_WAIT;
3190   } else {
3191     gint gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
3192
3193     if (G_LIKELY (gap == 0)) {
3194       /* no missing packet, pop and push */
3195       result = pop_and_push_next (jitterbuffer, seqnum);
3196     } else if (G_UNLIKELY (gap < 0)) {
3197       /* if we have a packet that we already pushed or considered dropped, pop it
3198        * off and get the next packet */
3199       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
3200           seqnum, next_seqnum);
3201       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
3202       free_item (item);
3203       result = GST_FLOW_OK;
3204     } else {
3205       /* the chain function has scheduled timers to request retransmission or
3206        * when to consider the packet lost, wait for that */
3207       GST_DEBUG_OBJECT (jitterbuffer,
3208           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
3209           next_seqnum, seqnum, gap);
3210       result = GST_FLOW_WAIT;
3211     }
3212   }
3213
3214   return result;
3215
3216 wait:
3217   {
3218     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
3219     if (priv->eos) {
3220       return GST_FLOW_EOS;
3221     } else {
3222       return GST_FLOW_WAIT;
3223     }
3224   }
3225 }
3226
3227 static GstClockTime
3228 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
3229 {
3230   GstClockTime rtx_retry_timeout;
3231   GstClockTime rtx_min_retry_timeout;
3232
3233   if (priv->rtx_retry_timeout == -1) {
3234     if (priv->avg_rtx_rtt == 0)
3235       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
3236     else
3237       /* we want to ask for a retransmission after we waited for a
3238        * complete RTT and the additional jitter */
3239       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
3240   } else {
3241     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
3242   }
3243   /* make sure we don't retry too often. On very low latency networks,
3244    * the RTT and jitter can be very low. */
3245   if (priv->rtx_min_retry_timeout == -1) {
3246     rtx_min_retry_timeout = priv->packet_spacing;
3247   } else {
3248     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
3249   }
3250   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
3251
3252   return rtx_retry_timeout;
3253 }
3254
3255 static GstClockTime
3256 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
3257     GstClockTime rtx_retry_timeout)
3258 {
3259   GstClockTime rtx_retry_period;
3260
3261   if (priv->rtx_retry_period == -1) {
3262     /* we retry up to the configured jitterbuffer size but leaving some
3263      * room for the retransmission to arrive in time */
3264     if (rtx_retry_timeout > priv->latency_ns) {
3265       rtx_retry_period = 0;
3266     } else {
3267       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
3268     }
3269   } else {
3270     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
3271   }
3272   return rtx_retry_period;
3273 }
3274
3275 /* the timeout for when we expected a packet expired */
3276 static gboolean
3277 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3278     GstClockTime now)
3279 {
3280   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3281   GstEvent *event;
3282   guint delay, delay_ms, avg_rtx_rtt_ms;
3283   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
3284   GstClockTime rtx_retry_period;
3285   GstClockTime rtx_retry_timeout;
3286   GstClock *clock;
3287
3288   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
3289       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
3290
3291   rtx_retry_timeout = get_rtx_retry_timeout (priv);
3292   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
3293
3294   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
3295       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
3296       GST_TIME_ARGS (rtx_retry_period));
3297
3298   delay = timer->rtx_delay + timer->rtx_retry;
3299
3300   delay_ms = GST_TIME_AS_MSECONDS (delay);
3301   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
3302   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
3303   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
3304
3305   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
3306       gst_structure_new ("GstRTPRetransmissionRequest",
3307           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
3308           "running-time", G_TYPE_UINT64, timer->rtx_base,
3309           "delay", G_TYPE_UINT, delay_ms,
3310           "retry", G_TYPE_UINT, timer->num_rtx_retry,
3311           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
3312           "period", G_TYPE_UINT, rtx_retry_period_ms,
3313           "deadline", G_TYPE_UINT, priv->latency_ms,
3314           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
3315           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
3316
3317   priv->num_rtx_requests++;
3318   timer->num_rtx_retry++;
3319
3320   GST_OBJECT_LOCK (jitterbuffer);
3321   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
3322     timer->rtx_last = gst_clock_get_time (clock);
3323     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
3324   } else {
3325     timer->rtx_last = now;
3326   }
3327   GST_OBJECT_UNLOCK (jitterbuffer);
3328
3329   /* calculate the timeout for the next retransmission attempt */
3330   timer->rtx_retry += rtx_retry_timeout;
3331   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
3332       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
3333       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
3334       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
3335   if ((priv->rtx_max_retries != -1
3336           && timer->num_rtx_retry >= priv->rtx_max_retries)
3337       || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
3338     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
3339     /* too many retransmission request, we now convert the timer
3340      * to a lost timer, leave the num_rtx_retry as it is for stats */
3341     timer->type = TIMER_TYPE_LOST;
3342     timer->rtx_delay = 0;
3343     timer->rtx_retry = 0;
3344   }
3345   reschedule_timer (jitterbuffer, timer, timer->seqnum,
3346       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
3347
3348   JBUF_UNLOCK (priv);
3349   gst_pad_push_event (priv->sinkpad, event);
3350   JBUF_LOCK (priv);
3351
3352   return FALSE;
3353 }
3354
3355 /* a packet is lost */
3356 static gboolean
3357 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3358     GstClockTime now)
3359 {
3360   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3361   GstClockTime duration, timestamp;
3362   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
3363   gboolean head;
3364   GstEvent *event;
3365   RTPJitterBufferItem *item;
3366
3367   seqnum = timer->seqnum;
3368   timestamp = apply_offset (jitterbuffer, timer->timeout);
3369   duration = timer->duration;
3370   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
3371     duration = priv->packet_spacing;
3372   lost_packets = MAX (timer->num, 1);
3373   num_rtx_retry = timer->num_rtx_retry;
3374
3375   /* we had a gap and thus we lost some packets. Create an event for this.  */
3376   if (lost_packets > 1)
3377     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
3378         seqnum + lost_packets - 1);
3379   else
3380     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
3381
3382   priv->num_late += lost_packets;
3383   priv->num_rtx_failed += num_rtx_retry;
3384
3385   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
3386
3387   /* we now only accept seqnum bigger than this */
3388   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
3389     priv->next_in_seqnum = next_in_seqnum;
3390
3391   /* create paket lost event */
3392   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
3393       gst_structure_new ("GstRTPPacketLost",
3394           "seqnum", G_TYPE_UINT, (guint) seqnum,
3395           "timestamp", G_TYPE_UINT64, timestamp,
3396           "duration", G_TYPE_UINT64, duration,
3397           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
3398
3399   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
3400   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3401
3402   /* remove timer now */
3403   remove_timer (jitterbuffer, timer);
3404   if (head)
3405     JBUF_SIGNAL_EVENT (priv);
3406
3407   return TRUE;
3408 }
3409
3410 static gboolean
3411 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3412     GstClockTime now)
3413 {
3414   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3415
3416   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
3417   remove_timer (jitterbuffer, timer);
3418   if (!priv->eos) {
3419     /* there was no EOS in the buffer, put one in there now */
3420     queue_event (jitterbuffer, gst_event_new_eos ());
3421   }
3422   JBUF_SIGNAL_EVENT (priv);
3423
3424   return TRUE;
3425 }
3426
3427 static gboolean
3428 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3429     GstClockTime now)
3430 {
3431   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3432
3433   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3434
3435   /* timer seqnum might have been obsoleted by caps seqnum-base,
3436    * only mess with current ongoing seqnum if still unknown */
3437   if (priv->next_seqnum == -1)
3438     priv->next_seqnum = timer->seqnum;
3439   remove_timer (jitterbuffer, timer);
3440   JBUF_SIGNAL_EVENT (priv);
3441
3442   return TRUE;
3443 }
3444
3445 static gboolean
3446 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3447     GstClockTime now)
3448 {
3449   gboolean removed = FALSE;
3450
3451   switch (timer->type) {
3452     case TIMER_TYPE_EXPECTED:
3453       removed = do_expected_timeout (jitterbuffer, timer, now);
3454       break;
3455     case TIMER_TYPE_LOST:
3456       removed = do_lost_timeout (jitterbuffer, timer, now);
3457       break;
3458     case TIMER_TYPE_DEADLINE:
3459       removed = do_deadline_timeout (jitterbuffer, timer, now);
3460       break;
3461     case TIMER_TYPE_EOS:
3462       removed = do_eos_timeout (jitterbuffer, timer, now);
3463       break;
3464   }
3465   return removed;
3466 }
3467
3468 /* called when we need to wait for the next timeout.
3469  *
3470  * We loop over the array of recorded timeouts and wait for the earliest one.
3471  * When it timed out, do the logic associated with the timer.
3472  *
3473  * If there are no timers, we wait on a gcond until something new happens.
3474  */
3475 static void
3476 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3477 {
3478   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3479   GstClockTime now = 0;
3480
3481   JBUF_LOCK (priv);
3482   while (priv->timer_running) {
3483     TimerData *timer = NULL;
3484     GstClockTime timer_timeout = -1;
3485     gint i, len;
3486
3487     /* If we have a clock, update "now" now with the very
3488      * latest running time we have. If timers are unscheduled below we
3489      * otherwise wouldn't update now (it's only updated when timers
3490      * expire), and also for the very first loop iteration now would
3491      * otherwise always be 0
3492      */
3493     GST_OBJECT_LOCK (jitterbuffer);
3494     if (GST_ELEMENT_CLOCK (jitterbuffer)) {
3495       now =
3496           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
3497           GST_ELEMENT_CAST (jitterbuffer)->base_time;
3498     }
3499     GST_OBJECT_UNLOCK (jitterbuffer);
3500
3501     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3502         GST_TIME_ARGS (now));
3503
3504     len = priv->timers->len;
3505     for (i = 0; i < len; i++) {
3506       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3507       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3508       gboolean save_best = FALSE;
3509
3510       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3511           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3512
3513       /* find the smallest timeout */
3514       if (timer == NULL) {
3515         save_best = TRUE;
3516       } else if (timer_timeout == -1) {
3517         /* we already have an immediate timeout, the new timer must be an
3518          * immediate timer with smaller seqnum to become the best */
3519         if (test_timeout == -1
3520             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3521                     timer->seqnum) > 0))
3522           save_best = TRUE;
3523       } else if (test_timeout == -1) {
3524         /* first immediate timer */
3525         save_best = TRUE;
3526       } else if (test_timeout < timer_timeout) {
3527         /* earlier timer */
3528         save_best = TRUE;
3529       } else if (test_timeout == timer_timeout
3530           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3531                   timer->seqnum) > 0)) {
3532         /* same timer, smaller seqnum */
3533         save_best = TRUE;
3534       }
3535       if (save_best) {
3536         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3537         timer = test;
3538         timer_timeout = test_timeout;
3539       }
3540     }
3541     if (timer && !priv->blocked) {
3542       GstClock *clock;
3543       GstClockTime sync_time;
3544       GstClockID id;
3545       GstClockReturn ret;
3546       GstClockTimeDiff clock_jitter;
3547
3548       if (timer_timeout == -1 || timer_timeout <= now) {
3549         do_timeout (jitterbuffer, timer, now);
3550         /* check here, do_timeout could have released the lock */
3551         if (!priv->timer_running)
3552           break;
3553         continue;
3554       }
3555
3556       GST_OBJECT_LOCK (jitterbuffer);
3557       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3558       if (!clock) {
3559         GST_OBJECT_UNLOCK (jitterbuffer);
3560         /* let's just push if there is no clock */
3561         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3562         now = timer_timeout;
3563         continue;
3564       }
3565
3566       /* prepare for sync against clock */
3567       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3568       /* add latency of peer to get input time */
3569       sync_time += priv->peer_latency;
3570
3571       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3572           " with sync time %" GST_TIME_FORMAT,
3573           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3574
3575       /* create an entry for the clock */
3576       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3577       priv->timer_timeout = timer_timeout;
3578       priv->timer_seqnum = timer->seqnum;
3579       GST_OBJECT_UNLOCK (jitterbuffer);
3580
3581       /* release the lock so that the other end can push stuff or unlock */
3582       JBUF_UNLOCK (priv);
3583
3584       ret = gst_clock_id_wait (id, &clock_jitter);
3585
3586       JBUF_LOCK (priv);
3587       if (!priv->timer_running) {
3588         gst_clock_id_unref (id);
3589         priv->clock_id = NULL;
3590         break;
3591       }
3592
3593       if (ret != GST_CLOCK_UNSCHEDULED) {
3594         now = timer_timeout + MAX (clock_jitter, 0);
3595         GST_DEBUG_OBJECT (jitterbuffer,
3596             "sync done, %d, #%d, %" GST_STIME_FORMAT, ret, priv->timer_seqnum,
3597             GST_STIME_ARGS (clock_jitter));
3598       } else {
3599         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3600       }
3601       /* and free the entry */
3602       gst_clock_id_unref (id);
3603       priv->clock_id = NULL;
3604     } else {
3605       /* no timers, wait for activity */
3606       JBUF_WAIT_TIMER (priv);
3607     }
3608   }
3609   JBUF_UNLOCK (priv);
3610
3611   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3612   return;
3613 }
3614
3615 /*
3616  * This funcion implements the main pushing loop on the source pad.
3617  *
3618  * It first tries to push as many buffers as possible. If there is a seqnum
3619  * mismatch, we wait for the next timeouts.
3620  */
3621 static void
3622 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3623 {
3624   GstRtpJitterBufferPrivate *priv;
3625   GstFlowReturn result = GST_FLOW_OK;
3626
3627   priv = jitterbuffer->priv;
3628
3629   JBUF_LOCK_CHECK (priv, flushing);
3630   do {
3631     result = handle_next_buffer (jitterbuffer);
3632     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3633       /* now wait for the next event */
3634       JBUF_WAIT_EVENT (priv, flushing);
3635       result = GST_FLOW_OK;
3636     }
3637   } while (result == GST_FLOW_OK);
3638   /* store result for upstream */
3639   priv->srcresult = result;
3640   /* if we get here we need to pause */
3641   goto pause;
3642
3643   /* ERRORS */
3644 flushing:
3645   {
3646     result = priv->srcresult;
3647     goto pause;
3648   }
3649 pause:
3650   {
3651     GstEvent *event;
3652
3653     JBUF_SIGNAL_QUERY (priv, FALSE);
3654     JBUF_UNLOCK (priv);
3655
3656     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3657         gst_flow_get_name (result));
3658     gst_pad_pause_task (priv->srcpad);
3659     if (result == GST_FLOW_EOS) {
3660       event = gst_event_new_eos ();
3661       gst_pad_push_event (priv->srcpad, event);
3662     }
3663     return;
3664   }
3665 }
3666
3667 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3668  * some sanity checks and then emit the handle-sync signal with the parameters.
3669  * This function must be called with the LOCK */
3670 static void
3671 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3672 {
3673   GstRtpJitterBufferPrivate *priv;
3674   guint64 base_rtptime, base_time;
3675   guint32 clock_rate;
3676   guint64 last_rtptime;
3677   guint64 clock_base;
3678   guint64 ext_rtptime, diff;
3679   gboolean valid = TRUE, keep = FALSE;
3680
3681   priv = jitterbuffer->priv;
3682
3683   /* get the last values from the jitterbuffer */
3684   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3685       &clock_rate, &last_rtptime);
3686
3687   clock_base = priv->clock_base;
3688   ext_rtptime = priv->ext_rtptime;
3689
3690   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3691       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3692       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3693       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3694
3695   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3696     /* we keep this SR packet for later. When we get a valid RTP packet the
3697      * above values will be set and we can try to use the SR packet */
3698     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3699     keep = TRUE;
3700   } else {
3701     /* we can't accept anything that happened before we did the last resync */
3702     if (base_rtptime > ext_rtptime) {
3703       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3704       valid = FALSE;
3705     } else {
3706       /* the SR RTP timestamp must be something close to what we last observed
3707        * in the jitterbuffer */
3708       if (ext_rtptime > last_rtptime) {
3709         /* check how far ahead it is to our RTP timestamps */
3710         diff = ext_rtptime - last_rtptime;
3711         /* if bigger than 1 second, we drop it */
3712         if (jitterbuffer->priv->max_rtcp_rtp_time_diff != -1 &&
3713             diff >
3714             gst_util_uint64_scale (jitterbuffer->priv->max_rtcp_rtp_time_diff,
3715                 clock_rate, 1000)) {
3716           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3717           /* should drop this, but some RTSP servers end up with bogus
3718            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3719            * so still trigger rptbin sync but invalidate RTCP data
3720            * (sync might use other methods) */
3721           ext_rtptime = -1;
3722         }
3723         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3724             G_GUINT64_FORMAT, last_rtptime, diff);
3725       }
3726     }
3727   }
3728
3729   if (keep) {
3730     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3731   } else if (valid) {
3732     GstStructure *s;
3733
3734     s = gst_structure_new ("application/x-rtp-sync",
3735         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3736         "base-time", G_TYPE_UINT64, base_time,
3737         "clock-rate", G_TYPE_UINT, clock_rate,
3738         "clock-base", G_TYPE_UINT64, clock_base,
3739         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3740         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3741
3742     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3743     gst_buffer_replace (&priv->last_sr, NULL);
3744     JBUF_UNLOCK (priv);
3745     g_signal_emit (jitterbuffer,
3746         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3747     JBUF_LOCK (priv);
3748     gst_structure_free (s);
3749   } else {
3750     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3751     gst_buffer_replace (&priv->last_sr, NULL);
3752   }
3753 }
3754
3755 static GstFlowReturn
3756 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3757     GstBuffer * buffer)
3758 {
3759   GstRtpJitterBuffer *jitterbuffer;
3760   GstRtpJitterBufferPrivate *priv;
3761   GstFlowReturn ret = GST_FLOW_OK;
3762   guint32 ssrc;
3763   GstRTCPPacket packet;
3764   guint64 ext_rtptime;
3765   guint32 rtptime;
3766   GstRTCPBuffer rtcp = { NULL, };
3767
3768   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3769
3770   if (G_UNLIKELY (!gst_rtcp_buffer_validate_reduced (buffer)))
3771     goto invalid_buffer;
3772
3773   priv = jitterbuffer->priv;
3774
3775   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3776
3777   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3778     goto empty_buffer;
3779
3780   /* first packet must be SR or RR or else the validate would have failed */
3781   switch (gst_rtcp_packet_get_type (&packet)) {
3782     case GST_RTCP_TYPE_SR:
3783       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3784           NULL, NULL);
3785       break;
3786     default:
3787       goto ignore_buffer;
3788   }
3789   gst_rtcp_buffer_unmap (&rtcp);
3790
3791   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3792
3793   JBUF_LOCK (priv);
3794   /* convert the RTP timestamp to our extended timestamp, using the same offset
3795    * we used in the jitterbuffer */
3796   ext_rtptime = priv->jbuf->ext_rtptime;
3797   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3798
3799   priv->ext_rtptime = ext_rtptime;
3800   gst_buffer_replace (&priv->last_sr, buffer);
3801
3802   do_handle_sync (jitterbuffer);
3803   JBUF_UNLOCK (priv);
3804
3805 done:
3806   gst_buffer_unref (buffer);
3807
3808   return ret;
3809
3810 invalid_buffer:
3811   {
3812     /* this is not fatal but should be filtered earlier */
3813     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3814         ("Received invalid RTCP payload, dropping"));
3815     ret = GST_FLOW_OK;
3816     goto done;
3817   }
3818 empty_buffer:
3819   {
3820     /* this is not fatal but should be filtered earlier */
3821     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3822         ("Received empty RTCP payload, dropping"));
3823     gst_rtcp_buffer_unmap (&rtcp);
3824     ret = GST_FLOW_OK;
3825     goto done;
3826   }
3827 ignore_buffer:
3828   {
3829     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3830     gst_rtcp_buffer_unmap (&rtcp);
3831     ret = GST_FLOW_OK;
3832     goto done;
3833   }
3834 }
3835
3836 static gboolean
3837 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3838     GstQuery * query)
3839 {
3840   gboolean res = FALSE;
3841   GstRtpJitterBuffer *jitterbuffer;
3842   GstRtpJitterBufferPrivate *priv;
3843
3844   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3845   priv = jitterbuffer->priv;
3846
3847   switch (GST_QUERY_TYPE (query)) {
3848     case GST_QUERY_CAPS:
3849     {
3850       GstCaps *filter, *caps;
3851
3852       gst_query_parse_caps (query, &filter);
3853       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3854       gst_query_set_caps_result (query, caps);
3855       gst_caps_unref (caps);
3856       res = TRUE;
3857       break;
3858     }
3859     default:
3860       if (GST_QUERY_IS_SERIALIZED (query)) {
3861         RTPJitterBufferItem *item;
3862         gboolean head;
3863
3864         JBUF_LOCK_CHECK (priv, out_flushing);
3865         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3866             RTP_JITTER_BUFFER_MODE_BUFFER) {
3867           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3868           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3869           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3870           if (head)
3871             JBUF_SIGNAL_EVENT (priv);
3872           JBUF_WAIT_QUERY (priv, out_flushing);
3873           res = priv->last_query;
3874         } else {
3875           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3876           res = FALSE;
3877         }
3878         JBUF_UNLOCK (priv);
3879       } else {
3880         res = gst_pad_query_default (pad, parent, query);
3881       }
3882       break;
3883   }
3884   return res;
3885   /* ERRORS */
3886 out_flushing:
3887   {
3888     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3889     JBUF_UNLOCK (priv);
3890     return FALSE;
3891   }
3892
3893 }
3894
3895 static gboolean
3896 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3897     GstQuery * query)
3898 {
3899   GstRtpJitterBuffer *jitterbuffer;
3900   GstRtpJitterBufferPrivate *priv;
3901   gboolean res = FALSE;
3902
3903   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3904   priv = jitterbuffer->priv;
3905
3906   switch (GST_QUERY_TYPE (query)) {
3907     case GST_QUERY_LATENCY:
3908     {
3909       /* We need to send the query upstream and add the returned latency to our
3910        * own */
3911       GstClockTime min_latency, max_latency;
3912       gboolean us_live;
3913       GstClockTime our_latency;
3914
3915       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3916         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3917
3918         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3919             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3920             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3921
3922         /* store this so that we can safely sync on the peer buffers. */
3923         JBUF_LOCK (priv);
3924         priv->peer_latency = min_latency;
3925         our_latency = priv->latency_ns;
3926         JBUF_UNLOCK (priv);
3927
3928         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3929             GST_TIME_ARGS (our_latency));
3930
3931         /* we add some latency but can buffer an infinite amount of time */
3932         min_latency += our_latency;
3933         max_latency = -1;
3934
3935         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3936             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3937             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3938
3939         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3940       }
3941       break;
3942     }
3943     case GST_QUERY_POSITION:
3944     {
3945       GstClockTime start, last_out;
3946       GstFormat fmt;
3947
3948       gst_query_parse_position (query, &fmt, NULL);
3949       if (fmt != GST_FORMAT_TIME) {
3950         res = gst_pad_query_default (pad, parent, query);
3951         break;
3952       }
3953
3954       JBUF_LOCK (priv);
3955       start = priv->npt_start;
3956       last_out = priv->last_out_time;
3957       JBUF_UNLOCK (priv);
3958
3959       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3960           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3961           GST_TIME_ARGS (last_out));
3962
3963       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3964         /* bring 0-based outgoing time to stream time */
3965         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3966         res = TRUE;
3967       } else {
3968         res = gst_pad_query_default (pad, parent, query);
3969       }
3970       break;
3971     }
3972     case GST_QUERY_CAPS:
3973     {
3974       GstCaps *filter, *caps;
3975
3976       gst_query_parse_caps (query, &filter);
3977       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3978       gst_query_set_caps_result (query, caps);
3979       gst_caps_unref (caps);
3980       res = TRUE;
3981       break;
3982     }
3983     default:
3984       res = gst_pad_query_default (pad, parent, query);
3985       break;
3986   }
3987
3988   return res;
3989 }
3990
3991 static void
3992 gst_rtp_jitter_buffer_set_property (GObject * object,
3993     guint prop_id, const GValue * value, GParamSpec * pspec)
3994 {
3995   GstRtpJitterBuffer *jitterbuffer;
3996   GstRtpJitterBufferPrivate *priv;
3997
3998   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3999   priv = jitterbuffer->priv;
4000
4001   switch (prop_id) {
4002     case PROP_LATENCY:
4003     {
4004       guint new_latency, old_latency;
4005
4006       new_latency = g_value_get_uint (value);
4007
4008       JBUF_LOCK (priv);
4009       old_latency = priv->latency_ms;
4010       priv->latency_ms = new_latency;
4011       priv->latency_ns = priv->latency_ms * GST_MSECOND;
4012       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
4013       JBUF_UNLOCK (priv);
4014
4015       /* post message if latency changed, this will inform the parent pipeline
4016        * that a latency reconfiguration is possible/needed. */
4017       if (new_latency != old_latency) {
4018         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
4019             GST_TIME_ARGS (new_latency * GST_MSECOND));
4020
4021         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
4022             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
4023       }
4024       break;
4025     }
4026     case PROP_DROP_ON_LATENCY:
4027       JBUF_LOCK (priv);
4028       priv->drop_on_latency = g_value_get_boolean (value);
4029       JBUF_UNLOCK (priv);
4030       break;
4031     case PROP_TS_OFFSET:
4032       JBUF_LOCK (priv);
4033       priv->ts_offset = g_value_get_int64 (value);
4034       priv->ts_discont = TRUE;
4035       JBUF_UNLOCK (priv);
4036       break;
4037     case PROP_DO_LOST:
4038       JBUF_LOCK (priv);
4039       priv->do_lost = g_value_get_boolean (value);
4040       JBUF_UNLOCK (priv);
4041       break;
4042     case PROP_MODE:
4043       JBUF_LOCK (priv);
4044       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
4045       JBUF_UNLOCK (priv);
4046       break;
4047     case PROP_DO_RETRANSMISSION:
4048       JBUF_LOCK (priv);
4049       priv->do_retransmission = g_value_get_boolean (value);
4050       JBUF_UNLOCK (priv);
4051       break;
4052     case PROP_RTX_NEXT_SEQNUM:
4053       JBUF_LOCK (priv);
4054       priv->rtx_next_seqnum = g_value_get_boolean (value);
4055       JBUF_UNLOCK (priv);
4056       break;
4057     case PROP_RTX_DELAY:
4058       JBUF_LOCK (priv);
4059       priv->rtx_delay = g_value_get_int (value);
4060       JBUF_UNLOCK (priv);
4061       break;
4062     case PROP_RTX_MIN_DELAY:
4063       JBUF_LOCK (priv);
4064       priv->rtx_min_delay = g_value_get_uint (value);
4065       JBUF_UNLOCK (priv);
4066       break;
4067     case PROP_RTX_DELAY_REORDER:
4068       JBUF_LOCK (priv);
4069       priv->rtx_delay_reorder = g_value_get_int (value);
4070       JBUF_UNLOCK (priv);
4071       break;
4072     case PROP_RTX_RETRY_TIMEOUT:
4073       JBUF_LOCK (priv);
4074       priv->rtx_retry_timeout = g_value_get_int (value);
4075       JBUF_UNLOCK (priv);
4076       break;
4077     case PROP_RTX_MIN_RETRY_TIMEOUT:
4078       JBUF_LOCK (priv);
4079       priv->rtx_min_retry_timeout = g_value_get_int (value);
4080       JBUF_UNLOCK (priv);
4081       break;
4082     case PROP_RTX_RETRY_PERIOD:
4083       JBUF_LOCK (priv);
4084       priv->rtx_retry_period = g_value_get_int (value);
4085       JBUF_UNLOCK (priv);
4086       break;
4087     case PROP_RTX_MAX_RETRIES:
4088       JBUF_LOCK (priv);
4089       priv->rtx_max_retries = g_value_get_int (value);
4090       JBUF_UNLOCK (priv);
4091       break;
4092     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4093       JBUF_LOCK (priv);
4094       priv->max_rtcp_rtp_time_diff = g_value_get_int (value);
4095       JBUF_UNLOCK (priv);
4096       break;
4097     case PROP_MAX_DROPOUT_TIME:
4098       JBUF_LOCK (priv);
4099       priv->max_dropout_time = g_value_get_uint (value);
4100       JBUF_UNLOCK (priv);
4101       break;
4102     case PROP_MAX_MISORDER_TIME:
4103       JBUF_LOCK (priv);
4104       priv->max_misorder_time = g_value_get_uint (value);
4105       JBUF_UNLOCK (priv);
4106       break;
4107     default:
4108       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4109       break;
4110   }
4111 }
4112
4113 static void
4114 gst_rtp_jitter_buffer_get_property (GObject * object,
4115     guint prop_id, GValue * value, GParamSpec * pspec)
4116 {
4117   GstRtpJitterBuffer *jitterbuffer;
4118   GstRtpJitterBufferPrivate *priv;
4119
4120   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
4121   priv = jitterbuffer->priv;
4122
4123   switch (prop_id) {
4124     case PROP_LATENCY:
4125       JBUF_LOCK (priv);
4126       g_value_set_uint (value, priv->latency_ms);
4127       JBUF_UNLOCK (priv);
4128       break;
4129     case PROP_DROP_ON_LATENCY:
4130       JBUF_LOCK (priv);
4131       g_value_set_boolean (value, priv->drop_on_latency);
4132       JBUF_UNLOCK (priv);
4133       break;
4134     case PROP_TS_OFFSET:
4135       JBUF_LOCK (priv);
4136       g_value_set_int64 (value, priv->ts_offset);
4137       JBUF_UNLOCK (priv);
4138       break;
4139     case PROP_DO_LOST:
4140       JBUF_LOCK (priv);
4141       g_value_set_boolean (value, priv->do_lost);
4142       JBUF_UNLOCK (priv);
4143       break;
4144     case PROP_MODE:
4145       JBUF_LOCK (priv);
4146       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
4147       JBUF_UNLOCK (priv);
4148       break;
4149     case PROP_PERCENT:
4150     {
4151       gint percent;
4152
4153       JBUF_LOCK (priv);
4154       if (priv->srcresult != GST_FLOW_OK)
4155         percent = 100;
4156       else
4157         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
4158
4159       g_value_set_int (value, percent);
4160       JBUF_UNLOCK (priv);
4161       break;
4162     }
4163     case PROP_DO_RETRANSMISSION:
4164       JBUF_LOCK (priv);
4165       g_value_set_boolean (value, priv->do_retransmission);
4166       JBUF_UNLOCK (priv);
4167       break;
4168     case PROP_RTX_NEXT_SEQNUM:
4169       JBUF_LOCK (priv);
4170       g_value_set_boolean (value, priv->rtx_next_seqnum);
4171       JBUF_UNLOCK (priv);
4172       break;
4173     case PROP_RTX_DELAY:
4174       JBUF_LOCK (priv);
4175       g_value_set_int (value, priv->rtx_delay);
4176       JBUF_UNLOCK (priv);
4177       break;
4178     case PROP_RTX_MIN_DELAY:
4179       JBUF_LOCK (priv);
4180       g_value_set_uint (value, priv->rtx_min_delay);
4181       JBUF_UNLOCK (priv);
4182       break;
4183     case PROP_RTX_DELAY_REORDER:
4184       JBUF_LOCK (priv);
4185       g_value_set_int (value, priv->rtx_delay_reorder);
4186       JBUF_UNLOCK (priv);
4187       break;
4188     case PROP_RTX_RETRY_TIMEOUT:
4189       JBUF_LOCK (priv);
4190       g_value_set_int (value, priv->rtx_retry_timeout);
4191       JBUF_UNLOCK (priv);
4192       break;
4193     case PROP_RTX_MIN_RETRY_TIMEOUT:
4194       JBUF_LOCK (priv);
4195       g_value_set_int (value, priv->rtx_min_retry_timeout);
4196       JBUF_UNLOCK (priv);
4197       break;
4198     case PROP_RTX_RETRY_PERIOD:
4199       JBUF_LOCK (priv);
4200       g_value_set_int (value, priv->rtx_retry_period);
4201       JBUF_UNLOCK (priv);
4202       break;
4203     case PROP_RTX_MAX_RETRIES:
4204       JBUF_LOCK (priv);
4205       g_value_set_int (value, priv->rtx_max_retries);
4206       JBUF_UNLOCK (priv);
4207       break;
4208     case PROP_STATS:
4209       g_value_take_boxed (value,
4210           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
4211       break;
4212     case PROP_MAX_RTCP_RTP_TIME_DIFF:
4213       JBUF_LOCK (priv);
4214       g_value_set_int (value, priv->max_rtcp_rtp_time_diff);
4215       JBUF_UNLOCK (priv);
4216       break;
4217     case PROP_MAX_DROPOUT_TIME:
4218       JBUF_LOCK (priv);
4219       g_value_set_uint (value, priv->max_dropout_time);
4220       JBUF_UNLOCK (priv);
4221       break;
4222     case PROP_MAX_MISORDER_TIME:
4223       JBUF_LOCK (priv);
4224       g_value_set_uint (value, priv->max_misorder_time);
4225       JBUF_UNLOCK (priv);
4226       break;
4227     default:
4228       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
4229       break;
4230   }
4231 }
4232
4233 static GstStructure *
4234 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
4235 {
4236   GstStructure *s;
4237
4238   JBUF_LOCK (jbuf->priv);
4239   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
4240       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
4241       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
4242       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
4243       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
4244   JBUF_UNLOCK (jbuf->priv);
4245
4246   return s;
4247 }