*/
/**
- * SECTION:element-gstrtpjitterbuffer
+ * SECTION:element-rtpjitterbuffer
*
* This element reorders and removes duplicate RTP packets as they are received
- * from a network source. It will also wait for missing packets up to a
- * configurable time limit using the #GstRtpJitterBuffer:latency property.
- * Packets arriving too late are considered to be lost packets.
- *
- * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
- * to the pipeline.
+ * from a network source.
*
* The element needs the clock-rate of the RTP payload in order to estimate the
* delay. This information is obtained either from the caps on the sink pad or,
* when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
* To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
*
- * This element will automatically be used inside gstrtpbin.
+ * The rtpjitterbuffer will wait for missing packets up to a configurable time
+ * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
+ * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
+ * property is set, lost packets will result in a custom serialized downstream
+ * event of name GstRTPPacketLost. The lost packet events are usually used by a
+ * depayloader or other element to create concealment data or some other logic
+ * to gracefully handle the missing packets.
+ *
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
+ * buffer.
+ *
+ * The jitterbuffer can also be configured to send early retransmission events
+ * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
+ * this mode, the jitterbuffer tries to estimate when a packet should arrive and
+ * sends a custom upstream event named GstRTPRetransmissionRequest when the
+ * packet is considered late. The initial expected packet arrival time is
+ * calculated as follows:
+ *
+ * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
+ * T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
+ * calculated from the DTS (or PTS is no DTS) of two consecutive RTP
+ * packets with different rtptime.
+ *
+ * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
+ * seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
+ * previously scheduled timeout is overwritten.
+ *
+ * - If seqnum N arrived, all seqnum older than
+ * N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
+ * immediately. This is to request fast feedback for abonormally reorder
+ * packets before any of the previous timeouts is triggered.
+ *
+ * A late packet triggers the GstRTPRetransmissionRequest custom upstream
+ * event. After the initial timeout expires and the retransmission event is
+ * sent, the timeout is scheduled for
+ * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
+ * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
+ * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
+ * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
+ * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
+ * retransmission requests are sent and the regular logic is performed to
+ * schedule a lost packet as discussed above.
+ *
+ * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
+ * to the pipeline.
+ *
+ * This element will automatically be used inside rtpbin.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
- * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
+ * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
* ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
* inserted into the pipeline to smooth out network jitter and to reorder the
* out-of-order RTP packets.
#include <string.h>
#include <gst/rtp/gstrtpbuffer.h>
-#include "gstrtpbin-marshal.h"
-
#include "gstrtpjitterbuffer.h"
#include "rtpjitterbuffer.h"
#include "rtpstats.h"
LAST_SIGNAL
};
-#define DEFAULT_LATENCY_MS 200
-#define DEFAULT_DROP_ON_LATENCY FALSE
-#define DEFAULT_TS_OFFSET 0
-#define DEFAULT_DO_LOST FALSE
-#define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
-#define DEFAULT_PERCENT 0
+#define DEFAULT_LATENCY_MS 200
+#define DEFAULT_DROP_ON_LATENCY FALSE
+#define DEFAULT_TS_OFFSET 0
+#define DEFAULT_DO_LOST FALSE
+#define DEFAULT_MODE RTP_JITTER_BUFFER_MODE_SLAVE
+#define DEFAULT_PERCENT 0
+#define DEFAULT_DO_RETRANSMISSION FALSE
+#define DEFAULT_RTX_DELAY 20
+#define DEFAULT_RTX_DELAY_REORDER 3
+#define DEFAULT_RTX_RETRY_TIMEOUT 40
+#define DEFAULT_RTX_RETRY_PERIOD 160
enum
{
PROP_DO_LOST,
PROP_MODE,
PROP_PERCENT,
+ PROP_DO_RETRANSMISSION,
+ PROP_RTX_DELAY,
+ PROP_RTX_DELAY_REORDER,
+ PROP_RTX_RETRY_TIMEOUT,
+ PROP_RTX_RETRY_PERIOD,
PROP_LAST
};
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
-
#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
-#define JBUF_WAIT(priv) (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
-#define JBUF_WAIT_CHECK(priv,label) G_STMT_START { \
- JBUF_WAIT(priv); \
- if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
- goto label; \
+#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
+ GST_DEBUG ("waiting timer"); \
+ (priv)->waiting_timer = TRUE; \
+ g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
+ (priv)->waiting_timer = FALSE; \
+ GST_DEBUG ("waiting timer done"); \
+} G_STMT_END
+#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_timer)) { \
+ GST_DEBUG ("signal timer"); \
+ g_cond_signal (&(priv)->jbuf_timer); \
+ } \
} G_STMT_END
-#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
+#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
+ GST_DEBUG ("waiting event"); \
+ (priv)->waiting_event = TRUE; \
+ g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
+ (priv)->waiting_event = FALSE; \
+ GST_DEBUG ("waiting event done"); \
+ if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
+ goto label; \
+} G_STMT_END
+#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_event)) { \
+ GST_DEBUG ("signal event"); \
+ g_cond_signal (&(priv)->jbuf_event); \
+ } \
+} G_STMT_END
struct _GstRtpJitterBufferPrivate
{
RTPJitterBuffer *jbuf;
GMutex jbuf_lock;
- GCond jbuf_cond;
- gboolean waiting;
+ gboolean waiting_timer;
+ GCond jbuf_timer;
+ gboolean waiting_event;
+ GCond jbuf_event;
gboolean discont;
gboolean ts_discont;
gboolean active;
guint64 out_offset;
+ gboolean timer_running;
+ GThread *timer_thread;
+
/* properties */
guint latency_ms;
guint64 latency_ns;
gboolean drop_on_latency;
gint64 ts_offset;
gboolean do_lost;
+ gboolean do_retransmission;
+ gint rtx_delay;
+ gint rtx_delay_reorder;
+ gint rtx_retry_timeout;
+ gint rtx_retry_period;
/* the last seqnum we pushed out */
guint32 last_popped_seqnum;
guint32 next_seqnum;
/* last output time */
GstClockTime last_out_time;
- GstClockTime last_out_pts;
+ /* last valid input timestamp and rtptime pair */
+ GstClockTime ips_dts;
+ guint64 ips_rtptime;
+ GstClockTime packet_spacing;
+
/* the next expected seqnum we receive */
+ GstClockTime last_in_dts;
+ guint32 last_in_seqnum;
guint32 next_in_seqnum;
GArray *timers;
/* for sync */
GstSegment segment;
GstClockID clock_id;
- gboolean unscheduled;
+ GstClockTime timer_timeout;
+ guint16 timer_seqnum;
/* the latency of the upstream peer, we have to take this into account when
* synchronizing the buffers. */
GstClockTime peer_latency;
/* some accounting */
guint64 num_late;
guint64 num_duplicates;
+ guint64 num_rtx_requests;
+ guint64 num_rtx_success;
+ guint64 num_rtx_failed;
+ gdouble avg_rtx_num;
+ guint64 avg_rtx_rtt;
};
typedef enum
{
guint idx;
guint16 seqnum;
+ guint num;
TimerType type;
GstClockTime timeout;
+ GstClockTime duration;
+ GstClockTime rtx_base;
+ GstClockTime rtx_delay;
+ GstClockTime rtx_retry;
+ GstClockTime rtx_last;
+ guint num_rtx_retry;
} TimerData;
#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
gboolean active, guint64 base_time);
static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
+
+static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
+static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
+
static void
gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
{
"The buffer filled percent", 0, 100,
0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
+ * GstRtpJitterBuffer::do-retransmission:
+ *
+ * Send out a GstRTPRetransmission event upstream when a packet is considered
+ * late and should be retransmitted.
+ *
+ * Since: 1.2
+ */
+ g_object_class_install_property (gobject_class, PROP_DO_RETRANSMISSION,
+ g_param_spec_boolean ("do-retransmission", "Do Retransmission",
+ "Send retransmission events upstream when a packet is late",
+ DEFAULT_DO_RETRANSMISSION,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
+ * GstRtpJitterBuffer::rtx-delay:
+ *
+ * When a packet did not arrive at the expected time, wait this extra amount
+ * of time before sending a retransmission event.
+ *
+ * When -1 is used, the max jitter will be used as extra delay.
+ *
+ * Since: 1.2
+ */
+ g_object_class_install_property (gobject_class, PROP_RTX_DELAY,
+ g_param_spec_int ("rtx-delay", "RTX Delay",
+ "Extra time in ms to wait before sending retransmission "
+ "event (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_DELAY,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstRtpJitterBuffer::rtx-delay-reorder:
+ *
+ * Assume that a retransmission event should be sent when we see
+ * this much packet reordering.
+ *
+ * When -1 is used, the value will be estimated based on observed packet
+ * reordering.
+ *
+ * Since: 1.2
+ */
+ g_object_class_install_property (gobject_class, PROP_RTX_DELAY_REORDER,
+ g_param_spec_int ("rtx-delay-reorder", "RTX Delay Reorder",
+ "Sending retransmission event when this much reordering (-1 automatic)",
+ -1, G_MAXINT, DEFAULT_RTX_DELAY_REORDER,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstRtpJitterBuffer::rtx-retry-timeout:
+ *
+ * When no packet has been received after sending a retransmission event
+ * for this time, retry sending a retransmission event.
+ *
+ * When -1 is used, the value will be estimated based on observed round
+ * trip time.
+ *
+ * Since: 1.2
+ */
+ g_object_class_install_property (gobject_class, PROP_RTX_RETRY_TIMEOUT,
+ g_param_spec_int ("rtx-retry-timeout", "RTX Retry Timeout",
+ "Retry sending a transmission event after this timeout in "
+ "ms (-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_TIMEOUT,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+ /**
+ * GstRtpJitterBuffer::rtx-retry-period:
+ *
+ * The amount of time to try to get a retransmission.
+ *
+ * When -1 is used, the value will be estimated based on the jitterbuffer
+ * latency and the observed round trip time.
+ *
+ * Since: 1.2
+ */
+ g_object_class_install_property (gobject_class, PROP_RTX_RETRY_PERIOD,
+ g_param_spec_int ("rtx-retry-period", "RTX Retry Period",
+ "Try to get a retransmission for this many ms "
+ "(-1 automatic)", -1, G_MAXINT, DEFAULT_RTX_RETRY_PERIOD,
+ G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
+
+ /**
* GstRtpJitterBuffer::request-pt-map:
* @buffer: the object which received the signal
* @pt: the pt
gst_rtp_jitter_buffer_signals[SIGNAL_REQUEST_PT_MAP] =
g_signal_new ("request-pt-map", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (GstRtpJitterBufferClass,
- request_pt_map), NULL, NULL, gst_rtp_bin_marshal_BOXED__UINT,
+ request_pt_map), NULL, NULL, g_cclosure_marshal_generic,
GST_TYPE_CAPS, 1, G_TYPE_UINT);
/**
* GstRtpJitterBuffer::handle-sync:
g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
- gst_rtp_bin_marshal_UINT64__BOOL_UINT64, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
+ g_cclosure_marshal_generic, G_TYPE_UINT64, 2, G_TYPE_BOOLEAN,
G_TYPE_UINT64);
gstelement_class->change_state =
klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
GST_DEBUG_CATEGORY_INIT
- (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
+ (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
}
static void
priv->latency_ns = priv->latency_ms * GST_MSECOND;
priv->drop_on_latency = DEFAULT_DROP_ON_LATENCY;
priv->do_lost = DEFAULT_DO_LOST;
- priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
+ priv->do_retransmission = DEFAULT_DO_RETRANSMISSION;
+ priv->rtx_delay = DEFAULT_RTX_DELAY;
+ priv->rtx_delay_reorder = DEFAULT_RTX_DELAY_REORDER;
+ priv->rtx_retry_timeout = DEFAULT_RTX_RETRY_TIMEOUT;
+ priv->rtx_retry_period = DEFAULT_RTX_RETRY_PERIOD;
+ priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
priv->jbuf = rtp_jitter_buffer_new ();
g_mutex_init (&priv->jbuf_lock);
- g_cond_init (&priv->jbuf_cond);
+ g_cond_init (&priv->jbuf_timer);
+ g_cond_init (&priv->jbuf_event);
/* reset skew detection initialy */
rtp_jitter_buffer_reset_skew (priv->jbuf);
GST_OBJECT_FLAG_SET (jitterbuffer, GST_ELEMENT_FLAG_PROVIDE_CLOCK);
}
+#define ITEM_TYPE_BUFFER 0
+#define ITEM_TYPE_LOST 1
+
+static RTPJitterBufferItem *
+alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
+ guint seqnum, guint count, guint rtptime)
+{
+ RTPJitterBufferItem *item;
+
+ item = g_slice_new (RTPJitterBufferItem);
+ item->data = data;
+ item->next = NULL;
+ item->prev = NULL;
+ item->type = type;
+ item->dts = dts;
+ item->pts = pts;
+ item->seqnum = seqnum;
+ item->count = count;
+ item->rtptime = rtptime;
+
+ return item;
+}
+
+static void
+free_item (RTPJitterBufferItem * item)
+{
+ if (item->data)
+ gst_mini_object_unref (item->data);
+ g_slice_free (RTPJitterBufferItem, item);
+}
+
static void
gst_rtp_jitter_buffer_finalize (GObject * object)
{
GstRtpJitterBuffer *jitterbuffer;
+ GstRtpJitterBufferPrivate *priv;
jitterbuffer = GST_RTP_JITTER_BUFFER (object);
+ priv = jitterbuffer->priv;
- g_array_free (jitterbuffer->priv->timers, TRUE);
- g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
- g_cond_clear (&jitterbuffer->priv->jbuf_cond);
+ g_array_free (priv->timers, TRUE);
+ g_mutex_clear (&priv->jbuf_lock);
+ g_cond_clear (&priv->jbuf_timer);
+ g_cond_clear (&priv->jbuf_event);
- g_object_unref (jitterbuffer->priv->jbuf);
+ rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
+ g_object_unref (priv->jbuf);
G_OBJECT_CLASS (parent_class)->finalize (object);
}
/* ERRORS */
wrong_template:
{
- g_warning ("gstrtpjitterbuffer: this is not our template");
+ g_warning ("rtpjitterbuffer: this is not our template");
return NULL;
}
exists:
{
- g_warning ("gstrtpjitterbuffer: pad already requested");
+ g_warning ("rtpjitterbuffer: pad already requested");
return NULL;
}
}
{
GstRtpJitterBufferPrivate *priv;
GstClockTime last_out;
- GstBuffer *head;
+ RTPJitterBufferItem *item;
priv = jbuf->priv;
GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
GST_TIME_ARGS (priv->out_offset));
priv->active = active;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
}
if (!active) {
rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
}
- if ((head = rtp_jitter_buffer_peek (priv->jbuf))) {
+ if ((item = rtp_jitter_buffer_peek (priv->jbuf))) {
/* head buffer timestamp and offset gives our output time */
- last_out = GST_BUFFER_TIMESTAMP (head) + priv->ts_offset;
+ last_out = item->dts + priv->ts_offset;
} else {
/* use last known time when the buffer is empty */
last_out = priv->last_out_time;
GST_DEBUG_OBJECT (jitterbuffer, "got clock-rate %d", priv->clock_rate);
+ rtp_jitter_buffer_set_clock_rate (priv->jbuf, priv->clock_rate);
+
/* The clock base is the RTP timestamp corrsponding to the npt-start value. We
* can use this to track the amount of time elapsed on the sender. */
if (gst_structure_get_uint (caps_struct, "clock-base", &val))
priv->srcresult = GST_FLOW_FLUSHING;
GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
/* this unblocks any waiting pops on the src pad task */
- JBUF_SIGNAL (priv);
- /* unlock clock, we just unschedule, the entry will be released by the
- * locking streaming thread. */
- if (priv->clock_id) {
- gst_clock_id_unschedule (priv->clock_id);
- priv->unscheduled = TRUE;
- }
+ JBUF_SIGNAL_EVENT (priv);
JBUF_UNLOCK (priv);
}
gst_segment_init (&priv->segment, GST_FORMAT_TIME);
priv->last_popped_seqnum = -1;
priv->last_out_time = -1;
- priv->last_out_pts = -1;
priv->next_seqnum = -1;
+ priv->ips_rtptime = -1;
+ priv->ips_dts = GST_CLOCK_TIME_NONE;
+ priv->packet_spacing = 0;
priv->next_in_seqnum = -1;
priv->clock_rate = -1;
priv->eos = FALSE;
priv->last_elapsed = 0;
priv->ext_timestamp = -1;
GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
- rtp_jitter_buffer_flush (priv->jbuf);
+ rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
rtp_jitter_buffer_reset_skew (priv->jbuf);
remove_all_timers (jitterbuffer);
JBUF_UNLOCK (priv);
priv->last_pt = -1;
/* block until we go to PLAYING */
priv->blocked = TRUE;
+ priv->timer_running = TRUE;
+ priv->timer_thread =
+ g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
JBUF_UNLOCK (priv);
break;
case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
JBUF_LOCK (priv);
/* unblock to allow streaming in PLAYING */
priv->blocked = FALSE;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
+ JBUF_SIGNAL_TIMER (priv);
JBUF_UNLOCK (priv);
break;
default:
JBUF_LOCK (priv);
/* block to stop streaming when PAUSED */
priv->blocked = TRUE;
+ unschedule_current_timer (jitterbuffer);
JBUF_UNLOCK (priv);
if (ret != GST_STATE_CHANGE_FAILURE)
ret = GST_STATE_CHANGE_NO_PREROLL;
break;
case GST_STATE_CHANGE_PAUSED_TO_READY:
+ JBUF_LOCK (priv);
gst_buffer_replace (&priv->last_sr, NULL);
+ priv->timer_running = FALSE;
+ unschedule_current_timer (jitterbuffer);
+ JBUF_SIGNAL_TIMER (priv);
+ JBUF_UNLOCK (priv);
+ g_thread_join (priv->timer_thread);
+ priv->timer_thread = NULL;
break;
case GST_STATE_CHANGE_READY_TO_NULL:
break;
break;
}
case GST_EVENT_FLUSH_START:
- gst_rtp_jitter_buffer_flush_start (jitterbuffer);
ret = gst_pad_push_event (priv->srcpad, event);
+ gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+ /* wait for the loop to go into PAUSED */
+ gst_pad_pause_task (priv->srcpad);
break;
case GST_EVENT_FLUSH_STOP:
ret = gst_pad_push_event (priv->srcpad, event);
if (ret && !priv->eos) {
GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
priv->eos = TRUE;
- JBUF_SIGNAL (priv);
+ JBUF_SIGNAL_EVENT (priv);
} else if (priv->eos) {
GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
} else {
gst_element_post_message (GST_ELEMENT_CAST (jitterbuffer), message);
}
-static GstFlowReturn
-gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
- GstBuffer * buffer)
+static GstClockTime
+apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
{
- GstRtpJitterBuffer *jitterbuffer;
GstRtpJitterBufferPrivate *priv;
- guint16 seqnum;
- GstFlowReturn ret = GST_FLOW_OK;
- GstClockTime timestamp;
- guint64 latency_ts;
- gboolean tail;
- gint percent = -1;
- guint8 pt;
- GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
-
- jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
priv = jitterbuffer->priv;
- if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
- goto invalid_buffer;
-
- pt = gst_rtp_buffer_get_payload_type (&rtp);
- seqnum = gst_rtp_buffer_get_seq (&rtp);
- gst_rtp_buffer_unmap (&rtp);
-
- /* take the timestamp of the buffer. This is the time when the packet was
- * received and is used to calculate jitter and clock skew. We will adjust
- * this timestamp with the smoothed value after processing it in the
- * jitterbuffer. */
- timestamp = GST_BUFFER_TIMESTAMP (buffer);
- /* bring to running time */
- timestamp = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME,
- timestamp);
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "Received packet #%d at time %" GST_TIME_FORMAT, seqnum,
- GST_TIME_ARGS (timestamp));
-
- JBUF_LOCK_CHECK (priv, out_flushing);
+ if (timestamp == -1)
+ return -1;
- if (G_UNLIKELY (priv->last_pt != pt)) {
- GstCaps *caps;
+ /* apply the timestamp offset, this is used for inter stream sync */
+ timestamp += priv->ts_offset;
+ /* add the offset, this is used when buffering */
+ timestamp += priv->out_offset;
- GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
- pt);
+ return timestamp;
+}
- priv->last_pt = pt;
- /* reset clock-rate so that we get a new one */
- priv->clock_rate = -1;
+static TimerData *
+find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type, guint16 seqnum)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ TimerData *timer = NULL;
+ gint i, len;
- /* Try to get the clock-rate from the caps first if we can. If there are no
- * caps we must fire the signal to get the clock-rate. */
- if ((caps = gst_pad_get_current_caps (pad))) {
- gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
- gst_caps_unref (caps);
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ if (test->seqnum == seqnum && test->type == type) {
+ timer = test;
+ break;
}
}
+ return timer;
+}
- if (G_UNLIKELY (priv->clock_rate == -1)) {
- /* no clock rate given on the caps, try to get one with the signal */
- if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
- pt) == GST_FLOW_FLUSHING)
- goto out_flushing;
+static void
+unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- if (G_UNLIKELY (priv->clock_rate == -1))
- goto no_clock_rate;
+ if (priv->clock_id) {
+ GST_DEBUG_OBJECT (jitterbuffer, "unschedule current timer");
+ gst_clock_id_unschedule (priv->clock_id);
+ priv->clock_id = NULL;
}
+}
- /* don't accept more data on EOS */
- if (G_UNLIKELY (priv->eos))
- goto have_eos;
+static GstClockTime
+get_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstClockTime test_timeout;
- /* now check against our expected seqnum */
- if (G_LIKELY (priv->next_in_seqnum != -1)) {
- gint gap;
- gboolean reset = FALSE;
-
- gap = gst_rtp_buffer_compare_seqnum (priv->next_in_seqnum, seqnum);
- if (G_UNLIKELY (gap != 0)) {
- GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
- priv->next_in_seqnum, seqnum, gap);
- /* priv->next_in_seqnum >= seqnum, this packet is too late or the
- * sender might have been restarted with different seqnum. */
- if (gap < -RTP_MAX_MISORDER) {
- GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d", gap);
- reset = TRUE;
- }
- /* priv->next_in_seqnum < seqnum, this is a new packet */
- else if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
- GST_DEBUG_OBJECT (jitterbuffer, "reset: too many dropped packets %d",
- gap);
- reset = TRUE;
- } else {
- GST_DEBUG_OBJECT (jitterbuffer, "tolerable gap");
- }
- }
- if (G_UNLIKELY (reset)) {
- GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
- rtp_jitter_buffer_flush (priv->jbuf);
- rtp_jitter_buffer_reset_skew (priv->jbuf);
- remove_all_timers (jitterbuffer);
- priv->last_popped_seqnum = -1;
- priv->next_seqnum = seqnum;
- }
+ if ((test_timeout = timer->timeout) == -1)
+ return -1;
+
+ if (timer->type != TIMER_TYPE_EXPECTED) {
+ /* add our latency and offset to get output times. */
+ test_timeout = apply_offset (jitterbuffer, test_timeout);
+ test_timeout += priv->latency_ns;
}
- priv->next_in_seqnum = (seqnum + 1) & 0xffff;
+ return test_timeout;
+}
- /* let's check if this buffer is too late, we can only accept packets with
- * bigger seqnum than the one we last pushed. */
- if (G_LIKELY (priv->last_popped_seqnum != -1)) {
- gint gap;
+static void
+recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
+ if (priv->clock_id) {
+ GstClockTime timeout = get_timeout (jitterbuffer, timer);
- /* priv->last_popped_seqnum >= seqnum, we're too late. */
- if (G_UNLIKELY (gap <= 0))
- goto too_late;
+ GST_DEBUG ("%" GST_TIME_FORMAT " <> %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timeout), GST_TIME_ARGS (priv->timer_timeout));
+
+ if (timeout == -1 || timeout < priv->timer_timeout)
+ unschedule_current_timer (jitterbuffer);
}
+}
- /* let's drop oldest packet if the queue is already full and drop-on-latency
- * is set. We can only do this when there actually is a latency. When no
- * latency is set, we just pump it in the queue and let the other end push it
- * out as fast as possible. */
- if (priv->latency_ms && priv->drop_on_latency) {
- latency_ts =
- gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
+static TimerData *
+add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
+ guint16 seqnum, guint num, GstClockTime timeout, GstClockTime delay,
+ GstClockTime duration)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ TimerData *timer;
+ gint len;
- if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
- GstBuffer *old_buf;
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "add timer for seqnum %d to %" GST_TIME_FORMAT ", delay %"
+ GST_TIME_FORMAT, seqnum, GST_TIME_ARGS (timeout), GST_TIME_ARGS (delay));
- old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+ len = priv->timers->len;
+ g_array_set_size (priv->timers, len + 1);
+ timer = &g_array_index (priv->timers, TimerData, len);
+ timer->idx = len;
+ timer->type = type;
+ timer->seqnum = seqnum;
+ timer->num = num;
+ timer->timeout = timeout + delay;
+ timer->duration = duration;
+ if (type == TIMER_TYPE_EXPECTED) {
+ timer->rtx_base = timeout;
+ timer->rtx_delay = delay;
+ timer->rtx_retry = 0;
+ }
+ timer->num_rtx_retry = 0;
+ recalculate_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_TIMER (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
- old_buf);
+ return timer;
+}
- gst_buffer_unref (old_buf);
- }
- }
+static void
+reschedule_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ guint16 seqnum, GstClockTime timeout, GstClockTime delay, gboolean reset)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ gboolean seqchange, timechange;
+ guint16 oldseq;
- /* we need to make the metadata writable before pushing it in the jitterbuffer
- * because the jitterbuffer will update the timestamp */
- buffer = gst_buffer_make_writable (buffer);
+ seqchange = timer->seqnum != seqnum;
+ timechange = timer->timeout != timeout;
- /* now insert the packet into the queue in sorted order. This function returns
- * FALSE if a packet with the same seqnum was already in the queue, meaning we
- * have a duplicate. */
- if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
- priv->clock_rate, &tail, &percent)))
- goto duplicate;
+ if (!seqchange && !timechange)
+ return;
- /* we had an unhandled SR, handle it now */
- if (priv->last_sr)
- do_handle_sync (jitterbuffer);
+ oldseq = timer->seqnum;
- /* signal addition of new buffer when the _loop is waiting. */
- if (priv->waiting && priv->active)
- JBUF_SIGNAL (priv);
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
+ oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
+
+ timer->timeout = timeout + delay;
+ timer->seqnum = seqnum;
+ if (reset) {
+ timer->rtx_base = timeout;
+ timer->rtx_delay = delay;
+ timer->rtx_retry = 0;
+ }
- /* let's unschedule and unblock any waiting buffers. We only want to do this
- * when the tail buffer changed */
- if (G_UNLIKELY (priv->clock_id && tail)) {
- GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
- gst_clock_id_unschedule (priv->clock_id);
- priv->unscheduled = TRUE;
+ if (priv->clock_id) {
+ /* we changed the seqnum and there is a timer currently waiting with this
+ * seqnum, unschedule it */
+ if (seqchange && priv->timer_seqnum == oldseq)
+ unschedule_current_timer (jitterbuffer);
+ /* we changed the time, check if it is earlier than what we are waiting
+ * for and unschedule if so */
+ else if (timechange)
+ recalculate_timer (jitterbuffer, timer);
}
+}
- GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
- seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
+static TimerData *
+set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
+ guint16 seqnum, GstClockTime timeout)
+{
+ TimerData *timer;
- check_buffering_percent (jitterbuffer, &percent);
+ /* find the seqnum timer */
+ timer = find_timer (jitterbuffer, type, seqnum);
+ if (timer == NULL) {
+ timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, 0, -1);
+ } else {
+ reschedule_timer (jitterbuffer, timer, seqnum, timeout, 0, FALSE);
+ }
+ return timer;
+}
-finished:
- JBUF_UNLOCK (priv);
+static void
+remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ guint idx;
- if (percent != -1)
- post_buffering_percent (jitterbuffer, percent);
+ if (priv->clock_id && priv->timer_seqnum == timer->seqnum)
+ unschedule_current_timer (jitterbuffer);
- return ret;
+ idx = timer->idx;
+ GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
+ g_array_remove_index_fast (priv->timers, idx);
+ timer->idx = idx;
+}
- /* ERRORS */
-invalid_buffer:
- {
- /* this is not fatal but should be filtered earlier */
- GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
- ("Received invalid RTP payload, dropping"));
+static void
+remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
+ g_array_set_size (priv->timers, 0);
+ unschedule_current_timer (jitterbuffer);
+}
+
+/* we just received a packet with seqnum and dts.
+ *
+ * First check for old seqnum that we are still expecting. If the gap with the
+ * current seqnum is too big, unschedule the timeouts.
+ *
+ * If we have a valid packet spacing estimate we can set a timer for when we
+ * should receive the next packet.
+ * If we don't have a valid estimate, we remove any timer we might have
+ * had for this packet.
+ */
+static void
+update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
+ GstClockTime dts, gboolean do_next_seqnum)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ TimerData *timer = NULL;
+ gint i, len;
+
+ /* go through all timers and unschedule the ones with a large gap, also find
+ * the timer for the seqnum */
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ gint gap;
+
+ gap = gst_rtp_buffer_compare_seqnum (test->seqnum, seqnum);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "%d, #%d<->#%d gap %d", i,
+ test->seqnum, seqnum, gap);
+
+ if (gap == 0) {
+ GST_DEBUG ("found timer for current seqnum");
+ /* the timer for the current seqnum */
+ timer = test;
+ } else if (gap > priv->rtx_delay_reorder) {
+ /* max gap, we exceeded the max reorder distance and we don't expect the
+ * missing packet to be this reordered */
+ if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
+ reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
+ }
+ }
+
+ if (priv->packet_spacing > 0 && do_next_seqnum && priv->do_retransmission) {
+ GstClockTime expected, delay;
+
+ /* calculate expected arrival time of the next seqnum */
+ expected = dts + priv->packet_spacing;
+ delay = priv->rtx_delay * GST_MSECOND;
+
+ /* and update/install timer for next seqnum */
+ if (timer)
+ reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected,
+ delay, TRUE);
+ else
+ add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
+ expected, delay, priv->packet_spacing);
+ } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
+
+ if (timer->num_rtx_retry > 0) {
+ GstClockTime rtx_last;
+
+ /* we scheduled a retry for this packet and now we have it */
+ priv->num_rtx_success++;
+ /* all the previous retry attempts failed */
+ priv->num_rtx_failed += timer->num_rtx_retry - 1;
+ /* number of retries before receiving the packet */
+ if (priv->avg_rtx_num == 0.0)
+ priv->avg_rtx_num = timer->num_rtx_retry;
+ else
+ priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
+ /* calculate the delay between retransmission request and receiving this
+ * packet, start with when we scheduled this timeout last */
+ rtx_last = timer->rtx_last;
+ if (dts > rtx_last) {
+ GstClockTime delay;
+ /* we have a valid delay if this packet arrived after we scheduled the
+ * request */
+ delay = dts - rtx_last;
+ if (priv->avg_rtx_rtt == 0)
+ priv->avg_rtx_rtt = delay;
+ else
+ priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
+ }
+ GST_LOG_OBJECT (jitterbuffer,
+ "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
+ ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
+ ", avg-num %g, avg-rtt %" G_GUINT64_FORMAT, priv->num_rtx_success,
+ priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
+ priv->avg_rtx_num, priv->avg_rtx_rtt);
+ }
+ /* if we had a timer, remove it, we don't know when to expect the next
+ * packet. */
+ remove_timer (jitterbuffer, timer);
+ /* we signal the _loop function because this new packet could be the one
+ * it was waiting for */
+ JBUF_SIGNAL_EVENT (priv);
+ }
+}
+
+static void
+calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
+ GstClockTime dts)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+ /* we need consecutive seqnums with a different
+ * rtptime to estimate the packet spacing. */
+ if (priv->ips_rtptime != rtptime) {
+ /* rtptime changed, check dts diff */
+ if (priv->ips_dts != -1 && dts != -1 && dts > priv->ips_dts) {
+ priv->packet_spacing = dts - priv->ips_dts;
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "new packet spacing %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (priv->packet_spacing));
+ }
+ priv->ips_rtptime = rtptime;
+ priv->ips_dts = dts;
+ }
+}
+
+static void
+calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
+ guint16 seqnum, GstClockTime dts, gint gap)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstClockTime total_duration, duration, expected_dts;
+ TimerType type;
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts));
+
+ /* the total duration spanned by the missing packets */
+ if (dts >= priv->last_in_dts)
+ total_duration = dts - priv->last_in_dts;
+ else
+ total_duration = 0;
+
+ /* interpolate between the current time and the last time based on
+ * number of packets we are missing, this is the estimated duration
+ * for the missing packet based on equidistant packet spacing. */
+ duration = total_duration / (gap + 1);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (duration));
+
+ if (total_duration > priv->latency_ns) {
+ GstClockTime gap_time;
+ guint lost_packets;
+
+ gap_time = total_duration - priv->latency_ns;
+
+ if (duration > 0) {
+ lost_packets = gap_time / duration;
+ gap_time = lost_packets * duration;
+ } else {
+ lost_packets = gap;
+ }
+
+ /* too many lost packets, some of the missing packets are already
+ * too late and we can generate lost packet events for them. */
+ GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT
+ " > %" GST_TIME_FORMAT ", consider %u lost",
+ GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
+ lost_packets);
+
+ /* this timer will fire immediately and the lost event will be pushed from
+ * the timer thread */
+ add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
+ priv->last_in_dts + duration, 0, gap_time);
+
+ expected += lost_packets;
+ priv->last_in_dts += gap_time;
+ }
+
+ expected_dts = priv->last_in_dts + duration;
+
+ if (priv->do_retransmission) {
+ TimerData *timer;
+
+ type = TIMER_TYPE_EXPECTED;
+ /* if we had a timer for the first missing packet, update it. */
+ if ((timer = find_timer (jitterbuffer, type, expected))) {
+ GstClockTime timeout = timer->timeout;
+
+ timer->duration = duration;
+ if (timeout > expected_dts) {
+ GstClockTime delay = timeout - expected_dts - timer->rtx_retry;
+ reschedule_timer (jitterbuffer, timer, timer->seqnum, expected_dts,
+ delay, TRUE);
+ }
+ expected++;
+ expected_dts += duration;
+ }
+ } else {
+ type = TIMER_TYPE_LOST;
+ }
+
+ while (expected < seqnum) {
+ add_timer (jitterbuffer, type, expected, 0, expected_dts, 0, duration);
+ expected_dts += duration;
+ expected++;
+ }
+}
+
+static GstFlowReturn
+gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
+ GstBuffer * buffer)
+{
+ GstRtpJitterBuffer *jitterbuffer;
+ GstRtpJitterBufferPrivate *priv;
+ guint16 seqnum;
+ guint32 expected, rtptime;
+ GstFlowReturn ret = GST_FLOW_OK;
+ GstClockTime dts, pts;
+ guint64 latency_ts;
+ gboolean tail;
+ gint percent = -1;
+ guint8 pt;
+ GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
+ gboolean do_next_seqnum = FALSE;
+ RTPJitterBufferItem *item;
+
+ jitterbuffer = GST_RTP_JITTER_BUFFER (parent);
+
+ priv = jitterbuffer->priv;
+
+ if (G_UNLIKELY (!gst_rtp_buffer_map (buffer, GST_MAP_READ, &rtp)))
+ goto invalid_buffer;
+
+ pt = gst_rtp_buffer_get_payload_type (&rtp);
+ seqnum = gst_rtp_buffer_get_seq (&rtp);
+ rtptime = gst_rtp_buffer_get_timestamp (&rtp);
+ gst_rtp_buffer_unmap (&rtp);
+
+ /* make sure we have PTS and DTS set */
+ pts = GST_BUFFER_PTS (buffer);
+ dts = GST_BUFFER_DTS (buffer);
+ if (dts == -1)
+ dts = pts;
+ else if (pts == -1)
+ pts = dts;
+
+ /* take the DTS of the buffer. This is the time when the packet was
+ * received and is used to calculate jitter and clock skew. We will adjust
+ * this DTS with the smoothed value after processing it in the
+ * jitterbuffer and assign it as the PTS. */
+ /* bring to running time */
+ dts = gst_segment_to_running_time (&priv->segment, GST_FORMAT_TIME, dts);
+
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Received packet #%d at time %" GST_TIME_FORMAT ", discont %d", seqnum,
+ GST_TIME_ARGS (dts), GST_BUFFER_IS_DISCONT (buffer));
+
+ JBUF_LOCK_CHECK (priv, out_flushing);
+
+ if (G_UNLIKELY (priv->last_pt != pt)) {
+ GstCaps *caps;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "pt changed from %u to %u", priv->last_pt,
+ pt);
+
+ priv->last_pt = pt;
+ /* reset clock-rate so that we get a new one */
+ priv->clock_rate = -1;
+
+ /* Try to get the clock-rate from the caps first if we can. If there are no
+ * caps we must fire the signal to get the clock-rate. */
+ if ((caps = gst_pad_get_current_caps (pad))) {
+ gst_jitter_buffer_sink_parse_caps (jitterbuffer, caps);
+ gst_caps_unref (caps);
+ }
+ }
+
+ if (G_UNLIKELY (priv->clock_rate == -1)) {
+ /* no clock rate given on the caps, try to get one with the signal */
+ if (gst_rtp_jitter_buffer_get_clock_rate (jitterbuffer,
+ pt) == GST_FLOW_FLUSHING)
+ goto out_flushing;
+
+ if (G_UNLIKELY (priv->clock_rate == -1))
+ goto no_clock_rate;
+ }
+
+ /* don't accept more data on EOS */
+ if (G_UNLIKELY (priv->eos))
+ goto have_eos;
+
+ expected = priv->next_in_seqnum;
+
+ /* now check against our expected seqnum */
+ if (G_LIKELY (expected != -1)) {
+ gint gap;
+
+ /* now calculate gap */
+ gap = gst_rtp_buffer_compare_seqnum (expected, seqnum);
+
+ GST_DEBUG_OBJECT (jitterbuffer, "expected #%d, got #%d, gap of %d",
+ expected, seqnum, gap);
+
+ if (G_LIKELY (gap == 0)) {
+ /* packet is expected */
+ calculate_packet_spacing (jitterbuffer, rtptime, dts);
+ do_next_seqnum = TRUE;
+ } else {
+ gboolean reset = FALSE;
+
+ if (gap < 0) {
+ /* we received an old packet */
+ if (G_UNLIKELY (gap < -RTP_MAX_MISORDER)) {
+ /* too old packet, reset */
+ GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too old %d < %d", gap,
+ -RTP_MAX_MISORDER);
+ reset = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "old packet received");
+ }
+ } else {
+ /* new packet, we are missing some packets */
+ if (G_UNLIKELY (gap > RTP_MAX_DROPOUT)) {
+ /* packet too far in future, reset */
+ GST_DEBUG_OBJECT (jitterbuffer, "reset: buffer too new %d > %d", gap,
+ RTP_MAX_DROPOUT);
+ reset = TRUE;
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
+ /* fill in the gap with EXPECTED timers */
+ calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
+
+ do_next_seqnum = TRUE;
+ }
+ }
+ if (G_UNLIKELY (reset)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "flush and reset jitterbuffer");
+ rtp_jitter_buffer_flush (priv->jbuf, (GFunc) free_item, NULL);
+ rtp_jitter_buffer_reset_skew (priv->jbuf);
+ remove_all_timers (jitterbuffer);
+ priv->last_popped_seqnum = -1;
+ priv->next_seqnum = seqnum;
+ do_next_seqnum = TRUE;
+ }
+ /* reset spacing estimation when gap */
+ priv->ips_rtptime = -1;
+ priv->ips_dts = GST_CLOCK_TIME_NONE;
+ }
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
+ /* we don't know what the next_in_seqnum should be, wait for the last
+ * possible moment to push this buffer, maybe we get an earlier seqnum
+ * while we wait */
+ set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, dts);
+ do_next_seqnum = TRUE;
+ /* take rtptime and dts to calculate packet spacing */
+ priv->ips_rtptime = rtptime;
+ priv->ips_dts = dts;
+ }
+ if (do_next_seqnum) {
+ priv->last_in_seqnum = seqnum;
+ priv->last_in_dts = dts;
+ priv->next_in_seqnum = (seqnum + 1) & 0xffff;
+ }
+
+ /* let's check if this buffer is too late, we can only accept packets with
+ * bigger seqnum than the one we last pushed. */
+ if (G_LIKELY (priv->last_popped_seqnum != -1)) {
+ gint gap;
+
+ gap = gst_rtp_buffer_compare_seqnum (priv->last_popped_seqnum, seqnum);
+
+ /* priv->last_popped_seqnum >= seqnum, we're too late. */
+ if (G_UNLIKELY (gap <= 0))
+ goto too_late;
+ }
+
+ /* let's drop oldest packet if the queue is already full and drop-on-latency
+ * is set. We can only do this when there actually is a latency. When no
+ * latency is set, we just pump it in the queue and let the other end push it
+ * out as fast as possible. */
+ if (priv->latency_ms && priv->drop_on_latency) {
+ latency_ts =
+ gst_util_uint64_scale_int (priv->latency_ms, priv->clock_rate, 1000);
+
+ if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
+ RTPJitterBufferItem *old_item;
+
+ old_item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+ GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet %p",
+ old_item);
+ free_item (old_item);
+ }
+ }
+
+ item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
+
+ /* now insert the packet into the queue in sorted order. This function returns
+ * FALSE if a packet with the same seqnum was already in the queue, meaning we
+ * have a duplicate. */
+ if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, item,
+ &tail, &percent)))
+ goto duplicate;
+
+ /* update timers */
+ update_timers (jitterbuffer, seqnum, dts, do_next_seqnum);
+
+ /* we had an unhandled SR, handle it now */
+ if (priv->last_sr)
+ do_handle_sync (jitterbuffer);
+
+ /* signal addition of new buffer when the _loop is waiting. */
+ if (priv->active && priv->waiting_timer)
+ JBUF_SIGNAL_EVENT (priv);
+
+ /* let's unschedule and unblock any waiting buffers. We only want to do this
+ * when the tail buffer changed */
+ if (G_UNLIKELY (priv->clock_id && tail)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "Unscheduling waiting new buffer");
+ unschedule_current_timer (jitterbuffer);
+ }
+
+ GST_DEBUG_OBJECT (jitterbuffer, "Pushed packet #%d, now %d packets, tail: %d",
+ seqnum, rtp_jitter_buffer_num_packets (priv->jbuf), tail);
+
+ check_buffering_percent (jitterbuffer, &percent);
+
+finished:
+ JBUF_UNLOCK (priv);
+
+ if (percent != -1)
+ post_buffering_percent (jitterbuffer, percent);
+
+ return ret;
+
+ /* ERRORS */
+invalid_buffer:
+ {
+ /* this is not fatal but should be filtered earlier */
+ GST_ELEMENT_WARNING (jitterbuffer, STREAM, DECODE, (NULL),
+ ("Received invalid RTP payload, dropping"));
gst_buffer_unref (buffer);
return GST_FLOW_OK;
}
GST_WARNING_OBJECT (jitterbuffer, "Duplicate packet #%d detected, dropping",
seqnum);
priv->num_duplicates++;
- gst_buffer_unref (buffer);
+ free_item (item);
goto finished;
}
}
static GstClockTime
-apply_offset (GstRtpJitterBuffer * jitterbuffer, GstClockTime timestamp)
-{
- GstRtpJitterBufferPrivate *priv;
-
- priv = jitterbuffer->priv;
-
- if (timestamp == -1)
- return -1;
-
- /* apply the timestamp offset, this is used for inter stream sync */
- timestamp += priv->ts_offset;
- /* add the offset, this is used when buffering */
- timestamp += priv->out_offset;
-
- return timestamp;
-}
-
-#define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
-
-static TimerData *
-find_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
- guint16 seqnum, gboolean * created)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- TimerData *timer;
- gint i, len;
- gboolean found = FALSE;
-
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- timer = &g_array_index (priv->timers, TimerData, i);
- if (timer->seqnum == seqnum && timer->type == type) {
- found = TRUE;
- break;
- }
- }
- if (!found) {
- /* not found, create */
- g_array_set_size (priv->timers, len + 1);
- timer = &g_array_index (priv->timers, TimerData, len);
- timer->idx = len;
- timer->type = type;
- timer->seqnum = seqnum;
- }
- if (created)
- *created = !found;
-
- return timer;
-}
-
-static GstFlowReturn
-set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
- guint16 seqnum, GstClockTime timeout)
-{
- TimerData *timer;
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "set timer for seqnum %d to %" GST_TIME_FORMAT, seqnum,
- GST_TIME_ARGS (timeout));
-
- /* find the seqnum timer */
- timer = find_timer (jitterbuffer, type, seqnum, NULL);
- timer->timeout = timeout;
-
- return GST_FLOW_WAIT;
-}
-
-static void
-remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- guint idx;
-
- idx = timer->idx;
- GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
- g_array_remove_index_fast (priv->timers, idx);
- timer->idx = idx;
-}
-
-static void
-remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
- g_array_set_size (priv->timers, 0);
-}
-
-static GstClockTime
-compute_elapsed (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
+compute_elapsed (GstRtpJitterBuffer * jitterbuffer, RTPJitterBufferItem * item)
{
guint64 ext_time, elapsed;
guint32 rtp_time;
GstRtpJitterBufferPrivate *priv;
- GstRTPBuffer rtp = GST_RTP_BUFFER_INIT;
priv = jitterbuffer->priv;
- gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
- rtp_time = gst_rtp_buffer_get_timestamp (&rtp);
- gst_rtp_buffer_unmap (&rtp);
+ rtp_time = item->rtptime;
GST_LOG_OBJECT (jitterbuffer, "rtp %" G_GUINT32_FORMAT ", ext %"
G_GUINT64_FORMAT, rtp_time, priv->ext_timestamp);
}
static void
-update_estimated_eos (GstRtpJitterBuffer * jitterbuffer, GstBuffer * outbuf)
+update_estimated_eos (GstRtpJitterBuffer * jitterbuffer,
+ RTPJitterBufferItem * item)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
&& priv->clock_base != -1 && priv->clock_rate > 0) {
guint64 elapsed, estimated;
- elapsed = compute_elapsed (jitterbuffer, outbuf);
+ elapsed = compute_elapsed (jitterbuffer, item);
if (elapsed > priv->last_elapsed || !priv->last_elapsed) {
guint64 left;
GST_LOG_OBJECT (jitterbuffer, "left %" GST_TIME_FORMAT,
GST_TIME_ARGS (left));
- out_time = GST_BUFFER_PTS (outbuf);
+ out_time = item->dts;
if (elapsed > 0)
estimated = gst_util_uint64_scale (out_time, left, elapsed);
/* take a buffer from the queue and push it */
static GstFlowReturn
-pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
- GstClockTime pts)
+pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result;
+ RTPJitterBufferItem *item;
GstBuffer *outbuf;
+ GstEvent *outevent;
+ GstClockTime dts, pts;
gint percent = -1;
- GstClockTime out_time;
+ gboolean is_buffer, do_push = TRUE;
/* when we get here we are ready to pop and push the buffer */
- outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
+ item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
- check_buffering_percent (jitterbuffer, &percent);
+ is_buffer = GST_IS_BUFFER (item->data);
- if (G_UNLIKELY (priv->discont)) {
- /* set DISCONT flag when we missed a packet. We pushed the buffer writable
- * into the jitterbuffer so we can modify now. */
- GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
- priv->discont = FALSE;
- }
- if (G_UNLIKELY (priv->ts_discont)) {
- GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
- priv->ts_discont = FALSE;
- }
+ if (is_buffer) {
+ check_buffering_percent (jitterbuffer, &percent);
+
+ /* we need to make writable to change the flags and timestamps */
+ outbuf = gst_buffer_make_writable (item->data);
+
+ if (G_UNLIKELY (priv->discont)) {
+ /* set DISCONT flag when we missed a packet. We pushed the buffer writable
+ * into the jitterbuffer so we can modify now. */
+ GST_DEBUG_OBJECT (jitterbuffer, "mark output buffer discont");
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
+ priv->discont = FALSE;
+ }
+ if (G_UNLIKELY (priv->ts_discont)) {
+ GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_RESYNC);
+ priv->ts_discont = FALSE;
+ }
+
+ dts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->dts);
+ pts = gst_segment_to_position (&priv->segment, GST_FORMAT_TIME, item->pts);
- /* apply timestamp with offset to buffer now */
- out_time = apply_offset (jitterbuffer, pts);
- GST_BUFFER_PTS (outbuf) = out_time;
- GST_BUFFER_DTS (outbuf) = out_time;
+ /* apply timestamp with offset to buffer now */
+ GST_BUFFER_DTS (outbuf) = apply_offset (jitterbuffer, dts);
+ GST_BUFFER_PTS (outbuf) = apply_offset (jitterbuffer, pts);
- /* update the elapsed time when we need to check against the npt stop time. */
- update_estimated_eos (jitterbuffer, outbuf);
+ /* update the elapsed time when we need to check against the npt stop time. */
+ update_estimated_eos (jitterbuffer, item);
+
+ priv->last_out_time = GST_BUFFER_PTS (outbuf);
+ } else {
+ outevent = item->data;
+ if (item->type == ITEM_TYPE_LOST) {
+ priv->discont = TRUE;
+ if (!priv->do_lost)
+ do_push = FALSE;
+ }
+ }
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
priv->last_popped_seqnum = seqnum;
- priv->last_out_time = out_time;
- priv->last_out_pts = pts;
- priv->next_seqnum = (seqnum + 1) & 0xffff;
+ priv->next_seqnum = (seqnum + item->count) & 0xffff;
JBUF_UNLOCK (priv);
- if (percent != -1)
- post_buffering_percent (jitterbuffer, percent);
+ item->data = NULL;
+ free_item (item);
- /* push buffer */
- GST_DEBUG_OBJECT (jitterbuffer,
- "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
- GST_TIME_ARGS (out_time));
- result = gst_pad_push (priv->srcpad, outbuf);
+ if (is_buffer) {
+ /* push buffer */
+ if (percent != -1)
+ post_buffering_percent (jitterbuffer, percent);
+ GST_DEBUG_OBJECT (jitterbuffer,
+ "Pushing buffer %d, dts %" GST_TIME_FORMAT ", pts %" GST_TIME_FORMAT,
+ seqnum, GST_TIME_ARGS (GST_BUFFER_DTS (outbuf)),
+ GST_TIME_ARGS (GST_BUFFER_PTS (outbuf)));
+ result = gst_pad_push (priv->srcpad, outbuf);
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
+
+ if (do_push)
+ gst_pad_push_event (priv->srcpad, outevent);
+ else
+ gst_event_unref (outevent);
+
+ result = GST_FLOW_OK;
+ }
JBUF_LOCK_CHECK (priv, out_flushing);
return result;
}
}
-static GstClockTime
-estimate_pts (GstRtpJitterBuffer * jitterbuffer, GstClockTime pts, gint gap)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration;
-
- if (pts == -1 || priv->last_out_pts == -1)
- return pts;
-
- GST_DEBUG_OBJECT (jitterbuffer,
- "pts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT,
- GST_TIME_ARGS (pts), GST_TIME_ARGS (priv->last_out_pts));
-
- /* interpolate between the current time and the last time based on
- * number of packets we are missing, this is the estimated duration
- * for the missing packet based on equidistant packet spacing. Also make
- * sure we never go negative. */
- if (pts >= priv->last_out_pts)
- duration = (pts - priv->last_out_pts) / (gap + 1);
- else
- /* packet already lost, timer will timeout quickly */
- duration = 0;
-
- GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT,
- GST_TIME_ARGS (duration));
-
- /* add this duration to the timestamp of the last packet we pushed */
- pts = (priv->last_out_pts + duration);
-
- return pts;
-}
+#define GST_FLOW_WAIT GST_FLOW_CUSTOM_SUCCESS
/* Peek a buffer and compare the seqnum to the expected seqnum.
* If all is fine, the buffer is pushed.
- * If something is wrong, a timeout is set. We set 2 kinds of timeouts:
- * * deadline: to the ultimate time we can still push the packet. We
- * do this for the first packet to make sure we have the previous
- * packets.
- * * lost: the ultimate time we can receive a packet before we have
- * to consider it lost. We estimate this based on the packet
- * spacing.
+ * If something is wrong, we wait for some event
*/
static GstFlowReturn
handle_next_buffer (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstFlowReturn result = GST_FLOW_OK;
- GstBuffer *outbuf;
+ RTPJitterBufferItem *item;
guint16 seqnum;
- GstClockTime pts;
guint32 next_seqnum;
gint gap;
- GstRTPBuffer rtp = { NULL, };
/* only push buffers when PLAYING and active and not buffering */
if (priv->blocked || !priv->active ||
again:
/* peek a buffer, we're just looking at the sequence number.
* If all is fine, we'll pop and push it. If the sequence number is wrong we
- * wait on the timestamp. In the chain function we will unlock the wait when a
- * new buffer is available. The peeked buffer is valid for as long as we hold
- * the jitterbuffer lock. */
- outbuf = rtp_jitter_buffer_peek (priv->jbuf);
- if (outbuf == NULL)
+ * wait for a timeout or something to change.
+ * The peeked buffer is valid for as long as we hold the jitterbuffer lock. */
+ item = rtp_jitter_buffer_peek (priv->jbuf);
+ if (item == NULL)
goto wait;
/* get the seqnum and the next expected seqnum */
- gst_rtp_buffer_map (outbuf, GST_MAP_READ, &rtp);
- seqnum = gst_rtp_buffer_get_seq (&rtp);
- gst_rtp_buffer_unmap (&rtp);
+ seqnum = item->seqnum;
next_seqnum = priv->next_seqnum;
- /* get the timestamp, this is already corrected for clock skew by the
- * jitterbuffer */
- pts = GST_BUFFER_PTS (outbuf);
-
/* get the gap between this and the previous packet. If we don't know the
* previous packet seqnum assume no gap. */
if (G_UNLIKELY (next_seqnum == -1)) {
GST_DEBUG_OBJECT (jitterbuffer, "First buffer #%d", seqnum);
- /* we don't know what the next_seqnum should be, wait for the last
- * possible moment to push this buffer, maybe we get an earlier seqnum
- * while we wait */
- result = set_timer (jitterbuffer, TIMER_TYPE_DEADLINE, seqnum, pts);
+ /* we don't know what the next_seqnum should be, the chain function should
+ * have scheduled a DEADLINE timer that will increment next_seqnum when it
+ * fires, so wait for that */
+ result = GST_FLOW_WAIT;
} else {
/* else calculate GAP */
gap = gst_rtp_buffer_compare_seqnum (next_seqnum, seqnum);
if (G_LIKELY (gap == 0)) {
/* no missing packet, pop and push */
- result = pop_and_push_next (jitterbuffer, seqnum, pts);
+ result = pop_and_push_next (jitterbuffer, seqnum);
} else if (G_UNLIKELY (gap < 0)) {
+ RTPJitterBufferItem *item;
/* if we have a packet that we already pushed or considered dropped, pop it
* off and get the next packet */
GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
seqnum, next_seqnum);
- outbuf = rtp_jitter_buffer_pop (priv->jbuf, NULL);
- gst_buffer_unref (outbuf);
+ item = rtp_jitter_buffer_pop (priv->jbuf, NULL);
+ free_item (item);
goto again;
} else {
+ /* the chain function has scheduled timers to request retransmission or
+ * when to consider the packet lost, wait for that */
GST_DEBUG_OBJECT (jitterbuffer,
"Sequence number GAP detected: expected %d instead of %d (%d missing)",
next_seqnum, seqnum, gap);
- /* packet missing, estimate when we should ultimately push this packet */
- pts = estimate_pts (jitterbuffer, pts, gap);
- /* and set a timer for it */
- result = set_timer (jitterbuffer, TIMER_TYPE_LOST, next_seqnum, pts);
+ result = GST_FLOW_WAIT;
}
}
return result;
wait:
{
GST_DEBUG_OBJECT (jitterbuffer, "no buffer, going to wait");
- return GST_FLOW_WAIT;
+ if (priv->eos)
+ result = GST_FLOW_EOS;
+ else
+ result = GST_FLOW_WAIT;
+ return result;
}
}
-/* a packet is lost */
-static GstFlowReturn
-do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
- GstClockTimeDiff clock_jitter)
+/* the timeout for when we expected a packet expired */
+static gboolean
+do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstClockTime duration = GST_CLOCK_TIME_NONE;
- guint32 lost_packets = 1;
- gboolean lost_packets_late = FALSE;
-
-#if 0
- if (clock_jitter > 0
- && clock_jitter > (priv->latency_ns + priv->peer_latency)) {
- GstClockTimeDiff total_duration;
- GstClockTime out_time_diff;
-
- out_time_diff =
- apply_offset (jitterbuffer, timer->timeout) - timer->timeout;
- total_duration = MIN (out_time_diff, clock_jitter);
-
- if (duration > 0)
- lost_packets = total_duration / duration;
- else
- lost_packets = gap;
- total_duration = lost_packets * duration;
+ GstEvent *event;
+ guint delay;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
+
+ delay = timer->rtx_delay + timer->rtx_retry;
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
+ gst_structure_new ("GstRTPRetransmissionRequest",
+ "seqnum", G_TYPE_UINT, (guint) timer->seqnum,
+ "running-time", G_TYPE_UINT64, timer->rtx_base,
+ "delay", G_TYPE_UINT, GST_TIME_AS_MSECONDS (delay),
+ "retry", G_TYPE_UINT, timer->num_rtx_retry,
+ "frequency", G_TYPE_UINT, priv->rtx_retry_timeout,
+ "period", G_TYPE_UINT, priv->rtx_retry_period,
+ "deadline", G_TYPE_UINT, priv->latency_ms,
+ "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, NULL));
+
+ priv->num_rtx_requests++;
+ timer->num_rtx_retry++;
+ timer->rtx_last = now;
+
+ /* calculate the timeout for the next retransmission attempt */
+ timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
+ GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
+ GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
+ GST_TIME_ARGS (timer->rtx_retry));
+
+ if (timer->rtx_retry + timer->rtx_delay >
+ (priv->rtx_retry_period * GST_MSECOND)) {
+ GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
+ /* too many retransmission request, we now convert the timer
+ * to a lost timer, leave the num_rtx_retry as it is for stats */
+ timer->type = TIMER_TYPE_LOST;
+ timer->rtx_delay = 0;
+ timer->rtx_retry = 0;
+ }
+ reschedule_timer (jitterbuffer, timer, timer->seqnum,
+ timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
- GST_DEBUG_OBJECT (jitterbuffer,
- "Current sync_time has expired a long time ago (+%" GST_TIME_FORMAT
- ") Cover up %d lost packets with duration %" GST_TIME_FORMAT,
- GST_TIME_ARGS (clock_jitter),
- lost_packets, GST_TIME_ARGS (total_duration));
+ JBUF_UNLOCK (priv);
+ gst_pad_push_event (priv->sinkpad, event);
+ JBUF_LOCK (priv);
- duration = total_duration;
- lost_packets_late = TRUE;
- }
-#endif
+ return FALSE;
+}
+
+/* a packet is lost */
+static gboolean
+do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+ GstClockTime duration, timestamp;
+ guint seqnum, lost_packets, num_rtx_retry;
+ gboolean late;
+ GstEvent *event;
+ RTPJitterBufferItem *item;
+
+ seqnum = timer->seqnum;
+ timestamp = apply_offset (jitterbuffer, timer->timeout);
+ duration = timer->duration;
+ if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
+ duration = priv->packet_spacing;
+ lost_packets = MAX (timer->num, 1);
+ late = timer->num > 0;
+ num_rtx_retry = timer->num_rtx_retry;
/* we had a gap and thus we lost some packets. Create an event for this. */
if (lost_packets > 1)
- GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", timer->seqnum,
- timer->seqnum + lost_packets - 1);
+ GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
+ seqnum + lost_packets - 1);
else
- GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", timer->seqnum);
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
priv->num_late += lost_packets;
- priv->discont = TRUE;
+ priv->num_rtx_failed += num_rtx_retry;
+
+ /* create paket lost event */
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) seqnum,
+ "timestamp", G_TYPE_UINT64, timestamp,
+ "duration", G_TYPE_UINT64, duration,
+ "late", G_TYPE_BOOLEAN, late,
+ "retry", G_TYPE_UINT, num_rtx_retry, NULL));
+
+ item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
+ rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
- /* update our expected next packet */
- priv->last_popped_seqnum = timer->seqnum;
- priv->last_out_time = apply_offset (jitterbuffer, timer->timeout);
- priv->last_out_pts = timer->timeout;
- priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
/* remove timer now */
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
- if (priv->do_lost) {
- GstEvent *event;
+ return TRUE;
+}
- /* create paket lost event */
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
- gst_structure_new ("GstRTPPacketLost",
- "seqnum", G_TYPE_UINT, (guint) priv->last_popped_seqnum,
- "timestamp", G_TYPE_UINT64, priv->last_out_time,
- "duration", G_TYPE_UINT64, duration,
- "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->srcpad, event);
- JBUF_LOCK_CHECK (priv, flushing);
- }
- return GST_FLOW_OK;
+static gboolean
+do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
+{
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- /* ERRORS */
-flushing:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- return priv->srcresult;
- }
+ GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
+ remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
+
+ return TRUE;
}
-static GstFlowReturn
-do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
+static gboolean
+do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
{
- GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
+ GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+ GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
+
+ priv->next_seqnum = timer->seqnum;
remove_timer (jitterbuffer, timer);
+ JBUF_SIGNAL_EVENT (priv);
- return GST_FLOW_EOS;
+ return TRUE;
+}
+
+static gboolean
+do_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+ GstClockTime now)
+{
+ gboolean removed = FALSE;
+
+ switch (timer->type) {
+ case TIMER_TYPE_EXPECTED:
+ removed = do_expected_timeout (jitterbuffer, timer, now);
+ break;
+ case TIMER_TYPE_LOST:
+ removed = do_lost_timeout (jitterbuffer, timer, now);
+ break;
+ case TIMER_TYPE_DEADLINE:
+ removed = do_deadline_timeout (jitterbuffer, timer, now);
+ break;
+ case TIMER_TYPE_EOS:
+ removed = do_eos_timeout (jitterbuffer, timer, now);
+ break;
+ }
+ return removed;
}
/* called when we need to wait for the next timeout.
*
* If there are no timers, we wait on a gcond until something new happens.
*/
-static GstFlowReturn
+static void
wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- GstFlowReturn result = GST_FLOW_OK;
- gint i, len;
- TimerData *timer = NULL;
-
- len = priv->timers->len;
- for (i = 0; i < len; i++) {
- TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ GstClockTime now = 0;
- /* find the smallest timeout */
- if (timer == NULL || test->timeout == -1 || test->timeout < timer->timeout)
- timer = test;
- }
- if (timer) {
- GstClock *clock;
- GstClockTime sync_time;
- GstClockID id;
- GstClockReturn ret;
- GstClockTimeDiff clock_jitter;
-
- /* no timestamp, timeout immeditately */
- if (timer->timeout == -1)
- goto do_timeout;
-
- GST_OBJECT_LOCK (jitterbuffer);
- clock = GST_ELEMENT_CLOCK (jitterbuffer);
- if (!clock) {
- GST_OBJECT_UNLOCK (jitterbuffer);
- /* let's just push if there is no clock */
- GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
- goto do_timeout;
+ JBUF_LOCK (priv);
+ while (priv->timer_running) {
+ TimerData *timer = NULL;
+ GstClockTime timer_timeout = -1;
+ gint i, len;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "now %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (now));
+
+ len = priv->timers->len;
+ for (i = 0; i < len; i++) {
+ TimerData *test = &g_array_index (priv->timers, TimerData, i);
+ GstClockTime test_timeout = get_timeout (jitterbuffer, test);
+ gboolean save_best = FALSE;
+
+ GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
+ i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
+
+ /* find the smallest timeout */
+ if (timer == NULL) {
+ save_best = TRUE;
+ } else if (timer_timeout == -1) {
+ /* we already have an immediate timeout, the new timer must be an
+ * immediate timer with smaller seqnum to become the best */
+ if (test_timeout == -1 && test->seqnum < timer->seqnum)
+ save_best = TRUE;
+ } else if (test_timeout == -1) {
+ /* first immediate timer */
+ save_best = TRUE;
+ } else if (test_timeout < timer_timeout) {
+ /* earlier timer */
+ save_best = TRUE;
+ } else if (test_timeout == timer_timeout && test->seqnum < timer->seqnum) {
+ /* same timer, smaller seqnum */
+ save_best = TRUE;
+ }
+ if (save_best) {
+ GST_DEBUG_OBJECT (jitterbuffer, "new best %d", i);
+ timer = test;
+ timer_timeout = test_timeout;
+ }
}
+ if (timer && !priv->blocked) {
+ GstClock *clock;
+ GstClockTime sync_time;
+ GstClockID id;
+ GstClockReturn ret;
+ GstClockTimeDiff clock_jitter;
+
+ if (timer_timeout == -1 || timer_timeout <= now) {
+ do_timeout (jitterbuffer, timer, now);
+ /* check here, do_timeout could have released the lock */
+ if (!priv->timer_running)
+ break;
+ continue;
+ }
- /* prepare for sync against clock */
- sync_time = timer->timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
- /* add latency of peer to get input time */
- sync_time += priv->peer_latency;
+ GST_OBJECT_LOCK (jitterbuffer);
+ clock = GST_ELEMENT_CLOCK (jitterbuffer);
+ if (!clock) {
+ GST_OBJECT_UNLOCK (jitterbuffer);
+ /* let's just push if there is no clock */
+ GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
+ now = timer_timeout;
+ continue;
+ }
- /* add our latency and offset to get output times. */
- sync_time = apply_offset (jitterbuffer, sync_time);
- sync_time += priv->latency_ns;
+ /* prepare for sync against clock */
+ sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+ /* add latency of peer to get input time */
+ sync_time += priv->peer_latency;
- GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
- " with sync time %" GST_TIME_FORMAT,
- GST_TIME_ARGS (timer->timeout), GST_TIME_ARGS (sync_time));
+ GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
+ " with sync time %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
- /* create an entry for the clock */
- id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
- priv->unscheduled = FALSE;
- GST_OBJECT_UNLOCK (jitterbuffer);
+ /* create an entry for the clock */
+ id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
+ priv->timer_timeout = timer_timeout;
+ priv->timer_seqnum = timer->seqnum;
+ GST_OBJECT_UNLOCK (jitterbuffer);
- /* release the lock so that the other end can push stuff or unlock */
- JBUF_UNLOCK (priv);
+ /* release the lock so that the other end can push stuff or unlock */
+ JBUF_UNLOCK (priv);
- ret = gst_clock_id_wait (id, &clock_jitter);
+ ret = gst_clock_id_wait (id, &clock_jitter);
- JBUF_LOCK (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, %" G_GINT64_FORMAT,
- ret, clock_jitter);
- /* and free the entry */
- gst_clock_id_unref (id);
- priv->clock_id = NULL;
-
- /* at this point, the clock could have been unlocked by a timeout, a new
- * tail element was added to the queue or because we are shutting down. Check
- * for shutdown first. */
- if G_UNLIKELY
- ((priv->srcresult != GST_FLOW_OK))
- goto flushing;
-
- /* if we got unscheduled and we are not flushing, it's because a new tail
- * element became available in the queue or we flushed the queue.
- * Grab it and try to push or sync. */
- if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
- GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
- goto done;
- }
- do_timeout:
- switch (timer->type) {
- case TIMER_TYPE_EXPECTED:
- remove_timer (jitterbuffer, timer);
- break;
- case TIMER_TYPE_LOST:
- result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
- break;
- case TIMER_TYPE_DEADLINE:
- priv->next_seqnum = timer->seqnum;
- remove_timer (jitterbuffer, timer);
- break;
- case TIMER_TYPE_EOS:
- result = do_eos_timeout (jitterbuffer, timer);
+ JBUF_LOCK (priv);
+ if (!priv->timer_running)
break;
+
+ if (ret != GST_CLOCK_UNSCHEDULED) {
+ now = timer_timeout + MAX (clock_jitter, 0);
+ GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
+ ret, priv->timer_seqnum, clock_jitter);
+ } else {
+ GST_DEBUG_OBJECT (jitterbuffer, "sync unscheduled");
+ }
+ /* and free the entry */
+ gst_clock_id_unref (id);
+ priv->clock_id = NULL;
+ } else {
+ /* no timers, wait for activity */
+ JBUF_WAIT_TIMER (priv);
}
- } else {
- /* no timers, wait for activity */
- GST_DEBUG_OBJECT (jitterbuffer, "waiting");
- priv->waiting = TRUE;
- JBUF_WAIT_CHECK (priv, flushing);
- priv->waiting = FALSE;
- GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
+ JBUF_UNLOCK (priv);
-done:
- return result;
-
-flushing:
- {
- GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
- return priv->srcresult;
- }
+ GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
+ return;
}
/*
* This funcion implements the main pushing loop on the source pad.
*
* It first tries to push as many buffers as possible. If there is a seqnum
- * mismatch, a timeout is created and this function goes waiting for the
- * next timeout.
+ * mismatch, we wait for the next timeouts.
*/
static void
gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
JBUF_LOCK_CHECK (priv, flushing);
do {
result = handle_next_buffer (jitterbuffer);
- if (G_LIKELY (result == GST_FLOW_WAIT))
+ if (G_LIKELY (result == GST_FLOW_WAIT)) {
/* now wait for the next event */
- result = wait_next_timeout (jitterbuffer);
+ JBUF_WAIT_EVENT (priv, flushing);
+ result = GST_FLOW_OK;
+ }
}
while (result == GST_FLOW_OK);
JBUF_UNLOCK (priv);
rtp_jitter_buffer_set_mode (priv->jbuf, g_value_get_enum (value));
JBUF_UNLOCK (priv);
break;
+ case PROP_DO_RETRANSMISSION:
+ JBUF_LOCK (priv);
+ priv->do_retransmission = g_value_get_boolean (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_DELAY:
+ JBUF_LOCK (priv);
+ priv->rtx_delay = g_value_get_int (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_DELAY_REORDER:
+ JBUF_LOCK (priv);
+ priv->rtx_delay_reorder = g_value_get_int (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_RETRY_TIMEOUT:
+ JBUF_LOCK (priv);
+ priv->rtx_retry_timeout = g_value_get_int (value);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_RETRY_PERIOD:
+ JBUF_LOCK (priv);
+ priv->rtx_retry_period = g_value_get_int (value);
+ JBUF_UNLOCK (priv);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
JBUF_UNLOCK (priv);
break;
}
+ case PROP_DO_RETRANSMISSION:
+ JBUF_LOCK (priv);
+ g_value_set_boolean (value, priv->do_retransmission);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_DELAY:
+ JBUF_LOCK (priv);
+ g_value_set_int (value, priv->rtx_delay);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_DELAY_REORDER:
+ JBUF_LOCK (priv);
+ g_value_set_int (value, priv->rtx_delay_reorder);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_RETRY_TIMEOUT:
+ JBUF_LOCK (priv);
+ g_value_set_int (value, priv->rtx_retry_timeout);
+ JBUF_UNLOCK (priv);
+ break;
+ case PROP_RTX_RETRY_PERIOD:
+ JBUF_LOCK (priv);
+ g_value_set_int (value, priv->rtx_retry_period);
+ JBUF_UNLOCK (priv);
+ break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;