2 * Farsight Voice+Video library
4 * Copyright 2007 Collabora Ltd,
5 * Copyright 2007 Nokia Corporation
6 * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7 * Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
27 * SECTION:element-rtpjitterbuffer
29 * This element reorders and removes duplicate RTP packets as they are received
30 * from a network source.
32 * The element needs the clock-rate of the RTP payload in order to estimate the
33 * delay. This information is obtained either from the caps on the sink pad or,
34 * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35 * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
37 * The rtpjitterbuffer will wait for missing packets up to a configurable time
38 * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39 * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40 * property is set, lost packets will result in a custom serialized downstream
41 * event of name GstRTPPacketLost. The lost packet events are usually used by a
42 * depayloader or other element to create concealment data or some other logic
43 * to gracefully handle the missing packets.
45 * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46 * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
49 * The jitterbuffer can also be configured to send early retransmission events
50 * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51 * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52 * sends a custom upstream event named GstRTPRetransmissionRequest when the
53 * packet is considered late. The initial expected packet arrival time is
54 * calculated as follows:
56 * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57 * T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58 * calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59 * packets with different rtptime.
61 * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62 * seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63 * previously scheduled timeout is overwritten.
65 * - If seqnum N arrived, all seqnum older than
66 * N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67 * immediately. This is to request fast feedback for abonormally reorder
68 * packets before any of the previous timeouts is triggered.
70 * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71 * event. After the initial timeout expires and the retransmission event is
72 * sent, the timeout is scheduled for
73 * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74 * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75 * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76 * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77 * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78 * retransmission requests are sent and the regular logic is performed to
79 * schedule a lost packet as discussed above.
81 * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
84 * This element will automatically be used inside rtpbin.
87 * <title>Example pipelines</title>
89 * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90 * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91 * inserted into the pipeline to smooth out network jitter and to reorder the
92 * out-of-order RTP packets.
102 #include <gst/rtp/gstrtpbuffer.h>
104 #include "gstrtpjitterbuffer.h"
105 #include "rtpjitterbuffer.h"
106 #include "rtpstats.h"
108 #include <gst/glib-compat-private.h>
110 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
111 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
113 /* RTPJitterBuffer signals and args */
116 SIGNAL_REQUEST_PT_MAP,
124 #define DEFAULT_LATENCY_MS 200
125 #define DEFAULT_DROP_ON_LATENCY FALSE
126 #define DEFAULT_TS_OFFSET 0
127 #define DEFAULT_DO_LOST FALSE
128 #define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
129 #define DEFAULT_PERCENT 0
130 #define DEFAULT_DO_RETRANSMISSION FALSE
131 #define DEFAULT_RTX_DELAY -1
132 #define DEFAULT_RTX_MIN_DELAY 0
133 #define DEFAULT_RTX_DELAY_REORDER 3
134 #define DEFAULT_RTX_RETRY_TIMEOUT -1
135 #define DEFAULT_RTX_MIN_RETRY_TIMEOUT -1
136 #define DEFAULT_RTX_RETRY_PERIOD -1
137 #define DEFAULT_RTX_MAX_RETRIES -1
139 #define DEFAULT_AUTO_RTX_DELAY (20 * GST_MSECOND)
140 #define DEFAULT_AUTO_RTX_TIMEOUT (40 * GST_MSECOND)
146 PROP_DROP_ON_LATENCY,
151 PROP_DO_RETRANSMISSION,
154 PROP_RTX_DELAY_REORDER,
155 PROP_RTX_RETRY_TIMEOUT,
156 PROP_RTX_MIN_RETRY_TIMEOUT,
157 PROP_RTX_RETRY_PERIOD,
158 PROP_RTX_MAX_RETRIES,
163 #define JBUF_LOCK(priv) (g_mutex_lock (&(priv)->jbuf_lock))
165 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
167 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
170 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
172 #define JBUF_WAIT_TIMER(priv) G_STMT_START { \
173 GST_DEBUG ("waiting timer"); \
174 (priv)->waiting_timer = TRUE; \
175 g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
176 (priv)->waiting_timer = FALSE; \
177 GST_DEBUG ("waiting timer done"); \
179 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
180 if (G_UNLIKELY ((priv)->waiting_timer)) { \
181 GST_DEBUG ("signal timer"); \
182 g_cond_signal (&(priv)->jbuf_timer); \
186 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
187 GST_DEBUG ("waiting event"); \
188 (priv)->waiting_event = TRUE; \
189 g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
190 (priv)->waiting_event = FALSE; \
191 GST_DEBUG ("waiting event done"); \
192 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
195 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
196 if (G_UNLIKELY ((priv)->waiting_event)) { \
197 GST_DEBUG ("signal event"); \
198 g_cond_signal (&(priv)->jbuf_event); \
202 #define JBUF_WAIT_QUERY(priv,label) G_STMT_START { \
203 GST_DEBUG ("waiting query"); \
204 (priv)->waiting_query = TRUE; \
205 g_cond_wait (&(priv)->jbuf_query, &(priv)->jbuf_lock); \
206 (priv)->waiting_query = FALSE; \
207 GST_DEBUG ("waiting query done"); \
208 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
211 #define JBUF_SIGNAL_QUERY(priv,res) G_STMT_START { \
212 (priv)->last_query = res; \
213 if (G_UNLIKELY ((priv)->waiting_query)) { \
214 GST_DEBUG ("signal query"); \
215 g_cond_signal (&(priv)->jbuf_query); \
220 struct _GstRtpJitterBufferPrivate
222 GstPad *sinkpad, *srcpad;
225 RTPJitterBuffer *jbuf;
227 gboolean waiting_timer;
229 gboolean waiting_event;
231 gboolean waiting_query;
239 gboolean timer_running;
240 GThread *timer_thread;
245 gboolean drop_on_latency;
248 gboolean do_retransmission;
251 gint rtx_delay_reorder;
252 gint rtx_retry_timeout;
253 gint rtx_min_retry_timeout;
254 gint rtx_retry_period;
255 gint rtx_max_retries;
257 /* the last seqnum we pushed out */
258 guint32 last_popped_seqnum;
259 /* the next expected seqnum we push */
261 /* seqnum-base, if known */
263 /* last output time */
264 GstClockTime last_out_time;
265 /* last valid input timestamp and rtptime pair */
266 GstClockTime ips_dts;
268 GstClockTime packet_spacing;
270 /* the next expected seqnum we receive */
271 GstClockTime last_in_dts;
272 guint32 last_in_seqnum;
273 guint32 next_in_seqnum;
277 /* start and stop ranges */
278 GstClockTime npt_start;
279 GstClockTime npt_stop;
280 guint64 ext_timestamp;
281 guint64 last_elapsed;
282 guint64 estimated_eos;
289 /* clock rate and rtp timestamp offset */
293 gint64 prev_ts_offset;
295 /* when we are shutting down */
296 GstFlowReturn srcresult;
302 GstClockTime timer_timeout;
303 guint16 timer_seqnum;
304 /* the latency of the upstream peer, we have to take this into account when
305 * synchronizing the buffers. */
306 GstClockTime peer_latency;
310 /* some accounting */
312 guint64 num_duplicates;
313 guint64 num_rtx_requests;
314 guint64 num_rtx_success;
315 guint64 num_rtx_failed;
320 GstClockTime last_dts;
321 guint64 last_rtptime;
322 GstClockTime avg_jitter;
339 GstClockTime timeout;
340 GstClockTime duration;
341 GstClockTime rtx_base;
342 GstClockTime rtx_delay;
343 GstClockTime rtx_retry;
344 GstClockTime rtx_last;
348 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
349 (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
350 GstRtpJitterBufferPrivate))
352 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
353 GST_STATIC_PAD_TEMPLATE ("sink",
356 GST_STATIC_CAPS ("application/x-rtp"
357 /* "clock-rate = (int) [ 1, 2147483647 ], "
358 * "payload = (int) , "
359 * "encoding-name = (string) "
363 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
364 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
367 GST_STATIC_CAPS ("application/x-rtcp")
370 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
371 GST_STATIC_PAD_TEMPLATE ("src",
374 GST_STATIC_CAPS ("application/x-rtp"
375 /* "payload = (int) , "
376 * "clock-rate = (int) , "
377 * "encoding-name = (string) "
381 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
383 #define gst_rtp_jitter_buffer_parent_class parent_class
384 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
386 /* object overrides */
387 static void gst_rtp_jitter_buffer_set_property (GObject * object,
388 guint prop_id, const GValue * value, GParamSpec * pspec);
389 static void gst_rtp_jitter_buffer_get_property (GObject * object,
390 guint prop_id, GValue * value, GParamSpec * pspec);
391 static void gst_rtp_jitter_buffer_finalize (GObject * object);
393 /* element overrides */
394 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
395 * element, GstStateChange transition);
396 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
397 GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
398 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
400 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
403 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
404 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
407 /* sinkpad overrides */
408 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
409 GstObject * parent, GstEvent * event);
410 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
411 GstObject * parent, GstBuffer * buffer);
413 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
414 GstObject * parent, GstEvent * event);
415 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
416 GstObject * parent, GstBuffer * buffer);
418 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
419 GstObject * parent, GstQuery * query);
421 /* srcpad overrides */
422 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
423 GstObject * parent, GstEvent * event);
424 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
425 GstObject * parent, GstPadMode mode, gboolean active);
426 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
427 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
428 GstObject * parent, GstQuery * query);
431 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
433 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
434 gboolean active, guint64 base_time);
435 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
437 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
438 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
440 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
442 static GstStructure *gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer *
446 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
448 GObjectClass *gobject_class;
449 GstElementClass *gstelement_class;
451 gobject_class = (GObjectClass *) klass;
452 gstelement_class = (GstElementClass *) klass;
454 g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
456 gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
458 gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
459 gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
462 * GstRtpJitterBuffer:latency:
464 * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
465 * for at most this time.
467 g_object_class_install_property (gobject_class, PROP_LATENCY,
468 g_param_spec_uint ("latency", "Buffer latency in ms",
469 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
470 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
472 * GstRtpJitterBuffer:drop-on-latency:
474 * Drop oldest buffers when the queue is completely filled.
476 g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
477 g_param_spec_boolean ("drop-on-latency",
478 "Drop buffers when maximum latency is reached",
479 "Tells the jitterbuffer to never exceed the given latency in size",
480 DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
482 * GstRtpJitterBuffer:ts-offset:
484 * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
485 * This is mainly used to ensure interstream synchronisation.
487 g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
488 g_param_spec_int64 ("ts-offset", "Timestamp Offset",
489 "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
490 G_MAXINT64, DEFAULT_TS_OFFSET,
491 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
494 * GstRtpJitterBuffer:do-lost:
496 * Send out a GstRTPPacketLost event downstream when a packet is considered
499 g_object_class_install_property (gobject_class, PROP_DO_LOST,
500 g_param_spec_boolean ("do-lost", "Do Lost",
501 "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
502 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
505 * GstRtpJitterBuffer:mode:
507 * Control the buffering and timestamping mode used by the jitterbuffer.
509 g_object_class_install_property (gobject_class, PROP_MODE,
510 g_param_spec_enum ("mode", "Mode",
511 "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
512 DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
514 * GstRtpJitterBuffer:percent:
516 * The percent of the jitterbuffer that is filled.
518 g_object_class_install_property (gobject_class, PROP_PERCENT,
519 g_param_spec_int ("percent", "percent",
520 "The buffer filled percent", 0, 100,
521 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
523 * GstRtpJitterBuffer:do-retransmission:
525 * Send out a GstRTPRetransmission event upstream when a packet is considered
526 * late and should be retransmitted.
530 g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
531 g_param_spec_boolean ("do-retransmission", "Do Retransmission",
532 "Send retransmission events upstream when a packet is late",
533 DEFAULT_DO_RETRANSMISSION,
534 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
537 * GstRtpJitterBuffer:rtx-delay:
539 * When a packet did not arrive at the expected time, wait this extra amount
540 * of time before sending a retransmission event.
542 * When -1 is used, the max jitter will be used as extra delay.
546 g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
547 g_param_spec_int ("rtx-delay", "RTX Delay",
548 "Extra time in ms to wait before sending retransmission "
549 "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
550 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
553 * GstRtpJitterBuffer:rtx-min-delay:
555 * When a packet did not arrive at the expected time, wait at least this extra amount
556 * of time before sending a retransmission event.
560 g_object_class_install_property (gobject_class, PROP_RTX_MIN_DELAY,
561 g_param_spec_uint ("rtx-min-delay", "Minimum RTX Delay",
562 "Minimum time in ms to wait before sending retransmission "
563 "event", 0, G_MAXUINT, DEFAULT_RTX_MIN_DELAY,
564 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
566 * GstRtpJitterBuffer:rtx-delay-reorder:
568 * Assume that a retransmission event should be sent when we see
569 * this much packet reordering.
571 * When -1 is used, the value will be estimated based on observed packet
576 g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
577 g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
578 "Sending retransmission event when this much reordering (-1 automatic)",
579 -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
580 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
582 * GstRtpJitterBuffer::rtx-retry-timeout:
584 * When no packet has been received after sending a retransmission event
585 * for this time, retry sending a retransmission event.
587 * When -1 is used, the value will be estimated based on observed round
592 g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
593 g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
594 "Retry sending a transmission event after this timeout in "
595 "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
596 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
598 * GstRtpJitterBuffer::rtx-min-retry-timeout:
600 * The minimum amount of time between retry timeouts. When
601 * GstRtpJitterBuffer::rtx-retry-timeout is -1, this value ensures a
602 * minimum interval between retry timeouts.
604 * When -1 is used, the value will be estimated based on the
609 g_object_class_install_property (gobject_class, PROP_RTX_MIN_RETRY_TIMEOUT,
610 g_param_spec_int ("rtx-min-retry-timeout", "RTX Min Retry Timeout",
611 "Minimum timeout between sending a transmission event in "
612 "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_MIN_RETRY_TIMEOUT,
613 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
615 * GstRtpJitterBuffer:rtx-retry-period:
617 * The amount of time to try to get a retransmission.
619 * When -1 is used, the value will be estimated based on the jitterbuffer
620 * latency and the observed round trip time.
624 g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
625 g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
626 "Try to get a retransmission for this many ms "
627 "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
628 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
630 * GstRtpJitterBuffer:rtx-max-retries:
632 * The maximum number of retries to request a retransmission.
634 * This implies that as maximum (rtx-max-retries + 1) retransmissions will be requested.
635 * When -1 is used, the number of retransmission request will not be limited.
639 g_object_class_install_property (gobject_class, PROP_RTX_MAX_RETRIES,
640 g_param_spec_int ("rtx-max-retries", "RTX Max Retries",
641 "The maximum number of retries to request a retransmission. "
642 "(-1 not limited)", -1, G_MAXINT, DEFAULT_RTX_MAX_RETRIES,
643 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
645 * GstRtpJitterBuffer:stats:
647 * Various jitterbuffer statistics. This property returns a GstStructure
648 * with name application/x-rtp-jitterbuffer-stats with the following fields:
650 * "rtx-count" G_TYPE_UINT64 The number of retransmissions requested
651 * "rtx-success-count" G_TYPE_UINT64 The number of successful retransmissions
652 * "rtx-per-packet" G_TYPE_DOUBLE Average number of RTX per packet
653 * "rtx-rtt" G_TYPE_UINT64 Average round trip time per RTX
657 g_object_class_install_property (gobject_class, PROP_STATS,
658 g_param_spec_boxed ("stats", "Statistics",
659 "Various statistics", GST_TYPE_STRUCTURE,
660 G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
663 * GstRtpJitterBuffer::request-pt-map:
664 * @buffer: the object which received the signal
667 * Request the payload type as #GstCaps for @pt.
669 gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
670 g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
671 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
672 request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
673 GST_TYPE_CAPS, 1, G_TYPE_UINT);
675 * GstRtpJitterBuffer::handle-sync:
676 * @buffer: the object which received the signal
677 * @struct: a GstStructure containing sync values.
679 * Be notified of new sync values.
681 gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
682 g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
683 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
684 handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
685 G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
688 * GstRtpJitterBuffer::on-npt-stop:
689 * @buffer: the object which received the signal
691 * Signal that the jitterbufer has pushed the RTP packet that corresponds to
692 * the npt-stop position.
694 gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
695 g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
696 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
697 on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
698 G_TYPE_NONE, 0, G_TYPE_NONE);
701 * GstRtpJitterBuffer::clear-pt-map:
702 * @buffer: the object which received the signal
704 * Invalidate the clock-rate as obtained with the
705 * #GstRtpJitterBuffer::request-pt-map signal.
707 gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
708 g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
709 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
710 G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
711 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
714 * GstRtpJitterBuffer::set-active:
715 * @buffer: the object which received the signal
717 * Start pushing out packets with the given base time. This signal is only
718 * useful in buffering mode.
720 * Returns: the time of the last pushed packet.
722 gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
723 g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
724 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
725 G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
726 g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
729 gstelement_class->change_state =
730 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
731 gstelement_class->request_new_pad =
732 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
733 gstelement_class->release_pad =
734 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
735 gstelement_class->provide_clock =
736 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
738 gst_element_class_add_pad_template (gstelement_class,
739 gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
740 gst_element_class_add_pad_template (gstelement_class,
741 gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
742 gst_element_class_add_pad_template (gstelement_class,
743 gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
745 gst_element_class_set_static_metadata (gstelement_class,
746 "RTP packet jitter-buffer", "Filter/Network/RTP",
747 "A buffer that deals with network jitter and other transmission faults",
748 "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
749 "Wim Taymans <wim.taymans@gmail.com>");
751 klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
752 klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
754 GST_DEBUG_CATEGORY_INIT
755 (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
759 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
761 GstRtpJitterBufferPrivate *priv;
763 priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
764 jitterbuffer->priv = priv;
766 priv->latency_ms = DEFAULT_LATENCY_MS;
767 priv->latency_ns = priv->latency_ms * GST_MSECOND;
768 priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
769 priv->do_lost = DEFAULT_DO_LOST;
770 priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
771 priv->rtx_delay = DEFAULT_RTX_DELAY;
772 priv->rtx_min_delay = DEFAULT_RTX_MIN_DELAY;
773 priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
774 priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
775 priv->rtx_min_retry_timeout = DEFAULT_RTX_MIN_RETRY_TIMEOUT;
776 priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
777 priv->rtx_max_retries = DEFAULT_RTX_MAX_RETRIES;
780 priv->last_rtptime = -1;
781 priv->avg_jitter = 0;
782 priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
783 priv->jbuf = rtp_jitter_buffer_new ();
784 g_mutex_init (&priv->jbuf_lock);
785 g_cond_init (&priv->jbuf_timer);
786 g_cond_init (&priv->jbuf_event);
787 g_cond_init (&priv->jbuf_query);
789 /* reset skew detection initialy */
790 rtp_jitter_buffer_reset_skew (priv->jbuf);
791 rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
792 rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
796 gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
799 gst_pad_set_activatemode_function (priv->srcpad,
800 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
801 gst_pad_set_query_function (priv->srcpad,
802 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
803 gst_pad_set_event_function (priv->srcpad,
804 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
807 gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
810 gst_pad_set_chain_function (priv->sinkpad,
811 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
812 gst_pad_set_event_function (priv->sinkpad,
813 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
814 gst_pad_set_query_function (priv->sinkpad,
815 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
817 gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
818 gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
820 GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
823 #define IS_DROPABLE(it) (((it)->type == ITEM_TYPE_BUFFER) || ((it)->type == ITEM_TYPE_LOST))
825 #define ITEM_TYPE_BUFFER 0
826 #define ITEM_TYPE_LOST 1
827 #define ITEM_TYPE_EVENT 2
828 #define ITEM_TYPE_QUERY 3
830 static RTPJitterBufferItem *
831 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
832 guint seqnum, guint count, guint rtptime)
834 RTPJitterBufferItem *item;
836 item = g_slice_new (RTPJitterBufferItem);
843 item->seqnum = seqnum;
845 item->rtptime = rtptime;
851 free_item (RTPJitterBufferItem * item)
853 if (item->data && item->type != ITEM_TYPE_QUERY)
854 gst_mini_object_unref (item->data);
855 g_slice_free (RTPJitterBufferItem, item);
859 free_item_and_retain_events (RTPJitterBufferItem * item, gpointer user_data)
861 GList **l = user_data;
863 if (item->data && item->type == ITEM_TYPE_EVENT
864 && GST_EVENT_IS_STICKY (item->data)) {
865 *l = g_list_prepend (*l, item->data);
866 } else if (item->data && item->type != ITEM_TYPE_QUERY) {
867 gst_mini_object_unref (item->data);
869 g_slice_free (RTPJitterBufferItem, item);
873 gst_rtp_jitter_buffer_finalize (GObject * object)
875 GstRtpJitterBuffer *jitterbuffer;
876 GstRtpJitterBufferPrivate *priv;
878 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
879 priv = jitterbuffer->priv;
881 g_array_free (priv->timers, TRUE);
882 g_mutex_clear (&priv->jbuf_lock);
883 g_cond_clear (&priv->jbuf_timer);
884 g_cond_clear (&priv->jbuf_event);
885 g_cond_clear (&priv->jbuf_query);
887 rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
888 g_object_unref (priv->jbuf);
890 G_OBJECT_CLASS (parent_class)->finalize (object);
894 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
896 GstRtpJitterBuffer *jitterbuffer;
897 GstPad *otherpad = NULL;
898 GstIterator *it = NULL;
901 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
903 if (pad == jitterbuffer->priv->sinkpad) {
904 otherpad = jitterbuffer->priv->srcpad;
905 } else if (pad == jitterbuffer->priv->srcpad) {
906 otherpad = jitterbuffer->priv->sinkpad;
907 } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
908 it = gst_iterator_new_single (GST_TYPE_PAD, NULL);
912 g_value_init (&val, GST_TYPE_PAD);
913 g_value_set_object (&val, otherpad);
914 it = gst_iterator_new_single (GST_TYPE_PAD, &val);
915 g_value_unset (&val);
922 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
924 GstRtpJitterBufferPrivate *priv;
926 priv = jitterbuffer->priv;
928 GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
931 gst_pad_new_from_static_template
932 (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
933 gst_pad_set_chain_function (priv->rtcpsinkpad,
934 gst_rtp_jitter_buffer_chain_rtcp);
935 gst_pad_set_event_function (priv->rtcpsinkpad,
936 (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
937 gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
938 gst_rtp_jitter_buffer_iterate_internal_links);
939 gst_pad_set_active (priv->rtcpsinkpad, TRUE);
940 gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
942 return priv->rtcpsinkpad;
946 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
948 GstRtpJitterBufferPrivate *priv;
950 priv = jitterbuffer->priv;
952 GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
954 gst_pad_set_active (priv->rtcpsinkpad, FALSE);
956 gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
957 priv->rtcpsinkpad = NULL;
961 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
962 GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
964 GstRtpJitterBuffer *jitterbuffer;
965 GstElementClass *klass;
967 GstRtpJitterBufferPrivate *priv;
969 g_return_val_if_fail (templ != NULL, NULL);
970 g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
972 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
973 priv = jitterbuffer->priv;
974 klass = GST_ELEMENT_GET_CLASS (element);
976 GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
978 /* figure out the template */
979 if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
980 if (priv->rtcpsinkpad != NULL)
983 result = create_rtcp_sink (jitterbuffer);
992 g_warning ("rtpjitterbuffer: this is not our template");
997 g_warning ("rtpjitterbuffer: pad already requested");
1003 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
1005 GstRtpJitterBuffer *jitterbuffer;
1006 GstRtpJitterBufferPrivate *priv;
1008 g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
1009 g_return_if_fail (GST_IS_PAD (pad));
1011 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (element);
1012 priv = jitterbuffer->priv;
1014 GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
1016 if (priv->rtcpsinkpad == pad) {
1017 remove_rtcp_sink (jitterbuffer);
1026 g_warning ("gstjitterbuffer: asked to release an unknown pad");
1032 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
1034 return gst_system_clock_obtain ();
1038 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
1040 GstRtpJitterBufferPrivate *priv;
1042 priv = jitterbuffer->priv;
1044 /* this will trigger a new pt-map request signal, FIXME, do something better. */
1047 priv->clock_rate = -1;
1048 /* do not clear current content, but refresh state for new arrival */
1049 GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
1050 rtp_jitter_buffer_reset_skew (priv->jbuf);
1055 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
1058 GstRtpJitterBufferPrivate *priv;
1059 GstClockTime last_out;
1060 RTPJitterBufferItem *item;
1065 GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
1066 active, GST_TIME_ARGS (offset));
1068 if (active != priv->active) {
1069 /* add the amount of time spent in paused to the output offset. All
1070 * outgoing buffers will have this offset applied to their timestamps in
1071 * order to make them arrive in time in the sink. */
1072 priv->out_offset = offset;
1073 GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
1074 GST_TIME_ARGS (priv->out_offset));
1075 priv->active = active;
1076 JBUF_SIGNAL_EVENT (priv);
1079 rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
1081 if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
1082 /* head buffer timestamp and offset gives our output time */
1083 last_out = item->dts + priv->ts_offset;
1085 /* use last known time when the buffer is empty */
1086 last_out = priv->last_out_time;
1094 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
1096 GstRtpJitterBuffer *jitterbuffer;
1097 GstRtpJitterBufferPrivate *priv;
1102 jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
1103 priv = jitterbuffer->priv;
1105 other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
1107 caps = gst_pad_peer_query_caps (other, filter);
1109 templ = gst_pad_get_pad_template_caps (pad);
1111 GST_DEBUG_OBJECT (jitterbuffer, "use template");
1116 GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
1118 intersect = gst_caps_intersect (caps, templ);
1119 gst_caps_unref (caps);
1120 gst_caps_unref (templ);
1124 gst_object_unref (jitterbuffer);
1130 * Must be called with JBUF_LOCK held
1134 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1137 GstRtpJitterBufferPrivate *priv;
1138 GstStructure *caps_struct;
1142 priv = jitterbuffer->priv;
1144 /* first parse the caps */
1145 caps_struct = gst_caps_get_structure (caps, 0);
1147 GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1149 /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1150 * measure the amount of data in the buffer */
1151 if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1154 if (priv->clock_rate <= 0)
1157 GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1159 rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1161 /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1162 * can use this to track the amount of time elapsed on the sender. */
1163 if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1164 priv->clock_base = val;
1166 priv->clock_base = -1;
1168 priv->ext_timestamp = priv->clock_base;
1170 GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1173 if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1174 /* first expected seqnum, only update when we didn't have a previous base. */
1175 if (priv->next_in_seqnum == -1)
1176 priv->next_in_seqnum = val;
1177 if (priv->next_seqnum == -1) {
1178 priv->next_seqnum = val;
1179 JBUF_SIGNAL_EVENT (priv);
1181 priv->seqnum_base = val;
1183 priv->seqnum_base = -1;
1186 GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1188 /* the start and stop times. The seqnum-base corresponds to the start time. We
1189 * will keep track of the seqnums on the output and when we reach the one
1190 * corresponding to npt-stop, we emit the npt-stop-reached signal */
1191 if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1192 priv->npt_start = tval;
1194 priv->npt_start = 0;
1196 if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1197 priv->npt_stop = tval;
1199 priv->npt_stop = -1;
1201 GST_DEBUG_OBJECT (jitterbuffer,
1202 "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1203 GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1210 GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1215 GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1221 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1223 GstRtpJitterBufferPrivate *priv;
1225 priv = jitterbuffer->priv;
1228 /* mark ourselves as flushing */
1229 priv->srcresult = GST_FLOW_FLUSHING;
1230 GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1231 /* this unblocks any waiting pops on the src pad task */
1232 JBUF_SIGNAL_EVENT (priv);
1233 JBUF_SIGNAL_QUERY (priv, FALSE);
1238 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1240 GstRtpJitterBufferPrivate *priv;
1242 priv = jitterbuffer->priv;
1245 GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1246 /* Mark as non flushing */
1247 priv->srcresult = GST_FLOW_OK;
1248 gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1249 priv->last_popped_seqnum = -1;
1250 priv->last_out_time = -1;
1251 priv->next_seqnum = -1;
1252 priv->seqnum_base = -1;
1253 priv->ips_rtptime = -1;
1254 priv->ips_dts = GST_CLOCK_TIME_NONE;
1255 priv->packet_spacing = 0;
1256 priv->next_in_seqnum = -1;
1257 priv->clock_rate = -1;
1260 priv->estimated_eos = -1;
1261 priv->last_elapsed = 0;
1262 priv->ext_timestamp = -1;
1263 priv->avg_jitter = 0;
1264 priv->last_dts = -1;
1265 priv->last_rtptime = -1;
1266 GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1267 rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1268 rtp_jitter_buffer_disable_buffering (priv->jbuf, FALSE);
1269 rtp_jitter_buffer_reset_skew (priv->jbuf);
1270 remove_all_timers (jitterbuffer);
1275 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1276 GstPadMode mode, gboolean active)
1279 GstRtpJitterBuffer *jitterbuffer = NULL;
1281 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1284 case GST_PAD_MODE_PUSH:
1286 /* allow data processing */
1287 gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1289 /* start pushing out buffers */
1290 GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1291 result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1292 (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1294 /* make sure all data processing stops ASAP */
1295 gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1297 /* NOTE this will hardlock if the state change is called from the src pad
1298 * task thread because we will _join() the thread. */
1299 GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1300 result = gst_pad_stop_task (pad);
1310 static GstStateChangeReturn
1311 gst_rtp_jitter_buffer_change_state (GstElement * element,
1312 GstStateChange transition)
1314 GstRtpJitterBuffer *jitterbuffer;
1315 GstRtpJitterBufferPrivate *priv;
1316 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1318 jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1319 priv = jitterbuffer->priv;
1321 switch (transition) {
1322 case GST_STATE_CHANGE_NULL_TO_READY:
1324 case GST_STATE_CHANGE_READY_TO_PAUSED:
1326 /* reset negotiated values */
1327 priv->clock_rate = -1;
1328 priv->clock_base = -1;
1329 priv->peer_latency = 0;
1331 /* block until we go to PLAYING */
1332 priv->blocked = TRUE;
1333 priv->timer_running = TRUE;
1334 priv->timer_thread =
1335 g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1338 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1340 /* unblock to allow streaming in PLAYING */
1341 priv->blocked = FALSE;
1342 JBUF_SIGNAL_EVENT (priv);
1343 JBUF_SIGNAL_TIMER (priv);
1350 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1352 switch (transition) {
1353 case GST_STATE_CHANGE_READY_TO_PAUSED:
1354 /* we are a live element because we sync to the clock, which we can only
1355 * do in the PLAYING state */
1356 if (ret != GST_STATE_CHANGE_FAILURE)
1357 ret = GST_STATE_CHANGE_NO_PREROLL;
1359 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1361 /* block to stop streaming when PAUSED */
1362 priv->blocked = TRUE;
1363 unschedule_current_timer (jitterbuffer);
1365 if (ret != GST_STATE_CHANGE_FAILURE)
1366 ret = GST_STATE_CHANGE_NO_PREROLL;
1368 case GST_STATE_CHANGE_PAUSED_TO_READY:
1370 gst_buffer_replace (&priv->last_sr, NULL);
1371 priv->timer_running = FALSE;
1372 unschedule_current_timer (jitterbuffer);
1373 JBUF_SIGNAL_TIMER (priv);
1374 JBUF_SIGNAL_QUERY (priv, FALSE);
1376 g_thread_join (priv->timer_thread);
1377 priv->timer_thread = NULL;
1379 case GST_STATE_CHANGE_READY_TO_NULL:
1389 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1392 gboolean ret = TRUE;
1393 GstRtpJitterBuffer *jitterbuffer;
1394 GstRtpJitterBufferPrivate *priv;
1396 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
1397 priv = jitterbuffer->priv;
1399 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1401 switch (GST_EVENT_TYPE (event)) {
1402 case GST_EVENT_LATENCY:
1404 GstClockTime latency;
1406 gst_event_parse_latency (event, &latency);
1408 GST_DEBUG_OBJECT (jitterbuffer,
1409 "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1412 /* adjust the overall buffer delay to the total pipeline latency in
1413 * buffering mode because if downstream consumes too fast (because of
1414 * large latency or queues, we would start rebuffering again. */
1415 if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1416 RTP_JITTER_BUFFER_MODE_BUFFER) {
1417 rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1421 ret = gst_pad_push_event (priv->sinkpad, event);
1425 ret = gst_pad_push_event (priv->sinkpad, event);
1432 /* handles and stores the event in the jitterbuffer, must be called with
1435 queue_event (GstRtpJitterBuffer * jitterbuffer, GstEvent * event)
1437 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1438 RTPJitterBufferItem *item;
1441 switch (GST_EVENT_TYPE (event)) {
1442 case GST_EVENT_CAPS:
1446 gst_event_parse_caps (event, &caps);
1447 gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1450 case GST_EVENT_SEGMENT:
1451 gst_event_copy_segment (event, &priv->segment);
1453 /* we need time for now */
1454 if (priv->segment.format != GST_FORMAT_TIME)
1455 goto newseg_wrong_format;
1457 GST_DEBUG_OBJECT (jitterbuffer,
1458 "segment: %" GST_SEGMENT_FORMAT, &priv->segment);
1462 rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
1469 GST_DEBUG_OBJECT (jitterbuffer, "adding event");
1470 item = alloc_item (event, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
1471 rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
1473 JBUF_SIGNAL_EVENT (priv);
1478 newseg_wrong_format:
1480 GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1481 gst_event_unref (event);
1487 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1490 gboolean ret = TRUE;
1491 GstRtpJitterBuffer *jitterbuffer;
1492 GstRtpJitterBufferPrivate *priv;
1494 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1495 priv = jitterbuffer->priv;
1497 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1499 switch (GST_EVENT_TYPE (event)) {
1500 case GST_EVENT_FLUSH_START:
1501 ret = gst_pad_push_event (priv->srcpad, event);
1502 gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1503 /* wait for the loop to go into PAUSED */
1504 gst_pad_pause_task (priv->srcpad);
1506 case GST_EVENT_FLUSH_STOP:
1507 ret = gst_pad_push_event (priv->srcpad, event);
1509 gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1510 GST_PAD_MODE_PUSH, TRUE);
1513 if (GST_EVENT_IS_SERIALIZED (event)) {
1514 /* serialized events go in the queue */
1516 if (priv->srcresult != GST_FLOW_OK) {
1517 /* Errors in sticky event pushing are no problem and ignored here
1518 * as they will cause more meaningful errors during data flow.
1519 * For EOS events, that are not followed by data flow, we still
1520 * return FALSE here though.
1522 if (!GST_EVENT_IS_STICKY (event) ||
1523 GST_EVENT_TYPE (event) == GST_EVENT_EOS)
1524 goto out_flow_error;
1526 /* refuse more events on EOS */
1529 ret = queue_event (jitterbuffer, event);
1532 /* non-serialized events are forwarded downstream immediately */
1533 ret = gst_pad_push_event (priv->srcpad, event);
1542 GST_DEBUG_OBJECT (jitterbuffer,
1543 "refusing event, we have a downstream flow error: %s",
1544 gst_flow_get_name (priv->srcresult));
1546 gst_event_unref (event);
1551 GST_DEBUG_OBJECT (jitterbuffer, "refusing event, we are EOS");
1553 gst_event_unref (event);
1559 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1562 gboolean ret = TRUE;
1563 GstRtpJitterBuffer *jitterbuffer;
1565 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1567 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1569 switch (GST_EVENT_TYPE (event)) {
1570 case GST_EVENT_FLUSH_START:
1571 gst_event_unref (event);
1573 case GST_EVENT_FLUSH_STOP:
1574 gst_event_unref (event);
1577 ret = gst_pad_event_default (pad, parent, event);
1585 * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1586 * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1587 * GST_FLOW_FLUSHING when the element is shutting down. On success
1588 * GST_FLOW_OK is returned.
1590 static GstFlowReturn
1591 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1595 GValue args[2] = { {0}, {0} };
1599 g_value_init (&args[0], GST_TYPE_ELEMENT);
1600 g_value_set_object (&args[0], jitterbuffer);
1601 g_value_init (&args[1], G_TYPE_UINT);
1602 g_value_set_uint (&args[1], pt);
1604 g_value_init (&ret, GST_TYPE_CAPS);
1605 g_value_set_boxed (&ret, NULL);
1607 JBUF_UNLOCK (jitterbuffer->priv);
1608 g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1610 JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1612 g_value_unset (&args[0]);
1613 g_value_unset (&args[1]);
1614 caps = (GstCaps *) g_value_dup_boxed (&ret);
1615 g_value_unset (&ret);
1619 res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1620 gst_caps_unref (caps);
1622 if (G_UNLIKELY (!res))
1630 GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1631 return GST_FLOW_ERROR;
1635 GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1636 return GST_FLOW_FLUSHING;
1640 GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1641 return GST_FLOW_ERROR;
1645 /* call with jbuf lock held */
1647 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1649 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1650 GstMessage *message = NULL;
1655 /* Post a buffering message */
1656 if (priv->last_percent != percent) {
1657 priv->last_percent = percent;
1659 gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1660 gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1667 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1669 GstRtpJitterBufferPrivate *priv;
1671 priv = jitterbuffer->priv;
1673 if (timestamp == -1)
1676 /* apply the timestamp offset, this is used for inter stream sync */
1677 timestamp += priv->ts_offset;
1678 /* add the offset, this is used when buffering */
1679 timestamp += priv->out_offset;
1685 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1687 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1688 TimerData *timer = NULL;
1691 len = priv->timers->len;
1692 for (i = 0; i < len; i++) {
1693 TimerData *test = &g_array_index (priv->timers, TimerData, i);
1694 if (test->seqnum == seqnum && test->type == type) {
1703 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1705 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1707 if (priv->clock_id) {
1708 GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1709 gst_clock_id_unschedule (priv->clock_id);
1710 priv->clock_id = NULL;
1715 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1717 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1718 GstClockTime test_timeout;
1720 if ((test_timeout = timer->timeout) == -1)
1723 if (timer->type != TIMER_TYPE_EXPECTED) {
1724 /* add our latency and offset to get output times. */
1725 test_timeout = apply_offset (jitterbuffer, test_timeout);
1726 test_timeout += priv->latency_ns;
1728 return test_timeout;
1732 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1734 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1736 if (priv->clock_id) {
1737 GstClockTime timeout = get_timeout (jitterbuffer, timer);
1739 GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1740 GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1742 if (timeout == -1 || timeout < priv->timer_timeout)
1743 unschedule_current_timer (jitterbuffer);
1748 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1749 guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1750 GstClockTime duration)
1752 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1756 GST_DEBUG_OBJECT (jitterbuffer,
1757 "add timer %d for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1758 GST_TIME_FORMAT, type, seqnum, GST_TIME_ARGS (timeout),
1759 GST_TIME_ARGS (delay));
1761 len = priv->timers->len;
1762 g_array_set_size (priv->timers, len + 1);
1763 timer = &g_array_index (priv->timers, TimerData, len);
1766 timer->seqnum = seqnum;
1768 timer->timeout = timeout + delay;
1769 timer->duration = duration;
1770 if (type == TIMER_TYPE_EXPECTED) {
1771 timer->rtx_base = timeout;
1772 timer->rtx_delay = delay;
1773 timer->rtx_retry = 0;
1775 timer->num_rtx_retry = 0;
1776 recalculate_timer (jitterbuffer, timer);
1777 JBUF_SIGNAL_TIMER (priv);
1783 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1784 guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1786 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1787 gboolean seqchange, timechange;
1790 seqchange = timer->seqnum != seqnum;
1791 timechange = timer->timeout != timeout;
1793 if (!seqchange && !timechange)
1796 oldseq = timer->seqnum;
1798 GST_DEBUG_OBJECT (jitterbuffer,
1799 "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1800 oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1802 timer->timeout = timeout + delay;
1803 timer->seqnum = seqnum;
1805 timer->rtx_base = timeout;
1806 timer->rtx_delay = delay;
1807 timer->rtx_retry = 0;
1810 timer->num_rtx_retry = 0;
1812 if (priv->clock_id) {
1813 /* we changed the seqnum and there is a timer currently waiting with this
1814 * seqnum, unschedule it */
1815 if (seqchange && priv->timer_seqnum == oldseq)
1816 unschedule_current_timer (jitterbuffer);
1817 /* we changed the time, check if it is earlier than what we are waiting
1818 * for and unschedule if so */
1819 else if (timechange)
1820 recalculate_timer (jitterbuffer, timer);
1825 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1826 guint16 seqnum, GstClockTime timeout)
1830 /* find the seqnum timer */
1831 timer = find_timer (jitterbuffer, type, seqnum);
1832 if (timer == NULL) {
1833 timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1835 reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1841 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1843 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1846 if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1847 unschedule_current_timer (jitterbuffer);
1850 GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1851 g_array_remove_index_fast (priv->timers, idx);
1856 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1858 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1859 GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1860 g_array_set_size (priv->timers, 0);
1861 unschedule_current_timer (jitterbuffer);
1864 /* get the extra delay to wait before sending RTX */
1866 get_rtx_delay (GstRtpJitterBufferPrivate * priv)
1870 if (priv->rtx_delay == -1) {
1871 if (priv->avg_jitter == 0)
1872 delay = DEFAULT_AUTO_RTX_DELAY;
1874 /* jitter is in nanoseconds, 2x jitter is a good margin */
1875 delay = priv->avg_jitter * 2;
1877 delay = priv->rtx_delay * GST_MSECOND;
1879 if (priv->rtx_min_delay > 0)
1880 delay = MAX (delay, priv->rtx_min_delay * GST_MSECOND);
1885 /* we just received a packet with seqnum and dts.
1887 * First check for old seqnum that we are still expecting. If the gap with the
1888 * current seqnum is too big, unschedule the timeouts.
1890 * If we have a valid packet spacing estimate we can set a timer for when we
1891 * should receive the next packet.
1892 * If we don't have a valid estimate, we remove any timer we might have
1893 * had for this packet.
1896 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1897 GstClockTime dts, gboolean do_next_seqnum)
1899 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1900 TimerData *timer = NULL;
1903 /* go through all timers and unschedule the ones with a large gap, also find
1904 * the timer for the seqnum */
1905 len = priv->timers->len;
1906 for (i = 0; i < len; i++) {
1907 TimerData *test = &g_array_index (priv->timers, TimerData, i);
1910 gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1912 GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, #%d<->#%d gap %d", i,
1913 test->type, test->seqnum, seqnum, gap);
1916 GST_DEBUG ("found timer for current seqnum");
1917 /* the timer for the current seqnum */
1919 /* when no retransmission, we can stop now, we only need to find the
1920 * timer for the current seqnum */
1921 if (!priv->do_retransmission)
1923 } else if (gap > priv->rtx_delay_reorder) {
1924 /* max gap, we exceeded the max reorder distance and we don't expect the
1925 * missing packet to be this reordered */
1926 if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1927 reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1931 do_next_seqnum = do_next_seqnum && priv->packet_spacing > 0
1932 && priv->do_retransmission;
1934 if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1935 if (timer->num_rtx_retry > 0) {
1936 GstClockTime rtx_last, delay;
1938 /* we scheduled a retry for this packet and now we have it */
1939 priv->num_rtx_success++;
1940 /* all the previous retry attempts failed */
1941 priv->num_rtx_failed += timer->num_rtx_retry - 1;
1942 /* number of retries before receiving the packet */
1943 if (priv->avg_rtx_num == 0.0)
1944 priv->avg_rtx_num = timer->num_rtx_retry;
1946 priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1947 /* calculate the delay between retransmission request and receiving this
1948 * packet, start with when we scheduled this timeout last */
1949 rtx_last = timer->rtx_last;
1950 if (dts != GST_CLOCK_TIME_NONE && dts > rtx_last) {
1951 /* we have a valid delay if this packet arrived after we scheduled the
1953 delay = dts - rtx_last;
1954 if (priv->avg_rtx_rtt == 0)
1955 priv->avg_rtx_rtt = delay;
1957 priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1961 GST_LOG_OBJECT (jitterbuffer,
1962 "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1963 ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1964 ", avg-num %g, delay %" GST_TIME_FORMAT ", avg-rtt %" GST_TIME_FORMAT,
1965 priv->num_rtx_success, priv->num_rtx_failed, priv->num_rtx_requests,
1966 priv->num_duplicates, priv->avg_rtx_num, GST_TIME_ARGS (delay),
1967 GST_TIME_ARGS (priv->avg_rtx_rtt));
1969 /* don't try to estimate the next seqnum because this is a retransmitted
1970 * packet and it probably did not arrive with the expected packet
1972 do_next_seqnum = FALSE;
1976 if (do_next_seqnum) {
1977 GstClockTime expected, delay;
1979 /* calculate expected arrival time of the next seqnum */
1980 expected = dts + priv->packet_spacing;
1982 delay = get_rtx_delay (priv);
1984 /* and update/install timer for next seqnum */
1986 reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1989 add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1990 expected, delay, priv->packet_spacing);
1991 } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1992 /* if we had a timer, remove it, we don't know when to expect the next
1994 remove_timer (jitterbuffer, timer);
1999 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
2002 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2004 /* we need consecutive seqnums with a different
2005 * rtptime to estimate the packet spacing. */
2006 if (priv->ips_rtptime != rtptime) {
2007 /* rtptime changed, check dts diff */
2008 if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
2009 priv->packet_spacing = dts - priv->ips_dts;
2010 GST_DEBUG_OBJECT (jitterbuffer,
2011 "new packet spacing %" GST_TIME_FORMAT,
2012 GST_TIME_ARGS (priv->packet_spacing));
2014 priv->ips_rtptime = rtptime;
2015 priv->ips_dts = dts;
2020 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
2021 guint16 seqnum, GstClockTime dts, gint gap)
2023 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2024 GstClockTime total_duration, duration, expected_dts;
2026 guint lost_packets = 0;
2028 GST_DEBUG_OBJECT (jitterbuffer,
2029 "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
2030 GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
2032 /* the total duration spanned by the missing packets */
2033 if (dts >= priv->last_in_dts)
2034 total_duration = dts - priv->last_in_dts;
2038 /* interpolate between the current time and the last time based on
2039 * number of packets we are missing, this is the estimated duration
2040 * for the missing packet based on equidistant packet spacing. */
2041 duration = total_duration / (gap + 1);
2043 GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
2044 GST_TIME_ARGS (duration));
2046 if (total_duration > priv->latency_ns) {
2047 GstClockTime gap_time;
2049 gap_time = total_duration - priv->latency_ns;
2052 lost_packets = gap_time / duration;
2053 gap_time = lost_packets * duration;
2058 /* too many lost packets, some of the missing packets are already
2059 * too late and we can generate lost packet events for them. */
2060 GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
2061 " > %" GST_TIME_FORMAT ", consider %u lost",
2062 GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
2065 /* this timer will fire immediately and the lost event will be pushed from
2066 * the timer thread */
2067 add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
2068 priv->last_in_dts + duration, 0, gap_time);
2070 expected += lost_packets;
2071 priv->last_in_dts += gap_time;
2074 expected_dts = priv->last_in_dts + (lost_packets + 1) * duration;
2076 if (priv->do_retransmission) {
2079 type = TIMER_TYPE_EXPECTED;
2080 /* if we had a timer for the first missing packet, update it. */
2081 if ((timer = find_timer (jitterbuffer, type, expected))) {
2082 GstClockTime timeout = timer->timeout;
2084 timer->duration = duration;
2085 if (timeout > (expected_dts + timer->rtx_retry)) {
2086 GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
2087 reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
2091 expected_dts += duration;
2094 type = TIMER_TYPE_LOST;
2097 while (gst_rtp_buffer_compare_seqnum (expected, seqnum) > 0) {
2098 add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
2099 expected_dts += duration;
2105 calculate_jitter (GstRtpJitterBuffer * jitterbuffer, GstClockTime dts,
2109 GstClockTimeDiff dtsdiff, rtpdiffns, diff;
2110 GstRtpJitterBufferPrivate *priv;
2112 priv = jitterbuffer->priv;
2114 if (G_UNLIKELY (dts == GST_CLOCK_TIME_NONE) || priv->clock_rate <= 0)
2117 if (priv->last_dts != -1)
2118 dtsdiff = dts - priv->last_dts;
2122 if (priv->last_rtptime != -1)
2123 rtpdiff = rtptime - (guint32) priv->last_rtptime;
2127 priv->last_dts = dts;
2128 priv->last_rtptime = rtptime;
2132 gst_util_uint64_scale_int (rtpdiff, GST_SECOND, priv->clock_rate);
2135 -gst_util_uint64_scale_int (-rtpdiff, GST_SECOND, priv->clock_rate);
2137 diff = ABS (dtsdiff - rtpdiffns);
2139 /* jitter is stored in nanoseconds */
2140 priv->avg_jitter = (diff + (15 * priv->avg_jitter)) >> 4;
2142 GST_LOG_OBJECT (jitterbuffer,
2143 "dtsdiff %" GST_TIME_FORMAT " rtptime %" GST_TIME_FORMAT
2144 ", clock-rate %d, diff %" GST_TIME_FORMAT ", jitter: %" GST_TIME_FORMAT,
2145 GST_TIME_ARGS (dtsdiff), GST_TIME_ARGS (rtpdiffns), priv->clock_rate,
2146 GST_TIME_ARGS (diff), GST_TIME_ARGS (priv->avg_jitter));
2153 GST_DEBUG_OBJECT (jitterbuffer,
2154 "no dts or no clock-rate, can't calculate jitter");
2159 static GstFlowReturn
2160 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
2163 GstRtpJitterBuffer *jitterbuffer;
2164 GstRtpJitterBufferPrivate *priv;
2166 guint32 expected, rtptime;
2167 GstFlowReturn ret = GST_FLOW_OK;
2168 GstClockTime dts, pts;
2173 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
2174 gboolean do_next_seqnum = FALSE;
2175 RTPJitterBufferItem *item;
2176 GstMessage *msg = NULL;
2178 jitterbuffer = GST_RTP_JITTER_BUFFER_CAST (parent);
2180 priv = jitterbuffer->priv;
2182 if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
2183 goto invalid_buffer;
2185 pt = gst_rtp_buffer_get_payload_type (&rtp);
2186 seqnum = gst_rtp_buffer_get_seq (&rtp);
2187 rtptime = gst_rtp_buffer_get_timestamp (&rtp);
2188 gst_rtp_buffer_unmap (&rtp);
2190 /* make sure we have PTS and DTS set */
2191 pts = GST_BUFFER_PTS (buffer);
2192 dts = GST_BUFFER_DTS (buffer);
2198 /* take the DTS of the buffer. This is the time when the packet was
2199 * received and is used to calculate jitter and clock skew. We will adjust
2200 * this DTS with the smoothed value after processing it in the
2201 * jitterbuffer and assign it as the PTS. */
2202 /* bring to running time */
2203 dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
2205 GST_DEBUG_OBJECT (jitterbuffer,
2206 "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
2207 GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
2209 JBUF_LOCK_CHECK (priv, out_flushing);
2211 if (G_UNLIKELY (priv->last_pt != pt)) {
2214 GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
2218 /* reset clock-rate so that we get a new one */
2219 priv->clock_rate = -1;
2221 /* Try to get the clock-rate from the caps first if we can. If there are no
2222 * caps we must fire the signal to get the clock-rate. */
2223 if ((caps = gst_pad_get_current_caps (pad))) {
2224 gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
2225 gst_caps_unref (caps);
2229 if (G_UNLIKELY (priv->clock_rate == -1)) {
2230 /* no clock rate given on the caps, try to get one with the signal */
2231 if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
2232 pt) == GST_FLOW_FLUSHING)
2235 if (G_UNLIKELY (priv->clock_rate == -1))
2239 /* don't accept more data on EOS */
2240 if (G_UNLIKELY (priv->eos))
2243 calculate_jitter (jitterbuffer, dts, rtptime);
2245 if (priv->seqnum_base != -1) {
2248 gap = gst_rtp_buffer_compare_seqnum (priv->seqnum_base, seqnum);
2251 GST_DEBUG_OBJECT (jitterbuffer,
2252 "packet seqnum #%d before seqnum-base #%d", seqnum,
2254 gst_buffer_unref (buffer);
2257 } else if (gap > 16384) {
2258 /* From now on don't compare against the seqnum base anymore as
2259 * at some point in the future we will wrap around and also that
2260 * much reordering is very unlikely */
2261 priv->seqnum_base = -1;
2265 expected = priv->next_in_seqnum;
2267 /* now check against our expected seqnum */
2268 if (G_LIKELY (expected != -1)) {
2271 /* now calculate gap */
2272 gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
2274 GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
2275 expected, seqnum, gap);
2277 if (G_LIKELY (gap == 0)) {
2278 /* packet is expected */
2279 calculate_packet_spacing (jitterbuffer, rtptime, dts);
2280 do_next_seqnum = TRUE;
2282 gboolean reset = FALSE;
2284 if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2285 /* We would run into calculations with GST_CLOCK_TIME_NONE below
2286 * and can't compensate for anything without DTS on RTP packets
2288 goto gap_but_no_dts;
2289 } else if (gap < 0) {
2290 /* we received an old packet */
2291 if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2292 /* too old packet, reset */
2293 GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2297 GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2300 /* new packet, we are missing some packets */
2301 if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2302 /* packet too far in future, reset */
2303 GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2307 GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2308 /* fill in the gap with EXPECTED timers */
2309 calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2311 do_next_seqnum = TRUE;
2314 if (G_UNLIKELY (reset)) {
2315 GList *events = NULL, *l;
2317 GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2318 rtp_jitter_buffer_flush (priv->jbuf,
2319 (GFunc) free_item_and_retain_events, &events);
2320 rtp_jitter_buffer_reset_skew (priv->jbuf);
2321 remove_all_timers (jitterbuffer);
2322 priv->discont = TRUE;
2323 priv->last_popped_seqnum = -1;
2324 priv->next_seqnum = seqnum;
2325 do_next_seqnum = TRUE;
2327 /* Insert all sticky events again in order, otherwise we would
2328 * potentially loose STREAM_START, CAPS or SEGMENT events
2330 events = g_list_reverse (events);
2331 for (l = events; l; l = l->next) {
2332 RTPJitterBufferItem *item;
2334 item = alloc_item (l->data, ITEM_TYPE_EVENT, -1, -1, -1, 0, -1);
2335 rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2337 g_list_free (events);
2339 JBUF_SIGNAL_EVENT (priv);
2341 /* reset spacing estimation when gap */
2342 priv->ips_rtptime = -1;
2343 priv->ips_dts = GST_CLOCK_TIME_NONE;
2346 GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2347 /* we don't know what the next_in_seqnum should be, wait for the last
2348 * possible moment to push this buffer, maybe we get an earlier seqnum
2350 set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2351 do_next_seqnum = TRUE;
2352 /* take rtptime and dts to calculate packet spacing */
2353 priv->ips_rtptime = rtptime;
2354 priv->ips_dts = dts;
2356 if (do_next_seqnum) {
2357 priv->last_in_seqnum = seqnum;
2358 priv->last_in_dts = dts;
2359 priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2362 /* let's check if this buffer is too late, we can only accept packets with
2363 * bigger seqnum than the one we last pushed. */
2364 if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2367 gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2369 /* priv->last_popped_seqnum >= seqnum, we're too late. */
2370 if (G_UNLIKELY (gap <= 0))
2374 /* let's drop oldest packet if the queue is already full and drop-on-latency
2375 * is set. We can only do this when there actually is a latency. When no
2376 * latency is set, we just pump it in the queue and let the other end push it
2377 * out as fast as possible. */
2378 if (priv->latency_ms && priv->drop_on_latency) {
2380 gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2382 if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2383 RTPJitterBufferItem *old_item;
2385 old_item = rtp_jitter_buffer_peek (priv->jbuf);
2387 if (IS_DROPABLE (old_item)) {
2388 old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2389 GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2391 priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2392 free_item (old_item);
2394 /* we might have removed some head buffers, signal the pushing thread to
2395 * see if it can push now */
2396 JBUF_SIGNAL_EVENT (priv);
2400 item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2402 /* now insert the packet into the queue in sorted order. This function returns
2403 * FALSE if a packet with the same seqnum was already in the queue, meaning we
2404 * have a duplicate. */
2405 if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2410 update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2412 /* we had an unhandled SR, handle it now */
2414 do_handle_sync (jitterbuffer);
2416 if (G_UNLIKELY (head)) {
2417 /* signal addition of new buffer when the _loop is waiting. */
2418 if (G_LIKELY (priv->active))
2419 JBUF_SIGNAL_EVENT (priv);
2421 /* let's unschedule and unblock any waiting buffers. We only want to do this
2422 * when the head buffer changed */
2423 if (G_UNLIKELY (priv->clock_id)) {
2424 GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2425 unschedule_current_timer (jitterbuffer);
2429 GST_DEBUG_OBJECT (jitterbuffer,
2430 "Pushed packet #%d, now %d packets, head: %d, " "percent %d", seqnum,
2431 rtp_jitter_buffer_num_packets (priv->jbuf), head, percent);
2433 msg = check_buffering_percent (jitterbuffer, percent);
2439 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2446 /* this is not fatal but should be filtered earlier */
2447 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2448 ("Received invalid RTP payload, dropping"));
2449 gst_buffer_unref (buffer);
2454 GST_WARNING_OBJECT (jitterbuffer,
2455 "No clock-rate in caps!, dropping buffer");
2456 gst_buffer_unref (buffer);
2461 ret = priv->srcresult;
2462 GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2463 gst_buffer_unref (buffer);
2469 GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2470 gst_buffer_unref (buffer);
2475 GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2476 " popped, dropping", seqnum, priv->last_popped_seqnum);
2478 gst_buffer_unref (buffer);
2483 GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2485 priv->num_duplicates++;
2491 /* this is fatal as we can't compensate for gaps without DTS */
2492 GST_ELEMENT_ERROR (jitterbuffer, STREAM, DECODE, (NULL),
2493 ("Received packet without DTS after a gap"));
2494 gst_buffer_unref (buffer);
2495 ret = GST_FLOW_ERROR;
2501 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2503 guint64 ext_time, elapsed;
2505 GstRtpJitterBufferPrivate *priv;
2507 priv = jitterbuffer->priv;
2508 rtp_time = item->rtptime;
2510 GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2511 G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2513 if (rtp_time < priv->ext_timestamp) {
2514 ext_time = priv->ext_timestamp;
2516 ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2519 if (ext_time > priv->clock_base)
2520 elapsed = ext_time - priv->clock_base;
2524 elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2529 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2530 RTPJitterBufferItem * item)
2532 guint64 total, elapsed, left, estimated;
2533 GstClockTime out_time;
2534 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2536 if (priv->npt_stop == -1 || priv->ext_timestamp == -1
2537 || priv->clock_base == -1 || priv->clock_rate <= 0)
2540 /* compute the elapsed time */
2541 elapsed = compute_elapsed (jitterbuffer, item);
2543 /* do nothing if elapsed time doesn't increment */
2544 if (priv->last_elapsed && elapsed <= priv->last_elapsed)
2547 priv->last_elapsed = elapsed;
2549 /* this is the total time we need to play */
2550 total = priv->npt_stop - priv->npt_start;
2551 GST_LOG_OBJECT (jitterbuffer, "total %" GST_TIME_FORMAT,
2552 GST_TIME_ARGS (total));
2554 /* this is how much time there is left */
2555 if (total > elapsed)
2556 left = total - elapsed;
2560 /* if we have less time left that the size of the buffer, we will not
2561 * be able to keep it filled, disabled buffering then */
2562 if (left < rtp_jitter_buffer_get_delay (priv->jbuf)) {
2563 GST_DEBUG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT
2564 ", disable buffering close to EOS", GST_TIME_ARGS (left));
2565 rtp_jitter_buffer_disable_buffering (priv->jbuf, TRUE);
2568 /* this is the current time as running-time */
2569 out_time = item->dts;
2572 estimated = gst_util_uint64_scale (out_time, total, elapsed);
2574 /* if there is almost nothing left,
2575 * we may never advance enough to end up in the above case */
2576 if (total < GST_SECOND)
2577 estimated = GST_SECOND;
2581 GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2582 GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2584 if (estimated != -1 && priv->estimated_eos != estimated) {
2585 set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2586 priv->estimated_eos = estimated;
2590 /* take a buffer from the queue and push it */
2591 static GstFlowReturn
2592 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
2594 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2595 GstFlowReturn result = GST_FLOW_OK;
2596 RTPJitterBufferItem *item;
2597 GstBuffer *outbuf = NULL;
2598 GstEvent *outevent = NULL;
2599 GstQuery *outquery = NULL;
2600 GstClockTime dts, pts;
2602 gboolean do_push = TRUE;
2606 /* when we get here we are ready to pop and push the buffer */
2607 item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2611 case ITEM_TYPE_BUFFER:
2613 /* we need to make writable to change the flags and timestamps */
2614 outbuf = gst_buffer_make_writable (item->data);
2616 if (G_UNLIKELY (priv->discont)) {
2617 /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2618 * into the jitterbuffer so we can modify now. */
2619 GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2620 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2621 priv->discont = FALSE;
2623 if (G_UNLIKELY (priv->ts_discont)) {
2624 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2625 priv->ts_discont = FALSE;
2629 gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2631 gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2633 /* apply timestamp with offset to buffer now */
2634 GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2635 GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2637 /* update the elapsed time when we need to check against the npt stop time. */
2638 update_estimated_eos (jitterbuffer, item);
2640 priv->last_out_time = GST_BUFFER_PTS (outbuf);
2642 case ITEM_TYPE_LOST:
2643 priv->discont = TRUE;
2647 case ITEM_TYPE_EVENT:
2648 outevent = item->data;
2650 case ITEM_TYPE_QUERY:
2651 outquery = item->data;
2655 /* now we are ready to push the buffer. Save the seqnum and release the lock
2656 * so the other end can push stuff in the queue again. */
2658 priv->last_popped_seqnum = seqnum;
2659 priv->next_seqnum = (seqnum + item->count) & 0xffff;
2661 msg = check_buffering_percent (jitterbuffer, percent);
2668 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), msg);
2671 case ITEM_TYPE_BUFFER:
2673 GST_DEBUG_OBJECT (jitterbuffer,
2674 "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2675 seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2676 GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2677 result = gst_pad_push (priv->srcpad, outbuf);
2679 JBUF_LOCK_CHECK (priv, out_flushing);
2681 case ITEM_TYPE_LOST:
2682 case ITEM_TYPE_EVENT:
2683 GST_DEBUG_OBJECT (jitterbuffer, "%sPushing event %" GST_PTR_FORMAT
2684 ", seqnum %d", do_push ? "" : "NOT ", outevent, seqnum);
2687 gst_pad_push_event (priv->srcpad, outevent);
2689 gst_event_unref (outevent);
2691 result = GST_FLOW_OK;
2693 JBUF_LOCK_CHECK (priv, out_flushing);
2695 case ITEM_TYPE_QUERY:
2699 res = gst_pad_peer_query (priv->srcpad, outquery);
2701 JBUF_LOCK_CHECK (priv, out_flushing);
2702 result = GST_FLOW_OK;
2703 GST_LOG_OBJECT (jitterbuffer, "did query %p, return %d", outquery, res);
2704 JBUF_SIGNAL_QUERY (priv, res);
2713 return priv->srcresult;
2717 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2719 /* Peek a buffer and compare the seqnum to the expected seqnum.
2720 * If all is fine, the buffer is pushed.
2721 * If something is wrong, we wait for some event
2723 static GstFlowReturn
2724 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2726 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2727 GstFlowReturn result = GST_FLOW_OK;
2728 RTPJitterBufferItem *item;
2730 guint32 next_seqnum;
2733 /* only push buffers when PLAYING and active and not buffering */
2734 if (priv->blocked || !priv->active ||
2735 rtp_jitter_buffer_is_buffering (priv->jbuf))
2736 return GST_FLOW_WAIT;
2739 /* peek a buffer, we're just looking at the sequence number.
2740 * If all is fine, we'll pop and push it. If the sequence number is wrong we
2741 * wait for a timeout or something to change.
2742 * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2743 item = rtp_jitter_buffer_peek (priv->jbuf);
2747 /* get the seqnum and the next expected seqnum */
2748 seqnum = item->seqnum;
2752 next_seqnum = priv->next_seqnum;
2754 /* get the gap between this and the previous packet. If we don't know the
2755 * previous packet seqnum assume no gap. */
2756 if (G_UNLIKELY (next_seqnum == -1)) {
2757 GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2758 /* we don't know what the next_seqnum should be, the chain function should
2759 * have scheduled a DEADLINE timer that will increment next_seqnum when it
2760 * fires, so wait for that */
2761 result = GST_FLOW_WAIT;
2763 /* else calculate GAP */
2764 gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2766 if (G_LIKELY (gap == 0)) {
2768 /* no missing packet, pop and push */
2769 result = pop_and_push_next (jitterbuffer, seqnum);
2770 } else if (G_UNLIKELY (gap < 0)) {
2771 RTPJitterBufferItem *item;
2772 /* if we have a packet that we already pushed or considered dropped, pop it
2773 * off and get the next packet */
2774 GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2775 seqnum, next_seqnum);
2776 item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2780 /* the chain function has scheduled timers to request retransmission or
2781 * when to consider the packet lost, wait for that */
2782 GST_DEBUG_OBJECT (jitterbuffer,
2783 "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2784 next_seqnum, seqnum, gap);
2785 result = GST_FLOW_WAIT;
2792 GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2794 result = GST_FLOW_EOS;
2796 result = GST_FLOW_WAIT;
2802 get_rtx_retry_timeout (GstRtpJitterBufferPrivate * priv)
2804 GstClockTime rtx_retry_timeout;
2805 GstClockTime rtx_min_retry_timeout;
2807 if (priv->rtx_retry_timeout == -1) {
2808 if (priv->avg_rtx_rtt == 0)
2809 rtx_retry_timeout = DEFAULT_AUTO_RTX_TIMEOUT;
2811 /* we want to ask for a retransmission after we waited for a
2812 * complete RTT and the additional jitter */
2813 rtx_retry_timeout = priv->avg_rtx_rtt + priv->avg_jitter * 2;
2815 rtx_retry_timeout = priv->rtx_retry_timeout * GST_MSECOND;
2817 /* make sure we don't retry too often. On very low latency networks,
2818 * the RTT and jitter can be very low. */
2819 if (priv->rtx_min_retry_timeout == -1) {
2820 rtx_min_retry_timeout = priv->packet_spacing;
2822 rtx_min_retry_timeout = priv->rtx_min_retry_timeout * GST_MSECOND;
2824 rtx_retry_timeout = MAX (rtx_retry_timeout, rtx_min_retry_timeout);
2826 return rtx_retry_timeout;
2830 get_rtx_retry_period (GstRtpJitterBufferPrivate * priv,
2831 GstClockTime rtx_retry_timeout)
2833 GstClockTime rtx_retry_period;
2835 if (priv->rtx_retry_period == -1) {
2836 /* we retry up to the configured jitterbuffer size but leaving some
2837 * room for the retransmission to arrive in time */
2838 if (rtx_retry_timeout > priv->latency_ns) {
2839 rtx_retry_period = 0;
2841 rtx_retry_period = priv->latency_ns - rtx_retry_timeout;
2844 rtx_retry_period = priv->rtx_retry_period * GST_MSECOND;
2846 return rtx_retry_period;
2849 /* the timeout for when we expected a packet expired */
2851 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2854 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2856 guint delay, delay_ms, avg_rtx_rtt_ms;
2857 guint rtx_retry_timeout_ms, rtx_retry_period_ms;
2858 GstClockTime rtx_retry_period;
2859 GstClockTime rtx_retry_timeout;
2862 GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive, now %"
2863 GST_TIME_FORMAT, timer->seqnum, GST_TIME_ARGS (now));
2865 rtx_retry_timeout = get_rtx_retry_timeout (priv);
2866 rtx_retry_period = get_rtx_retry_period (priv, rtx_retry_timeout);
2868 GST_DEBUG_OBJECT (jitterbuffer, "timeout %" GST_TIME_FORMAT ", period %"
2869 GST_TIME_FORMAT, GST_TIME_ARGS (rtx_retry_timeout),
2870 GST_TIME_ARGS (rtx_retry_period));
2872 delay = timer->rtx_delay + timer->rtx_retry;
2874 delay_ms = GST_TIME_AS_MSECONDS (delay);
2875 rtx_retry_timeout_ms = GST_TIME_AS_MSECONDS (rtx_retry_timeout);
2876 rtx_retry_period_ms = GST_TIME_AS_MSECONDS (rtx_retry_period);
2877 avg_rtx_rtt_ms = GST_TIME_AS_MSECONDS (priv->avg_rtx_rtt);
2879 event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2880 gst_structure_new ("GstRTPRetransmissionRequest",
2881 "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2882 "running-time", G_TYPE_UINT64, timer->rtx_base,
2883 "delay", G_TYPE_UINT, delay_ms,
2884 "retry", G_TYPE_UINT, timer->num_rtx_retry,
2885 "frequency", G_TYPE_UINT, rtx_retry_timeout_ms,
2886 "period", G_TYPE_UINT, rtx_retry_period_ms,
2887 "deadline", G_TYPE_UINT, priv->latency_ms,
2888 "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
2889 "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
2891 priv->num_rtx_requests++;
2892 timer->num_rtx_retry++;
2894 GST_OBJECT_LOCK (jitterbuffer);
2895 if ((clock = GST_ELEMENT_CLOCK (jitterbuffer))) {
2896 timer->rtx_last = gst_clock_get_time (clock);
2897 timer->rtx_last -= GST_ELEMENT_CAST (jitterbuffer)->base_time;
2899 timer->rtx_last = now;
2901 GST_OBJECT_UNLOCK (jitterbuffer);
2903 /* calculate the timeout for the next retransmission attempt */
2904 timer->rtx_retry += rtx_retry_timeout;
2905 GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2906 GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT ", num_retry %u",
2907 GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2908 GST_TIME_ARGS (timer->rtx_retry), timer->num_rtx_retry);
2909 if ((priv->rtx_max_retries != -1
2910 && timer->num_rtx_retry >= priv->rtx_max_retries)
2911 || (timer->rtx_retry + timer->rtx_delay > rtx_retry_period)) {
2912 GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2913 /* too many retransmission request, we now convert the timer
2914 * to a lost timer, leave the num_rtx_retry as it is for stats */
2915 timer->type = TIMER_TYPE_LOST;
2916 timer->rtx_delay = 0;
2917 timer->rtx_retry = 0;
2919 reschedule_timer (jitterbuffer, timer, timer->seqnum,
2920 timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2923 gst_pad_push_event (priv->sinkpad, event);
2929 /* a packet is lost */
2931 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2934 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2935 GstClockTime duration, timestamp;
2936 guint seqnum, lost_packets, num_rtx_retry, next_in_seqnum;
2937 gboolean late, head;
2939 RTPJitterBufferItem *item;
2941 seqnum = timer->seqnum;
2942 timestamp = apply_offset (jitterbuffer, timer->timeout);
2943 duration = timer->duration;
2944 if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2945 duration = priv->packet_spacing;
2946 lost_packets = MAX (timer->num, 1);
2947 late = timer->num > 0;
2948 num_rtx_retry = timer->num_rtx_retry;
2950 /* we had a gap and thus we lost some packets. Create an event for this. */
2951 if (lost_packets > 1)
2952 GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2953 seqnum + lost_packets - 1);
2955 GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2957 priv->num_late += lost_packets;
2958 priv->num_rtx_failed += num_rtx_retry;
2960 next_in_seqnum = (seqnum + lost_packets) & 0xffff;
2962 /* we now only accept seqnum bigger than this */
2963 if (gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, next_in_seqnum) > 0)
2964 priv->next_in_seqnum = next_in_seqnum;
2966 /* create paket lost event */
2967 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2968 gst_structure_new ("GstRTPPacketLost",
2969 "seqnum", G_TYPE_UINT, (guint) seqnum,
2970 "timestamp", G_TYPE_UINT64, timestamp,
2971 "duration", G_TYPE_UINT64, duration,
2972 "late", G_TYPE_BOOLEAN, late,
2973 "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2975 item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2976 rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
2978 /* remove timer now */
2979 remove_timer (jitterbuffer, timer);
2981 JBUF_SIGNAL_EVENT (priv);
2987 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2990 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2992 GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2993 remove_timer (jitterbuffer, timer);
2995 /* there was no EOS in the buffer, put one in there now */
2996 queue_event (jitterbuffer, gst_event_new_eos ());
2998 JBUF_SIGNAL_EVENT (priv);
3004 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3007 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3009 GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
3011 /* timer seqnum might have been obsoleted by caps seqnum-base,
3012 * only mess with current ongoing seqnum if still unknown */
3013 if (priv->next_seqnum == -1)
3014 priv->next_seqnum = timer->seqnum;
3015 remove_timer (jitterbuffer, timer);
3016 JBUF_SIGNAL_EVENT (priv);
3022 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
3025 gboolean removed = FALSE;
3027 switch (timer->type) {
3028 case TIMER_TYPE_EXPECTED:
3029 removed = do_expected_timeout (jitterbuffer, timer, now);
3031 case TIMER_TYPE_LOST:
3032 removed = do_lost_timeout (jitterbuffer, timer, now);
3034 case TIMER_TYPE_DEADLINE:
3035 removed = do_deadline_timeout (jitterbuffer, timer, now);
3037 case TIMER_TYPE_EOS:
3038 removed = do_eos_timeout (jitterbuffer, timer, now);
3044 /* called when we need to wait for the next timeout.
3046 * We loop over the array of recorded timeouts and wait for the earliest one.
3047 * When it timed out, do the logic associated with the timer.
3049 * If there are no timers, we wait on a gcond until something new happens.
3052 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
3054 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
3055 GstClockTime now = 0;
3058 while (priv->timer_running) {
3059 TimerData *timer = NULL;
3060 GstClockTime timer_timeout = -1;
3063 GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
3064 GST_TIME_ARGS (now));
3066 len = priv->timers->len;
3067 for (i = 0; i < len; i++) {
3068 TimerData *test = &g_array_index (priv->timers, TimerData, i);
3069 GstClockTime test_timeout = get_timeout (jitterbuffer, test);
3070 gboolean save_best = FALSE;
3072 GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
3073 i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
3075 /* find the smallest timeout */
3076 if (timer == NULL) {
3078 } else if (timer_timeout == -1) {
3079 /* we already have an immediate timeout, the new timer must be an
3080 * immediate timer with smaller seqnum to become the best */
3081 if (test_timeout == -1
3082 && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3083 timer->seqnum) > 0))
3085 } else if (test_timeout == -1) {
3086 /* first immediate timer */
3088 } else if (test_timeout < timer_timeout) {
3091 } else if (test_timeout == timer_timeout
3092 && (gst_rtp_buffer_compare_seqnum (test->seqnum,
3093 timer->seqnum) > 0)) {
3094 /* same timer, smaller seqnum */
3098 GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
3100 timer_timeout = test_timeout;
3103 if (timer && !priv->blocked) {
3105 GstClockTime sync_time;
3108 GstClockTimeDiff clock_jitter;
3110 if (timer_timeout == -1 || timer_timeout <= now) {
3111 do_timeout (jitterbuffer, timer, now);
3112 /* check here, do_timeout could have released the lock */
3113 if (!priv->timer_running)
3118 GST_OBJECT_LOCK (jitterbuffer);
3119 clock = GST_ELEMENT_CLOCK (jitterbuffer);
3121 GST_OBJECT_UNLOCK (jitterbuffer);
3122 /* let's just push if there is no clock */
3123 GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
3124 now = timer_timeout;
3128 /* prepare for sync against clock */
3129 sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
3130 /* add latency of peer to get input time */
3131 sync_time += priv->peer_latency;
3133 GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
3134 " with sync time %" GST_TIME_FORMAT,
3135 GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
3137 /* create an entry for the clock */
3138 id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
3139 priv->timer_timeout = timer_timeout;
3140 priv->timer_seqnum = timer->seqnum;
3141 GST_OBJECT_UNLOCK (jitterbuffer);
3143 /* release the lock so that the other end can push stuff or unlock */
3146 ret = gst_clock_id_wait (id, &clock_jitter);
3149 if (!priv->timer_running) {
3150 gst_clock_id_unref (id);
3151 priv->clock_id = NULL;
3155 if (ret != GST_CLOCK_UNSCHEDULED) {
3156 now = timer_timeout + MAX (clock_jitter, 0);
3157 GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
3158 ret, priv->timer_seqnum, clock_jitter);
3160 GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
3162 /* and free the entry */
3163 gst_clock_id_unref (id);
3164 priv->clock_id = NULL;
3166 /* no timers, wait for activity */
3167 JBUF_WAIT_TIMER (priv);
3172 GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
3177 * This funcion implements the main pushing loop on the source pad.
3179 * It first tries to push as many buffers as possible. If there is a seqnum
3180 * mismatch, we wait for the next timeouts.
3183 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
3185 GstRtpJitterBufferPrivate *priv;
3186 GstFlowReturn result = GST_FLOW_OK;
3188 priv = jitterbuffer->priv;
3190 JBUF_LOCK_CHECK (priv, flushing);
3192 result = handle_next_buffer (jitterbuffer);
3193 if (G_LIKELY (result == GST_FLOW_WAIT)) {
3194 /* now wait for the next event */
3195 JBUF_WAIT_EVENT (priv, flushing);
3196 result = GST_FLOW_OK;
3199 while (result == GST_FLOW_OK);
3200 /* store result for upstream */
3201 priv->srcresult = result;
3202 /* if we get here we need to pause */
3208 result = priv->srcresult;
3215 JBUF_SIGNAL_QUERY (priv, FALSE);
3218 GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s",
3219 gst_flow_get_name (result));
3220 gst_pad_pause_task (priv->srcpad);
3221 if (result == GST_FLOW_EOS) {
3222 event = gst_event_new_eos ();
3223 gst_pad_push_event (priv->srcpad, event);
3229 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
3230 * some sanity checks and then emit the handle-sync signal with the parameters.
3231 * This function must be called with the LOCK */
3233 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
3235 GstRtpJitterBufferPrivate *priv;
3236 guint64 base_rtptime, base_time;
3238 guint64 last_rtptime;
3240 guint64 ext_rtptime, diff;
3241 gboolean valid = TRUE, keep = FALSE;
3243 priv = jitterbuffer->priv;
3245 /* get the last values from the jitterbuffer */
3246 rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
3247 &clock_rate, &last_rtptime);
3249 clock_base = priv->clock_base;
3250 ext_rtptime = priv->ext_rtptime;
3252 GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
3253 G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
3254 ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
3255 ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
3257 if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
3258 /* we keep this SR packet for later. When we get a valid RTP packet the
3259 * above values will be set and we can try to use the SR packet */
3260 GST_DEBUG_OBJECT (jitterbuffer, "keeping for later, no RTP values");
3263 /* we can't accept anything that happened before we did the last resync */
3264 if (base_rtptime > ext_rtptime) {
3265 GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
3268 /* the SR RTP timestamp must be something close to what we last observed
3269 * in the jitterbuffer */
3270 if (ext_rtptime > last_rtptime) {
3271 /* check how far ahead it is to our RTP timestamps */
3272 diff = ext_rtptime - last_rtptime;
3273 /* if bigger than 1 second, we drop it */
3274 if (diff > clock_rate) {
3275 GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
3276 /* should drop this, but some RTSP servers end up with bogus
3277 * way too ahead RTCP packet when repeated PAUSE/PLAY,
3278 * so still trigger rptbin sync but invalidate RTCP data
3279 * (sync might use other methods) */
3282 GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
3283 G_GUINT64_FORMAT, last_rtptime, diff);
3289 GST_DEBUG_OBJECT (jitterbuffer, "keeping RTCP packet for later");
3293 s = gst_structure_new ("application/x-rtp-sync",
3294 "base-rtptime", G_TYPE_UINT64, base_rtptime,
3295 "base-time", G_TYPE_UINT64, base_time,
3296 "clock-rate", G_TYPE_UINT, clock_rate,
3297 "clock-base", G_TYPE_UINT64, clock_base,
3298 "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
3299 "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
3301 GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
3302 gst_buffer_replace (&priv->last_sr, NULL);
3304 g_signal_emit (jitterbuffer,
3305 gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
3307 gst_structure_free (s);
3309 GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
3310 gst_buffer_replace (&priv->last_sr, NULL);
3314 static GstFlowReturn
3315 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
3318 GstRtpJitterBuffer *jitterbuffer;
3319 GstRtpJitterBufferPrivate *priv;
3320 GstFlowReturn ret = GST_FLOW_OK;
3322 GstRTCPPacket packet;
3323 guint64 ext_rtptime;
3325 GstRTCPBuffer rtcp = { NULL, };
3327 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3329 if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
3330 goto invalid_buffer;
3332 priv = jitterbuffer->priv;
3334 gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
3336 if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
3339 /* first packet must be SR or RR or else the validate would have failed */
3340 switch (gst_rtcp_packet_get_type (&packet)) {
3341 case GST_RTCP_TYPE_SR:
3342 gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
3348 gst_rtcp_buffer_unmap (&rtcp);
3350 GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
3353 /* convert the RTP timestamp to our extended timestamp, using the same offset
3354 * we used in the jitterbuffer */
3355 ext_rtptime = priv->jbuf->ext_rtptime;
3356 ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
3358 priv->ext_rtptime = ext_rtptime;
3359 gst_buffer_replace (&priv->last_sr, buffer);
3361 do_handle_sync (jitterbuffer);
3365 gst_buffer_unref (buffer);
3371 /* this is not fatal but should be filtered earlier */
3372 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3373 ("Received invalid RTCP payload, dropping"));
3379 /* this is not fatal but should be filtered earlier */
3380 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
3381 ("Received empty RTCP payload, dropping"));
3382 gst_rtcp_buffer_unmap (&rtcp);
3388 GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
3389 gst_rtcp_buffer_unmap (&rtcp);
3396 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
3399 gboolean res = FALSE;
3400 GstRtpJitterBuffer *jitterbuffer;
3401 GstRtpJitterBufferPrivate *priv;
3403 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3404 priv = jitterbuffer->priv;
3406 switch (GST_QUERY_TYPE (query)) {
3407 case GST_QUERY_CAPS:
3409 GstCaps *filter, *caps;
3411 gst_query_parse_caps (query, &filter);
3412 caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3413 gst_query_set_caps_result (query, caps);
3414 gst_caps_unref (caps);
3419 if (GST_QUERY_IS_SERIALIZED (query)) {
3420 RTPJitterBufferItem *item;
3423 JBUF_LOCK_CHECK (priv, out_flushing);
3424 if (rtp_jitter_buffer_get_mode (priv->jbuf) !=
3425 RTP_JITTER_BUFFER_MODE_BUFFER) {
3426 GST_DEBUG_OBJECT (jitterbuffer, "adding serialized query");
3427 item = alloc_item (query, ITEM_TYPE_QUERY, -1, -1, -1, 0, -1);
3428 rtp_jitter_buffer_insert (priv->jbuf, item, &head, NULL);
3430 JBUF_SIGNAL_EVENT (priv);
3431 JBUF_WAIT_QUERY (priv, out_flushing);
3432 res = priv->last_query;
3434 GST_DEBUG_OBJECT (jitterbuffer, "refusing query, we are buffering");
3439 res = gst_pad_query_default (pad, parent, query);
3447 GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
3455 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
3458 GstRtpJitterBuffer *jitterbuffer;
3459 GstRtpJitterBufferPrivate *priv;
3460 gboolean res = FALSE;
3462 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
3463 priv = jitterbuffer->priv;
3465 switch (GST_QUERY_TYPE (query)) {
3466 case GST_QUERY_LATENCY:
3468 /* We need to send the query upstream and add the returned latency to our
3470 GstClockTime min_latency, max_latency;
3472 GstClockTime our_latency;
3474 if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
3475 gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
3477 GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
3478 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3479 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3481 /* store this so that we can safely sync on the peer buffers. */
3483 priv->peer_latency = min_latency;
3484 our_latency = priv->latency_ns;
3487 GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
3488 GST_TIME_ARGS (our_latency));
3490 /* we add some latency but can buffer an infinite amount of time */
3491 min_latency += our_latency;
3494 GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
3495 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
3496 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
3498 gst_query_set_latency (query, TRUE, min_latency, max_latency);
3502 case GST_QUERY_POSITION:
3504 GstClockTime start, last_out;
3507 gst_query_parse_position (query, &fmt, NULL);
3508 if (fmt != GST_FORMAT_TIME) {
3509 res = gst_pad_query_default (pad, parent, query);
3514 start = priv->npt_start;
3515 last_out = priv->last_out_time;
3518 GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3519 ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3520 GST_TIME_ARGS (last_out));
3522 if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3523 /* bring 0-based outgoing time to stream time */
3524 gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3527 res = gst_pad_query_default (pad, parent, query);
3531 case GST_QUERY_CAPS:
3533 GstCaps *filter, *caps;
3535 gst_query_parse_caps (query, &filter);
3536 caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3537 gst_query_set_caps_result (query, caps);
3538 gst_caps_unref (caps);
3543 res = gst_pad_query_default (pad, parent, query);
3551 gst_rtp_jitter_buffer_set_property (GObject * object,
3552 guint prop_id, const GValue * value, GParamSpec * pspec)
3554 GstRtpJitterBuffer *jitterbuffer;
3555 GstRtpJitterBufferPrivate *priv;
3557 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3558 priv = jitterbuffer->priv;
3563 guint new_latency, old_latency;
3565 new_latency = g_value_get_uint (value);
3568 old_latency = priv->latency_ms;
3569 priv->latency_ms = new_latency;
3570 priv->latency_ns = priv->latency_ms * GST_MSECOND;
3571 rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3574 /* post message if latency changed, this will inform the parent pipeline
3575 * that a latency reconfiguration is possible/needed. */
3576 if (new_latency != old_latency) {
3577 GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3578 GST_TIME_ARGS (new_latency * GST_MSECOND));
3580 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3581 gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3585 case PROP_DROP_ON_LATENCY:
3587 priv->drop_on_latency = g_value_get_boolean (value);
3590 case PROP_TS_OFFSET:
3592 priv->ts_offset = g_value_get_int64 (value);
3593 priv->ts_discont = TRUE;
3598 priv->do_lost = g_value_get_boolean (value);
3603 rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3606 case PROP_DO_RETRANSMISSION:
3608 priv->do_retransmission = g_value_get_boolean (value);
3611 case PROP_RTX_DELAY:
3613 priv->rtx_delay = g_value_get_int (value);
3616 case PROP_RTX_MIN_DELAY:
3618 priv->rtx_min_delay = g_value_get_uint (value);
3621 case PROP_RTX_DELAY_REORDER:
3623 priv->rtx_delay_reorder = g_value_get_int (value);
3626 case PROP_RTX_RETRY_TIMEOUT:
3628 priv->rtx_retry_timeout = g_value_get_int (value);
3631 case PROP_RTX_MIN_RETRY_TIMEOUT:
3633 priv->rtx_min_retry_timeout = g_value_get_int (value);
3636 case PROP_RTX_RETRY_PERIOD:
3638 priv->rtx_retry_period = g_value_get_int (value);
3641 case PROP_RTX_MAX_RETRIES:
3643 priv->rtx_max_retries = g_value_get_int (value);
3647 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3653 gst_rtp_jitter_buffer_get_property (GObject * object,
3654 guint prop_id, GValue * value, GParamSpec * pspec)
3656 GstRtpJitterBuffer *jitterbuffer;
3657 GstRtpJitterBufferPrivate *priv;
3659 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3660 priv = jitterbuffer->priv;
3665 g_value_set_uint (value, priv->latency_ms);
3668 case PROP_DROP_ON_LATENCY:
3670 g_value_set_boolean (value, priv->drop_on_latency);
3673 case PROP_TS_OFFSET:
3675 g_value_set_int64 (value, priv->ts_offset);
3680 g_value_set_boolean (value, priv->do_lost);
3685 g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3693 if (priv->srcresult != GST_FLOW_OK)
3696 percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3698 g_value_set_int (value, percent);
3702 case PROP_DO_RETRANSMISSION:
3704 g_value_set_boolean (value, priv->do_retransmission);
3707 case PROP_RTX_DELAY:
3709 g_value_set_int (value, priv->rtx_delay);
3712 case PROP_RTX_MIN_DELAY:
3714 g_value_set_uint (value, priv->rtx_min_delay);
3717 case PROP_RTX_DELAY_REORDER:
3719 g_value_set_int (value, priv->rtx_delay_reorder);
3722 case PROP_RTX_RETRY_TIMEOUT:
3724 g_value_set_int (value, priv->rtx_retry_timeout);
3727 case PROP_RTX_MIN_RETRY_TIMEOUT:
3729 g_value_set_int (value, priv->rtx_min_retry_timeout);
3732 case PROP_RTX_RETRY_PERIOD:
3734 g_value_set_int (value, priv->rtx_retry_period);
3737 case PROP_RTX_MAX_RETRIES:
3739 g_value_set_int (value, priv->rtx_max_retries);
3743 g_value_take_boxed (value,
3744 gst_rtp_jitter_buffer_create_stats (jitterbuffer));
3747 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3752 static GstStructure *
3753 gst_rtp_jitter_buffer_create_stats (GstRtpJitterBuffer * jbuf)
3757 JBUF_LOCK (jbuf->priv);
3758 s = gst_structure_new ("application/x-rtp-jitterbuffer-stats",
3759 "rtx-count", G_TYPE_UINT64, jbuf->priv->num_rtx_requests,
3760 "rtx-success-count", G_TYPE_UINT64, jbuf->priv->num_rtx_success,
3761 "rtx-per-packet", G_TYPE_DOUBLE, jbuf->priv->avg_rtx_num,
3762 "rtx-rtt", G_TYPE_UINT64, jbuf->priv->avg_rtx_rtt, NULL);
3763 JBUF_UNLOCK (jbuf->priv);