rtpjitterbuffer: Drop packets with sequence numbers before the seqnum-base
[platform/upstream/gst-plugins-good.git] / gst / rtpmanager / gstrtpjitterbuffer.c
1 /*
2  * Farsight Voice+Video library
3  *
4  *  Copyright 2007 Collabora Ltd,
5  *  Copyright 2007 Nokia Corporation
6  *   @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7  *  Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  *
24  */
25
26 /**
27  * SECTION:element-rtpjitterbuffer
28  *
29  * This element reorders and removes duplicate RTP packets as they are received
30  * from a network source.
31  *
32  * The element needs the clock-rate of the RTP payload in order to estimate the
33  * delay. This information is obtained either from the caps on the sink pad or,
34  * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35  * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
36  *
37  * The rtpjitterbuffer will wait for missing packets up to a configurable time
38  * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39  * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40  * property is set, lost packets will result in a custom serialized downstream
41  * event of name GstRTPPacketLost. The lost packet events are usually used by a
42  * depayloader or other element to create concealment data or some other logic
43  * to gracefully handle the missing packets.
44  *
45  * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46  * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
47  * buffer.
48  *
49  * The jitterbuffer can also be configured to send early retransmission events
50  * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51  * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52  * sends a custom upstream event named GstRTPRetransmissionRequest when the
53  * packet is considered late. The initial expected packet arrival time is
54  * calculated as follows:
55  *
56  * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57  *     T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58  *     calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59  *     packets with different rtptime.
60  *
61  * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62  *     seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63  *     previously scheduled timeout is overwritten.
64  *
65  * - If seqnum N arrived, all seqnum older than
66  *     N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67  *     immediately. This is to request fast feedback for abonormally reorder
68  *     packets before any of the previous timeouts is triggered.
69  *
70  * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71  * event. After the initial timeout expires and the retransmission event is
72  * sent, the timeout is scheduled for
73  * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74  * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75  * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76  * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77  * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78  * retransmission requests are sent and the regular logic is performed to
79  * schedule a lost packet as discussed above.
80  *
81  * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
82  * to the pipeline.
83  *
84  * This element will automatically be used inside rtpbin.
85  *
86  * <refsect2>
87  * <title>Example pipelines</title>
88  * |[
89  * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90  * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91  * inserted into the pipeline to smooth out network jitter and to reorder the
92  * out-of-order RTP packets.
93  * </refsect2>
94  */
95
96 #ifdef HAVE_CONFIG_H
97 #include "config.h"
98 #endif
99
100 #include <stdlib.h>
101 #include <string.h>
102 #include <gst/rtp/gstrtpbuffer.h>
103
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
107
108 #include <gst/glib-compat-private.h>
109
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
112
113 /* RTPJitterBuffer signals and args */
114 enum
115 {
116   SIGNAL_REQUEST_PT_MAP,
117   SIGNAL_CLEAR_PT_MAP,
118   SIGNAL_HANDLE_SYNC,
119   SIGNAL_ON_NPT_STOP,
120   SIGNAL_SET_ACTIVE,
121   LAST_SIGNAL
122 };
123
124 #define DEFAULT_LATENCY_MS          200
125 #define DEFAULT_DROP_ON_LATENCY     FALSE
126 #define DEFAULT_TS_OFFSET           0
127 #define DEFAULT_DO_LOST             FALSE
128 #define DEFAULT_MODE                RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT             0
130 #define DEFAULT_DO_RETRANSMISSION   FALSE
131 #define DEFAULT_RTX_DELAY           -1
132 #define DEFAULT_RTX_MIN_DELAY       0
133 #define DEFAULT_RTX_DELAY_REORDER   3
134 #define DEFAULT_RTX_RETRY_TIMEOUT   -1
135 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT   -1
136 #define DEFAULT_RTX_RETRY_PERIOD    -1
137
138 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
139 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
140
141 enum
142 {
143   PROP_0,
144   PROP_LATENCY,
145   PROP_DROP_ON_LATENCY,
146   PROP_TS_OFFSET,
147   PROP_DO_LOST,
148   PROP_MODE,
149   PROP_PERCENT,
150   PROP_DO_RETRANSMISSION,
151   PROP_RTX_DELAY,
152   PROP_RTX_MIN_DELAY,
153   PROP_RTX_DELAY_REORDER,
154   PROP_RTX_RETRY_TIMEOUT,
155   PROP_RTX_MIN_RETRY_TIMEOUT,
156   PROP_RTX_RETRY_PERIOD,
157   PROP_STATS,
158   PROP_LAST
159 };
160
161 #define JBUF_LOCK(priv)   (g_mutex_lock (&(priv)->jbuf_lock))
162
163 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START {    \
164   JBUF_LOCK (priv);                                   \
165   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
166     goto label;                                       \
167 } G_STMT_END
168 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
169
170 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
171   GST_DEBUG ("waiting timer");                            \
172   (priv)->waiting_timer = TRUE;                           \
173   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
174   (priv)->waiting_timer = FALSE;                          \
175   GST_DEBUG ("waiting timer done");                       \
176 } G_STMT_END
177 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
178   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
179     GST_DEBUG ("signal timer");                           \
180     g_cond_signal (&(priv)->jbuf_timer);                  \
181   }                                                       \
182 } G_STMT_END
183
184 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
185   GST_DEBUG ("waiting event");                           \
186   (priv)->waiting_event = TRUE;                          \
187   g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
188   (priv)->waiting_event = FALSE;                         \
189   GST_DEBUG ("waiting event done");                      \
190   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
191     goto label;                                          \
192 } G_STMT_END
193 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START {           \
194   if (G_UNLIKELY ((priv)->waiting_event)) {              \
195     GST_DEBUG ("signal event");                          \
196     g_cond_signal (&(priv)->jbuf_event);                 \
197   }                                                      \
198 } G_STMT_END
199
200 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START {       \
201   GST_DEBUG ("waiting query");                           \
202   (priv)->waiting_query = TRUE;                          \
203   g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
204   (priv)->waiting_query = FALSE;                         \
205   GST_DEBUG ("waiting query done");                      \
206   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
207     goto label;                                          \
208 } G_STMT_END
209 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START {       \
210   (priv)->last_query = res;                              \
211   if (G_UNLIKELY ((priv)->waiting_query)) {              \
212     GST_DEBUG ("signal query");                          \
213     g_cond_signal (&(priv)->jbuf_query);                 \
214   }                                                      \
215 } G_STMT_END
216
217
218 struct _GstRtpJitterBufferPrivate
219 {
220   GstPad *sinkpad, *srcpad;
221   GstPad *rtcpsinkpad;
222
223   RTPJitterBuffer *jbuf;
224   GMutex jbuf_lock;
225   gboolean waiting_timer;
226   GCond jbuf_timer;
227   gboolean waiting_event;
228   GCond jbuf_event;
229   gboolean waiting_query;
230   GCond jbuf_query;
231   gboolean last_query;
232   gboolean discont;
233   gboolean ts_discont;
234   gboolean active;
235   guint64 out_offset;
236
237   gboolean timer_running;
238   GThread *timer_thread;
239
240   /* properties */
241   guint latency_ms;
242   guint64 latency_ns;
243   gboolean drop_on_latency;
244   gint64 ts_offset;
245   gboolean do_lost;
246   gboolean do_retransmission;
247   gint rtx_delay;
248   guint rtx_min_delay;
249   gint rtx_delay_reorder;
250   gint rtx_retry_timeout;
251   gint rtx_min_retry_timeout;
252   gint rtx_retry_period;
253
254   /* the last seqnum we pushed out */
255   guint32 last_popped_seqnum;
256   /* the next expected seqnum we push */
257   guint32 next_seqnum;
258   /* seqnum-base, if known */
259   guint32 seqnum_base;
260   /* last output time */
261   GstClockTime last_out_time;
262   /* last valid input timestamp and rtptime pair */
263   GstClockTime ips_dts;
264   guint64 ips_rtptime;
265   GstClockTime packet_spacing;
266
267   /* the next expected seqnum we receive */
268   GstClockTime last_in_dts;
269   guint32 last_in_seqnum;
270   guint32 next_in_seqnum;
271
272   GArray *timers;
273
274   /* start and stop ranges */
275   GstClockTime npt_start;
276   GstClockTime npt_stop;
277   guint64 ext_timestamp;
278   guint64 last_elapsed;
279   guint64 estimated_eos;
280   GstClockID eos_id;
281
282   /* state */
283   gboolean eos;
284   guint last_percent;
285
286   /* clock rate and rtp timestamp offset */
287   gint last_pt;
288   gint32 clock_rate;
289   gint64 clock_base;
290   gint64 prev_ts_offset;
291
292   /* when we are shutting down */
293   GstFlowReturn srcresult;
294   gboolean blocked;
295
296   /* for sync */
297   GstSegment segment;
298   GstClockID clock_id;
299   GstClockTime timer_timeout;
300   guint16 timer_seqnum;
301   /* the latency of the upstream peer, we have to take this into account when
302    * synchronizing the buffers. */
303   GstClockTime peer_latency;
304   guint64 ext_rtptime;
305   GstBuffer *last_sr;
306
307   /* some accounting */
308   guint64 num_late;
309   guint64 num_duplicates;
310   guint64 num_rtx_requests;
311   guint64 num_rtx_success;
312   guint64 num_rtx_failed;
313   gdouble avg_rtx_num;
314   guint64 avg_rtx_rtt;
315
316   /* for the jitter */
317   GstClockTime last_dts;
318   guint64 last_rtptime;
319   GstClockTime avg_jitter;
320 };
321
322 typedef enum
323 {
324   TIMER_TYPE_EXPECTED,
325   TIMER_TYPE_LOST,
326   TIMER_TYPE_DEADLINE,
327   TIMER_TYPE_EOS
328 } TimerType;
329
330 typedef struct
331 {
332   guint idx;
333   guint16 seqnum;
334   guint num;
335   TimerType type;
336   GstClockTime timeout;
337   GstClockTime duration;
338   GstClockTime rtx_base;
339   GstClockTime rtx_delay;
340   GstClockTime rtx_retry;
341   GstClockTime rtx_last;
342   guint num_rtx_retry;
343 } TimerData;
344
345 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
346   (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
347                                 GstRtpJitterBufferPrivate))
348
349 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
350 GST_STATIC_PAD_TEMPLATE ("sink",
351     GST_PAD_SINK,
352     GST_PAD_ALWAYS,
353     GST_STATIC_CAPS ("application/x-rtp"
354         /* "clock-rate = (int) [ 1, 2147483647 ], "
355          * "payload = (int) , "
356          * "encoding-name = (string) "
357          */ )
358     );
359
360 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
361 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
362     GST_PAD_SINK,
363     GST_PAD_REQUEST,
364     GST_STATIC_CAPS ("application/x-rtcp")
365     );
366
367 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
368 GST_STATIC_PAD_TEMPLATE ("src",
369     GST_PAD_SRC,
370     GST_PAD_ALWAYS,
371     GST_STATIC_CAPS ("application/x-rtp"
372         /* "payload = (int) , "
373          * "clock-rate = (int) , "
374          * "encoding-name = (string) "
375          */ )
376     );
377
378 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
379
380 #define gst_rtp_jitter_buffer_parent_class parent_class
381 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
382
383 /* object overrides */
384 static void gst_rtp_jitter_buffer_set_property (GObject * object,
385     guint prop_id, const GValue * value, GParamSpec * pspec);
386 static void gst_rtp_jitter_buffer_get_property (GObject * object,
387     guint prop_id, GValue * value, GParamSpec * pspec);
388 static void gst_rtp_jitter_buffer_finalize (GObject * object);
389
390 /* element overrides */
391 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
392     * element, GstStateChange transition);
393 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
394     GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
395 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
396     GstPad * pad);
397 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
398
399 /* pad overrides */
400 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
401 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
402     GstObject * parent);
403
404 /* sinkpad overrides */
405 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
406     GstObject * parent, GstEvent * event);
407 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
408     GstObject * parent, GstBuffer * buffer);
409
410 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
411     GstObject * parent, GstEvent * event);
412 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
413     GstObject * parent, GstBuffer * buffer);
414
415 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
416     GstObject * parent, GstQuery * query);
417
418 /* srcpad overrides */
419 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
420     GstObject * parent, GstEvent * event);
421 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
422     GstObject * parent, GstPadMode mode, gboolean active);
423 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
424 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
425     GstObject * parent, GstQuery * query);
426
427 static void
428 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
429 static GstClockTime
430 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
431     gboolean active, guint64 base_time);
432 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
433
434 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
435 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
436
437 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
438
439 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
440     jitterbuffer);
441
442 static void
443 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
444 {
445   GObjectClass *gobject_class;
446   GstElementClass *gstelement_class;
447
448   gobject_class = (GObjectClass *) klass;
449   gstelement_class = (GstElementClass *) klass;
450
451   g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
452
453   gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
454
455   gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
456   gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
457
458   /**
459    * GstRtpJitterBuffer:latency:
460    *
461    * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
462    * for at most this time.
463    */
464   g_object_class_install_property (gobject_class, PROP_LATENCY,
465       g_param_spec_uint ("latency", "Buffer latency in ms",
466           "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
467           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
468   /**
469    * GstRtpJitterBuffer:drop-on-latency:
470    *
471    * Drop oldest buffers when the queue is completely filled.
472    */
473   g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
474       g_param_spec_boolean ("drop-on-latency",
475           "Drop buffers when maximum latency is reached",
476           "Tells the jitterbuffer to never exceed the given latency in size",
477           DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
478   /**
479    * GstRtpJitterBuffer:ts-offset:
480    *
481    * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
482    * This is mainly used to ensure interstream synchronisation.
483    */
484   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
485       g_param_spec_int64 ("ts-offset", "Timestamp Offset",
486           "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
487           G_MAXINT64, DEFAULT_TS_OFFSET,
488           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
489
490   /**
491    * GstRtpJitterBuffer:do-lost:
492    *
493    * Send out a GstRTPPacketLost event downstream when a packet is considered
494    * lost.
495    */
496   g_object_class_install_property (gobject_class, PROP_DO_LOST,
497       g_param_spec_boolean ("do-lost", "Do Lost",
498           "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
499           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
500
501   /**
502    * GstRtpJitterBuffer:mode:
503    *
504    * Control the buffering and timestamping mode used by the jitterbuffer.
505    */
506   g_object_class_install_property (gobject_class, PROP_MODE,
507       g_param_spec_enum ("mode", "Mode",
508           "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
509           DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
510   /**
511    * GstRtpJitterBuffer:percent:
512    *
513    * The percent of the jitterbuffer that is filled.
514    */
515   g_object_class_install_property (gobject_class, PROP_PERCENT,
516       g_param_spec_int ("percent", "percent",
517           "The buffer filled percent", 0, 100,
518           0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
519   /**
520    * GstRtpJitterBuffer:do-retransmission:
521    *
522    * Send out a GstRTPRetransmission event upstream when a packet is considered
523    * late and should be retransmitted.
524    *
525    * Since: 1.2
526    */
527   g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
528       g_param_spec_boolean ("do-retransmission", "Do Retransmission",
529           "Send retransmission events upstream when a packet is late",
530           DEFAULT_DO_RETRANSMISSION,
531           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
532
533   /**
534    * GstRtpJitterBuffer:rtx-delay:
535    *
536    * When a packet did not arrive at the expected time, wait this extra amount
537    * of time before sending a retransmission event.
538    *
539    * When -1 is used, the max jitter will be used as extra delay.
540    *
541    * Since: 1.2
542    */
543   g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
544       g_param_spec_int ("rtx-delay", "RTX Delay",
545           "Extra time in ms to wait before sending retransmission "
546           "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
547           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
548
549   /**
550    * GstRtpJitterBuffer:rtx-min-delay:
551    *
552    * When a packet did not arrive at the expected time, wait at least this extra amount
553    * of time before sending a retransmission event.
554    *
555    * Since: 1.6
556    */
557   g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
558       g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
559           "Minimum time in ms to wait before sending retransmission "
560           "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
561           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
562   /**
563    * GstRtpJitterBuffer:rtx-delay-reorder:
564    *
565    * Assume that a retransmission event should be sent when we see
566    * this much packet reordering.
567    *
568    * When -1 is used, the value will be estimated based on observed packet
569    * reordering.
570    *
571    * Since: 1.2
572    */
573   g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
574       g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
575           "Sending retransmission event when this much reordering (-1 automatic)",
576           -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
577           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
578   /**
579    * GstRtpJitterBuffer::rtx-retry-timeout:
580    *
581    * When no packet has been received after sending a retransmission event
582    * for this time, retry sending a retransmission event.
583    *
584    * When -1 is used, the value will be estimated based on observed round
585    * trip time.
586    *
587    * Since: 1.2
588    */
589   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
590       g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
591           "Retry sending a transmission event after this timeout in "
592           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
593           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
594   /**
595    * GstRtpJitterBuffer::rtx-min-retry-timeout:
596    *
597    * The minimum amount of time between retry timeouts. When
598    * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
599    * minimum interval between retry timeouts.
600    *
601    * When -1 is used, the value will be estimated based on the
602    * packet spacing.
603    *
604    * Since: 1.6
605    */
606   g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
607       g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
608           "Minimum timeout between sending a transmission event in "
609           "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
610           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
611   /**
612    * GstRtpJitterBuffer:rtx-retry-period:
613    *
614    * The amount of time to try to get a retransmission.
615    *
616    * When -1 is used, the value will be estimated based on the jitterbuffer
617    * latency and the observed round trip time.
618    *
619    * Since: 1.2
620    */
621   g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
622       g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
623           "Try to get a retransmission for this many ms "
624           "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
625           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
626   /**
627    * GstRtpJitterBuffer:stats:
628    *
629    * Various jitterbuffer statistics. This property returns a GstStructure
630    * with name application/x-rtp-jitterbuffer-stats with the following fields:
631    *
632    *  "rtx-count"         G_TYPE_UINT64 The number of retransmissions requested
633    *  "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
634    *  "rtx-per-packet"    G_TYPE_DOUBLE Average number of RTX per packet
635    *  "rtx-rtt"           G_TYPE_UINT64 Average round trip time per RTX
636    *
637    * Since: 1.4
638    */
639   g_object_class_install_property (gobject_class, PROP_STATS,
640       g_param_spec_boxed ("stats", "Statistics",
641           "Various statistics", GST_TYPE_STRUCTURE,
642           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
643
644   /**
645    * GstRtpJitterBuffer::request-pt-map:
646    * @buffer: the object which received the signal
647    * @pt: the pt
648    *
649    * Request the payload type as #GstCaps for @pt.
650    */
651   gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
652       g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
653       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
654           request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
655       GST_TYPE_CAPS, 1, G_TYPE_UINT);
656   /**
657    * GstRtpJitterBuffer::handle-sync:
658    * @buffer: the object which received the signal
659    * @struct: a GstStructure containing sync values.
660    *
661    * Be notified of new sync values.
662    */
663   gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
664       g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
665       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
666           handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
667       G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
668
669   /**
670    * GstRtpJitterBuffer::on-npt-stop:
671    * @buffer: the object which received the signal
672    *
673    * Signal that the jitterbufer has pushed the RTP packet that corresponds to
674    * the npt-stop position.
675    */
676   gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
677       g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
678       G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
679           on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
680       G_TYPE_NONE, 0, G_TYPE_NONE);
681
682   /**
683    * GstRtpJitterBuffer::clear-pt-map:
684    * @buffer: the object which received the signal
685    *
686    * Invalidate the clock-rate as obtained with the
687    * #GstRtpJitterBuffer::request-pt-map signal.
688    */
689   gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
690       g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
691       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
692       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
693       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
694
695   /**
696    * GstRtpJitterBuffer::set-active:
697    * @buffer: the object which received the signal
698    *
699    * Start pushing out packets with the given base time. This signal is only
700    * useful in buffering mode.
701    *
702    * Returns: the time of the last pushed packet.
703    */
704   gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
705       g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
706       G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
707       G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
708       g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
709       G_TYPE_UINT64);
710
711   gstelement_class->change_state =
712       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
713   gstelement_class->request_new_pad =
714       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
715   gstelement_class->release_pad =
716       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
717   gstelement_class->provide_clock =
718       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
719
720   gst_element_class_add_pad_template (gstelement_class,
721       gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
722   gst_element_class_add_pad_template (gstelement_class,
723       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
724   gst_element_class_add_pad_template (gstelement_class,
725       gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
726
727   gst_element_class_set_static_metadata (gstelement_class,
728       "RTP packet jitter-buffer", "Filter/Network/RTP",
729       "A buffer that deals with network jitter and other transmission faults",
730       "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
731       "Wim Taymans <wim.taymans@gmail.com>");
732
733   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
734   klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
735
736   GST_DEBUG_CATEGORY_INIT
737       (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
738 }
739
740 static void
741 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
742 {
743   GstRtpJitterBufferPrivate *priv;
744
745   priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
746   jitterbuffer->priv = priv;
747
748   priv->latency_ms = DEFAULT_LATENCY_MS;
749   priv->latency_ns = priv->latency_ms * GST_MSECOND;
750   priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
751   priv->do_lost = DEFAULT_DO_LOST;
752   priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
753   priv->rtx_delay = DEFAULT_RTX_DELAY;
754   priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
755   priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
756   priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
757   priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
758   priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
759
760   priv->last_dts = -1;
761   priv->last_rtptime = -1;
762   priv->avg_jitter = 0;
763   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
764   priv->jbuf = rtp_jitter_buffer_new ();
765   g_mutex_init (&priv->jbuf_lock);
766   g_cond_init (&priv->jbuf_timer);
767   g_cond_init (&priv->jbuf_event);
768   g_cond_init (&priv->jbuf_query);
769
770   /* reset skew detection initialy */
771   rtp_jitter_buffer_reset_skew (priv->jbuf);
772   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
773   rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
774   priv->active = TRUE;
775
776   priv->srcpad =
777       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
778       "src");
779
780   gst_pad_set_activatemode_function (priv->srcpad,
781       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
782   gst_pad_set_query_function (priv->srcpad,
783       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
784   gst_pad_set_event_function (priv->srcpad,
785       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
786
787   priv->sinkpad =
788       gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
789       "sink");
790
791   gst_pad_set_chain_function (priv->sinkpad,
792       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
793   gst_pad_set_event_function (priv->sinkpad,
794       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
795   gst_pad_set_query_function (priv->sinkpad,
796       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
797
798   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
799   gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
800
801   GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
802 }
803
804 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
805
806 #define ITEM_TYPE_BUFFER        0
807 #define ITEM_TYPE_LOST          1
808 #define ITEM_TYPE_EVENT         2
809 #define ITEM_TYPE_QUERY         3
810
811 static RTPJitterBufferItem *
812 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
813     guint seqnum, guint count, guint rtptime)
814 {
815   RTPJitterBufferItem *item;
816
817   item = g_slice_new (RTPJitterBufferItem);
818   item->data = data;
819   item->next = NULL;
820   item->prev = NULL;
821   item->type = type;
822   item->dts = dts;
823   item->pts = pts;
824   item->seqnum = seqnum;
825   item->count = count;
826   item->rtptime = rtptime;
827
828   return item;
829 }
830
831 static void
832 free_item (RTPJitterBufferItem * item)
833 {
834   if (item->data && item->type != ITEM_TYPE_QUERY)
835     gst_mini_object_unref (item->data);
836   g_slice_free (RTPJitterBufferItem, item);
837 }
838
839 static void
840 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
841 {
842   GList **l = user_data;
843
844   if (item->data && item->type == ITEM_TYPE_EVENT
845       && GST_EVENT_IS_STICKY (item->data)) {
846     *l = g_list_prepend (*l, item->data);
847   } else if (item->data && item->type != ITEM_TYPE_QUERY) {
848     gst_mini_object_unref (item->data);
849   }
850   g_slice_free (RTPJitterBufferItem, item);
851 }
852
853 static void
854 gst_rtp_jitter_buffer_finalize (GObject * object)
855 {
856   GstRtpJitterBuffer *jitterbuffer;
857   GstRtpJitterBufferPrivate *priv;
858
859   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
860   priv = jitterbuffer->priv;
861
862   g_array_free (priv->timers, TRUE);
863   g_mutex_clear (&priv->jbuf_lock);
864   g_cond_clear (&priv->jbuf_timer);
865   g_cond_clear (&priv->jbuf_event);
866   g_cond_clear (&priv->jbuf_query);
867
868   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
869   g_object_unref (priv->jbuf);
870
871   G_OBJECT_CLASS (parent_class)->finalize (object);
872 }
873
874 static GstIterator *
875 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
876 {
877   GstRtpJitterBuffer *jitterbuffer;
878   GstPad *otherpad = NULL;
879   GstIterator *it = NULL;
880   GValue val = { 0, };
881
882   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
883
884   if (pad == jitterbuffer->priv->sinkpad) {
885     otherpad = jitterbuffer->priv->srcpad;
886   } else if (pad == jitterbuffer->priv->srcpad) {
887     otherpad = jitterbuffer->priv->sinkpad;
888   } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
889     it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
890   }
891
892   if (it == NULL) {
893     g_value_init (&val, GST_TYPE_PAD);
894     g_value_set_object (&val, otherpad);
895     it = gst_iterator_new_single (GST_TYPE_PAD, &val);
896     g_value_unset (&val);
897   }
898
899   return it;
900 }
901
902 static GstPad *
903 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
904 {
905   GstRtpJitterBufferPrivate *priv;
906
907   priv = jitterbuffer->priv;
908
909   GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
910
911   priv->rtcpsinkpad =
912       gst_pad_new_from_static_template
913       (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
914   gst_pad_set_chain_function (priv->rtcpsinkpad,
915       gst_rtp_jitter_buffer_chain_rtcp);
916   gst_pad_set_event_function (priv->rtcpsinkpad,
917       (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
918   gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
919       gst_rtp_jitter_buffer_iterate_internal_links);
920   gst_pad_set_active (priv->rtcpsinkpad, TRUE);
921   gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
922
923   return priv->rtcpsinkpad;
924 }
925
926 static void
927 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
928 {
929   GstRtpJitterBufferPrivate *priv;
930
931   priv = jitterbuffer->priv;
932
933   GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
934
935   gst_pad_set_active (priv->rtcpsinkpad, FALSE);
936
937   gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
938   priv->rtcpsinkpad = NULL;
939 }
940
941 static GstPad *
942 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
943     GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
944 {
945   GstRtpJitterBuffer *jitterbuffer;
946   GstElementClass *klass;
947   GstPad *result;
948   GstRtpJitterBufferPrivate *priv;
949
950   g_return_val_if_fail (templ != NULL, NULL);
951   g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
952
953   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
954   priv = jitterbuffer->priv;
955   klass = GST_ELEMENT_GET_CLASS (element);
956
957   GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
958
959   /* figure out the template */
960   if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
961     if (priv->rtcpsinkpad != NULL)
962       goto exists;
963
964     result = create_rtcp_sink (jitterbuffer);
965   } else
966     goto wrong_template;
967
968   return result;
969
970   /* ERRORS */
971 wrong_template:
972   {
973     g_warning ("rtpjitterbuffer: this is not our template");
974     return NULL;
975   }
976 exists:
977   {
978     g_warning ("rtpjitterbuffer: pad already requested");
979     return NULL;
980   }
981 }
982
983 static void
984 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
985 {
986   GstRtpJitterBuffer *jitterbuffer;
987   GstRtpJitterBufferPrivate *priv;
988
989   g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
990   g_return_if_fail (GST_IS_PAD (pad));
991
992   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
993   priv = jitterbuffer->priv;
994
995   GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
996
997   if (priv->rtcpsinkpad == pad) {
998     remove_rtcp_sink (jitterbuffer);
999   } else
1000     goto wrong_pad;
1001
1002   return;
1003
1004   /* ERRORS */
1005 wrong_pad:
1006   {
1007     g_warning ("gstjitterbuffer: asked to release an unknown pad");
1008     return;
1009   }
1010 }
1011
1012 static GstClock *
1013 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1014 {
1015   return gst_system_clock_obtain ();
1016 }
1017
1018 static void
1019 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1020 {
1021   GstRtpJitterBufferPrivate *priv;
1022
1023   priv = jitterbuffer->priv;
1024
1025   /* this will trigger a new pt-map request signal, FIXME, do something better. */
1026
1027   JBUF_LOCK (priv);
1028   priv->clock_rate = -1;
1029   /* do not clear current content, but refresh state for new arrival */
1030   GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1031   rtp_jitter_buffer_reset_skew (priv->jbuf);
1032   JBUF_UNLOCK (priv);
1033 }
1034
1035 static GstClockTime
1036 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1037     guint64 offset)
1038 {
1039   GstRtpJitterBufferPrivate *priv;
1040   GstClockTime last_out;
1041   RTPJitterBufferItem *item;
1042
1043   priv = jbuf->priv;
1044
1045   JBUF_LOCK (priv);
1046   GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1047       active, GST_TIME_ARGS (offset));
1048
1049   if (active != priv->active) {
1050     /* add the amount of time spent in paused to the output offset. All
1051      * outgoing buffers will have this offset applied to their timestamps in
1052      * order to make them arrive in time in the sink. */
1053     priv->out_offset = offset;
1054     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1055         GST_TIME_ARGS (priv->out_offset));
1056     priv->active = active;
1057     JBUF_SIGNAL_EVENT (priv);
1058   }
1059   if (!active) {
1060     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1061   }
1062   if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1063     /* head buffer timestamp and offset gives our output time */
1064     last_out = item->dts + priv->ts_offset;
1065   } else {
1066     /* use last known time when the buffer is empty */
1067     last_out = priv->last_out_time;
1068   }
1069   JBUF_UNLOCK (priv);
1070
1071   return last_out;
1072 }
1073
1074 static GstCaps *
1075 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1076 {
1077   GstRtpJitterBuffer *jitterbuffer;
1078   GstRtpJitterBufferPrivate *priv;
1079   GstPad *other;
1080   GstCaps *caps;
1081   GstCaps *templ;
1082
1083   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1084   priv = jitterbuffer->priv;
1085
1086   other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1087
1088   caps = gst_pad_peer_query_caps (other, filter);
1089
1090   templ = gst_pad_get_pad_template_caps (pad);
1091   if (caps == NULL) {
1092     GST_DEBUG_OBJECT (jitterbuffer, "use template");
1093     caps = templ;
1094   } else {
1095     GstCaps *intersect;
1096
1097     GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1098
1099     intersect = gst_caps_intersect (caps, templ);
1100     gst_caps_unref (caps);
1101     gst_caps_unref (templ);
1102
1103     caps = intersect;
1104   }
1105   gst_object_unref (jitterbuffer);
1106
1107   return caps;
1108 }
1109
1110 /*
1111  * Must be called with JBUF_LOCK held
1112  */
1113
1114 static gboolean
1115 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1116     GstCaps * caps)
1117 {
1118   GstRtpJitterBufferPrivate *priv;
1119   GstStructure *caps_struct;
1120   guint val;
1121   GstClockTime tval;
1122
1123   priv = jitterbuffer->priv;
1124
1125   /* first parse the caps */
1126   caps_struct = gst_caps_get_structure (caps, 0);
1127
1128   GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1129
1130   /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1131    * measure the amount of data in the buffer */
1132   if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1133     goto error;
1134
1135   if (priv->clock_rate <= 0)
1136     goto wrong_rate;
1137
1138   GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1139
1140   rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1141
1142   /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1143    * can use this to track the amount of time elapsed on the sender. */
1144   if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1145     priv->clock_base = val;
1146   else
1147     priv->clock_base = -1;
1148
1149   priv->ext_timestamp = priv->clock_base;
1150
1151   GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1152       priv->clock_base);
1153
1154   if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1155     /* first expected seqnum, only update when we didn't have a previous base. */
1156     if (priv->next_in_seqnum == -1)
1157       priv->next_in_seqnum = val;
1158     if (priv->next_seqnum == -1) {
1159       priv->next_seqnum = val;
1160       JBUF_SIGNAL_EVENT (priv);
1161     }
1162     priv->seqnum_base = val;
1163   } else {
1164     priv->seqnum_base = -1;
1165   }
1166
1167   GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1168
1169   /* the start and stop times. The seqnum-base corresponds to the start time. We
1170    * will keep track of the seqnums on the output and when we reach the one
1171    * corresponding to npt-stop, we emit the npt-stop-reached signal */
1172   if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1173     priv->npt_start = tval;
1174   else
1175     priv->npt_start = 0;
1176
1177   if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1178     priv->npt_stop = tval;
1179   else
1180     priv->npt_stop = -1;
1181
1182   GST_DEBUG_OBJECT (jitterbuffer,
1183       "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1184       GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1185
1186   return TRUE;
1187
1188   /* ERRORS */
1189 error:
1190   {
1191     GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1192     return FALSE;
1193   }
1194 wrong_rate:
1195   {
1196     GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1197     return FALSE;
1198   }
1199 }
1200
1201 static void
1202 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1203 {
1204   GstRtpJitterBufferPrivate *priv;
1205
1206   priv = jitterbuffer->priv;
1207
1208   JBUF_LOCK (priv);
1209   /* mark ourselves as flushing */
1210   priv->srcresult = GST_FLOW_FLUSHING;
1211   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1212   /* this unblocks any waiting pops on the src pad task */
1213   JBUF_SIGNAL_EVENT (priv);
1214   JBUF_SIGNAL_QUERY (priv, FALSE);
1215   JBUF_UNLOCK (priv);
1216 }
1217
1218 static void
1219 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1220 {
1221   GstRtpJitterBufferPrivate *priv;
1222
1223   priv = jitterbuffer->priv;
1224
1225   JBUF_LOCK (priv);
1226   GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1227   /* Mark as non flushing */
1228   priv->srcresult = GST_FLOW_OK;
1229   gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1230   priv->last_popped_seqnum = -1;
1231   priv->last_out_time = -1;
1232   priv->next_seqnum = -1;
1233   priv->seqnum_base = -1;
1234   priv->ips_rtptime = -1;
1235   priv->ips_dts = GST_CLOCK_TIME_NONE;
1236   priv->packet_spacing = 0;
1237   priv->next_in_seqnum = -1;
1238   priv->clock_rate = -1;
1239   priv->last_pt = -1;
1240   priv->eos = FALSE;
1241   priv->estimated_eos = -1;
1242   priv->last_elapsed = 0;
1243   priv->ext_timestamp = -1;
1244   priv->avg_jitter = 0;
1245   priv->last_dts = -1;
1246   priv->last_rtptime = -1;
1247   GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1248   rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1249   rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1250   rtp_jitter_buffer_reset_skew (priv->jbuf);
1251   remove_all_timers (jitterbuffer);
1252   JBUF_UNLOCK (priv);
1253 }
1254
1255 static gboolean
1256 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1257     GstPadMode mode, gboolean active)
1258 {
1259   gboolean result;
1260   GstRtpJitterBuffer *jitterbuffer = NULL;
1261
1262   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1263
1264   switch (mode) {
1265     case GST_PAD_MODE_PUSH:
1266       if (active) {
1267         /* allow data processing */
1268         gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1269
1270         /* start pushing out buffers */
1271         GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1272         result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1273             (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1274       } else {
1275         /* make sure all data processing stops ASAP */
1276         gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1277
1278         /* NOTE this will hardlock if the state change is called from the src pad
1279          * task thread because we will _join() the thread. */
1280         GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1281         result = gst_pad_stop_task (pad);
1282       }
1283       break;
1284     default:
1285       result = FALSE;
1286       break;
1287   }
1288   return result;
1289 }
1290
1291 static GstStateChangeReturn
1292 gst_rtp_jitter_buffer_change_state (GstElement * element,
1293     GstStateChange transition)
1294 {
1295   GstRtpJitterBuffer *jitterbuffer;
1296   GstRtpJitterBufferPrivate *priv;
1297   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1298
1299   jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1300   priv = jitterbuffer->priv;
1301
1302   switch (transition) {
1303     case GST_STATE_CHANGE_NULL_TO_READY:
1304       break;
1305     case GST_STATE_CHANGE_READY_TO_PAUSED:
1306       JBUF_LOCK (priv);
1307       /* reset negotiated values */
1308       priv->clock_rate = -1;
1309       priv->clock_base = -1;
1310       priv->peer_latency = 0;
1311       priv->last_pt = -1;
1312       /* block until we go to PLAYING */
1313       priv->blocked = TRUE;
1314       priv->timer_running = TRUE;
1315       priv->timer_thread =
1316           g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1317       JBUF_UNLOCK (priv);
1318       break;
1319     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1320       JBUF_LOCK (priv);
1321       /* unblock to allow streaming in PLAYING */
1322       priv->blocked = FALSE;
1323       JBUF_SIGNAL_EVENT (priv);
1324       JBUF_SIGNAL_TIMER (priv);
1325       JBUF_UNLOCK (priv);
1326       break;
1327     default:
1328       break;
1329   }
1330
1331   ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1332
1333   switch (transition) {
1334     case GST_STATE_CHANGE_READY_TO_PAUSED:
1335       /* we are a live element because we sync to the clock, which we can only
1336        * do in the PLAYING state */
1337       if (ret != GST_STATE_CHANGE_FAILURE)
1338         ret = GST_STATE_CHANGE_NO_PREROLL;
1339       break;
1340     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1341       JBUF_LOCK (priv);
1342       /* block to stop streaming when PAUSED */
1343       priv->blocked = TRUE;
1344       unschedule_current_timer (jitterbuffer);
1345       JBUF_UNLOCK (priv);
1346       if (ret != GST_STATE_CHANGE_FAILURE)
1347         ret = GST_STATE_CHANGE_NO_PREROLL;
1348       break;
1349     case GST_STATE_CHANGE_PAUSED_TO_READY:
1350       JBUF_LOCK (priv);
1351       gst_buffer_replace (&priv->last_sr, NULL);
1352       priv->timer_running = FALSE;
1353       unschedule_current_timer (jitterbuffer);
1354       JBUF_SIGNAL_TIMER (priv);
1355       JBUF_SIGNAL_QUERY (priv, FALSE);
1356       JBUF_UNLOCK (priv);
1357       g_thread_join (priv->timer_thread);
1358       priv->timer_thread = NULL;
1359       break;
1360     case GST_STATE_CHANGE_READY_TO_NULL:
1361       break;
1362     default:
1363       break;
1364   }
1365
1366   return ret;
1367 }
1368
1369 static gboolean
1370 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1371     GstEvent * event)
1372 {
1373   gboolean ret = TRUE;
1374   GstRtpJitterBuffer *jitterbuffer;
1375   GstRtpJitterBufferPrivate *priv;
1376
1377   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1378   priv = jitterbuffer->priv;
1379
1380   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1381
1382   switch (GST_EVENT_TYPE (event)) {
1383     case GST_EVENT_LATENCY:
1384     {
1385       GstClockTime latency;
1386
1387       gst_event_parse_latency (event, &latency);
1388
1389       GST_DEBUG_OBJECT (jitterbuffer,
1390           "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1391
1392       JBUF_LOCK (priv);
1393       /* adjust the overall buffer delay to the total pipeline latency in
1394        * buffering mode because if downstream consumes too fast (because of
1395        * large latency or queues, we would start rebuffering again. */
1396       if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1397           RTP_JITTER_BUFFER_MODE_BUFFER) {
1398         rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1399       }
1400       JBUF_UNLOCK (priv);
1401
1402       ret = gst_pad_push_event (priv->sinkpad, event);
1403       break;
1404     }
1405     default:
1406       ret = gst_pad_push_event (priv->sinkpad, event);
1407       break;
1408   }
1409
1410   return ret;
1411 }
1412
1413 /* handles and stores the event in the jitterbuffer, must be called with
1414  * LOCK */
1415 static gboolean
1416 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1417 {
1418   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1419   RTPJitterBufferItem *item;
1420   gboolean head;
1421
1422   switch (GST_EVENT_TYPE (event)) {
1423     case GST_EVENT_CAPS:
1424     {
1425       GstCaps *caps;
1426
1427       gst_event_parse_caps (event, &caps);
1428       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1429       break;
1430     }
1431     case GST_EVENT_SEGMENT:
1432       gst_event_copy_segment (event, &priv->segment);
1433
1434       /* we need time for now */
1435       if (priv->segment.format != GST_FORMAT_TIME)
1436         goto newseg_wrong_format;
1437
1438       GST_DEBUG_OBJECT (jitterbuffer,
1439           "segment:  %" GST_SEGMENT_FORMAT, &priv->segment);
1440       break;
1441     case GST_EVENT_EOS:
1442       priv->eos = TRUE;
1443       rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1444       break;
1445     default:
1446       break;
1447   }
1448
1449
1450   GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1451   item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1452   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1453   if (head)
1454     JBUF_SIGNAL_EVENT (priv);
1455
1456   return TRUE;
1457
1458   /* ERRORS */
1459 newseg_wrong_format:
1460   {
1461     GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1462     gst_event_unref (event);
1463     return FALSE;
1464   }
1465 }
1466
1467 static gboolean
1468 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1469     GstEvent * event)
1470 {
1471   gboolean ret = TRUE;
1472   GstRtpJitterBuffer *jitterbuffer;
1473   GstRtpJitterBufferPrivate *priv;
1474
1475   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1476   priv = jitterbuffer->priv;
1477
1478   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1479
1480   switch (GST_EVENT_TYPE (event)) {
1481     case GST_EVENT_FLUSH_START:
1482       ret = gst_pad_push_event (priv->srcpad, event);
1483       gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1484       /* wait for the loop to go into PAUSED */
1485       gst_pad_pause_task (priv->srcpad);
1486       break;
1487     case GST_EVENT_FLUSH_STOP:
1488       ret = gst_pad_push_event (priv->srcpad, event);
1489       ret =
1490           gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1491           GST_PAD_MODE_PUSH, TRUE);
1492       break;
1493     default:
1494       if (GST_EVENT_IS_SERIALIZED (event)) {
1495         /* serialized events go in the queue */
1496         JBUF_LOCK (priv);
1497         if (priv->srcresult != GST_FLOW_OK) {
1498           /* Errors in sticky event pushing are no problem and ignored here
1499            * as they will cause more meaningful errors during data flow.
1500            * For EOS events, that are not followed by data flow, we still
1501            * return FALSE here though.
1502            */
1503           if (!GST_EVENT_IS_STICKY (event) ||
1504               GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1505             goto out_flow_error;
1506         }
1507         /* refuse more events on EOS */
1508         if (priv->eos)
1509           goto out_eos;
1510         ret = queue_event (jitterbuffer, event);
1511         JBUF_UNLOCK (priv);
1512       } else {
1513         /* non-serialized events are forwarded downstream immediately */
1514         ret = gst_pad_push_event (priv->srcpad, event);
1515       }
1516       break;
1517   }
1518   return ret;
1519
1520   /* ERRORS */
1521 out_flow_error:
1522   {
1523     GST_DEBUG_OBJECT (jitterbuffer,
1524         "refusing event, we have a downstream flow error: %s",
1525         gst_flow_get_name (priv->srcresult));
1526     JBUF_UNLOCK (priv);
1527     gst_event_unref (event);
1528     return FALSE;
1529   }
1530 out_eos:
1531   {
1532     GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1533     JBUF_UNLOCK (priv);
1534     gst_event_unref (event);
1535     return FALSE;
1536   }
1537 }
1538
1539 static gboolean
1540 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1541     GstEvent * event)
1542 {
1543   gboolean ret = TRUE;
1544   GstRtpJitterBuffer *jitterbuffer;
1545
1546   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1547
1548   GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1549
1550   switch (GST_EVENT_TYPE (event)) {
1551     case GST_EVENT_FLUSH_START:
1552       gst_event_unref (event);
1553       break;
1554     case GST_EVENT_FLUSH_STOP:
1555       gst_event_unref (event);
1556       break;
1557     default:
1558       ret = gst_pad_event_default (pad, parent, event);
1559       break;
1560   }
1561
1562   return ret;
1563 }
1564
1565 /*
1566  * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1567  * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1568  * GST_FLOW_FLUSHING when the element is shutting down. On success
1569  * GST_FLOW_OK is returned.
1570  */
1571 static GstFlowReturn
1572 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1573     guint8 pt)
1574 {
1575   GValue ret = { 0 };
1576   GValue args[2] = { {0}, {0} };
1577   GstCaps *caps;
1578   gboolean res;
1579
1580   g_value_init (&args[0], GST_TYPE_ELEMENT);
1581   g_value_set_object (&args[0], jitterbuffer);
1582   g_value_init (&args[1], G_TYPE_UINT);
1583   g_value_set_uint (&args[1], pt);
1584
1585   g_value_init (&ret, GST_TYPE_CAPS);
1586   g_value_set_boxed (&ret, NULL);
1587
1588   JBUF_UNLOCK (jitterbuffer->priv);
1589   g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1590       &ret);
1591   JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1592
1593   g_value_unset (&args[0]);
1594   g_value_unset (&args[1]);
1595   caps = (GstCaps *) g_value_dup_boxed (&ret);
1596   g_value_unset (&ret);
1597   if (!caps)
1598     goto no_caps;
1599
1600   res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1601   gst_caps_unref (caps);
1602
1603   if (G_UNLIKELY (!res))
1604     goto parse_failed;
1605
1606   return GST_FLOW_OK;
1607
1608   /* ERRORS */
1609 no_caps:
1610   {
1611     GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1612     return GST_FLOW_ERROR;
1613   }
1614 out_flushing:
1615   {
1616     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1617     return GST_FLOW_FLUSHING;
1618   }
1619 parse_failed:
1620   {
1621     GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1622     return GST_FLOW_ERROR;
1623   }
1624 }
1625
1626 /* call with jbuf lock held */
1627 static GstMessage *
1628 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1629 {
1630   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1631   GstMessage *message = NULL;
1632
1633   if (percent == -1)
1634     return NULL;
1635
1636   /* Post a buffering message */
1637   if (priv->last_percent != percent) {
1638     priv->last_percent = percent;
1639     message =
1640         gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1641     gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1642   }
1643
1644   return message;
1645 }
1646
1647 static GstClockTime
1648 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1649 {
1650   GstRtpJitterBufferPrivate *priv;
1651
1652   priv = jitterbuffer->priv;
1653
1654   if (timestamp == -1)
1655     return -1;
1656
1657   /* apply the timestamp offset, this is used for inter stream sync */
1658   timestamp += priv->ts_offset;
1659   /* add the offset, this is used when buffering */
1660   timestamp += priv->out_offset;
1661
1662   return timestamp;
1663 }
1664
1665 static TimerData *
1666 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1667 {
1668   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1669   TimerData *timer = NULL;
1670   gint i, len;
1671
1672   len = priv->timers->len;
1673   for (i = 0; i < len; i++) {
1674     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1675     if (test->seqnum == seqnum && test->type == type) {
1676       timer = test;
1677       break;
1678     }
1679   }
1680   return timer;
1681 }
1682
1683 static void
1684 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1685 {
1686   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1687
1688   if (priv->clock_id) {
1689     GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1690     gst_clock_id_unschedule (priv->clock_id);
1691     priv->clock_id = NULL;
1692   }
1693 }
1694
1695 static GstClockTime
1696 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1697 {
1698   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1699   GstClockTime test_timeout;
1700
1701   if ((test_timeout = timer->timeout) == -1)
1702     return -1;
1703
1704   if (timer->type != TIMER_TYPE_EXPECTED) {
1705     /* add our latency and offset to get output times. */
1706     test_timeout = apply_offset (jitterbuffer, test_timeout);
1707     test_timeout += priv->latency_ns;
1708   }
1709   return test_timeout;
1710 }
1711
1712 static void
1713 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1714 {
1715   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1716
1717   if (priv->clock_id) {
1718     GstClockTime timeout = get_timeout (jitterbuffer, timer);
1719
1720     GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1721         GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1722
1723     if (timeout == -1 || timeout < priv->timer_timeout)
1724       unschedule_current_timer (jitterbuffer);
1725   }
1726 }
1727
1728 static TimerData *
1729 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1730     guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1731     GstClockTime duration)
1732 {
1733   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1734   TimerData *timer;
1735   gint len;
1736
1737   GST_DEBUG_OBJECT (jitterbuffer,
1738       "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1739       GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1740       GST_TIME_ARGS (delay));
1741
1742   len = priv->timers->len;
1743   g_array_set_size (priv->timers, len + 1);
1744   timer = &g_array_index (priv->timers, TimerData, len);
1745   timer->idx = len;
1746   timer->type = type;
1747   timer->seqnum = seqnum;
1748   timer->num = num;
1749   timer->timeout = timeout + delay;
1750   timer->duration = duration;
1751   if (type == TIMER_TYPE_EXPECTED) {
1752     timer->rtx_base = timeout;
1753     timer->rtx_delay = delay;
1754     timer->rtx_retry = 0;
1755   }
1756   timer->num_rtx_retry = 0;
1757   recalculate_timer (jitterbuffer, timer);
1758   JBUF_SIGNAL_TIMER (priv);
1759
1760   return timer;
1761 }
1762
1763 static void
1764 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1765     guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1766 {
1767   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1768   gboolean seqchange, timechange;
1769   guint16 oldseq;
1770
1771   seqchange = timer->seqnum != seqnum;
1772   timechange = timer->timeout != timeout;
1773
1774   if (!seqchange && !timechange)
1775     return;
1776
1777   oldseq = timer->seqnum;
1778
1779   GST_DEBUG_OBJECT (jitterbuffer,
1780       "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1781       oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1782
1783   timer->timeout = timeout + delay;
1784   timer->seqnum = seqnum;
1785   if (reset) {
1786     timer->rtx_base = timeout;
1787     timer->rtx_delay = delay;
1788     timer->rtx_retry = 0;
1789   }
1790   if (seqchange)
1791     timer->num_rtx_retry = 0;
1792
1793   if (priv->clock_id) {
1794     /* we changed the seqnum and there is a timer currently waiting with this
1795      * seqnum, unschedule it */
1796     if (seqchange && priv->timer_seqnum == oldseq)
1797       unschedule_current_timer (jitterbuffer);
1798     /* we changed the time, check if it is earlier than what we are waiting
1799      * for and unschedule if so */
1800     else if (timechange)
1801       recalculate_timer (jitterbuffer, timer);
1802   }
1803 }
1804
1805 static TimerData *
1806 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1807     guint16 seqnum, GstClockTime timeout)
1808 {
1809   TimerData *timer;
1810
1811   /* find the seqnum timer */
1812   timer = find_timer (jitterbuffer, type, seqnum);
1813   if (timer == NULL) {
1814     timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1815   } else {
1816     reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1817   }
1818   return timer;
1819 }
1820
1821 static void
1822 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1823 {
1824   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1825   guint idx;
1826
1827   if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1828     unschedule_current_timer (jitterbuffer);
1829
1830   idx = timer->idx;
1831   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1832   g_array_remove_index_fast (priv->timers, idx);
1833   timer->idx = idx;
1834 }
1835
1836 static void
1837 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1838 {
1839   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1840   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1841   g_array_set_size (priv->timers, 0);
1842   unschedule_current_timer (jitterbuffer);
1843 }
1844
1845 /* get the extra delay to wait before sending RTX */
1846 static GstClockTime
1847 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1848 {
1849   GstClockTime delay;
1850
1851   if (priv->rtx_delay == -1) {
1852     if (priv->avg_jitter == 0)
1853       delay = DEFAULT_AUTO_RTX_DELAY;
1854     else
1855       /* jitter is in nanoseconds, 2x jitter is a good margin */
1856       delay = priv->avg_jitter * 2;
1857   } else {
1858     delay = priv->rtx_delay * GST_MSECOND;
1859   }
1860   if (priv->rtx_min_delay > 0)
1861     delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1862
1863   return delay;
1864 }
1865
1866 /* we just received a packet with seqnum and dts.
1867  *
1868  * First check for old seqnum that we are still expecting. If the gap with the
1869  * current seqnum is too big, unschedule the timeouts.
1870  *
1871  * If we have a valid packet spacing estimate we can set a timer for when we
1872  * should receive the next packet.
1873  * If we don't have a valid estimate, we remove any timer we might have
1874  * had for this packet.
1875  */
1876 static void
1877 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1878     GstClockTime dts, gboolean do_next_seqnum)
1879 {
1880   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1881   TimerData *timer = NULL;
1882   gint i, len;
1883
1884   /* go through all timers and unschedule the ones with a large gap, also find
1885    * the timer for the seqnum */
1886   len = priv->timers->len;
1887   for (i = 0; i < len; i++) {
1888     TimerData *test = &g_array_index (priv->timers, TimerData, i);
1889     gint gap;
1890
1891     gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1892
1893     GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1894         test->type, test->seqnum, seqnum, gap);
1895
1896     if (gap == 0) {
1897       GST_DEBUG ("found timer for current seqnum");
1898       /* the timer for the current seqnum */
1899       timer = test;
1900       /* when no retransmission, we can stop now, we only need to find the
1901        * timer for the current seqnum */
1902       if (!priv->do_retransmission)
1903         break;
1904     } else if (gap > priv->rtx_delay_reorder) {
1905       /* max gap, we exceeded the max reorder distance and we don't expect the
1906        * missing packet to be this reordered */
1907       if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1908         reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1909     }
1910   }
1911
1912   do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1913       && priv->do_retransmission;
1914
1915   if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1916     if (timer->num_rtx_retry > 0) {
1917       GstClockTime rtx_last, delay;
1918
1919       /* we scheduled a retry for this packet and now we have it */
1920       priv->num_rtx_success++;
1921       /* all the previous retry attempts failed */
1922       priv->num_rtx_failed += timer->num_rtx_retry - 1;
1923       /* number of retries before receiving the packet */
1924       if (priv->avg_rtx_num == 0.0)
1925         priv->avg_rtx_num = timer->num_rtx_retry;
1926       else
1927         priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1928       /* calculate the delay between retransmission request and receiving this
1929        * packet, start with when we scheduled this timeout last */
1930       rtx_last = timer->rtx_last;
1931       if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1932         /* we have a valid delay if this packet arrived after we scheduled the
1933          * request */
1934         delay = dts - rtx_last;
1935         if (priv->avg_rtx_rtt == 0)
1936           priv->avg_rtx_rtt = delay;
1937         else
1938           priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1939       } else
1940         delay = 0;
1941
1942       GST_LOG_OBJECT (jitterbuffer,
1943           "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1944           ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1945           ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
1946           priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
1947           priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
1948           GST_TIME_ARGS (priv->avg_rtx_rtt));
1949
1950       /* don't try to estimate the next seqnum because this is a retransmitted
1951        * packet and it probably did not arrive with the expected packet
1952        * spacing. */
1953       do_next_seqnum = FALSE;
1954     }
1955   }
1956
1957   if (do_next_seqnum) {
1958     GstClockTime expected, delay;
1959
1960     /* calculate expected arrival time of the next seqnum */
1961     expected = dts + priv->packet_spacing;
1962
1963     delay = get_rtx_delay (priv);
1964
1965     /* and update/install timer for next seqnum */
1966     if (timer)
1967       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1968           delay, TRUE);
1969     else
1970       add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1971           expected, delay, priv->packet_spacing);
1972   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1973     /* if we had a timer, remove it, we don't know when to expect the next
1974      * packet. */
1975     remove_timer (jitterbuffer, timer);
1976   }
1977 }
1978
1979 static void
1980 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1981     GstClockTime dts)
1982 {
1983   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1984
1985   /* we need consecutive seqnums with a different
1986    * rtptime to estimate the packet spacing. */
1987   if (priv->ips_rtptime != rtptime) {
1988     /* rtptime changed, check dts diff */
1989     if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1990       priv->packet_spacing = dts - priv->ips_dts;
1991       GST_DEBUG_OBJECT (jitterbuffer,
1992           "new packet spacing %" GST_TIME_FORMAT,
1993           GST_TIME_ARGS (priv->packet_spacing));
1994     }
1995     priv->ips_rtptime = rtptime;
1996     priv->ips_dts = dts;
1997   }
1998 }
1999
2000 static void
2001 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2002     guint16 seqnum, GstClockTime dts, gint gap)
2003 {
2004   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2005   GstClockTime total_duration, duration, expected_dts;
2006   TimerType type;
2007
2008   GST_DEBUG_OBJECT (jitterbuffer,
2009       "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2010       GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2011
2012   /* the total duration spanned by the missing packets */
2013   if (dts >= priv->last_in_dts)
2014     total_duration = dts - priv->last_in_dts;
2015   else
2016     total_duration = 0;
2017
2018   /* interpolate between the current time and the last time based on
2019    * number of packets we are missing, this is the estimated duration
2020    * for the missing packet based on equidistant packet spacing. */
2021   duration = total_duration / (gap + 1);
2022
2023   GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2024       GST_TIME_ARGS (duration));
2025
2026   if (total_duration > priv->latency_ns) {
2027     GstClockTime gap_time;
2028     guint lost_packets;
2029
2030     gap_time = total_duration - priv->latency_ns;
2031
2032     if (duration > 0) {
2033       lost_packets = gap_time / duration;
2034       gap_time = lost_packets * duration;
2035     } else {
2036       lost_packets = gap;
2037     }
2038
2039     /* too many lost packets, some of the missing packets are already
2040      * too late and we can generate lost packet events for them. */
2041     GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2042         " > %" GST_TIME_FORMAT ", consider %u lost",
2043         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2044         lost_packets);
2045
2046     /* this timer will fire immediately and the lost event will be pushed from
2047      * the timer thread */
2048     add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2049         priv->last_in_dts + duration, 0, gap_time);
2050
2051     expected += lost_packets;
2052     priv->last_in_dts += gap_time;
2053   }
2054
2055   expected_dts = priv->last_in_dts + duration;
2056
2057   if (priv->do_retransmission) {
2058     TimerData *timer;
2059
2060     type = TIMER_TYPE_EXPECTED;
2061     /* if we had a timer for the first missing packet, update it. */
2062     if ((timer = find_timer (jitterbuffer, type, expected))) {
2063       GstClockTime timeout = timer->timeout;
2064
2065       timer->duration = duration;
2066       if (timeout > expected_dts) {
2067         GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2068         reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2069             delay, TRUE);
2070       }
2071       expected++;
2072       expected_dts += duration;
2073     }
2074   } else {
2075     type = TIMER_TYPE_LOST;
2076   }
2077
2078   while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2079     add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2080     expected_dts += duration;
2081     expected++;
2082   }
2083 }
2084
2085 static void
2086 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2087     guint rtptime)
2088 {
2089   gint32 rtpdiff;
2090   GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2091   GstRtpJitterBufferPrivate *priv;
2092
2093   priv = jitterbuffer->priv;
2094
2095   if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2096     goto no_time;
2097
2098   if (priv->last_dts != -1)
2099     dtsdiff = dts - priv->last_dts;
2100   else
2101     dtsdiff = 0;
2102
2103   if (priv->last_rtptime != -1)
2104     rtpdiff = rtptime - (guint32) priv->last_rtptime;
2105   else
2106     rtpdiff = 0;
2107
2108   priv->last_dts = dts;
2109   priv->last_rtptime = rtptime;
2110
2111   if (rtpdiff > 0)
2112     rtpdiffns =
2113         gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2114   else
2115     rtpdiffns =
2116         -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2117
2118   diff = ABS (dtsdiff - rtpdiffns);
2119
2120   /* jitter is stored in nanoseconds */
2121   priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2122
2123   GST_LOG_OBJECT (jitterbuffer,
2124       "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2125       ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2126       GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2127       GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2128
2129   return;
2130
2131   /* ERRORS */
2132 no_time:
2133   {
2134     GST_DEBUG_OBJECT (jitterbuffer,
2135         "no dts or no clock-rate, can't calculate jitter");
2136     return;
2137   }
2138 }
2139
2140 static GstFlowReturn
2141 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2142     GstBuffer * buffer)
2143 {
2144   GstRtpJitterBuffer *jitterbuffer;
2145   GstRtpJitterBufferPrivate *priv;
2146   guint16 seqnum;
2147   guint32 expected, rtptime;
2148   GstFlowReturn ret = GST_FLOW_OK;
2149   GstClockTime dts, pts;
2150   guint64 latency_ts;
2151   gboolean head;
2152   gint percent = -1;
2153   guint8 pt;
2154   GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2155   gboolean do_next_seqnum = FALSE;
2156   RTPJitterBufferItem *item;
2157   GstMessage *msg = NULL;
2158
2159   jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2160
2161   priv = jitterbuffer->priv;
2162
2163   if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2164     goto invalid_buffer;
2165
2166   pt = gst_rtp_buffer_get_payload_type (&rtp);
2167   seqnum = gst_rtp_buffer_get_seq (&rtp);
2168   rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2169   gst_rtp_buffer_unmap (&rtp);
2170
2171   /* make sure we have PTS and DTS set */
2172   pts = GST_BUFFER_PTS (buffer);
2173   dts = GST_BUFFER_DTS (buffer);
2174   if (dts == -1)
2175     dts = pts;
2176   else if (pts == -1)
2177     pts = dts;
2178
2179   /* take the DTS of the buffer. This is the time when the packet was
2180    * received and is used to calculate jitter and clock skew. We will adjust
2181    * this DTS with the smoothed value after processing it in the
2182    * jitterbuffer and assign it as the PTS. */
2183   /* bring to running time */
2184   dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2185
2186   GST_DEBUG_OBJECT (jitterbuffer,
2187       "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2188       GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2189
2190   JBUF_LOCK_CHECK (priv, out_flushing);
2191
2192   if (G_UNLIKELY (priv->last_pt != pt)) {
2193     GstCaps *caps;
2194
2195     GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2196         pt);
2197
2198     priv->last_pt = pt;
2199     /* reset clock-rate so that we get a new one */
2200     priv->clock_rate = -1;
2201
2202     /* Try to get the clock-rate from the caps first if we can. If there are no
2203      * caps we must fire the signal to get the clock-rate. */
2204     if ((caps = gst_pad_get_current_caps (pad))) {
2205       gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2206       gst_caps_unref (caps);
2207     }
2208   }
2209
2210   if (G_UNLIKELY (priv->clock_rate == -1)) {
2211     /* no clock rate given on the caps, try to get one with the signal */
2212     if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2213             pt) == GST_FLOW_FLUSHING)
2214       goto out_flushing;
2215
2216     if (G_UNLIKELY (priv->clock_rate == -1))
2217       goto no_clock_rate;
2218   }
2219
2220   /* don't accept more data on EOS */
2221   if (G_UNLIKELY (priv->eos))
2222     goto have_eos;
2223
2224   calculate_jitter (jitterbuffer, dts, rtptime);
2225
2226   if (priv->seqnum_base != -1) {
2227     gint gap;
2228
2229     gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2230
2231     if (gap < 0) {
2232       GST_DEBUG_OBJECT (jitterbuffer,
2233           "packet seqnum #%d before seqnum-base #%d", seqnum,
2234           priv->seqnum_base);
2235       gst_buffer_unref (buffer);
2236       ret = GST_FLOW_OK;
2237       goto finished;
2238     } else if (gap > 16384) {
2239       /* From now on don't compare against the seqnum base anymore as
2240        * at some point in the future we will wrap around and also that
2241        * much reordering is very unlikely */
2242       priv->seqnum_base = -1;
2243     }
2244   }
2245
2246   expected = priv->next_in_seqnum;
2247
2248   /* now check against our expected seqnum */
2249   if (G_LIKELY (expected != -1)) {
2250     gint gap;
2251
2252     /* now calculate gap */
2253     gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2254
2255     GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2256         expected, seqnum, gap);
2257
2258     if (G_LIKELY (gap == 0)) {
2259       /* packet is expected */
2260       calculate_packet_spacing (jitterbuffer, rtptime, dts);
2261       do_next_seqnum = TRUE;
2262     } else {
2263       gboolean reset = FALSE;
2264
2265       if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2266         /* We would run into calculations with GST_CLOCK_TIME_NONE below
2267          * and can't compensate for anything without DTS on RTP packets
2268          */
2269         goto gap_but_no_dts;
2270       } else if (gap < 0) {
2271         /* we received an old packet */
2272         if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2273           /* too old packet, reset */
2274           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2275               -RTP_MAX_MISORDER);
2276           reset = TRUE;
2277         } else {
2278           GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2279         }
2280       } else {
2281         /* new packet, we are missing some packets */
2282         if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2283           /* packet too far in future, reset */
2284           GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2285               RTP_MAX_DROPOUT);
2286           reset = TRUE;
2287         } else {
2288           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2289           /* fill in the gap with EXPECTED timers */
2290           calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2291
2292           do_next_seqnum = TRUE;
2293         }
2294       }
2295       if (G_UNLIKELY (reset)) {
2296         GList *events = NULL, *l;
2297
2298         GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2299         rtp_jitter_buffer_flush (priv->jbuf,
2300             (GFunc) free_item_and_retain_events, &events);
2301         rtp_jitter_buffer_reset_skew (priv->jbuf);
2302         remove_all_timers (jitterbuffer);
2303         priv->last_popped_seqnum = -1;
2304         priv->next_seqnum = seqnum;
2305         do_next_seqnum = TRUE;
2306
2307         /* Insert all sticky events again in order, otherwise we would
2308          * potentially loose STREAM_START, CAPS or SEGMENT events
2309          */
2310         events = g_list_reverse (events);
2311         for (l = events; l; l = l->next) {
2312           RTPJitterBufferItem *item;
2313
2314           item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2315           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2316         }
2317         g_list_free (events);
2318
2319         JBUF_SIGNAL_EVENT (priv);
2320       }
2321       /* reset spacing estimation when gap */
2322       priv->ips_rtptime = -1;
2323       priv->ips_dts = GST_CLOCK_TIME_NONE;
2324     }
2325   } else {
2326     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2327     /* we don't know what the next_in_seqnum should be, wait for the last
2328      * possible moment to push this buffer, maybe we get an earlier seqnum
2329      * while we wait */
2330     set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2331     do_next_seqnum = TRUE;
2332     /* take rtptime and dts to calculate packet spacing */
2333     priv->ips_rtptime = rtptime;
2334     priv->ips_dts = dts;
2335   }
2336   if (do_next_seqnum) {
2337     priv->last_in_seqnum = seqnum;
2338     priv->last_in_dts = dts;
2339     priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2340   }
2341
2342   /* let's check if this buffer is too late, we can only accept packets with
2343    * bigger seqnum than the one we last pushed. */
2344   if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2345     gint gap;
2346
2347     gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2348
2349     /* priv->last_popped_seqnum >= seqnum, we're too late. */
2350     if (G_UNLIKELY (gap <= 0))
2351       goto too_late;
2352   }
2353
2354   /* let's drop oldest packet if the queue is already full and drop-on-latency
2355    * is set. We can only do this when there actually is a latency. When no
2356    * latency is set, we just pump it in the queue and let the other end push it
2357    * out as fast as possible. */
2358   if (priv->latency_ms && priv->drop_on_latency) {
2359     latency_ts =
2360         gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2361
2362     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2363       RTPJitterBufferItem *old_item;
2364
2365       old_item = rtp_jitter_buffer_peek (priv->jbuf);
2366
2367       if (IS_DROPABLE (old_item)) {
2368         old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2369         GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2370             old_item);
2371         priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2372         free_item (old_item);
2373       }
2374       /* we might have removed some head buffers, signal the pushing thread to
2375        * see if it can push now */
2376       JBUF_SIGNAL_EVENT (priv);
2377     }
2378   }
2379
2380   item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2381
2382   /* now insert the packet into the queue in sorted order. This function returns
2383    * FALSE if a packet with the same seqnum was already in the queue, meaning we
2384    * have a duplicate. */
2385   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2386               &head, &percent)))
2387     goto duplicate;
2388
2389   /* update timers */
2390   update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2391
2392   /* we had an unhandled SR, handle it now */
2393   if (priv->last_sr)
2394     do_handle_sync (jitterbuffer);
2395
2396   if (G_UNLIKELY (head)) {
2397     /* signal addition of new buffer when the _loop is waiting. */
2398     if (G_LIKELY (priv->active))
2399       JBUF_SIGNAL_EVENT (priv);
2400
2401     /* let's unschedule and unblock any waiting buffers. We only want to do this
2402      * when the head buffer changed */
2403     if (G_UNLIKELY (priv->clock_id)) {
2404       GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2405       unschedule_current_timer (jitterbuffer);
2406     }
2407   }
2408
2409   GST_DEBUG_OBJECT (jitterbuffer,
2410       "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2411       rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2412
2413   msg = check_buffering_percent (jitterbuffer, percent);
2414
2415 finished:
2416   JBUF_UNLOCK (priv);
2417
2418   if (msg)
2419     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2420
2421   return ret;
2422
2423   /* ERRORS */
2424 invalid_buffer:
2425   {
2426     /* this is not fatal but should be filtered earlier */
2427     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2428         ("Received invalid RTP payload, dropping"));
2429     gst_buffer_unref (buffer);
2430     return GST_FLOW_OK;
2431   }
2432 no_clock_rate:
2433   {
2434     GST_WARNING_OBJECT (jitterbuffer,
2435         "No clock-rate in caps!, dropping buffer");
2436     gst_buffer_unref (buffer);
2437     goto finished;
2438   }
2439 out_flushing:
2440   {
2441     ret = priv->srcresult;
2442     GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2443     gst_buffer_unref (buffer);
2444     goto finished;
2445   }
2446 have_eos:
2447   {
2448     ret = GST_FLOW_EOS;
2449     GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2450     gst_buffer_unref (buffer);
2451     goto finished;
2452   }
2453 too_late:
2454   {
2455     GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2456         " popped, dropping", seqnum, priv->last_popped_seqnum);
2457     priv->num_late++;
2458     gst_buffer_unref (buffer);
2459     goto finished;
2460   }
2461 duplicate:
2462   {
2463     GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2464         seqnum);
2465     priv->num_duplicates++;
2466     free_item (item);
2467     goto finished;
2468   }
2469 gap_but_no_dts:
2470   {
2471     /* this is fatal as we can't compensate for gaps without DTS */
2472     GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2473         ("Received packet without DTS after a gap"));
2474     gst_buffer_unref (buffer);
2475     ret = GST_FLOW_ERROR;
2476     goto finished;
2477   }
2478 }
2479
2480 static GstClockTime
2481 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2482 {
2483   guint64 ext_time, elapsed;
2484   guint32 rtp_time;
2485   GstRtpJitterBufferPrivate *priv;
2486
2487   priv = jitterbuffer->priv;
2488   rtp_time = item->rtptime;
2489
2490   GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2491       G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2492
2493   if (rtp_time < priv->ext_timestamp) {
2494     ext_time = priv->ext_timestamp;
2495   } else {
2496     ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2497   }
2498
2499   if (ext_time > priv->clock_base)
2500     elapsed = ext_time - priv->clock_base;
2501   else
2502     elapsed = 0;
2503
2504   elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2505   return elapsed;
2506 }
2507
2508 static void
2509 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2510     RTPJitterBufferItem * item)
2511 {
2512   guint64 total, elapsed, left, estimated;
2513   GstClockTime out_time;
2514   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2515
2516   if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2517       || priv->clock_base == -1 || priv->clock_rate <= 0)
2518     return;
2519
2520   /* compute the elapsed time */
2521   elapsed = compute_elapsed (jitterbuffer, item);
2522
2523   /* do nothing if elapsed time doesn't increment */
2524   if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2525     return;
2526
2527   priv->last_elapsed = elapsed;
2528
2529   /* this is the total time we need to play */
2530   total = priv->npt_stop - priv->npt_start;
2531   GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2532       GST_TIME_ARGS (total));
2533
2534   /* this is how much time there is left */
2535   if (total > elapsed)
2536     left = total - elapsed;
2537   else
2538     left = 0;
2539
2540   /* if we have less time left that the size of the buffer, we will not
2541    * be able to keep it filled, disabled buffering then */
2542   if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2543     GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2544         ", disable buffering close to EOS", GST_TIME_ARGS (left));
2545     rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2546   }
2547
2548   /* this is the current time as running-time */
2549   out_time = item->dts;
2550
2551   if (elapsed > 0)
2552     estimated = gst_util_uint64_scale (out_time, total, elapsed);
2553   else {
2554     /* if there is almost nothing left,
2555      * we may never advance enough to end up in the above case */
2556     if (total < GST_SECOND)
2557       estimated = GST_SECOND;
2558     else
2559       estimated = -1;
2560   }
2561   GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2562       GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2563
2564   if (estimated != -1 && priv->estimated_eos != estimated) {
2565     set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2566     priv->estimated_eos = estimated;
2567   }
2568 }
2569
2570 /* take a buffer from the queue and push it */
2571 static GstFlowReturn
2572 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2573 {
2574   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2575   GstFlowReturn result = GST_FLOW_OK;
2576   RTPJitterBufferItem *item;
2577   GstBuffer *outbuf = NULL;
2578   GstEvent *outevent = NULL;
2579   GstQuery *outquery = NULL;
2580   GstClockTime dts, pts;
2581   gint percent = -1;
2582   gboolean do_push = TRUE;
2583   guint type;
2584   GstMessage *msg;
2585
2586   /* when we get here we are ready to pop and push the buffer */
2587   item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2588   type = item->type;
2589
2590   switch (type) {
2591     case ITEM_TYPE_BUFFER:
2592
2593       /* we need to make writable to change the flags and timestamps */
2594       outbuf = gst_buffer_make_writable (item->data);
2595
2596       if (G_UNLIKELY (priv->discont)) {
2597         /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2598          * into the jitterbuffer so we can modify now. */
2599         GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2600         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2601         priv->discont = FALSE;
2602       }
2603       if (G_UNLIKELY (priv->ts_discont)) {
2604         GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2605         priv->ts_discont = FALSE;
2606       }
2607
2608       dts =
2609           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2610       pts =
2611           gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2612
2613       /* apply timestamp with offset to buffer now */
2614       GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2615       GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2616
2617       /* update the elapsed time when we need to check against the npt stop time. */
2618       update_estimated_eos (jitterbuffer, item);
2619
2620       priv->last_out_time = GST_BUFFER_PTS (outbuf);
2621       break;
2622     case ITEM_TYPE_LOST:
2623       priv->discont = TRUE;
2624       if (!priv->do_lost)
2625         do_push = FALSE;
2626       /* FALLTHROUGH */
2627     case ITEM_TYPE_EVENT:
2628       outevent = item->data;
2629       break;
2630     case ITEM_TYPE_QUERY:
2631       outquery = item->data;
2632       break;
2633   }
2634
2635   /* now we are ready to push the buffer. Save the seqnum and release the lock
2636    * so the other end can push stuff in the queue again. */
2637   if (seqnum != -1) {
2638     priv->last_popped_seqnum = seqnum;
2639     priv->next_seqnum = (seqnum + item->count) & 0xffff;
2640   }
2641   msg = check_buffering_percent (jitterbuffer, percent);
2642   JBUF_UNLOCK (priv);
2643
2644   item->data = NULL;
2645   free_item (item);
2646
2647   if (msg)
2648     gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2649
2650   switch (type) {
2651     case ITEM_TYPE_BUFFER:
2652       /* push buffer */
2653       GST_DEBUG_OBJECT (jitterbuffer,
2654           "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2655           seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2656           GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2657       result = gst_pad_push (priv->srcpad, outbuf);
2658
2659       JBUF_LOCK_CHECK (priv, out_flushing);
2660       break;
2661     case ITEM_TYPE_LOST:
2662     case ITEM_TYPE_EVENT:
2663       GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2664           ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2665
2666       if (do_push)
2667         gst_pad_push_event (priv->srcpad, outevent);
2668       else
2669         gst_event_unref (outevent);
2670
2671       result = GST_FLOW_OK;
2672
2673       JBUF_LOCK_CHECK (priv, out_flushing);
2674       break;
2675     case ITEM_TYPE_QUERY:
2676     {
2677       gboolean res;
2678
2679       res = gst_pad_peer_query (priv->srcpad, outquery);
2680
2681       JBUF_LOCK_CHECK (priv, out_flushing);
2682       result = GST_FLOW_OK;
2683       GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2684       JBUF_SIGNAL_QUERY (priv, res);
2685       break;
2686     }
2687   }
2688   return result;
2689
2690   /* ERRORS */
2691 out_flushing:
2692   {
2693     return priv->srcresult;
2694   }
2695 }
2696
2697 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2698
2699 /* Peek a buffer and compare the seqnum to the expected seqnum.
2700  * If all is fine, the buffer is pushed.
2701  * If something is wrong, we wait for some event
2702  */
2703 static GstFlowReturn
2704 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2705 {
2706   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2707   GstFlowReturn result = GST_FLOW_OK;
2708   RTPJitterBufferItem *item;
2709   guint seqnum;
2710   guint32 next_seqnum;
2711   gint gap;
2712
2713   /* only push buffers when PLAYING and active and not buffering */
2714   if (priv->blocked || !priv->active ||
2715       rtp_jitter_buffer_is_buffering (priv->jbuf))
2716     return GST_FLOW_WAIT;
2717
2718 again:
2719   /* peek a buffer, we're just looking at the sequence number.
2720    * If all is fine, we'll pop and push it. If the sequence number is wrong we
2721    * wait for a timeout or something to change.
2722    * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2723   item = rtp_jitter_buffer_peek (priv->jbuf);
2724   if (item == NULL)
2725     goto wait;
2726
2727   /* get the seqnum and the next expected seqnum */
2728   seqnum = item->seqnum;
2729   if (seqnum == -1)
2730     goto do_push;
2731
2732   next_seqnum = priv->next_seqnum;
2733
2734   /* get the gap between this and the previous packet. If we don't know the
2735    * previous packet seqnum assume no gap. */
2736   if (G_UNLIKELY (next_seqnum == -1)) {
2737     GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2738     /* we don't know what the next_seqnum should be, the chain function should
2739      * have scheduled a DEADLINE timer that will increment next_seqnum when it
2740      * fires, so wait for that */
2741     result = GST_FLOW_WAIT;
2742   } else {
2743     /* else calculate GAP */
2744     gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2745
2746     if (G_LIKELY (gap == 0)) {
2747     do_push:
2748       /* no missing packet, pop and push */
2749       result = pop_and_push_next (jitterbuffer, seqnum);
2750     } else if (G_UNLIKELY (gap < 0)) {
2751       RTPJitterBufferItem *item;
2752       /* if we have a packet that we already pushed or considered dropped, pop it
2753        * off and get the next packet */
2754       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2755           seqnum, next_seqnum);
2756       item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2757       free_item (item);
2758       goto again;
2759     } else {
2760       /* the chain function has scheduled timers to request retransmission or
2761        * when to consider the packet lost, wait for that */
2762       GST_DEBUG_OBJECT (jitterbuffer,
2763           "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2764           next_seqnum, seqnum, gap);
2765       result = GST_FLOW_WAIT;
2766     }
2767   }
2768   return result;
2769
2770 wait:
2771   {
2772     GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2773     if (priv->eos)
2774       result = GST_FLOW_EOS;
2775     else
2776       result = GST_FLOW_WAIT;
2777     return result;
2778   }
2779 }
2780
2781 static GstClockTime
2782 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2783 {
2784   GstClockTime rtx_retry_timeout;
2785   GstClockTime rtx_min_retry_timeout;
2786
2787   if (priv->rtx_retry_timeout == -1) {
2788     if (priv->avg_rtx_rtt == 0)
2789       rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2790     else
2791       /* we want to ask for a retransmission after we waited for a
2792        * complete RTT and the additional jitter */
2793       rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2794   } else {
2795     rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2796   }
2797   /* make sure we don't retry too often. On very low latency networks,
2798    * the RTT and jitter can be very low. */
2799   if (priv->rtx_min_retry_timeout == -1) {
2800     rtx_min_retry_timeout = priv->packet_spacing;
2801   } else {
2802     rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2803   }
2804   rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2805
2806   return rtx_retry_timeout;
2807 }
2808
2809 static GstClockTime
2810 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2811     GstClockTime rtx_retry_timeout)
2812 {
2813   GstClockTime rtx_retry_period;
2814
2815   if (priv->rtx_retry_period == -1) {
2816     /* we retry up to the configured jitterbuffer size but leaving some
2817      * room for the retransmission to arrive in time */
2818     if (rtx_retry_timeout > priv->latency_ns) {
2819       rtx_retry_period = 0;
2820     } else {
2821       rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2822     }
2823   } else {
2824     rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2825   }
2826   return rtx_retry_period;
2827 }
2828
2829 /* the timeout for when we expected a packet expired */
2830 static gboolean
2831 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2832     GstClockTime now)
2833 {
2834   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2835   GstEvent *event;
2836   guint delay, delay_ms, avg_rtx_rtt_ms;
2837   guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2838   GstClockTime rtx_retry_period;
2839   GstClockTime rtx_retry_timeout;
2840   GstClock *clock;
2841
2842   GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2843       GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2844
2845   rtx_retry_timeout = get_rtx_retry_timeout (priv);
2846   rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2847
2848   GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2849       GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2850       GST_TIME_ARGS (rtx_retry_period));
2851
2852   delay = timer->rtx_delay + timer->rtx_retry;
2853
2854   delay_ms = GST_TIME_AS_MSECONDS (delay);
2855   rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2856   rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2857   avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2858
2859   event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2860       gst_structure_new ("GstRTPRetransmissionRequest",
2861           "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2862           "running-time", G_TYPE_UINT64, timer->rtx_base,
2863           "delay", G_TYPE_UINT, delay_ms,
2864           "retry", G_TYPE_UINT, timer->num_rtx_retry,
2865           "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2866           "period", G_TYPE_UINT, rtx_retry_period_ms,
2867           "deadline", G_TYPE_UINT, priv->latency_ms,
2868           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2869           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2870
2871   priv->num_rtx_requests++;
2872   timer->num_rtx_retry++;
2873
2874   GST_OBJECT_LOCK (jitterbuffer);
2875   if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2876     timer->rtx_last = gst_clock_get_time (clock);
2877     timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2878   } else {
2879     timer->rtx_last = now;
2880   }
2881   GST_OBJECT_UNLOCK (jitterbuffer);
2882
2883   /* calculate the timeout for the next retransmission attempt */
2884   timer->rtx_retry += rtx_retry_timeout;
2885   GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2886       GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2887       GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2888       GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2889
2890   if (timer->rtx_retry + timer->rtx_delay > rtx_retry_period) {
2891     GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2892     /* too many retransmission request, we now convert the timer
2893      * to a lost timer, leave the num_rtx_retry as it is for stats */
2894     timer->type = TIMER_TYPE_LOST;
2895     timer->rtx_delay = 0;
2896     timer->rtx_retry = 0;
2897   }
2898   reschedule_timer (jitterbuffer, timer, timer->seqnum,
2899       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2900
2901   JBUF_UNLOCK (priv);
2902   gst_pad_push_event (priv->sinkpad, event);
2903   JBUF_LOCK (priv);
2904
2905   return FALSE;
2906 }
2907
2908 /* a packet is lost */
2909 static gboolean
2910 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2911     GstClockTime now)
2912 {
2913   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2914   GstClockTime duration, timestamp;
2915   guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
2916   gboolean late, head;
2917   GstEvent *event;
2918   RTPJitterBufferItem *item;
2919
2920   seqnum = timer->seqnum;
2921   timestamp = apply_offset (jitterbuffer, timer->timeout);
2922   duration = timer->duration;
2923   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2924     duration = priv->packet_spacing;
2925   lost_packets = MAX (timer->num, 1);
2926   late = timer->num > 0;
2927   num_rtx_retry = timer->num_rtx_retry;
2928
2929   /* we had a gap and thus we lost some packets. Create an event for this.  */
2930   if (lost_packets > 1)
2931     GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2932         seqnum + lost_packets - 1);
2933   else
2934     GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2935
2936   priv->num_late += lost_packets;
2937   priv->num_rtx_failed += num_rtx_retry;
2938
2939   next_in_seqnum = (seqnum + lost_packets) & 0xffff;
2940
2941   /* we now only accept seqnum bigger than this */
2942   if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
2943     priv->next_in_seqnum = next_in_seqnum;
2944
2945   /* create paket lost event */
2946   event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2947       gst_structure_new ("GstRTPPacketLost",
2948           "seqnum", G_TYPE_UINT, (guint) seqnum,
2949           "timestamp", G_TYPE_UINT64, timestamp,
2950           "duration", G_TYPE_UINT64, duration,
2951           "late", G_TYPE_BOOLEAN, late,
2952           "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2953
2954   item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2955   rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2956
2957   /* remove timer now */
2958   remove_timer (jitterbuffer, timer);
2959   if (head)
2960     JBUF_SIGNAL_EVENT (priv);
2961
2962   return TRUE;
2963 }
2964
2965 static gboolean
2966 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2967     GstClockTime now)
2968 {
2969   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2970
2971   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2972   remove_timer (jitterbuffer, timer);
2973   if (!priv->eos) {
2974     /* there was no EOS in the buffer, put one in there now */
2975     queue_event (jitterbuffer, gst_event_new_eos ());
2976   }
2977   JBUF_SIGNAL_EVENT (priv);
2978
2979   return TRUE;
2980 }
2981
2982 static gboolean
2983 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2984     GstClockTime now)
2985 {
2986   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2987
2988   GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2989
2990   /* timer seqnum might have been obsoleted by caps seqnum-base,
2991    * only mess with current ongoing seqnum if still unknown */
2992   if (priv->next_seqnum == -1)
2993     priv->next_seqnum = timer->seqnum;
2994   remove_timer (jitterbuffer, timer);
2995   JBUF_SIGNAL_EVENT (priv);
2996
2997   return TRUE;
2998 }
2999
3000 static gboolean
3001 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3002     GstClockTime now)
3003 {
3004   gboolean removed = FALSE;
3005
3006   switch (timer->type) {
3007     case TIMER_TYPE_EXPECTED:
3008       removed = do_expected_timeout (jitterbuffer, timer, now);
3009       break;
3010     case TIMER_TYPE_LOST:
3011       removed = do_lost_timeout (jitterbuffer, timer, now);
3012       break;
3013     case TIMER_TYPE_DEADLINE:
3014       removed = do_deadline_timeout (jitterbuffer, timer, now);
3015       break;
3016     case TIMER_TYPE_EOS:
3017       removed = do_eos_timeout (jitterbuffer, timer, now);
3018       break;
3019   }
3020   return removed;
3021 }
3022
3023 /* called when we need to wait for the next timeout.
3024  *
3025  * We loop over the array of recorded timeouts and wait for the earliest one.
3026  * When it timed out, do the logic associated with the timer.
3027  *
3028  * If there are no timers, we wait on a gcond until something new happens.
3029  */
3030 static void
3031 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3032 {
3033   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3034   GstClockTime now = 0;
3035
3036   JBUF_LOCK (priv);
3037   while (priv->timer_running) {
3038     TimerData *timer = NULL;
3039     GstClockTime timer_timeout = -1;
3040     gint i, len;
3041
3042     GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3043         GST_TIME_ARGS (now));
3044
3045     len = priv->timers->len;
3046     for (i = 0; i < len; i++) {
3047       TimerData *test = &g_array_index (priv->timers, TimerData, i);
3048       GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3049       gboolean save_best = FALSE;
3050
3051       GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3052           i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3053
3054       /* find the smallest timeout */
3055       if (timer == NULL) {
3056         save_best = TRUE;
3057       } else if (timer_timeout == -1) {
3058         /* we already have an immediate timeout, the new timer must be an
3059          * immediate timer with smaller seqnum to become the best */
3060         if (test_timeout == -1
3061             && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3062                     timer->seqnum) > 0))
3063           save_best = TRUE;
3064       } else if (test_timeout == -1) {
3065         /* first immediate timer */
3066         save_best = TRUE;
3067       } else if (test_timeout < timer_timeout) {
3068         /* earlier timer */
3069         save_best = TRUE;
3070       } else if (test_timeout == timer_timeout
3071           && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3072                   timer->seqnum) > 0)) {
3073         /* same timer, smaller seqnum */
3074         save_best = TRUE;
3075       }
3076       if (save_best) {
3077         GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3078         timer = test;
3079         timer_timeout = test_timeout;
3080       }
3081     }
3082     if (timer && !priv->blocked) {
3083       GstClock *clock;
3084       GstClockTime sync_time;
3085       GstClockID id;
3086       GstClockReturn ret;
3087       GstClockTimeDiff clock_jitter;
3088
3089       if (timer_timeout == -1 || timer_timeout <= now) {
3090         do_timeout (jitterbuffer, timer, now);
3091         /* check here, do_timeout could have released the lock */
3092         if (!priv->timer_running)
3093           break;
3094         continue;
3095       }
3096
3097       GST_OBJECT_LOCK (jitterbuffer);
3098       clock = GST_ELEMENT_CLOCK (jitterbuffer);
3099       if (!clock) {
3100         GST_OBJECT_UNLOCK (jitterbuffer);
3101         /* let's just push if there is no clock */
3102         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3103         now = timer_timeout;
3104         continue;
3105       }
3106
3107       /* prepare for sync against clock */
3108       sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3109       /* add latency of peer to get input time */
3110       sync_time += priv->peer_latency;
3111
3112       GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3113           " with sync time %" GST_TIME_FORMAT,
3114           GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3115
3116       /* create an entry for the clock */
3117       id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3118       priv->timer_timeout = timer_timeout;
3119       priv->timer_seqnum = timer->seqnum;
3120       GST_OBJECT_UNLOCK (jitterbuffer);
3121
3122       /* release the lock so that the other end can push stuff or unlock */
3123       JBUF_UNLOCK (priv);
3124
3125       ret = gst_clock_id_wait (id, &clock_jitter);
3126
3127       JBUF_LOCK (priv);
3128       if (!priv->timer_running) {
3129         gst_clock_id_unref (id);
3130         priv->clock_id = NULL;
3131         break;
3132       }
3133
3134       if (ret != GST_CLOCK_UNSCHEDULED) {
3135         now = timer_timeout + MAX (clock_jitter, 0);
3136         GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3137             ret, priv->timer_seqnum, clock_jitter);
3138       } else {
3139         GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3140       }
3141       /* and free the entry */
3142       gst_clock_id_unref (id);
3143       priv->clock_id = NULL;
3144     } else {
3145       /* no timers, wait for activity */
3146       JBUF_WAIT_TIMER (priv);
3147     }
3148   }
3149   JBUF_UNLOCK (priv);
3150
3151   GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3152   return;
3153 }
3154
3155 /*
3156  * This funcion implements the main pushing loop on the source pad.
3157  *
3158  * It first tries to push as many buffers as possible. If there is a seqnum
3159  * mismatch, we wait for the next timeouts.
3160  */
3161 static void
3162 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3163 {
3164   GstRtpJitterBufferPrivate *priv;
3165   GstFlowReturn result = GST_FLOW_OK;
3166
3167   priv = jitterbuffer->priv;
3168
3169   JBUF_LOCK_CHECK (priv, flushing);
3170   do {
3171     result = handle_next_buffer (jitterbuffer);
3172     if (G_LIKELY (result == GST_FLOW_WAIT)) {
3173       /* now wait for the next event */
3174       JBUF_WAIT_EVENT (priv, flushing);
3175       result = GST_FLOW_OK;
3176     }
3177   }
3178   while (result == GST_FLOW_OK);
3179   /* store result for upstream */
3180   priv->srcresult = result;
3181   /* if we get here we need to pause */
3182   goto pause;
3183
3184   /* ERRORS */
3185 flushing:
3186   {
3187     result = priv->srcresult;
3188     goto pause;
3189   }
3190 pause:
3191   {
3192     GstEvent *event;
3193
3194     JBUF_SIGNAL_QUERY (priv, FALSE);
3195     JBUF_UNLOCK (priv);
3196
3197     GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3198         gst_flow_get_name (result));
3199     gst_pad_pause_task (priv->srcpad);
3200     if (result == GST_FLOW_EOS) {
3201       event = gst_event_new_eos ();
3202       gst_pad_push_event (priv->srcpad, event);
3203     }
3204     return;
3205   }
3206 }
3207
3208 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3209  * some sanity checks and then emit the handle-sync signal with the parameters.
3210  * This function must be called with the LOCK */
3211 static void
3212 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3213 {
3214   GstRtpJitterBufferPrivate *priv;
3215   guint64 base_rtptime, base_time;
3216   guint32 clock_rate;
3217   guint64 last_rtptime;
3218   guint64 clock_base;
3219   guint64 ext_rtptime, diff;
3220   gboolean valid = TRUE, keep = FALSE;
3221
3222   priv = jitterbuffer->priv;
3223
3224   /* get the last values from the jitterbuffer */
3225   rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3226       &clock_rate, &last_rtptime);
3227
3228   clock_base = priv->clock_base;
3229   ext_rtptime = priv->ext_rtptime;
3230
3231   GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3232       G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3233       ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3234       ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3235
3236   if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3237     /* we keep this SR packet for later. When we get a valid RTP packet the
3238      * above values will be set and we can try to use the SR packet */
3239     GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3240     keep = TRUE;
3241   } else {
3242     /* we can't accept anything that happened before we did the last resync */
3243     if (base_rtptime > ext_rtptime) {
3244       GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3245       valid = FALSE;
3246     } else {
3247       /* the SR RTP timestamp must be something close to what we last observed
3248        * in the jitterbuffer */
3249       if (ext_rtptime > last_rtptime) {
3250         /* check how far ahead it is to our RTP timestamps */
3251         diff = ext_rtptime - last_rtptime;
3252         /* if bigger than 1 second, we drop it */
3253         if (diff > clock_rate) {
3254           GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3255           /* should drop this, but some RTSP servers end up with bogus
3256            * way too ahead RTCP packet when repeated PAUSE/PLAY,
3257            * so still trigger rptbin sync but invalidate RTCP data
3258            * (sync might use other methods) */
3259           ext_rtptime = -1;
3260         }
3261         GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3262             G_GUINT64_FORMAT, last_rtptime, diff);
3263       }
3264     }
3265   }
3266
3267   if (keep) {
3268     GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3269   } else if (valid) {
3270     GstStructure *s;
3271
3272     s = gst_structure_new ("application/x-rtp-sync",
3273         "base-rtptime", G_TYPE_UINT64, base_rtptime,
3274         "base-time", G_TYPE_UINT64, base_time,
3275         "clock-rate", G_TYPE_UINT, clock_rate,
3276         "clock-base", G_TYPE_UINT64, clock_base,
3277         "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3278         "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3279
3280     GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3281     gst_buffer_replace (&priv->last_sr, NULL);
3282     JBUF_UNLOCK (priv);
3283     g_signal_emit (jitterbuffer,
3284         gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3285     JBUF_LOCK (priv);
3286     gst_structure_free (s);
3287   } else {
3288     GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3289     gst_buffer_replace (&priv->last_sr, NULL);
3290   }
3291 }
3292
3293 static GstFlowReturn
3294 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3295     GstBuffer * buffer)
3296 {
3297   GstRtpJitterBuffer *jitterbuffer;
3298   GstRtpJitterBufferPrivate *priv;
3299   GstFlowReturn ret = GST_FLOW_OK;
3300   guint32 ssrc;
3301   GstRTCPPacket packet;
3302   guint64 ext_rtptime;
3303   guint32 rtptime;
3304   GstRTCPBuffer rtcp = { NULL, };
3305
3306   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3307
3308   if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3309     goto invalid_buffer;
3310
3311   priv = jitterbuffer->priv;
3312
3313   gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3314
3315   if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3316     goto empty_buffer;
3317
3318   /* first packet must be SR or RR or else the validate would have failed */
3319   switch (gst_rtcp_packet_get_type (&packet)) {
3320     case GST_RTCP_TYPE_SR:
3321       gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3322           NULL, NULL);
3323       break;
3324     default:
3325       goto ignore_buffer;
3326   }
3327   gst_rtcp_buffer_unmap (&rtcp);
3328
3329   GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3330
3331   JBUF_LOCK (priv);
3332   /* convert the RTP timestamp to our extended timestamp, using the same offset
3333    * we used in the jitterbuffer */
3334   ext_rtptime = priv->jbuf->ext_rtptime;
3335   ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3336
3337   priv->ext_rtptime = ext_rtptime;
3338   gst_buffer_replace (&priv->last_sr, buffer);
3339
3340   do_handle_sync (jitterbuffer);
3341   JBUF_UNLOCK (priv);
3342
3343 done:
3344   gst_buffer_unref (buffer);
3345
3346   return ret;
3347
3348 invalid_buffer:
3349   {
3350     /* this is not fatal but should be filtered earlier */
3351     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3352         ("Received invalid RTCP payload, dropping"));
3353     ret = GST_FLOW_OK;
3354     goto done;
3355   }
3356 empty_buffer:
3357   {
3358     /* this is not fatal but should be filtered earlier */
3359     GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3360         ("Received empty RTCP payload, dropping"));
3361     gst_rtcp_buffer_unmap (&rtcp);
3362     ret = GST_FLOW_OK;
3363     goto done;
3364   }
3365 ignore_buffer:
3366   {
3367     GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3368     gst_rtcp_buffer_unmap (&rtcp);
3369     ret = GST_FLOW_OK;
3370     goto done;
3371   }
3372 }
3373
3374 static gboolean
3375 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3376     GstQuery * query)
3377 {
3378   gboolean res = FALSE;
3379   GstRtpJitterBuffer *jitterbuffer;
3380   GstRtpJitterBufferPrivate *priv;
3381
3382   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3383   priv = jitterbuffer->priv;
3384
3385   switch (GST_QUERY_TYPE (query)) {
3386     case GST_QUERY_CAPS:
3387     {
3388       GstCaps *filter, *caps;
3389
3390       gst_query_parse_caps (query, &filter);
3391       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3392       gst_query_set_caps_result (query, caps);
3393       gst_caps_unref (caps);
3394       res = TRUE;
3395       break;
3396     }
3397     default:
3398       if (GST_QUERY_IS_SERIALIZED (query)) {
3399         RTPJitterBufferItem *item;
3400         gboolean head;
3401
3402         JBUF_LOCK_CHECK (priv, out_flushing);
3403         if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3404             RTP_JITTER_BUFFER_MODE_BUFFER) {
3405           GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3406           item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3407           rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3408           if (head)
3409             JBUF_SIGNAL_EVENT (priv);
3410           JBUF_WAIT_QUERY (priv, out_flushing);
3411           res = priv->last_query;
3412         } else {
3413           GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3414           res = FALSE;
3415         }
3416         JBUF_UNLOCK (priv);
3417       } else {
3418         res = gst_pad_query_default (pad, parent, query);
3419       }
3420       break;
3421   }
3422   return res;
3423   /* ERRORS */
3424 out_flushing:
3425   {
3426     GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3427     JBUF_UNLOCK (priv);
3428     return FALSE;
3429   }
3430
3431 }
3432
3433 static gboolean
3434 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3435     GstQuery * query)
3436 {
3437   GstRtpJitterBuffer *jitterbuffer;
3438   GstRtpJitterBufferPrivate *priv;
3439   gboolean res = FALSE;
3440
3441   jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3442   priv = jitterbuffer->priv;
3443
3444   switch (GST_QUERY_TYPE (query)) {
3445     case GST_QUERY_LATENCY:
3446     {
3447       /* We need to send the query upstream and add the returned latency to our
3448        * own */
3449       GstClockTime min_latency, max_latency;
3450       gboolean us_live;
3451       GstClockTime our_latency;
3452
3453       if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3454         gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3455
3456         GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3457             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3458             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3459
3460         /* store this so that we can safely sync on the peer buffers. */
3461         JBUF_LOCK (priv);
3462         priv->peer_latency = min_latency;
3463         our_latency = priv->latency_ns;
3464         JBUF_UNLOCK (priv);
3465
3466         GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3467             GST_TIME_ARGS (our_latency));
3468
3469         /* we add some latency but can buffer an infinite amount of time */
3470         min_latency += our_latency;
3471         max_latency = -1;
3472
3473         GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3474             GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3475             GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3476
3477         gst_query_set_latency (query, TRUE, min_latency, max_latency);
3478       }
3479       break;
3480     }
3481     case GST_QUERY_POSITION:
3482     {
3483       GstClockTime start, last_out;
3484       GstFormat fmt;
3485
3486       gst_query_parse_position (query, &fmt, NULL);
3487       if (fmt != GST_FORMAT_TIME) {
3488         res = gst_pad_query_default (pad, parent, query);
3489         break;
3490       }
3491
3492       JBUF_LOCK (priv);
3493       start = priv->npt_start;
3494       last_out = priv->last_out_time;
3495       JBUF_UNLOCK (priv);
3496
3497       GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3498           ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3499           GST_TIME_ARGS (last_out));
3500
3501       if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3502         /* bring 0-based outgoing time to stream time */
3503         gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3504         res = TRUE;
3505       } else {
3506         res = gst_pad_query_default (pad, parent, query);
3507       }
3508       break;
3509     }
3510     case GST_QUERY_CAPS:
3511     {
3512       GstCaps *filter, *caps;
3513
3514       gst_query_parse_caps (query, &filter);
3515       caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3516       gst_query_set_caps_result (query, caps);
3517       gst_caps_unref (caps);
3518       res = TRUE;
3519       break;
3520     }
3521     default:
3522       res = gst_pad_query_default (pad, parent, query);
3523       break;
3524   }
3525
3526   return res;
3527 }
3528
3529 static void
3530 gst_rtp_jitter_buffer_set_property (GObject * object,
3531     guint prop_id, const GValue * value, GParamSpec * pspec)
3532 {
3533   GstRtpJitterBuffer *jitterbuffer;
3534   GstRtpJitterBufferPrivate *priv;
3535
3536   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3537   priv = jitterbuffer->priv;
3538
3539   switch (prop_id) {
3540     case PROP_LATENCY:
3541     {
3542       guint new_latency, old_latency;
3543
3544       new_latency = g_value_get_uint (value);
3545
3546       JBUF_LOCK (priv);
3547       old_latency = priv->latency_ms;
3548       priv->latency_ms = new_latency;
3549       priv->latency_ns = priv->latency_ms * GST_MSECOND;
3550       rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3551       JBUF_UNLOCK (priv);
3552
3553       /* post message if latency changed, this will inform the parent pipeline
3554        * that a latency reconfiguration is possible/needed. */
3555       if (new_latency != old_latency) {
3556         GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3557             GST_TIME_ARGS (new_latency * GST_MSECOND));
3558
3559         gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3560             gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3561       }
3562       break;
3563     }
3564     case PROP_DROP_ON_LATENCY:
3565       JBUF_LOCK (priv);
3566       priv->drop_on_latency = g_value_get_boolean (value);
3567       JBUF_UNLOCK (priv);
3568       break;
3569     case PROP_TS_OFFSET:
3570       JBUF_LOCK (priv);
3571       priv->ts_offset = g_value_get_int64 (value);
3572       priv->ts_discont = TRUE;
3573       JBUF_UNLOCK (priv);
3574       break;
3575     case PROP_DO_LOST:
3576       JBUF_LOCK (priv);
3577       priv->do_lost = g_value_get_boolean (value);
3578       JBUF_UNLOCK (priv);
3579       break;
3580     case PROP_MODE:
3581       JBUF_LOCK (priv);
3582       rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3583       JBUF_UNLOCK (priv);
3584       break;
3585     case PROP_DO_RETRANSMISSION:
3586       JBUF_LOCK (priv);
3587       priv->do_retransmission = g_value_get_boolean (value);
3588       JBUF_UNLOCK (priv);
3589       break;
3590     case PROP_RTX_DELAY:
3591       JBUF_LOCK (priv);
3592       priv->rtx_delay = g_value_get_int (value);
3593       JBUF_UNLOCK (priv);
3594       break;
3595     case PROP_RTX_MIN_DELAY:
3596       JBUF_LOCK (priv);
3597       priv->rtx_min_delay = g_value_get_uint (value);
3598       JBUF_UNLOCK (priv);
3599       break;
3600     case PROP_RTX_DELAY_REORDER:
3601       JBUF_LOCK (priv);
3602       priv->rtx_delay_reorder = g_value_get_int (value);
3603       JBUF_UNLOCK (priv);
3604       break;
3605     case PROP_RTX_RETRY_TIMEOUT:
3606       JBUF_LOCK (priv);
3607       priv->rtx_retry_timeout = g_value_get_int (value);
3608       JBUF_UNLOCK (priv);
3609       break;
3610     case PROP_RTX_MIN_RETRY_TIMEOUT:
3611       JBUF_LOCK (priv);
3612       priv->rtx_min_retry_timeout = g_value_get_int (value);
3613       JBUF_UNLOCK (priv);
3614       break;
3615     case PROP_RTX_RETRY_PERIOD:
3616       JBUF_LOCK (priv);
3617       priv->rtx_retry_period = g_value_get_int (value);
3618       JBUF_UNLOCK (priv);
3619       break;
3620     default:
3621       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3622       break;
3623   }
3624 }
3625
3626 static void
3627 gst_rtp_jitter_buffer_get_property (GObject * object,
3628     guint prop_id, GValue * value, GParamSpec * pspec)
3629 {
3630   GstRtpJitterBuffer *jitterbuffer;
3631   GstRtpJitterBufferPrivate *priv;
3632
3633   jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3634   priv = jitterbuffer->priv;
3635
3636   switch (prop_id) {
3637     case PROP_LATENCY:
3638       JBUF_LOCK (priv);
3639       g_value_set_uint (value, priv->latency_ms);
3640       JBUF_UNLOCK (priv);
3641       break;
3642     case PROP_DROP_ON_LATENCY:
3643       JBUF_LOCK (priv);
3644       g_value_set_boolean (value, priv->drop_on_latency);
3645       JBUF_UNLOCK (priv);
3646       break;
3647     case PROP_TS_OFFSET:
3648       JBUF_LOCK (priv);
3649       g_value_set_int64 (value, priv->ts_offset);
3650       JBUF_UNLOCK (priv);
3651       break;
3652     case PROP_DO_LOST:
3653       JBUF_LOCK (priv);
3654       g_value_set_boolean (value, priv->do_lost);
3655       JBUF_UNLOCK (priv);
3656       break;
3657     case PROP_MODE:
3658       JBUF_LOCK (priv);
3659       g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3660       JBUF_UNLOCK (priv);
3661       break;
3662     case PROP_PERCENT:
3663     {
3664       gint percent;
3665
3666       JBUF_LOCK (priv);
3667       if (priv->srcresult != GST_FLOW_OK)
3668         percent = 100;
3669       else
3670         percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3671
3672       g_value_set_int (value, percent);
3673       JBUF_UNLOCK (priv);
3674       break;
3675     }
3676     case PROP_DO_RETRANSMISSION:
3677       JBUF_LOCK (priv);
3678       g_value_set_boolean (value, priv->do_retransmission);
3679       JBUF_UNLOCK (priv);
3680       break;
3681     case PROP_RTX_DELAY:
3682       JBUF_LOCK (priv);
3683       g_value_set_int (value, priv->rtx_delay);
3684       JBUF_UNLOCK (priv);
3685       break;
3686     case PROP_RTX_MIN_DELAY:
3687       JBUF_LOCK (priv);
3688       g_value_set_uint (value, priv->rtx_min_delay);
3689       JBUF_UNLOCK (priv);
3690       break;
3691     case PROP_RTX_DELAY_REORDER:
3692       JBUF_LOCK (priv);
3693       g_value_set_int (value, priv->rtx_delay_reorder);
3694       JBUF_UNLOCK (priv);
3695       break;
3696     case PROP_RTX_RETRY_TIMEOUT:
3697       JBUF_LOCK (priv);
3698       g_value_set_int (value, priv->rtx_retry_timeout);
3699       JBUF_UNLOCK (priv);
3700       break;
3701     case PROP_RTX_MIN_RETRY_TIMEOUT:
3702       JBUF_LOCK (priv);
3703       g_value_set_int (value, priv->rtx_min_retry_timeout);
3704       JBUF_UNLOCK (priv);
3705       break;
3706     case PROP_RTX_RETRY_PERIOD:
3707       JBUF_LOCK (priv);
3708       g_value_set_int (value, priv->rtx_retry_period);
3709       JBUF_UNLOCK (priv);
3710       break;
3711     case PROP_STATS:
3712       g_value_take_boxed (value,
3713           gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3714       break;
3715     default:
3716       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3717       break;
3718   }
3719 }
3720
3721 static GstStructure *
3722 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3723 {
3724   GstStructure *s;
3725
3726   JBUF_LOCK (jbuf->priv);
3727   s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3728       "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3729       "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3730       "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3731       "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3732   JBUF_UNLOCK (jbuf->priv);
3733
3734   return s;
3735 }