2 * Farsight Voice+Video library
4 * Copyright 2007 Collabora Ltd,
5 * Copyright 2007 Nokia Corporation
6 * @author: Philippe Kalaf <philippe.kalaf@collabora.co.uk>.
7 * Copyright 2007 Wim Taymans <wim.taymans@gmail.com>
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
27 * SECTION:element-rtpjitterbuffer
29 * This element reorders and removes duplicate RTP packets as they are received
30 * from a network source.
32 * The element needs the clock-rate of the RTP payload in order to estimate the
33 * delay. This information is obtained either from the caps on the sink pad or,
34 * when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
35 * To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
37 * The rtpjitterbuffer will wait for missing packets up to a configurable time
38 * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
39 * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
40 * property is set, lost packets will result in a custom serialized downstream
41 * event of name GstRTPPacketLost. The lost packet events are usually used by a
42 * depayloader or other element to create concealment data or some other logic
43 * to gracefully handle the missing packets.
45 * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
46 * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
49 * The jitterbuffer can also be configured to send early retransmission events
50 * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
51 * this mode, the jitterbuffer tries to estimate when a packet should arrive and
52 * sends a custom upstream event named GstRTPRetransmissionRequest when the
53 * packet is considered late. The initial expected packet arrival time is
54 * calculated as follows:
56 * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
57 * T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
58 * calculated from the DTS (or PTS is no DTS) of two consecutive RTP
59 * packets with different rtptime.
61 * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
62 * seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
63 * previously scheduled timeout is overwritten.
65 * - If seqnum N arrived, all seqnum older than
66 * N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
67 * immediately. This is to request fast feedback for abonormally reorder
68 * packets before any of the previous timeouts is triggered.
70 * A late packet triggers the GstRTPRetransmissionRequest custom upstream
71 * event. After the initial timeout expires and the retransmission event is
72 * sent, the timeout is scheduled for
73 * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
74 * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
75 * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
76 * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
77 * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
78 * retransmission requests are sent and the regular logic is performed to
79 * schedule a lost packet as discussed above.
81 * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
84 * This element will automatically be used inside rtpbin.
87 * <title>Example pipelines</title>
89 * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
90 * ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
91 * inserted into the pipeline to smooth out network jitter and to reorder the
92 * out-of-order RTP packets.
95 * Last reviewed on 2007-05-28 (0.10.5)
104 #include <gst/rtp/gstrtpbuffer.h>
106 #include "gstrtpjitterbuffer.h"
107 #include "rtpjitterbuffer.h"
108 #include "rtpstats.h"
110 #include <gst/glib-compat-private.h>
112 GST_DEBUG_CATEGORY (rtpjitterbuffer_debug);
113 #define GST_CAT_DEFAULT (rtpjitterbuffer_debug)
115 /* RTPJitterBuffer signals and args */
118 SIGNAL_REQUEST_PT_MAP,
126 #define DEFAULT_LATENCY_MS 200
127 #define DEFAULT_DROP_ON_LATENCY FALSE
128 #define DEFAULT_TS_OFFSET 0
129 #define DEFAULT_DO_LOST FALSE
130 #define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
131 #define DEFAULT_PERCENT 0
132 #define DEFAULT_DO_RETRANSMISSION FALSE
133 #define DEFAULT_RTX_DELAY 20
134 #define DEFAULT_RTX_DELAY_REORDER 3
135 #define DEFAULT_RTX_RETRY_TIMEOUT 40
136 #define DEFAULT_RTX_RETRY_PERIOD 160
142 PROP_DROP_ON_LATENCY,
147 PROP_DO_RETRANSMISSION,
149 PROP_RTX_DELAY_REORDER,
150 PROP_RTX_RETRY_TIMEOUT,
151 PROP_RTX_RETRY_PERIOD,
155 #define JBUF_LOCK(priv) (g_mutex_lock (&(priv)->jbuf_lock))
157 #define JBUF_LOCK_CHECK(priv,label) G_STMT_START { \
159 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
162 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
164 #define JBUF_WAIT_TIMER(priv) G_STMT_START { \
165 GST_DEBUG ("waiting timer"); \
166 (priv)->waiting_timer = TRUE; \
167 g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
168 (priv)->waiting_timer = FALSE; \
169 GST_DEBUG ("waiting timer done"); \
171 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
172 if (G_UNLIKELY ((priv)->waiting_timer)) { \
173 GST_DEBUG ("signal timer"); \
174 g_cond_signal (&(priv)->jbuf_timer); \
178 #define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
179 GST_DEBUG ("waiting event"); \
180 (priv)->waiting_event = TRUE; \
181 g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
182 (priv)->waiting_event = FALSE; \
183 GST_DEBUG ("waiting event done"); \
184 if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
187 #define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
188 if (G_UNLIKELY ((priv)->waiting_event)) { \
189 GST_DEBUG ("signal event"); \
190 g_cond_signal (&(priv)->jbuf_event); \
194 struct _GstRtpJitterBufferPrivate
196 GstPad *sinkpad, *srcpad;
199 RTPJitterBuffer *jbuf;
201 gboolean waiting_timer;
203 gboolean waiting_event;
210 gboolean timer_running;
211 GThread *timer_thread;
216 gboolean drop_on_latency;
219 gboolean do_retransmission;
221 gint rtx_delay_reorder;
222 gint rtx_retry_timeout;
223 gint rtx_retry_period;
225 /* the last seqnum we pushed out */
226 guint32 last_popped_seqnum;
227 /* the next expected seqnum we push */
229 /* last output time */
230 GstClockTime last_out_time;
231 /* last valid input timestamp and rtptime pair */
232 GstClockTime ips_dts;
234 GstClockTime packet_spacing;
236 /* the next expected seqnum we receive */
237 GstClockTime last_in_dts;
238 guint32 last_in_seqnum;
239 guint32 next_in_seqnum;
243 /* start and stop ranges */
244 GstClockTime npt_start;
245 GstClockTime npt_stop;
246 guint64 ext_timestamp;
247 guint64 last_elapsed;
248 guint64 estimated_eos;
254 /* clock rate and rtp timestamp offset */
258 gint64 prev_ts_offset;
260 /* when we are shutting down */
261 GstFlowReturn srcresult;
267 GstClockTime timer_timeout;
268 guint16 timer_seqnum;
269 /* the latency of the upstream peer, we have to take this into account when
270 * synchronizing the buffers. */
271 GstClockTime peer_latency;
275 /* some accounting */
277 guint64 num_duplicates;
278 guint64 num_rtx_requests;
279 guint64 num_rtx_success;
280 guint64 num_rtx_failed;
299 GstClockTime timeout;
300 GstClockTime duration;
301 GstClockTime rtx_base;
302 GstClockTime rtx_delay;
303 GstClockTime rtx_retry;
304 GstClockTime rtx_last;
308 #define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
309 (G_TYPE_INSTANCE_GET_PRIVATE ((o), GST_TYPE_RTP_JITTER_BUFFER, \
310 GstRtpJitterBufferPrivate))
312 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_template =
313 GST_STATIC_PAD_TEMPLATE ("sink",
316 GST_STATIC_CAPS ("application/x-rtp, "
317 "clock-rate = (int) [ 1, 2147483647 ]"
318 /* "payload = (int) , "
319 * "encoding-name = (string) "
323 static GstStaticPadTemplate gst_rtp_jitter_buffer_sink_rtcp_template =
324 GST_STATIC_PAD_TEMPLATE ("sink_rtcp",
327 GST_STATIC_CAPS ("application/x-rtcp")
330 static GstStaticPadTemplate gst_rtp_jitter_buffer_src_template =
331 GST_STATIC_PAD_TEMPLATE ("src",
334 GST_STATIC_CAPS ("application/x-rtp"
335 /* "payload = (int) , "
336 * "clock-rate = (int) , "
337 * "encoding-name = (string) "
341 static guint gst_rtp_jitter_buffer_signals[LAST_SIGNAL] = { 0 };
343 #define gst_rtp_jitter_buffer_parent_class parent_class
344 G_DEFINE_TYPE (GstRtpJitterBuffer, gst_rtp_jitter_buffer, GST_TYPE_ELEMENT);
346 /* object overrides */
347 static void gst_rtp_jitter_buffer_set_property (GObject * object,
348 guint prop_id, const GValue * value, GParamSpec * pspec);
349 static void gst_rtp_jitter_buffer_get_property (GObject * object,
350 guint prop_id, GValue * value, GParamSpec * pspec);
351 static void gst_rtp_jitter_buffer_finalize (GObject * object);
353 /* element overrides */
354 static GstStateChangeReturn gst_rtp_jitter_buffer_change_state (GstElement
355 * element, GstStateChange transition);
356 static GstPad *gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
357 GstPadTemplate * templ, const gchar * name, const GstCaps * filter);
358 static void gst_rtp_jitter_buffer_release_pad (GstElement * element,
360 static GstClock *gst_rtp_jitter_buffer_provide_clock (GstElement * element);
363 static GstCaps *gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter);
364 static GstIterator *gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad,
367 /* sinkpad overrides */
368 static gboolean gst_rtp_jitter_buffer_sink_event (GstPad * pad,
369 GstObject * parent, GstEvent * event);
370 static GstFlowReturn gst_rtp_jitter_buffer_chain (GstPad * pad,
371 GstObject * parent, GstBuffer * buffer);
373 static gboolean gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad,
374 GstObject * parent, GstEvent * event);
375 static GstFlowReturn gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad,
376 GstObject * parent, GstBuffer * buffer);
378 static gboolean gst_rtp_jitter_buffer_sink_query (GstPad * pad,
379 GstObject * parent, GstQuery * query);
381 /* srcpad overrides */
382 static gboolean gst_rtp_jitter_buffer_src_event (GstPad * pad,
383 GstObject * parent, GstEvent * event);
384 static gboolean gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad,
385 GstObject * parent, GstPadMode mode, gboolean active);
386 static void gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer);
387 static gboolean gst_rtp_jitter_buffer_src_query (GstPad * pad,
388 GstObject * parent, GstQuery * query);
391 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
393 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
394 gboolean active, guint64 base_time);
395 static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
397 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
398 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
400 static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
403 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
405 GObjectClass *gobject_class;
406 GstElementClass *gstelement_class;
408 gobject_class = (GObjectClass *) klass;
409 gstelement_class = (GstElementClass *) klass;
411 g_type_class_add_private (klass, sizeof (GstRtpJitterBufferPrivate));
413 gobject_class->finalize = gst_rtp_jitter_buffer_finalize;
415 gobject_class->set_property = gst_rtp_jitter_buffer_set_property;
416 gobject_class->get_property = gst_rtp_jitter_buffer_get_property;
419 * GstRtpJitterBuffer::latency:
421 * The maximum latency of the jitterbuffer. Packets will be kept in the buffer
422 * for at most this time.
424 g_object_class_install_property (gobject_class, PROP_LATENCY,
425 g_param_spec_uint ("latency", "Buffer latency in ms",
426 "Amount of ms to buffer", 0, G_MAXUINT, DEFAULT_LATENCY_MS,
427 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
429 * GstRtpJitterBuffer::drop-on-latency:
431 * Drop oldest buffers when the queue is completely filled.
433 g_object_class_install_property (gobject_class, PROP_DROP_ON_LATENCY,
434 g_param_spec_boolean ("drop-on-latency",
435 "Drop buffers when maximum latency is reached",
436 "Tells the jitterbuffer to never exceed the given latency in size",
437 DEFAULT_DROP_ON_LATENCY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
439 * GstRtpJitterBuffer::ts-offset:
441 * Adjust GStreamer output buffer timestamps in the jitterbuffer with offset.
442 * This is mainly used to ensure interstream synchronisation.
444 g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
445 g_param_spec_int64 ("ts-offset", "Timestamp Offset",
446 "Adjust buffer timestamps with offset in nanoseconds", G_MININT64,
447 G_MAXINT64, DEFAULT_TS_OFFSET,
448 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
451 * GstRtpJitterBuffer::do-lost:
453 * Send out a GstRTPPacketLost event downstream when a packet is considered
456 g_object_class_install_property (gobject_class, PROP_DO_LOST,
457 g_param_spec_boolean ("do-lost", "Do Lost",
458 "Send an event downstream when a packet is lost", DEFAULT_DO_LOST,
459 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
462 * GstRtpJitterBuffer::mode:
464 * Control the buffering and timestamping mode used by the jitterbuffer.
466 g_object_class_install_property (gobject_class, PROP_MODE,
467 g_param_spec_enum ("mode", "Mode",
468 "Control the buffering algorithm in use", RTP_TYPE_JITTER_BUFFER_MODE,
469 DEFAULT_MODE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
471 * GstRtpJitterBuffer::percent:
473 * The percent of the jitterbuffer that is filled.
477 g_object_class_install_property (gobject_class, PROP_PERCENT,
478 g_param_spec_int ("percent", "percent",
479 "The buffer filled percent", 0, 100,
480 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
482 * GstRtpJitterBuffer::do-retransmission:
484 * Send out a GstRTPRetransmission event upstream when a packet is considered
485 * late and should be retransmitted.
489 g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
490 g_param_spec_boolean ("do-retransmission", "Do Retransmission",
491 "Send retransmission events upstream when a packet is late",
492 DEFAULT_DO_RETRANSMISSION,
493 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
496 * GstRtpJitterBuffer::rtx-delay:
498 * When a packet did not arrive at the expected time, wait this extra amount
499 * of time before sending a retransmission event.
501 * When -1 is used, the max jitter will be used as extra delay.
505 g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
506 g_param_spec_int ("rtx-delay", "RTX Delay",
507 "Extra time in ms to wait before sending retransmission "
508 "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
509 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
511 * GstRtpJitterBuffer::rtx-delay-reorder:
513 * Assume that a retransmission event should be sent when we see
514 * this much packet reordering.
516 * When -1 is used, the value will be estimated based on observed packet
521 g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
522 g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
523 "Sending retransmission event when this much reordering (-1 automatic)",
524 -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
525 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
527 * GstRtpJitterBuffer::rtx-retry-timeout:
529 * When no packet has been received after sending a retransmission event
530 * for this time, retry sending a retransmission event.
532 * When -1 is used, the value will be estimated based on observed round
537 g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
538 g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
539 "Retry sending a transmission event after this timeout in "
540 "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
541 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
543 * GstRtpJitterBuffer::rtx-retry-period:
545 * The amount of time to try to get a retransmission.
547 * When -1 is used, the value will be estimated based on the jitterbuffer
548 * latency and the observed round trip time.
552 g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
553 g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
554 "Try to get a retransmission for this many ms "
555 "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
556 G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
559 * GstRtpJitterBuffer::request-pt-map:
560 * @buffer: the object which received the signal
563 * Request the payload type as #GstCaps for @pt.
565 gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
566 g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
567 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
568 request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
569 GST_TYPE_CAPS, 1, G_TYPE_UINT);
571 * GstRtpJitterBuffer::handle-sync:
572 * @buffer: the object which received the signal
573 * @struct: a GstStructure containing sync values.
575 * Be notified of new sync values.
577 gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC] =
578 g_signal_new ("handle-sync", G_TYPE_FROM_CLASS (klass),
579 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
580 handle_sync), NULL, NULL, g_cclosure_marshal_VOID__BOXED,
581 G_TYPE_NONE, 1, GST_TYPE_STRUCTURE | G_SIGNAL_TYPE_STATIC_SCOPE);
584 * GstRtpJitterBuffer::on-npt-stop
585 * @buffer: the object which received the signal
587 * Signal that the jitterbufer has pushed the RTP packet that corresponds to
588 * the npt-stop position.
590 gst_rtp_jitter_buffer_signals[SIGNAL_ON_NPT_STOP] =
591 g_signal_new ("on-npt-stop", G_TYPE_FROM_CLASS (klass),
592 G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
593 on_npt_stop), NULL, NULL, g_cclosure_marshal_VOID__VOID,
594 G_TYPE_NONE, 0, G_TYPE_NONE);
597 * GstRtpJitterBuffer::clear-pt-map:
598 * @buffer: the object which received the signal
600 * Invalidate the clock-rate as obtained with the
601 * #GstRtpJitterBuffer::request-pt-map signal.
603 gst_rtp_jitter_buffer_signals[SIGNAL_CLEAR_PT_MAP] =
604 g_signal_new ("clear-pt-map", G_TYPE_FROM_CLASS (klass),
605 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
606 G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
607 g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
610 * GstRtpJitterBuffer::set-active:
611 * @buffer: the object which received the signal
613 * Start pushing out packets with the given base time. This signal is only
614 * useful in buffering mode.
616 * Returns: the time of the last pushed packet.
620 gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
621 g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
622 G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
623 G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
624 g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
627 gstelement_class->change_state =
628 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
629 gstelement_class->request_new_pad =
630 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_request_new_pad);
631 gstelement_class->release_pad =
632 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
633 gstelement_class->provide_clock =
634 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_provide_clock);
636 gst_element_class_add_pad_template (gstelement_class,
637 gst_static_pad_template_get (&gst_rtp_jitter_buffer_src_template));
638 gst_element_class_add_pad_template (gstelement_class,
639 gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_template));
640 gst_element_class_add_pad_template (gstelement_class,
641 gst_static_pad_template_get (&gst_rtp_jitter_buffer_sink_rtcp_template));
643 gst_element_class_set_static_metadata (gstelement_class,
644 "RTP packet jitter-buffer", "Filter/Network/RTP",
645 "A buffer that deals with network jitter and other transmission faults",
646 "Philippe Kalaf <philippe.kalaf@collabora.co.uk>, "
647 "Wim Taymans <wim.taymans@gmail.com>");
649 klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
650 klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
652 GST_DEBUG_CATEGORY_INIT
653 (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
657 gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
659 GstRtpJitterBufferPrivate *priv;
661 priv = GST_RTP_JITTER_BUFFER_GET_PRIVATE (jitterbuffer);
662 jitterbuffer->priv = priv;
664 priv->latency_ms = DEFAULT_LATENCY_MS;
665 priv->latency_ns = priv->latency_ms * GST_MSECOND;
666 priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
667 priv->do_lost = DEFAULT_DO_LOST;
668 priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
669 priv->rtx_delay = DEFAULT_RTX_DELAY;
670 priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
671 priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
672 priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
674 priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
675 priv->jbuf = rtp_jitter_buffer_new ();
676 g_mutex_init (&priv->jbuf_lock);
677 g_cond_init (&priv->jbuf_timer);
678 g_cond_init (&priv->jbuf_event);
680 /* reset skew detection initialy */
681 rtp_jitter_buffer_reset_skew (priv->jbuf);
682 rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
683 rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
687 gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_src_template,
690 gst_pad_set_activatemode_function (priv->srcpad,
691 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_activate_mode));
692 gst_pad_set_query_function (priv->srcpad,
693 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_query));
694 gst_pad_set_event_function (priv->srcpad,
695 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_src_event));
698 gst_pad_new_from_static_template (&gst_rtp_jitter_buffer_sink_template,
701 gst_pad_set_chain_function (priv->sinkpad,
702 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_chain));
703 gst_pad_set_event_function (priv->sinkpad,
704 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_event));
705 gst_pad_set_query_function (priv->sinkpad,
706 GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_sink_query));
708 gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->srcpad);
709 gst_element_add_pad (GST_ELEMENT (jitterbuffer), priv->sinkpad);
711 GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
714 #define ITEM_TYPE_BUFFER 0
715 #define ITEM_TYPE_LOST 1
717 static RTPJitterBufferItem *
718 alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
719 guint seqnum, guint count, guint rtptime)
721 RTPJitterBufferItem *item;
723 item = g_slice_new (RTPJitterBufferItem);
730 item->seqnum = seqnum;
732 item->rtptime = rtptime;
738 free_item (RTPJitterBufferItem * item)
741 gst_mini_object_unref (item->data);
742 g_slice_free (RTPJitterBufferItem, item);
746 gst_rtp_jitter_buffer_finalize (GObject * object)
748 GstRtpJitterBuffer *jitterbuffer;
749 GstRtpJitterBufferPrivate *priv;
751 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
752 priv = jitterbuffer->priv;
754 g_array_free (priv->timers, TRUE);
755 g_mutex_clear (&priv->jbuf_lock);
756 g_cond_clear (&priv->jbuf_timer);
757 g_cond_clear (&priv->jbuf_event);
759 rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
760 g_object_unref (priv->jbuf);
762 G_OBJECT_CLASS (parent_class)->finalize (object);
766 gst_rtp_jitter_buffer_iterate_internal_links (GstPad * pad, GstObject * parent)
768 GstRtpJitterBuffer *jitterbuffer;
769 GstPad *otherpad = NULL;
773 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
775 if (pad == jitterbuffer->priv->sinkpad) {
776 otherpad = jitterbuffer->priv->srcpad;
777 } else if (pad == jitterbuffer->priv->srcpad) {
778 otherpad = jitterbuffer->priv->sinkpad;
779 } else if (pad == jitterbuffer->priv->rtcpsinkpad) {
783 g_value_init (&val, GST_TYPE_PAD);
784 g_value_set_object (&val, otherpad);
785 it = gst_iterator_new_single (GST_TYPE_PAD, &val);
786 g_value_unset (&val);
792 create_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
794 GstRtpJitterBufferPrivate *priv;
796 priv = jitterbuffer->priv;
798 GST_DEBUG_OBJECT (jitterbuffer, "creating RTCP sink pad");
801 gst_pad_new_from_static_template
802 (&gst_rtp_jitter_buffer_sink_rtcp_template, "sink_rtcp");
803 gst_pad_set_chain_function (priv->rtcpsinkpad,
804 gst_rtp_jitter_buffer_chain_rtcp);
805 gst_pad_set_event_function (priv->rtcpsinkpad,
806 (GstPadEventFunction) gst_rtp_jitter_buffer_sink_rtcp_event);
807 gst_pad_set_iterate_internal_links_function (priv->rtcpsinkpad,
808 gst_rtp_jitter_buffer_iterate_internal_links);
809 gst_pad_set_active (priv->rtcpsinkpad, TRUE);
810 gst_element_add_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
812 return priv->rtcpsinkpad;
816 remove_rtcp_sink (GstRtpJitterBuffer * jitterbuffer)
818 GstRtpJitterBufferPrivate *priv;
820 priv = jitterbuffer->priv;
822 GST_DEBUG_OBJECT (jitterbuffer, "removing RTCP sink pad");
824 gst_pad_set_active (priv->rtcpsinkpad, FALSE);
826 gst_element_remove_pad (GST_ELEMENT_CAST (jitterbuffer), priv->rtcpsinkpad);
827 priv->rtcpsinkpad = NULL;
831 gst_rtp_jitter_buffer_request_new_pad (GstElement * element,
832 GstPadTemplate * templ, const gchar * name, const GstCaps * filter)
834 GstRtpJitterBuffer *jitterbuffer;
835 GstElementClass *klass;
837 GstRtpJitterBufferPrivate *priv;
839 g_return_val_if_fail (templ != NULL, NULL);
840 g_return_val_if_fail (GST_IS_RTP_JITTER_BUFFER (element), NULL);
842 jitterbuffer = GST_RTP_JITTER_BUFFER (element);
843 priv = jitterbuffer->priv;
844 klass = GST_ELEMENT_GET_CLASS (element);
846 GST_DEBUG_OBJECT (element, "requesting pad %s", GST_STR_NULL (name));
848 /* figure out the template */
849 if (templ == gst_element_class_get_pad_template (klass, "sink_rtcp")) {
850 if (priv->rtcpsinkpad != NULL)
853 result = create_rtcp_sink (jitterbuffer);
862 g_warning ("rtpjitterbuffer: this is not our template");
867 g_warning ("rtpjitterbuffer: pad already requested");
873 gst_rtp_jitter_buffer_release_pad (GstElement * element, GstPad * pad)
875 GstRtpJitterBuffer *jitterbuffer;
876 GstRtpJitterBufferPrivate *priv;
878 g_return_if_fail (GST_IS_RTP_JITTER_BUFFER (element));
879 g_return_if_fail (GST_IS_PAD (pad));
881 jitterbuffer = GST_RTP_JITTER_BUFFER (element);
882 priv = jitterbuffer->priv;
884 GST_DEBUG_OBJECT (element, "releasing pad %s:%s", GST_DEBUG_PAD_NAME (pad));
886 if (priv->rtcpsinkpad == pad) {
887 remove_rtcp_sink (jitterbuffer);
896 g_warning ("gstjitterbuffer: asked to release an unknown pad");
902 gst_rtp_jitter_buffer_provide_clock (GstElement * element)
904 return gst_system_clock_obtain ();
908 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
910 GstRtpJitterBufferPrivate *priv;
912 priv = jitterbuffer->priv;
914 /* this will trigger a new pt-map request signal, FIXME, do something better. */
917 priv->clock_rate = -1;
918 /* do not clear current content, but refresh state for new arrival */
919 GST_DEBUG_OBJECT (jitterbuffer, "reset jitterbuffer");
920 rtp_jitter_buffer_reset_skew (priv->jbuf);
921 priv->last_popped_seqnum = -1;
922 priv->next_seqnum = -1;
927 gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
930 GstRtpJitterBufferPrivate *priv;
931 GstClockTime last_out;
932 RTPJitterBufferItem *item;
937 GST_DEBUG_OBJECT (jbuf, "setting active %d with offset %" GST_TIME_FORMAT,
938 active, GST_TIME_ARGS (offset));
940 if (active != priv->active) {
941 /* add the amount of time spent in paused to the output offset. All
942 * outgoing buffers will have this offset applied to their timestamps in
943 * order to make them arrive in time in the sink. */
944 priv->out_offset = offset;
945 GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
946 GST_TIME_ARGS (priv->out_offset));
947 priv->active = active;
948 JBUF_SIGNAL_EVENT (priv);
951 rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
953 if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
954 /* head buffer timestamp and offset gives our output time */
955 last_out = item->dts + priv->ts_offset;
957 /* use last known time when the buffer is empty */
958 last_out = priv->last_out_time;
966 gst_rtp_jitter_buffer_getcaps (GstPad * pad, GstCaps * filter)
968 GstRtpJitterBuffer *jitterbuffer;
969 GstRtpJitterBufferPrivate *priv;
974 jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
975 priv = jitterbuffer->priv;
977 other = (pad == priv->srcpad ? priv->sinkpad : priv->srcpad);
979 caps = gst_pad_peer_query_caps (other, filter);
981 templ = gst_pad_get_pad_template_caps (pad);
983 GST_DEBUG_OBJECT (jitterbuffer, "use template");
988 GST_DEBUG_OBJECT (jitterbuffer, "intersect with template");
990 intersect = gst_caps_intersect (caps, templ);
991 gst_caps_unref (caps);
992 gst_caps_unref (templ);
996 gst_object_unref (jitterbuffer);
1002 * Must be called with JBUF_LOCK held
1006 gst_jitter_buffer_sink_parse_caps (GstRtpJitterBuffer * jitterbuffer,
1009 GstRtpJitterBufferPrivate *priv;
1010 GstStructure *caps_struct;
1014 priv = jitterbuffer->priv;
1016 /* first parse the caps */
1017 caps_struct = gst_caps_get_structure (caps, 0);
1019 GST_DEBUG_OBJECT (jitterbuffer, "got caps");
1021 /* we need a clock-rate to convert the rtp timestamps to GStreamer time and to
1022 * measure the amount of data in the buffer */
1023 if (!gst_structure_get_int (caps_struct, "clock-rate", &priv->clock_rate))
1026 if (priv->clock_rate <= 0)
1029 GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
1031 rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
1033 /* The clock base is the RTP timestamp corrsponding to the npt-start value. We
1034 * can use this to track the amount of time elapsed on the sender. */
1035 if (gst_structure_get_uint (caps_struct, "clock-base", &val))
1036 priv->clock_base = val;
1038 priv->clock_base = -1;
1040 priv->ext_timestamp = priv->clock_base;
1042 GST_DEBUG_OBJECT (jitterbuffer, "got clock-base %" G_GINT64_FORMAT,
1045 if (gst_structure_get_uint (caps_struct, "seqnum-base", &val)) {
1046 /* first expected seqnum, only update when we didn't have a previous base. */
1047 if (priv->next_in_seqnum == -1)
1048 priv->next_in_seqnum = val;
1049 if (priv->next_seqnum == -1)
1050 priv->next_seqnum = val;
1053 GST_DEBUG_OBJECT (jitterbuffer, "got seqnum-base %d", priv->next_in_seqnum);
1055 /* the start and stop times. The seqnum-base corresponds to the start time. We
1056 * will keep track of the seqnums on the output and when we reach the one
1057 * corresponding to npt-stop, we emit the npt-stop-reached signal */
1058 if (gst_structure_get_clock_time (caps_struct, "npt-start", &tval))
1059 priv->npt_start = tval;
1061 priv->npt_start = 0;
1063 if (gst_structure_get_clock_time (caps_struct, "npt-stop", &tval))
1064 priv->npt_stop = tval;
1066 priv->npt_stop = -1;
1068 GST_DEBUG_OBJECT (jitterbuffer,
1069 "npt start/stop: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1070 GST_TIME_ARGS (priv->npt_start), GST_TIME_ARGS (priv->npt_stop));
1077 GST_DEBUG_OBJECT (jitterbuffer, "No clock-rate in caps!");
1082 GST_DEBUG_OBJECT (jitterbuffer, "Invalid clock-rate %d", priv->clock_rate);
1088 gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
1090 GstRtpJitterBufferPrivate *priv;
1092 priv = jitterbuffer->priv;
1095 /* mark ourselves as flushing */
1096 priv->srcresult = GST_FLOW_FLUSHING;
1097 GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
1098 /* this unblocks any waiting pops on the src pad task */
1099 JBUF_SIGNAL_EVENT (priv);
1104 gst_rtp_jitter_buffer_flush_stop (GstRtpJitterBuffer * jitterbuffer)
1106 GstRtpJitterBufferPrivate *priv;
1108 priv = jitterbuffer->priv;
1111 GST_DEBUG_OBJECT (jitterbuffer, "Enabling pop on queue");
1112 /* Mark as non flushing */
1113 priv->srcresult = GST_FLOW_OK;
1114 gst_segment_init (&priv->segment, GST_FORMAT_TIME);
1115 priv->last_popped_seqnum = -1;
1116 priv->last_out_time = -1;
1117 priv->next_seqnum = -1;
1118 priv->ips_rtptime = -1;
1119 priv->ips_dts = GST_CLOCK_TIME_NONE;
1120 priv->packet_spacing = 0;
1121 priv->next_in_seqnum = -1;
1122 priv->clock_rate = -1;
1124 priv->estimated_eos = -1;
1125 priv->last_elapsed = 0;
1126 priv->ext_timestamp = -1;
1127 GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
1128 rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
1129 rtp_jitter_buffer_reset_skew (priv->jbuf);
1130 remove_all_timers (jitterbuffer);
1135 gst_rtp_jitter_buffer_src_activate_mode (GstPad * pad, GstObject * parent,
1136 GstPadMode mode, gboolean active)
1139 GstRtpJitterBuffer *jitterbuffer = NULL;
1141 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1144 case GST_PAD_MODE_PUSH:
1146 /* allow data processing */
1147 gst_rtp_jitter_buffer_flush_stop (jitterbuffer);
1149 /* start pushing out buffers */
1150 GST_DEBUG_OBJECT (jitterbuffer, "Starting task on srcpad");
1151 result = gst_pad_start_task (jitterbuffer->priv->srcpad,
1152 (GstTaskFunction) gst_rtp_jitter_buffer_loop, jitterbuffer, NULL);
1154 /* make sure all data processing stops ASAP */
1155 gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1157 /* NOTE this will hardlock if the state change is called from the src pad
1158 * task thread because we will _join() the thread. */
1159 GST_DEBUG_OBJECT (jitterbuffer, "Stopping task on srcpad");
1160 result = gst_pad_stop_task (pad);
1170 static GstStateChangeReturn
1171 gst_rtp_jitter_buffer_change_state (GstElement * element,
1172 GstStateChange transition)
1174 GstRtpJitterBuffer *jitterbuffer;
1175 GstRtpJitterBufferPrivate *priv;
1176 GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
1178 jitterbuffer = GST_RTP_JITTER_BUFFER (element);
1179 priv = jitterbuffer->priv;
1181 switch (transition) {
1182 case GST_STATE_CHANGE_NULL_TO_READY:
1184 case GST_STATE_CHANGE_READY_TO_PAUSED:
1186 /* reset negotiated values */
1187 priv->clock_rate = -1;
1188 priv->clock_base = -1;
1189 priv->peer_latency = 0;
1191 /* block until we go to PLAYING */
1192 priv->blocked = TRUE;
1193 priv->timer_running = TRUE;
1194 priv->timer_thread =
1195 g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
1198 case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1200 /* unblock to allow streaming in PLAYING */
1201 priv->blocked = FALSE;
1202 JBUF_SIGNAL_EVENT (priv);
1203 JBUF_SIGNAL_TIMER (priv);
1210 ret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
1212 switch (transition) {
1213 case GST_STATE_CHANGE_READY_TO_PAUSED:
1214 /* we are a live element because we sync to the clock, which we can only
1215 * do in the PLAYING state */
1216 if (ret != GST_STATE_CHANGE_FAILURE)
1217 ret = GST_STATE_CHANGE_NO_PREROLL;
1219 case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1221 /* block to stop streaming when PAUSED */
1222 priv->blocked = TRUE;
1223 unschedule_current_timer (jitterbuffer);
1225 if (ret != GST_STATE_CHANGE_FAILURE)
1226 ret = GST_STATE_CHANGE_NO_PREROLL;
1228 case GST_STATE_CHANGE_PAUSED_TO_READY:
1230 gst_buffer_replace (&priv->last_sr, NULL);
1231 priv->timer_running = FALSE;
1232 unschedule_current_timer (jitterbuffer);
1233 JBUF_SIGNAL_TIMER (priv);
1235 g_thread_join (priv->timer_thread);
1236 priv->timer_thread = NULL;
1238 case GST_STATE_CHANGE_READY_TO_NULL:
1248 gst_rtp_jitter_buffer_src_event (GstPad * pad, GstObject * parent,
1251 gboolean ret = TRUE;
1252 GstRtpJitterBuffer *jitterbuffer;
1253 GstRtpJitterBufferPrivate *priv;
1255 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1256 priv = jitterbuffer->priv;
1258 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1260 switch (GST_EVENT_TYPE (event)) {
1261 case GST_EVENT_LATENCY:
1263 GstClockTime latency;
1265 gst_event_parse_latency (event, &latency);
1267 GST_DEBUG_OBJECT (jitterbuffer,
1268 "configuring latency of %" GST_TIME_FORMAT, GST_TIME_ARGS (latency));
1271 /* adjust the overall buffer delay to the total pipeline latency in
1272 * buffering mode because if downstream consumes too fast (because of
1273 * large latency or queues, we would start rebuffering again. */
1274 if (rtp_jitter_buffer_get_mode (priv->jbuf) ==
1275 RTP_JITTER_BUFFER_MODE_BUFFER) {
1276 rtp_jitter_buffer_set_delay (priv->jbuf, latency);
1280 ret = gst_pad_push_event (priv->sinkpad, event);
1284 ret = gst_pad_push_event (priv->sinkpad, event);
1292 gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
1295 gboolean ret = TRUE;
1296 GstRtpJitterBuffer *jitterbuffer;
1297 GstRtpJitterBufferPrivate *priv;
1299 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1300 priv = jitterbuffer->priv;
1302 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1304 switch (GST_EVENT_TYPE (event)) {
1305 case GST_EVENT_CAPS:
1309 gst_event_parse_caps (event, &caps);
1312 ret = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1315 /* set same caps on srcpad on success */
1317 ret = gst_pad_push_event (priv->srcpad, event);
1319 gst_event_unref (event);
1322 case GST_EVENT_SEGMENT:
1324 gst_event_copy_segment (event, &priv->segment);
1326 /* we need time for now */
1327 if (priv->segment.format != GST_FORMAT_TIME)
1328 goto newseg_wrong_format;
1330 GST_DEBUG_OBJECT (jitterbuffer,
1331 "newsegment: %" GST_SEGMENT_FORMAT, &priv->segment);
1333 /* FIXME, push SEGMENT in the queue. Sorting order might be difficult. */
1334 ret = gst_pad_push_event (priv->srcpad, event);
1337 case GST_EVENT_FLUSH_START:
1338 ret = gst_pad_push_event (priv->srcpad, event);
1339 gst_rtp_jitter_buffer_flush_start (jitterbuffer);
1340 /* wait for the loop to go into PAUSED */
1341 gst_pad_pause_task (priv->srcpad);
1343 case GST_EVENT_FLUSH_STOP:
1344 ret = gst_pad_push_event (priv->srcpad, event);
1346 gst_rtp_jitter_buffer_src_activate_mode (priv->srcpad, parent,
1347 GST_PAD_MODE_PUSH, TRUE);
1351 /* push EOS in queue. We always push it at the head */
1353 /* check for flushing, we need to discard the event and return FALSE when
1354 * we are flushing */
1355 ret = priv->srcresult == GST_FLOW_OK;
1356 if (ret && !priv->eos) {
1357 GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
1359 JBUF_SIGNAL_EVENT (priv);
1360 } else if (priv->eos) {
1361 GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
1363 GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, reason %s",
1364 gst_flow_get_name (priv->srcresult));
1367 gst_event_unref (event);
1371 ret = gst_pad_push_event (priv->srcpad, event);
1380 newseg_wrong_format:
1382 GST_DEBUG_OBJECT (jitterbuffer, "received non TIME newsegment");
1384 gst_event_unref (event);
1390 gst_rtp_jitter_buffer_sink_rtcp_event (GstPad * pad, GstObject * parent,
1393 gboolean ret = TRUE;
1394 GstRtpJitterBuffer *jitterbuffer;
1396 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1398 GST_DEBUG_OBJECT (jitterbuffer, "received %s", GST_EVENT_TYPE_NAME (event));
1400 switch (GST_EVENT_TYPE (event)) {
1401 case GST_EVENT_FLUSH_START:
1402 gst_event_unref (event);
1404 case GST_EVENT_FLUSH_STOP:
1405 gst_event_unref (event);
1408 ret = gst_pad_event_default (pad, parent, event);
1416 * Must be called with JBUF_LOCK held, will release the LOCK when emiting the
1417 * signal. The function returns GST_FLOW_ERROR when a parsing error happened and
1418 * GST_FLOW_FLUSHING when the element is shutting down. On success
1419 * GST_FLOW_OK is returned.
1421 static GstFlowReturn
1422 gst_rtp_jitter_buffer_get_clock_rate (GstRtpJitterBuffer * jitterbuffer,
1426 GValue args[2] = { {0}, {0} };
1430 g_value_init (&args[0], GST_TYPE_ELEMENT);
1431 g_value_set_object (&args[0], jitterbuffer);
1432 g_value_init (&args[1], G_TYPE_UINT);
1433 g_value_set_uint (&args[1], pt);
1435 g_value_init (&ret, GST_TYPE_CAPS);
1436 g_value_set_boxed (&ret, NULL);
1438 JBUF_UNLOCK (jitterbuffer->priv);
1439 g_signal_emitv (args, gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP], 0,
1441 JBUF_LOCK_CHECK (jitterbuffer->priv, out_flushing);
1443 g_value_unset (&args[0]);
1444 g_value_unset (&args[1]);
1445 caps = (GstCaps *) g_value_dup_boxed (&ret);
1446 g_value_unset (&ret);
1450 res = gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1451 gst_caps_unref (caps);
1453 if (G_UNLIKELY (!res))
1461 GST_DEBUG_OBJECT (jitterbuffer, "could not get caps");
1462 return GST_FLOW_ERROR;
1466 GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
1467 return GST_FLOW_FLUSHING;
1471 GST_DEBUG_OBJECT (jitterbuffer, "parse failed");
1472 return GST_FLOW_ERROR;
1476 /* call with jbuf lock held */
1478 check_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint * percent)
1480 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1482 /* too short a stream, or too close to EOS will never really fill buffer */
1483 if (*percent != -1 && priv->npt_stop != -1 &&
1484 priv->npt_stop - priv->npt_start <=
1485 rtp_jitter_buffer_get_delay (priv->jbuf)) {
1486 GST_DEBUG_OBJECT (jitterbuffer, "short stream; faking full buffer");
1487 rtp_jitter_buffer_set_buffering (priv->jbuf, FALSE);
1493 post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
1495 GstMessage *message;
1497 /* Post a buffering message */
1498 message = gst_message_new_buffering (GST_OBJECT_CAST (jitterbuffer), percent);
1499 gst_message_set_buffering_stats (message, GST_BUFFERING_LIVE, -1, -1, -1);
1501 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
1505 apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
1507 GstRtpJitterBufferPrivate *priv;
1509 priv = jitterbuffer->priv;
1511 if (timestamp == -1)
1514 /* apply the timestamp offset, this is used for inter stream sync */
1515 timestamp += priv->ts_offset;
1516 /* add the offset, this is used when buffering */
1517 timestamp += priv->out_offset;
1523 find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
1525 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1526 TimerData *timer = NULL;
1529 len = priv->timers->len;
1530 for (i = 0; i < len; i++) {
1531 TimerData *test = &g_array_index (priv->timers, TimerData, i);
1532 if (test->seqnum == seqnum && test->type == type) {
1541 unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
1543 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1545 if (priv->clock_id) {
1546 GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
1547 gst_clock_id_unschedule (priv->clock_id);
1548 priv->clock_id = NULL;
1553 get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1555 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1556 GstClockTime test_timeout;
1558 if ((test_timeout = timer->timeout) == -1)
1561 if (timer->type != TIMER_TYPE_EXPECTED) {
1562 /* add our latency and offset to get output times. */
1563 test_timeout = apply_offset (jitterbuffer, test_timeout);
1564 test_timeout += priv->latency_ns;
1566 return test_timeout;
1570 recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1572 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1574 if (priv->clock_id) {
1575 GstClockTime timeout = get_timeout (jitterbuffer, timer);
1577 GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
1578 GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
1580 if (timeout == -1 || timeout < priv->timer_timeout)
1581 unschedule_current_timer (jitterbuffer);
1586 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1587 guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
1588 GstClockTime duration)
1590 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1594 GST_DEBUG_OBJECT (jitterbuffer,
1595 "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
1596 GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
1598 len = priv->timers->len;
1599 g_array_set_size (priv->timers, len + 1);
1600 timer = &g_array_index (priv->timers, TimerData, len);
1603 timer->seqnum = seqnum;
1605 timer->timeout = timeout + delay;
1606 timer->duration = duration;
1607 if (type == TIMER_TYPE_EXPECTED) {
1608 timer->rtx_base = timeout;
1609 timer->rtx_delay = delay;
1610 timer->rtx_retry = 0;
1612 timer->num_rtx_retry = 0;
1613 recalculate_timer (jitterbuffer, timer);
1614 JBUF_SIGNAL_TIMER (priv);
1620 reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
1621 guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
1623 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1624 gboolean seqchange, timechange;
1627 seqchange = timer->seqnum != seqnum;
1628 timechange = timer->timeout != timeout;
1630 if (!seqchange && !timechange)
1633 oldseq = timer->seqnum;
1635 GST_DEBUG_OBJECT (jitterbuffer,
1636 "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
1637 oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
1639 timer->timeout = timeout + delay;
1640 timer->seqnum = seqnum;
1642 timer->rtx_base = timeout;
1643 timer->rtx_delay = delay;
1644 timer->rtx_retry = 0;
1647 if (priv->clock_id) {
1648 /* we changed the seqnum and there is a timer currently waiting with this
1649 * seqnum, unschedule it */
1650 if (seqchange && priv->timer_seqnum == oldseq)
1651 unschedule_current_timer (jitterbuffer);
1652 /* we changed the time, check if it is earlier than what we are waiting
1653 * for and unschedule if so */
1654 else if (timechange)
1655 recalculate_timer (jitterbuffer, timer);
1660 set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
1661 guint16 seqnum, GstClockTime timeout)
1665 /* find the seqnum timer */
1666 timer = find_timer (jitterbuffer, type, seqnum);
1667 if (timer == NULL) {
1668 timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
1670 reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
1676 remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
1678 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1681 if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
1682 unschedule_current_timer (jitterbuffer);
1685 GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
1686 g_array_remove_index_fast (priv->timers, idx);
1691 remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
1693 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1694 GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
1695 g_array_set_size (priv->timers, 0);
1696 unschedule_current_timer (jitterbuffer);
1699 /* we just received a packet with seqnum and dts.
1701 * First check for old seqnum that we are still expecting. If the gap with the
1702 * current seqnum is too big, unschedule the timeouts.
1704 * If we have a valid packet spacing estimate we can set a timer for when we
1705 * should receive the next packet.
1706 * If we don't have a valid estimate, we remove any timer we might have
1707 * had for this packet.
1710 update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
1711 GstClockTime dts, gboolean do_next_seqnum)
1713 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1714 TimerData *timer = NULL;
1717 /* go through all timers and unschedule the ones with a large gap, also find
1718 * the timer for the seqnum */
1719 len = priv->timers->len;
1720 for (i = 0; i < len; i++) {
1721 TimerData *test = &g_array_index (priv->timers, TimerData, i);
1724 gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
1726 GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
1727 test->seqnum, seqnum, gap);
1730 GST_DEBUG ("found timer for current seqnum");
1731 /* the timer for the current seqnum */
1733 } else if (gap > priv->rtx_delay_reorder) {
1734 /* max gap, we exceeded the max reorder distance and we don't expect the
1735 * missing packet to be this reordered */
1736 if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
1737 reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
1741 if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
1742 GstClockTime expected, delay;
1744 /* calculate expected arrival time of the next seqnum */
1745 expected = dts + priv->packet_spacing;
1746 delay = priv->rtx_delay * GST_MSECOND;
1748 /* and update/install timer for next seqnum */
1750 reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
1753 add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
1754 expected, delay, priv->packet_spacing);
1755 } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
1757 if (timer->num_rtx_retry > 0) {
1758 GstClockTime rtx_last;
1760 /* we scheduled a retry for this packet and now we have it */
1761 priv->num_rtx_success++;
1762 /* all the previous retry attempts failed */
1763 priv->num_rtx_failed += timer->num_rtx_retry - 1;
1764 /* number of retries before receiving the packet */
1765 if (priv->avg_rtx_num == 0.0)
1766 priv->avg_rtx_num = timer->num_rtx_retry;
1768 priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
1769 /* calculate the delay between retransmission request and receiving this
1770 * packet, start with when we scheduled this timeout last */
1771 rtx_last = timer->rtx_last;
1772 if (dts > rtx_last) {
1774 /* we have a valid delay if this packet arrived after we scheduled the
1776 delay = dts - rtx_last;
1777 if (priv->avg_rtx_rtt == 0)
1778 priv->avg_rtx_rtt = delay;
1780 priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
1782 GST_LOG_OBJECT (jitterbuffer,
1783 "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
1784 ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
1785 ", avg-num %g, avg-rtt %" G_GUINT64_FORMAT, priv->num_rtx_success,
1786 priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
1787 priv->avg_rtx_num, priv->avg_rtx_rtt);
1789 /* if we had a timer, remove it, we don't know when to expect the next
1791 remove_timer (jitterbuffer, timer);
1792 /* we signal the _loop function because this new packet could be the one
1793 * it was waiting for */
1794 JBUF_SIGNAL_EVENT (priv);
1799 calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
1802 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1804 /* we need consecutive seqnums with a different
1805 * rtptime to estimate the packet spacing. */
1806 if (priv->ips_rtptime != rtptime) {
1807 /* rtptime changed, check dts diff */
1808 if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
1809 priv->packet_spacing = dts - priv->ips_dts;
1810 GST_DEBUG_OBJECT (jitterbuffer,
1811 "new packet spacing %" GST_TIME_FORMAT,
1812 GST_TIME_ARGS (priv->packet_spacing));
1814 priv->ips_rtptime = rtptime;
1815 priv->ips_dts = dts;
1820 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
1821 guint16 seqnum, GstClockTime dts, gint gap)
1823 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
1824 GstClockTime total_duration, duration, expected_dts;
1827 GST_DEBUG_OBJECT (jitterbuffer,
1828 "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
1829 GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
1831 /* the total duration spanned by the missing packets */
1832 if (dts >= priv->last_in_dts)
1833 total_duration = dts - priv->last_in_dts;
1837 /* interpolate between the current time and the last time based on
1838 * number of packets we are missing, this is the estimated duration
1839 * for the missing packet based on equidistant packet spacing. */
1840 duration = total_duration / (gap + 1);
1842 GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
1843 GST_TIME_ARGS (duration));
1845 if (total_duration > priv->latency_ns) {
1846 GstClockTime gap_time;
1849 gap_time = total_duration - priv->latency_ns;
1852 lost_packets = gap_time / duration;
1853 gap_time = lost_packets * duration;
1858 /* too many lost packets, some of the missing packets are already
1859 * too late and we can generate lost packet events for them. */
1860 GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
1861 " > %" GST_TIME_FORMAT ", consider %u lost",
1862 GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
1865 /* this timer will fire immediately and the lost event will be pushed from
1866 * the timer thread */
1867 add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
1868 priv->last_in_dts + duration, 0, gap_time);
1870 expected += lost_packets;
1871 priv->last_in_dts += gap_time;
1874 expected_dts = priv->last_in_dts + duration;
1876 if (priv->do_retransmission) {
1879 type = TIMER_TYPE_EXPECTED;
1880 /* if we had a timer for the first missing packet, update it. */
1881 if ((timer = find_timer (jitterbuffer, type, expected))) {
1882 GstClockTime timeout = timer->timeout;
1884 timer->duration = duration;
1885 if (timeout > expected_dts) {
1886 GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
1887 reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
1891 expected_dts += duration;
1894 type = TIMER_TYPE_LOST;
1897 while (expected < seqnum) {
1898 add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
1899 expected_dts += duration;
1904 static GstFlowReturn
1905 gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
1908 GstRtpJitterBuffer *jitterbuffer;
1909 GstRtpJitterBufferPrivate *priv;
1911 guint32 expected, rtptime;
1912 GstFlowReturn ret = GST_FLOW_OK;
1913 GstClockTime dts, pts;
1918 GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
1919 gboolean do_next_seqnum = FALSE;
1920 RTPJitterBufferItem *item;
1922 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
1924 priv = jitterbuffer->priv;
1926 if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
1927 goto invalid_buffer;
1929 pt = gst_rtp_buffer_get_payload_type (&rtp);
1930 seqnum = gst_rtp_buffer_get_seq (&rtp);
1931 rtptime = gst_rtp_buffer_get_timestamp (&rtp);
1932 gst_rtp_buffer_unmap (&rtp);
1934 /* make sure we have PTS and DTS set */
1935 pts = GST_BUFFER_PTS (buffer);
1936 dts = GST_BUFFER_DTS (buffer);
1942 /* take the DTS of the buffer. This is the time when the packet was
1943 * received and is used to calculate jitter and clock skew. We will adjust
1944 * this DTS with the smoothed value after processing it in the
1945 * jitterbuffer and assign it as the PTS. */
1946 /* bring to running time */
1947 dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
1949 GST_DEBUG_OBJECT (jitterbuffer,
1950 "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
1951 GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
1953 JBUF_LOCK_CHECK (priv, out_flushing);
1955 if (G_UNLIKELY (priv->last_pt != pt)) {
1958 GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
1962 /* reset clock-rate so that we get a new one */
1963 priv->clock_rate = -1;
1965 /* Try to get the clock-rate from the caps first if we can. If there are no
1966 * caps we must fire the signal to get the clock-rate. */
1967 if ((caps = gst_pad_get_current_caps (pad))) {
1968 gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
1969 gst_caps_unref (caps);
1973 if (G_UNLIKELY (priv->clock_rate == -1)) {
1974 /* no clock rate given on the caps, try to get one with the signal */
1975 if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
1976 pt) == GST_FLOW_FLUSHING)
1979 if (G_UNLIKELY (priv->clock_rate == -1))
1983 /* don't accept more data on EOS */
1984 if (G_UNLIKELY (priv->eos))
1987 expected = priv->next_in_seqnum;
1989 /* now check against our expected seqnum */
1990 if (G_LIKELY (expected != -1)) {
1993 /* now calculate gap */
1994 gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
1996 GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
1997 expected, seqnum, gap);
1999 if (G_LIKELY (gap == 0)) {
2000 /* packet is expected */
2001 calculate_packet_spacing (jitterbuffer, rtptime, dts);
2002 do_next_seqnum = TRUE;
2004 gboolean reset = FALSE;
2007 /* we received an old packet */
2008 if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
2009 /* too old packet, reset */
2010 GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
2014 GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
2017 /* new packet, we are missing some packets */
2018 if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
2019 /* packet too far in future, reset */
2020 GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
2024 GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
2025 /* fill in the gap with EXPECTED timers */
2026 calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
2028 do_next_seqnum = TRUE;
2031 if (G_UNLIKELY (reset)) {
2032 GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
2033 rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
2034 rtp_jitter_buffer_reset_skew (priv->jbuf);
2035 remove_all_timers (jitterbuffer);
2036 priv->last_popped_seqnum = -1;
2037 priv->next_seqnum = seqnum;
2038 do_next_seqnum = TRUE;
2040 /* reset spacing estimation when gap */
2041 priv->ips_rtptime = -1;
2042 priv->ips_dts = GST_CLOCK_TIME_NONE;
2045 GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2046 /* we don't know what the next_in_seqnum should be, wait for the last
2047 * possible moment to push this buffer, maybe we get an earlier seqnum
2049 set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
2050 do_next_seqnum = TRUE;
2051 /* take rtptime and dts to calculate packet spacing */
2052 priv->ips_rtptime = rtptime;
2053 priv->ips_dts = dts;
2055 if (do_next_seqnum) {
2056 priv->last_in_seqnum = seqnum;
2057 priv->last_in_dts = dts;
2058 priv->next_in_seqnum = (seqnum + 1) & 0xffff;
2061 /* let's check if this buffer is too late, we can only accept packets with
2062 * bigger seqnum than the one we last pushed. */
2063 if (G_LIKELY (priv->last_popped_seqnum != -1)) {
2066 gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
2068 /* priv->last_popped_seqnum >= seqnum, we're too late. */
2069 if (G_UNLIKELY (gap <= 0))
2073 /* let's drop oldest packet if the queue is already full and drop-on-latency
2074 * is set. We can only do this when there actually is a latency. When no
2075 * latency is set, we just pump it in the queue and let the other end push it
2076 * out as fast as possible. */
2077 if (priv->latency_ms && priv->drop_on_latency) {
2079 gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
2081 if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
2082 RTPJitterBufferItem *old_item;
2084 old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2085 GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
2087 priv->next_seqnum = (old_item->seqnum + 1) & 0xffff;
2088 free_item (old_item);
2092 item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
2094 /* now insert the packet into the queue in sorted order. This function returns
2095 * FALSE if a packet with the same seqnum was already in the queue, meaning we
2096 * have a duplicate. */
2097 if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
2102 update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
2104 /* we had an unhandled SR, handle it now */
2106 do_handle_sync (jitterbuffer);
2108 /* signal addition of new buffer when the _loop is waiting. */
2109 if (priv->active && priv->waiting_timer)
2110 JBUF_SIGNAL_EVENT (priv);
2112 /* let's unschedule and unblock any waiting buffers. We only want to do this
2113 * when the tail buffer changed */
2114 if (G_UNLIKELY (priv->clock_id && tail)) {
2115 GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
2116 unschedule_current_timer (jitterbuffer);
2119 GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
2120 seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
2122 check_buffering_percent (jitterbuffer, &percent);
2128 post_buffering_percent (jitterbuffer, percent);
2135 /* this is not fatal but should be filtered earlier */
2136 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2137 ("Received invalid RTP payload, dropping"));
2138 gst_buffer_unref (buffer);
2143 GST_WARNING_OBJECT (jitterbuffer,
2144 "No clock-rate in caps!, dropping buffer");
2145 gst_buffer_unref (buffer);
2150 ret = priv->srcresult;
2151 GST_DEBUG_OBJECT (jitterbuffer, "flushing %s", gst_flow_get_name (ret));
2152 gst_buffer_unref (buffer);
2158 GST_WARNING_OBJECT (jitterbuffer, "we are EOS, refusing buffer");
2159 gst_buffer_unref (buffer);
2164 GST_WARNING_OBJECT (jitterbuffer, "Packet #%d too late as #%d was already"
2165 " popped, dropping", seqnum, priv->last_popped_seqnum);
2167 gst_buffer_unref (buffer);
2172 GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
2174 priv->num_duplicates++;
2181 compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
2183 guint64 ext_time, elapsed;
2185 GstRtpJitterBufferPrivate *priv;
2187 priv = jitterbuffer->priv;
2188 rtp_time = item->rtptime;
2190 GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
2191 G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
2193 if (rtp_time < priv->ext_timestamp) {
2194 ext_time = priv->ext_timestamp;
2196 ext_time = gst_rtp_buffer_ext_timestamp (&priv->ext_timestamp, rtp_time);
2199 if (ext_time > priv->clock_base)
2200 elapsed = ext_time - priv->clock_base;
2204 elapsed = gst_util_uint64_scale_int (elapsed, GST_SECOND, priv->clock_rate);
2209 update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
2210 RTPJitterBufferItem * item)
2212 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2214 if (priv->npt_stop != -1 && priv->ext_timestamp != -1
2215 && priv->clock_base != -1 && priv->clock_rate > 0) {
2216 guint64 elapsed, estimated;
2218 elapsed = compute_elapsed (jitterbuffer, item);
2220 if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
2222 GstClockTime out_time;
2224 priv->last_elapsed = elapsed;
2226 left = priv->npt_stop - priv->npt_start;
2227 GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
2228 GST_TIME_ARGS (left));
2230 out_time = item->dts;
2233 estimated = gst_util_uint64_scale (out_time, left, elapsed);
2235 /* if there is almost nothing left,
2236 * we may never advance enough to end up in the above case */
2237 if (left < GST_SECOND)
2238 estimated = GST_SECOND;
2243 GST_LOG_OBJECT (jitterbuffer, "elapsed %" GST_TIME_FORMAT ", estimated %"
2244 GST_TIME_FORMAT, GST_TIME_ARGS (elapsed), GST_TIME_ARGS (estimated));
2246 if (estimated != -1 && priv->estimated_eos != estimated) {
2247 set_timer (jitterbuffer, TIMER_TYPE_EOS, -1, estimated);
2248 priv->estimated_eos = estimated;
2254 /* take a buffer from the queue and push it */
2255 static GstFlowReturn
2256 pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
2258 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2259 GstFlowReturn result;
2260 RTPJitterBufferItem *item;
2263 GstClockTime dts, pts;
2265 gboolean is_buffer, do_push = TRUE;
2267 /* when we get here we are ready to pop and push the buffer */
2268 item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
2270 is_buffer = GST_IS_BUFFER (item->data);
2273 check_buffering_percent (jitterbuffer, &percent);
2275 /* we need to make writable to change the flags and timestamps */
2276 outbuf = gst_buffer_make_writable (item->data);
2278 if (G_UNLIKELY (priv->discont)) {
2279 /* set DISCONT flag when we missed a packet. We pushed the buffer writable
2280 * into the jitterbuffer so we can modify now. */
2281 GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
2282 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
2283 priv->discont = FALSE;
2285 if (G_UNLIKELY (priv->ts_discont)) {
2286 GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
2287 priv->ts_discont = FALSE;
2290 dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
2291 pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
2293 /* apply timestamp with offset to buffer now */
2294 GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
2295 GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
2297 /* update the elapsed time when we need to check against the npt stop time. */
2298 update_estimated_eos (jitterbuffer, item);
2300 priv->last_out_time = GST_BUFFER_PTS (outbuf);
2302 outevent = item->data;
2303 if (item->type == ITEM_TYPE_LOST) {
2304 priv->discont = TRUE;
2310 /* now we are ready to push the buffer. Save the seqnum and release the lock
2311 * so the other end can push stuff in the queue again. */
2312 priv->last_popped_seqnum = seqnum;
2313 priv->next_seqnum = (seqnum + item->count) & 0xffff;
2322 post_buffering_percent (jitterbuffer, percent);
2324 GST_DEBUG_OBJECT (jitterbuffer,
2325 "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
2326 seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
2327 GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
2328 result = gst_pad_push (priv->srcpad, outbuf);
2330 GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
2333 gst_pad_push_event (priv->srcpad, outevent);
2335 gst_event_unref (outevent);
2337 result = GST_FLOW_OK;
2339 JBUF_LOCK_CHECK (priv, out_flushing);
2346 return priv->srcresult;
2350 #define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
2352 /* Peek a buffer and compare the seqnum to the expected seqnum.
2353 * If all is fine, the buffer is pushed.
2354 * If something is wrong, we wait for some event
2356 static GstFlowReturn
2357 handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
2359 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2360 GstFlowReturn result = GST_FLOW_OK;
2361 RTPJitterBufferItem *item;
2363 guint32 next_seqnum;
2366 /* only push buffers when PLAYING and active and not buffering */
2367 if (priv->blocked || !priv->active ||
2368 rtp_jitter_buffer_is_buffering (priv->jbuf))
2369 return GST_FLOW_WAIT;
2372 /* peek a buffer, we're just looking at the sequence number.
2373 * If all is fine, we'll pop and push it. If the sequence number is wrong we
2374 * wait for a timeout or something to change.
2375 * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
2376 item = rtp_jitter_buffer_peek (priv->jbuf);
2380 /* get the seqnum and the next expected seqnum */
2381 seqnum = item->seqnum;
2383 next_seqnum = priv->next_seqnum;
2385 /* get the gap between this and the previous packet. If we don't know the
2386 * previous packet seqnum assume no gap. */
2387 if (G_UNLIKELY (next_seqnum == -1)) {
2388 GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
2389 /* we don't know what the next_seqnum should be, the chain function should
2390 * have scheduled a DEADLINE timer that will increment next_seqnum when it
2391 * fires, so wait for that */
2392 result = GST_FLOW_WAIT;
2394 /* else calculate GAP */
2395 gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
2397 if (G_LIKELY (gap == 0)) {
2398 /* no missing packet, pop and push */
2399 result = pop_and_push_next (jitterbuffer, seqnum);
2400 } else if (G_UNLIKELY (gap < 0)) {
2401 RTPJitterBufferItem *item;
2402 /* if we have a packet that we already pushed or considered dropped, pop it
2403 * off and get the next packet */
2404 GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
2405 seqnum, next_seqnum);
2406 item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
2410 /* the chain function has scheduled timers to request retransmission or
2411 * when to consider the packet lost, wait for that */
2412 GST_DEBUG_OBJECT (jitterbuffer,
2413 "Sequence number GAP detected: expected %d instead of %d (%d missing)",
2414 next_seqnum, seqnum, gap);
2415 result = GST_FLOW_WAIT;
2422 GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
2424 result = GST_FLOW_EOS;
2426 result = GST_FLOW_WAIT;
2431 /* the timeout for when we expected a packet expired */
2433 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2436 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2440 GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
2442 delay = timer->rtx_delay + timer->rtx_retry;
2443 event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
2444 gst_structure_new ("GstRTPRetransmissionRequest",
2445 "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
2446 "running-time", G_TYPE_UINT64, timer->rtx_base,
2447 "delay", G_TYPE_UINT, GST_TIME_AS_MSECONDS (delay),
2448 "retry", G_TYPE_UINT, timer->num_rtx_retry,
2449 "frequency", G_TYPE_UINT, priv->rtx_retry_timeout,
2450 "period", G_TYPE_UINT, priv->rtx_retry_period,
2451 "deadline", G_TYPE_UINT, priv->latency_ms,
2452 "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, NULL));
2454 priv->num_rtx_requests++;
2455 timer->num_rtx_retry++;
2456 timer->rtx_last = now;
2458 /* calculate the timeout for the next retransmission attempt */
2459 timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
2460 GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
2461 GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT,
2462 GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
2463 GST_TIME_ARGS (timer->rtx_retry));
2465 if (timer->rtx_retry + timer->rtx_delay >
2466 (priv->rtx_retry_period * GST_MSECOND)) {
2467 GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
2468 /* too many retransmission request, we now convert the timer
2469 * to a lost timer, leave the num_rtx_retry as it is for stats */
2470 timer->type = TIMER_TYPE_LOST;
2471 timer->rtx_delay = 0;
2472 timer->rtx_retry = 0;
2474 reschedule_timer (jitterbuffer, timer, timer->seqnum,
2475 timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
2478 gst_pad_push_event (priv->sinkpad, event);
2484 /* a packet is lost */
2486 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2489 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2490 GstClockTime duration, timestamp;
2491 guint seqnum, lost_packets, num_rtx_retry;
2494 RTPJitterBufferItem *item;
2496 seqnum = timer->seqnum;
2497 timestamp = apply_offset (jitterbuffer, timer->timeout);
2498 duration = timer->duration;
2499 if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
2500 duration = priv->packet_spacing;
2501 lost_packets = MAX (timer->num, 1);
2502 late = timer->num > 0;
2503 num_rtx_retry = timer->num_rtx_retry;
2505 /* we had a gap and thus we lost some packets. Create an event for this. */
2506 if (lost_packets > 1)
2507 GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
2508 seqnum + lost_packets - 1);
2510 GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
2512 priv->num_late += lost_packets;
2513 priv->num_rtx_failed += num_rtx_retry;
2515 /* create paket lost event */
2516 event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
2517 gst_structure_new ("GstRTPPacketLost",
2518 "seqnum", G_TYPE_UINT, (guint) seqnum,
2519 "timestamp", G_TYPE_UINT64, timestamp,
2520 "duration", G_TYPE_UINT64, duration,
2521 "late", G_TYPE_BOOLEAN, late,
2522 "retry", G_TYPE_UINT, num_rtx_retry, NULL));
2524 item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
2525 rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
2527 /* remove timer now */
2528 remove_timer (jitterbuffer, timer);
2529 JBUF_SIGNAL_EVENT (priv);
2535 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2538 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2540 GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
2541 remove_timer (jitterbuffer, timer);
2542 JBUF_SIGNAL_EVENT (priv);
2548 do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2551 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2553 GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
2555 priv->next_seqnum = timer->seqnum;
2556 remove_timer (jitterbuffer, timer);
2557 JBUF_SIGNAL_EVENT (priv);
2563 do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
2566 gboolean removed = FALSE;
2568 switch (timer->type) {
2569 case TIMER_TYPE_EXPECTED:
2570 removed = do_expected_timeout (jitterbuffer, timer, now);
2572 case TIMER_TYPE_LOST:
2573 removed = do_lost_timeout (jitterbuffer, timer, now);
2575 case TIMER_TYPE_DEADLINE:
2576 removed = do_deadline_timeout (jitterbuffer, timer, now);
2578 case TIMER_TYPE_EOS:
2579 removed = do_eos_timeout (jitterbuffer, timer, now);
2585 /* called when we need to wait for the next timeout.
2587 * We loop over the array of recorded timeouts and wait for the earliest one.
2588 * When it timed out, do the logic associated with the timer.
2590 * If there are no timers, we wait on a gcond until something new happens.
2593 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
2595 GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
2596 GstClockTime now = 0;
2599 while (priv->timer_running) {
2600 TimerData *timer = NULL;
2601 GstClockTime timer_timeout = -1;
2604 GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
2605 GST_TIME_ARGS (now));
2607 len = priv->timers->len;
2608 for (i = 0; i < len; i++) {
2609 TimerData *test = &g_array_index (priv->timers, TimerData, i);
2610 GstClockTime test_timeout = get_timeout (jitterbuffer, test);
2611 gboolean save_best = FALSE;
2613 GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
2614 i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
2616 /* find the smallest timeout */
2617 if (timer == NULL) {
2619 } else if (timer_timeout == -1) {
2620 /* we already have an immediate timeout, the new timer must be an
2621 * immediate timer with smaller seqnum to become the best */
2622 if (test_timeout == -1 && test->seqnum < timer->seqnum)
2624 } else if (test_timeout == -1) {
2625 /* first immediate timer */
2627 } else if (test_timeout < timer_timeout) {
2630 } else if (test_timeout == timer_timeout && test->seqnum < timer->seqnum) {
2631 /* same timer, smaller seqnum */
2635 GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
2637 timer_timeout = test_timeout;
2640 if (timer && !priv->blocked) {
2642 GstClockTime sync_time;
2645 GstClockTimeDiff clock_jitter;
2647 if (timer_timeout == -1 || timer_timeout <= now) {
2648 do_timeout (jitterbuffer, timer, now);
2649 /* check here, do_timeout could have released the lock */
2650 if (!priv->timer_running)
2655 GST_OBJECT_LOCK (jitterbuffer);
2656 clock = GST_ELEMENT_CLOCK (jitterbuffer);
2658 GST_OBJECT_UNLOCK (jitterbuffer);
2659 /* let's just push if there is no clock */
2660 GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
2661 now = timer_timeout;
2665 /* prepare for sync against clock */
2666 sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
2667 /* add latency of peer to get input time */
2668 sync_time += priv->peer_latency;
2670 GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
2671 " with sync time %" GST_TIME_FORMAT,
2672 GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
2674 /* create an entry for the clock */
2675 id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
2676 priv->timer_timeout = timer_timeout;
2677 priv->timer_seqnum = timer->seqnum;
2678 GST_OBJECT_UNLOCK (jitterbuffer);
2680 /* release the lock so that the other end can push stuff or unlock */
2683 ret = gst_clock_id_wait (id, &clock_jitter);
2686 if (!priv->timer_running)
2689 if (ret != GST_CLOCK_UNSCHEDULED) {
2690 now = timer_timeout + MAX (clock_jitter, 0);
2691 GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
2692 ret, priv->timer_seqnum, clock_jitter);
2694 GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
2696 /* and free the entry */
2697 gst_clock_id_unref (id);
2698 priv->clock_id = NULL;
2700 /* no timers, wait for activity */
2701 JBUF_WAIT_TIMER (priv);
2706 GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
2711 * This funcion implements the main pushing loop on the source pad.
2713 * It first tries to push as many buffers as possible. If there is a seqnum
2714 * mismatch, we wait for the next timeouts.
2717 gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
2719 GstRtpJitterBufferPrivate *priv;
2720 GstFlowReturn result;
2722 priv = jitterbuffer->priv;
2724 JBUF_LOCK_CHECK (priv, flushing);
2726 result = handle_next_buffer (jitterbuffer);
2727 if (G_LIKELY (result == GST_FLOW_WAIT)) {
2728 /* now wait for the next event */
2729 JBUF_WAIT_EVENT (priv, flushing);
2730 result = GST_FLOW_OK;
2733 while (result == GST_FLOW_OK);
2736 /* if we get here we need to pause */
2742 result = priv->srcresult;
2748 const gchar *reason = gst_flow_get_name (result);
2751 GST_DEBUG_OBJECT (jitterbuffer, "pausing task, reason %s", reason);
2752 gst_pad_pause_task (priv->srcpad);
2753 if (result == GST_FLOW_EOS) {
2754 event = gst_event_new_eos ();
2755 gst_pad_push_event (priv->srcpad, event);
2761 /* collect the info from the lastest RTCP packet and the jitterbuffer sync, do
2762 * some sanity checks and then emit the handle-sync signal with the parameters.
2763 * This function must be called with the LOCK */
2765 do_handle_sync (GstRtpJitterBuffer * jitterbuffer)
2767 GstRtpJitterBufferPrivate *priv;
2768 guint64 base_rtptime, base_time;
2770 guint64 last_rtptime;
2772 guint64 ext_rtptime, diff;
2773 gboolean drop = FALSE;
2775 priv = jitterbuffer->priv;
2777 /* get the last values from the jitterbuffer */
2778 rtp_jitter_buffer_get_sync (priv->jbuf, &base_rtptime, &base_time,
2779 &clock_rate, &last_rtptime);
2781 clock_base = priv->clock_base;
2782 ext_rtptime = priv->ext_rtptime;
2784 GST_DEBUG_OBJECT (jitterbuffer, "ext SR %" G_GUINT64_FORMAT ", base %"
2785 G_GUINT64_FORMAT ", clock-rate %" G_GUINT32_FORMAT
2786 ", clock-base %" G_GUINT64_FORMAT ", last-rtptime %" G_GUINT64_FORMAT,
2787 ext_rtptime, base_rtptime, clock_rate, clock_base, last_rtptime);
2789 if (base_rtptime == -1 || clock_rate == -1 || base_time == -1) {
2790 GST_DEBUG_OBJECT (jitterbuffer, "dropping, no RTP values");
2793 /* we can't accept anything that happened before we did the last resync */
2794 if (base_rtptime > ext_rtptime) {
2795 GST_DEBUG_OBJECT (jitterbuffer, "dropping, older than base time");
2798 /* the SR RTP timestamp must be something close to what we last observed
2799 * in the jitterbuffer */
2800 if (ext_rtptime > last_rtptime) {
2801 /* check how far ahead it is to our RTP timestamps */
2802 diff = ext_rtptime - last_rtptime;
2803 /* if bigger than 1 second, we drop it */
2804 if (diff > clock_rate) {
2805 GST_DEBUG_OBJECT (jitterbuffer, "too far ahead");
2806 /* should drop this, but some RTSP servers end up with bogus
2807 * way too ahead RTCP packet when repeated PAUSE/PLAY,
2808 * so still trigger rptbin sync but invalidate RTCP data
2809 * (sync might use other methods) */
2812 GST_DEBUG_OBJECT (jitterbuffer, "ext last %" G_GUINT64_FORMAT ", diff %"
2813 G_GUINT64_FORMAT, last_rtptime, diff);
2821 s = gst_structure_new ("application/x-rtp-sync",
2822 "base-rtptime", G_TYPE_UINT64, base_rtptime,
2823 "base-time", G_TYPE_UINT64, base_time,
2824 "clock-rate", G_TYPE_UINT, clock_rate,
2825 "clock-base", G_TYPE_UINT64, clock_base,
2826 "sr-ext-rtptime", G_TYPE_UINT64, ext_rtptime,
2827 "sr-buffer", GST_TYPE_BUFFER, priv->last_sr, NULL);
2829 GST_DEBUG_OBJECT (jitterbuffer, "signaling sync");
2830 gst_buffer_replace (&priv->last_sr, NULL);
2832 g_signal_emit (jitterbuffer,
2833 gst_rtp_jitter_buffer_signals[SIGNAL_HANDLE_SYNC], 0, s);
2835 gst_structure_free (s);
2837 GST_DEBUG_OBJECT (jitterbuffer, "dropping RTCP packet");
2841 static GstFlowReturn
2842 gst_rtp_jitter_buffer_chain_rtcp (GstPad * pad, GstObject * parent,
2845 GstRtpJitterBuffer *jitterbuffer;
2846 GstRtpJitterBufferPrivate *priv;
2847 GstFlowReturn ret = GST_FLOW_OK;
2849 GstRTCPPacket packet;
2850 guint64 ext_rtptime;
2852 GstRTCPBuffer rtcp = { NULL, };
2854 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2856 if (G_UNLIKELY (!gst_rtcp_buffer_validate (buffer)))
2857 goto invalid_buffer;
2859 priv = jitterbuffer->priv;
2861 gst_rtcp_buffer_map (buffer, GST_MAP_READ, &rtcp);
2863 if (!gst_rtcp_buffer_get_first_packet (&rtcp, &packet))
2866 /* first packet must be SR or RR or else the validate would have failed */
2867 switch (gst_rtcp_packet_get_type (&packet)) {
2868 case GST_RTCP_TYPE_SR:
2869 gst_rtcp_packet_sr_get_sender_info (&packet, &ssrc, NULL, &rtptime,
2875 gst_rtcp_buffer_unmap (&rtcp);
2877 GST_DEBUG_OBJECT (jitterbuffer, "received RTCP of SSRC %08x", ssrc);
2880 /* convert the RTP timestamp to our extended timestamp, using the same offset
2881 * we used in the jitterbuffer */
2882 ext_rtptime = priv->jbuf->ext_rtptime;
2883 ext_rtptime = gst_rtp_buffer_ext_timestamp (&ext_rtptime, rtptime);
2885 priv->ext_rtptime = ext_rtptime;
2886 gst_buffer_replace (&priv->last_sr, buffer);
2888 do_handle_sync (jitterbuffer);
2892 gst_buffer_unref (buffer);
2898 /* this is not fatal but should be filtered earlier */
2899 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2900 ("Received invalid RTCP payload, dropping"));
2906 /* this is not fatal but should be filtered earlier */
2907 GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
2908 ("Received empty RTCP payload, dropping"));
2909 gst_rtcp_buffer_unmap (&rtcp);
2915 GST_DEBUG_OBJECT (jitterbuffer, "ignoring RTCP packet");
2916 gst_rtcp_buffer_unmap (&rtcp);
2923 gst_rtp_jitter_buffer_sink_query (GstPad * pad, GstObject * parent,
2926 gboolean res = FALSE;
2928 switch (GST_QUERY_TYPE (query)) {
2929 case GST_QUERY_CAPS:
2931 GstCaps *filter, *caps;
2933 gst_query_parse_caps (query, &filter);
2934 caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
2935 gst_query_set_caps_result (query, caps);
2936 gst_caps_unref (caps);
2941 if (GST_QUERY_IS_SERIALIZED (query)) {
2942 GST_WARNING_OBJECT (pad, "unhandled serialized query");
2945 res = gst_pad_query_default (pad, parent, query);
2953 gst_rtp_jitter_buffer_src_query (GstPad * pad, GstObject * parent,
2956 GstRtpJitterBuffer *jitterbuffer;
2957 GstRtpJitterBufferPrivate *priv;
2958 gboolean res = FALSE;
2960 jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
2961 priv = jitterbuffer->priv;
2963 switch (GST_QUERY_TYPE (query)) {
2964 case GST_QUERY_LATENCY:
2966 /* We need to send the query upstream and add the returned latency to our
2968 GstClockTime min_latency, max_latency;
2970 GstClockTime our_latency;
2972 if ((res = gst_pad_peer_query (priv->sinkpad, query))) {
2973 gst_query_parse_latency (query, &us_live, &min_latency, &max_latency);
2975 GST_DEBUG_OBJECT (jitterbuffer, "Peer latency: min %"
2976 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2977 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2979 /* store this so that we can safely sync on the peer buffers. */
2981 priv->peer_latency = min_latency;
2982 our_latency = priv->latency_ns;
2985 GST_DEBUG_OBJECT (jitterbuffer, "Our latency: %" GST_TIME_FORMAT,
2986 GST_TIME_ARGS (our_latency));
2988 /* we add some latency but can buffer an infinite amount of time */
2989 min_latency += our_latency;
2992 GST_DEBUG_OBJECT (jitterbuffer, "Calculated total latency : min %"
2993 GST_TIME_FORMAT " max %" GST_TIME_FORMAT,
2994 GST_TIME_ARGS (min_latency), GST_TIME_ARGS (max_latency));
2996 gst_query_set_latency (query, TRUE, min_latency, max_latency);
3000 case GST_QUERY_POSITION:
3002 GstClockTime start, last_out;
3005 gst_query_parse_position (query, &fmt, NULL);
3006 if (fmt != GST_FORMAT_TIME) {
3007 res = gst_pad_query_default (pad, parent, query);
3012 start = priv->npt_start;
3013 last_out = priv->last_out_time;
3016 GST_DEBUG_OBJECT (jitterbuffer, "npt start %" GST_TIME_FORMAT
3017 ", last out %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
3018 GST_TIME_ARGS (last_out));
3020 if (GST_CLOCK_TIME_IS_VALID (start) && GST_CLOCK_TIME_IS_VALID (last_out)) {
3021 /* bring 0-based outgoing time to stream time */
3022 gst_query_set_position (query, GST_FORMAT_TIME, start + last_out);
3025 res = gst_pad_query_default (pad, parent, query);
3029 case GST_QUERY_CAPS:
3031 GstCaps *filter, *caps;
3033 gst_query_parse_caps (query, &filter);
3034 caps = gst_rtp_jitter_buffer_getcaps (pad, filter);
3035 gst_query_set_caps_result (query, caps);
3036 gst_caps_unref (caps);
3041 res = gst_pad_query_default (pad, parent, query);
3049 gst_rtp_jitter_buffer_set_property (GObject * object,
3050 guint prop_id, const GValue * value, GParamSpec * pspec)
3052 GstRtpJitterBuffer *jitterbuffer;
3053 GstRtpJitterBufferPrivate *priv;
3055 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3056 priv = jitterbuffer->priv;
3061 guint new_latency, old_latency;
3063 new_latency = g_value_get_uint (value);
3066 old_latency = priv->latency_ms;
3067 priv->latency_ms = new_latency;
3068 priv->latency_ns = priv->latency_ms * GST_MSECOND;
3069 rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
3072 /* post message if latency changed, this will inform the parent pipeline
3073 * that a latency reconfiguration is possible/needed. */
3074 if (new_latency != old_latency) {
3075 GST_DEBUG_OBJECT (jitterbuffer, "latency changed to: %" GST_TIME_FORMAT,
3076 GST_TIME_ARGS (new_latency * GST_MSECOND));
3078 gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer),
3079 gst_message_new_latency (GST_OBJECT_CAST (jitterbuffer)));
3083 case PROP_DROP_ON_LATENCY:
3085 priv->drop_on_latency = g_value_get_boolean (value);
3088 case PROP_TS_OFFSET:
3090 priv->ts_offset = g_value_get_int64 (value);
3091 priv->ts_discont = TRUE;
3096 priv->do_lost = g_value_get_boolean (value);
3101 rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
3104 case PROP_DO_RETRANSMISSION:
3106 priv->do_retransmission = g_value_get_boolean (value);
3109 case PROP_RTX_DELAY:
3111 priv->rtx_delay = g_value_get_int (value);
3114 case PROP_RTX_DELAY_REORDER:
3116 priv->rtx_delay_reorder = g_value_get_int (value);
3119 case PROP_RTX_RETRY_TIMEOUT:
3121 priv->rtx_retry_timeout = g_value_get_int (value);
3124 case PROP_RTX_RETRY_PERIOD:
3126 priv->rtx_retry_period = g_value_get_int (value);
3130 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
3136 gst_rtp_jitter_buffer_get_property (GObject * object,
3137 guint prop_id, GValue * value, GParamSpec * pspec)
3139 GstRtpJitterBuffer *jitterbuffer;
3140 GstRtpJitterBufferPrivate *priv;
3142 jitterbuffer = GST_RTP_JITTER_BUFFER (object);
3143 priv = jitterbuffer->priv;
3148 g_value_set_uint (value, priv->latency_ms);
3151 case PROP_DROP_ON_LATENCY:
3153 g_value_set_boolean (value, priv->drop_on_latency);
3156 case PROP_TS_OFFSET:
3158 g_value_set_int64 (value, priv->ts_offset);
3163 g_value_set_boolean (value, priv->do_lost);
3168 g_value_set_enum (value, rtp_jitter_buffer_get_mode (priv->jbuf));
3176 if (priv->srcresult != GST_FLOW_OK)
3179 percent = rtp_jitter_buffer_get_percent (priv->jbuf);
3181 g_value_set_int (value, percent);
3185 case PROP_DO_RETRANSMISSION:
3187 g_value_set_boolean (value, priv->do_retransmission);
3190 case PROP_RTX_DELAY:
3192 g_value_set_int (value, priv->rtx_delay);
3195 case PROP_RTX_DELAY_REORDER:
3197 g_value_set_int (value, priv->rtx_delay_reorder);
3200 case PROP_RTX_RETRY_TIMEOUT:
3202 g_value_set_int (value, priv->rtx_retry_timeout);
3205 case PROP_RTX_RETRY_PERIOD:
3207 g_value_set_int (value, priv->rtx_retry_period);
3211 G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);