*/
/**
- * SECTION:element-gstrtpjitterbuffer
+ * SECTION:element-rtpjitterbuffer
*
* This element reorders and removes duplicate RTP packets as they are received
- * from a network source. It will also wait for missing packets up to a
- * configurable time limit using the #GstRtpJitterBuffer:latency property.
- * Packets arriving too late are considered to be lost packets.
- *
- * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
- * to the pipeline.
+ * from a network source.
*
* The element needs the clock-rate of the RTP payload in order to estimate the
* delay. This information is obtained either from the caps on the sink pad or,
* when no caps are present, from the #GstRtpJitterBuffer::request-pt-map signal.
* To clear the previous pt-map use the #GstRtpJitterBuffer::clear-pt-map signal.
*
- * This element will automatically be used inside gstrtpbin.
+ * The rtpjitterbuffer will wait for missing packets up to a configurable time
+ * limit using the #GstRtpJitterBuffer:latency property. Packets arriving too
+ * late are considered to be lost packets. If the #GstRtpJitterBuffer:do-lost
+ * property is set, lost packets will result in a custom serialized downstream
+ * event of name GstRTPPacketLost. The lost packet events are usually used by a
+ * depayloader or other element to create concealment data or some other logic
+ * to gracefully handle the missing packets.
+ *
+ * The jitterbuffer will use the DTS (or PTS if no DTS is set) of the incomming
+ * buffer and the rtptime inside the RTP packet to create a PTS on the outgoing
+ * buffer.
+ *
+ * The jitterbuffer can also be configured to send early retransmission events
+ * upstream by setting the #GstRtpJitterBuffer:do-retransmission property. In
+ * this mode, the jitterbuffer tries to estimate when a packet should arrive and
+ * sends a custom upstream event named GstRTPRetransmissionRequest when the
+ * packet is considered late. The initial expected packet arrival time is
+ * calculated as follows:
+ *
+ * - If seqnum N arrived at time T, seqnum N+1 is expected to arrive at
+ * T + packet-spacing + #GstRtpJitterBuffer:rtx-delay. The packet spacing is
+ * calculated from the DTS (or PTS is no DTS) of two consecutive RTP
+ * packets with different rtptime.
+ *
+ * - If seqnum N0 arrived at time T0 and seqnum Nm arrived at time Tm,
+ * seqnum Ni is expected at time Ti = T0 + i*(Tm - T0)/(Nm - N0). Any
+ * previously scheduled timeout is overwritten.
+ *
+ * - If seqnum N arrived, all seqnum older than
+ * N - #GstRtpJitterBuffer:rtx-delay-reorder are considered late
+ * immediately. This is to request fast feedback for abonormally reorder
+ * packets before any of the previous timeouts is triggered.
+ *
+ * A late packet triggers the GstRTPRetransmissionRequest custom upstream
+ * event. After the initial timeout expires and the retransmission event is
+ * sent, the timeout is scheduled for
+ * T + #GstRtpJitterBuffer:rtx-retry-timeout. If the missing packet did not
+ * arrive after #GstRtpJitterBuffer:rtx-retry-timeout, a new
+ * GstRTPRetransmissionRequest is sent upstream and the timeout is rescheduled
+ * again for T + #GstRtpJitterBuffer:rtx-retry-timeout. This repeats until
+ * #GstRtpJitterBuffer:rtx-retry-period elapsed, at which point no further
+ * retransmission requests are sent and the regular logic is performed to
+ * schedule a lost packet as discussed above.
+ *
+ * This element acts as a live element and so adds #GstRtpJitterBuffer:latency
+ * to the pipeline.
+ *
+ * This element will automatically be used inside rtpbin.
*
* <refsect2>
* <title>Example pipelines</title>
* |[
- * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! gstrtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
+ * gst-launch-1.0 rtspsrc location=rtsp://192.168.1.133:8554/mpeg1or2AudioVideoTest ! rtpjitterbuffer ! rtpmpvdepay ! mpeg2dec ! xvimagesink
* ]| Connect to a streaming server and decode the MPEG video. The jitterbuffer is
* inserted into the pipeline to smooth out network jitter and to reorder the
* out-of-order RTP packets.
#define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
#define JBUF_WAIT_TIMER(priv) G_STMT_START { \
+ GST_DEBUG ("waiting timer"); \
(priv)->waiting_timer = TRUE; \
g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \
(priv)->waiting_timer = FALSE; \
+ GST_DEBUG ("waiting timer done"); \
} G_STMT_END
-#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
- if (G_UNLIKELY ((priv)->waiting_timer)) \
- g_cond_signal (&(priv)->jbuf_timer); \
+#define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_timer)) { \
+ GST_DEBUG ("signal timer"); \
+ g_cond_signal (&(priv)->jbuf_timer); \
+ } \
} G_STMT_END
#define JBUF_WAIT_EVENT(priv,label) G_STMT_START { \
+ GST_DEBUG ("waiting event"); \
(priv)->waiting_event = TRUE; \
g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
(priv)->waiting_event = FALSE; \
+ GST_DEBUG ("waiting event done"); \
if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK)) \
goto label; \
} G_STMT_END
-#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
- if (G_UNLIKELY ((priv)->waiting_event)) \
- g_cond_signal (&(priv)->jbuf_event); \
+#define JBUF_SIGNAL_EVENT(priv) G_STMT_START { \
+ if (G_UNLIKELY ((priv)->waiting_event)) { \
+ GST_DEBUG ("signal event"); \
+ g_cond_signal (&(priv)->jbuf_event); \
+ } \
} G_STMT_END
struct _GstRtpJitterBufferPrivate
/* some accounting */
guint64 num_late;
guint64 num_duplicates;
+ guint64 num_rtx_requests;
+ guint64 num_rtx_success;
+ guint64 num_rtx_failed;
+ gdouble avg_rtx_num;
+ guint64 avg_rtx_rtt;
};
typedef enum
GstClockTime rtx_base;
GstClockTime rtx_delay;
GstClockTime rtx_retry;
+ GstClockTime rtx_last;
+ guint num_rtx_retry;
} TimerData;
#define GST_RTP_JITTER_BUFFER_GET_PRIVATE(o) \
klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
GST_DEBUG_CATEGORY_INIT
- (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
+ (rtpjitterbuffer_debug, "rtpjitterbuffer", 0, "RTP Jitter Buffer");
}
static void
static RTPJitterBufferItem *
alloc_item (gpointer data, guint type, GstClockTime dts, GstClockTime pts,
- guint seqnum, guint rtptime)
+ guint seqnum, guint count, guint rtptime)
{
RTPJitterBufferItem *item;
item->dts = dts;
item->pts = pts;
item->seqnum = seqnum;
+ item->count = count;
item->rtptime = rtptime;
return item;
/* ERRORS */
wrong_template:
{
- g_warning ("gstrtpjitterbuffer: this is not our template");
+ g_warning ("rtpjitterbuffer: this is not our template");
return NULL;
}
exists:
{
- g_warning ("gstrtpjitterbuffer: pad already requested");
+ g_warning ("rtpjitterbuffer: pad already requested");
return NULL;
}
}
/* unblock to allow streaming in PLAYING */
priv->blocked = FALSE;
JBUF_SIGNAL_EVENT (priv);
+ JBUF_SIGNAL_TIMER (priv);
JBUF_UNLOCK (priv);
break;
default:
JBUF_LOCK (priv);
/* block to stop streaming when PAUSED */
priv->blocked = TRUE;
+ unschedule_current_timer (jitterbuffer);
JBUF_UNLOCK (priv);
if (ret != GST_STATE_CHANGE_FAILURE)
ret = GST_STATE_CHANGE_NO_PREROLL;
case GST_EVENT_FLUSH_START:
ret = gst_pad_push_event (priv->srcpad, event);
gst_rtp_jitter_buffer_flush_start (jitterbuffer);
+ /* wait for the loop to go into PAUSED */
+ gst_pad_pause_task (priv->srcpad);
break;
case GST_EVENT_FLUSH_STOP:
ret = gst_pad_push_event (priv->srcpad, event);
timer->rtx_delay = delay;
timer->rtx_retry = 0;
}
+ timer->num_rtx_retry = 0;
recalculate_timer (jitterbuffer, timer);
JBUF_SIGNAL_TIMER (priv);
GST_DEBUG_OBJECT (jitterbuffer,
"replace timer for seqnum %d->%d to %" GST_TIME_FORMAT,
- oldseq, seqnum, GST_TIME_ARGS (timeout));
+ oldseq, seqnum, GST_TIME_ARGS (timeout + delay));
timer->timeout = timeout + delay;
timer->seqnum = seqnum;
/* we just received a packet with seqnum and dts.
*
* First check for old seqnum that we are still expecting. If the gap with the
- * current timestamp is too big, unschedule the timeouts.
+ * current seqnum is too big, unschedule the timeouts.
*
* If we have a valid packet spacing estimate we can set a timer for when we
* should receive the next packet.
} else if (gap > priv->rtx_delay_reorder) {
/* max gap, we exceeded the max reorder distance and we don't expect the
* missing packet to be this reordered */
- if (test->rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
+ if (test->num_rtx_retry == 0 && test->type == TIMER_TYPE_EXPECTED)
reschedule_timer (jitterbuffer, test, test->seqnum, -1, 0, FALSE);
}
}
add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
expected, delay, priv->packet_spacing);
} else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
+
+ if (timer->num_rtx_retry > 0) {
+ GstClockTime rtx_last;
+
+ /* we scheduled a retry for this packet and now we have it */
+ priv->num_rtx_success++;
+ /* all the previous retry attempts failed */
+ priv->num_rtx_failed += timer->num_rtx_retry - 1;
+ /* number of retries before receiving the packet */
+ if (priv->avg_rtx_num == 0.0)
+ priv->avg_rtx_num = timer->num_rtx_retry;
+ else
+ priv->avg_rtx_num = (timer->num_rtx_retry + 7 * priv->avg_rtx_num) / 8;
+ /* calculate the delay between retransmission request and receiving this
+ * packet, start with when we scheduled this timeout last */
+ rtx_last = timer->rtx_last;
+ if (dts > rtx_last) {
+ GstClockTime delay;
+ /* we have a valid delay if this packet arrived after we scheduled the
+ * request */
+ delay = dts - rtx_last;
+ if (priv->avg_rtx_rtt == 0)
+ priv->avg_rtx_rtt = delay;
+ else
+ priv->avg_rtx_rtt = (delay + 7 * priv->avg_rtx_rtt) / 8;
+ }
+ GST_LOG_OBJECT (jitterbuffer,
+ "RTX success %" G_GUINT64_FORMAT ", failed %" G_GUINT64_FORMAT
+ ", requests %" G_GUINT64_FORMAT ", dups %" G_GUINT64_FORMAT
+ ", avg-num %g, avg-rtt %" G_GUINT64_FORMAT, priv->num_rtx_success,
+ priv->num_rtx_failed, priv->num_rtx_requests, priv->num_duplicates,
+ priv->avg_rtx_num, priv->avg_rtx_rtt);
+ }
/* if we had a timer, remove it, we don't know when to expect the next
* packet. */
remove_timer (jitterbuffer, timer);
+ /* we signal the _loop function because this new packet could be the one
+ * it was waiting for */
JBUF_SIGNAL_EVENT (priv);
}
}
}
static void
-send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
- guint lost_packets, GstClockTime timestamp, GstClockTime duration,
- gboolean late)
-{
- GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
- guint next_seqnum;
-
- /* we had a gap and thus we lost some packets. Create an event for this. */
- if (lost_packets > 1)
- GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
- seqnum + lost_packets - 1);
- else
- GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
-
- priv->num_late += lost_packets;
-
- next_seqnum = seqnum + lost_packets - 1;
-
- if (priv->do_lost) {
- GstEvent *event;
- RTPJitterBufferItem *item;
-
- /* create paket lost event */
- event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
- gst_structure_new ("GstRTPPacketLost",
- "seqnum", G_TYPE_UINT, (guint) seqnum,
- "timestamp", G_TYPE_UINT64, timestamp,
- "duration", G_TYPE_UINT64, duration,
- "late", G_TYPE_BOOLEAN, late, NULL));
-
- item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, next_seqnum, -1);
- rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
- }
- if (seqnum == priv->next_seqnum) {
- GST_DEBUG_OBJECT (jitterbuffer, "lost seqnum %d == %d next_seqnum -> %d",
- seqnum, priv->next_seqnum, next_seqnum);
- priv->next_seqnum = next_seqnum & 0xffff;
- priv->last_popped_seqnum = next_seqnum;
- priv->last_out_time = timestamp;
- }
-}
-
-static void
calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
guint16 seqnum, GstClockTime dts, gint gap)
{
}
}
- item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, rtptime);
+ item = alloc_item (buffer, ITEM_TYPE_BUFFER, dts, pts, seqnum, 1, rtptime);
/* now insert the packet into the queue in sorted order. This function returns
* FALSE if a packet with the same seqnum was already in the queue, meaning we
GstEvent *outevent;
GstClockTime dts, pts;
gint percent = -1;
- gboolean is_buffer;
+ gboolean is_buffer, do_push = TRUE;
/* when we get here we are ready to pop and push the buffer */
item = rtp_jitter_buffer_pop (priv->jbuf, &percent);
priv->last_out_time = GST_BUFFER_PTS (outbuf);
} else {
outevent = item->data;
- if (item->type == ITEM_TYPE_LOST)
+ if (item->type == ITEM_TYPE_LOST) {
priv->discont = TRUE;
+ if (!priv->do_lost)
+ do_push = FALSE;
+ }
}
/* now we are ready to push the buffer. Save the seqnum and release the lock
* so the other end can push stuff in the queue again. */
priv->last_popped_seqnum = seqnum;
- priv->next_seqnum = (seqnum + 1) & 0xffff;
+ priv->next_seqnum = (seqnum + item->count) & 0xffff;
JBUF_UNLOCK (priv);
item->data = NULL;
} else {
GST_DEBUG_OBJECT (jitterbuffer, "Pushing event %d", seqnum);
- gst_pad_push_event (priv->srcpad, outevent);
+ if (do_push)
+ gst_pad_push_event (priv->srcpad, outevent);
+ else
+ gst_event_unref (outevent);
+
result = GST_FLOW_OK;
}
JBUF_LOCK_CHECK (priv, out_flushing);
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstEvent *event;
+ guint delay;
GST_DEBUG_OBJECT (jitterbuffer, "expected %d didn't arrive", timer->seqnum);
+ delay = timer->rtx_delay + timer->rtx_retry;
event = gst_event_new_custom (GST_EVENT_CUSTOM_UPSTREAM,
gst_structure_new ("GstRTPRetransmissionRequest",
"seqnum", G_TYPE_UINT, (guint) timer->seqnum,
"running-time", G_TYPE_UINT64, timer->rtx_base,
- "delay", G_TYPE_UINT,
- GST_TIME_AS_MSECONDS (timer->rtx_delay + timer->rtx_retry),
- "frequency", G_TYPE_UINT, priv->rtx_retry_timeout, "period",
- G_TYPE_UINT, priv->rtx_retry_period, "deadline", G_TYPE_UINT,
- priv->latency_ms, "packet-spacing", G_TYPE_UINT64,
- priv->packet_spacing, NULL));
+ "delay", G_TYPE_UINT, GST_TIME_AS_MSECONDS (delay),
+ "retry", G_TYPE_UINT, timer->num_rtx_retry,
+ "frequency", G_TYPE_UINT, priv->rtx_retry_timeout,
+ "period", G_TYPE_UINT, priv->rtx_retry_period,
+ "deadline", G_TYPE_UINT, priv->latency_ms,
+ "packet-spacing", G_TYPE_UINT64, priv->packet_spacing, NULL));
- JBUF_UNLOCK (priv);
- gst_pad_push_event (priv->sinkpad, event);
- JBUF_LOCK (priv);
+ priv->num_rtx_requests++;
+ timer->num_rtx_retry++;
+ timer->rtx_last = now;
/* calculate the timeout for the next retransmission attempt */
timer->rtx_retry += (priv->rtx_retry_timeout * GST_MSECOND);
+ GST_DEBUG_OBJECT (jitterbuffer, "base %" GST_TIME_FORMAT ", delay %"
+ GST_TIME_FORMAT ", retry %" GST_TIME_FORMAT,
+ GST_TIME_ARGS (timer->rtx_base), GST_TIME_ARGS (timer->rtx_delay),
+ GST_TIME_ARGS (timer->rtx_retry));
+
if (timer->rtx_retry + timer->rtx_delay >
(priv->rtx_retry_period * GST_MSECOND)) {
GST_DEBUG_OBJECT (jitterbuffer, "reschedule as LOST timer");
/* too many retransmission request, we now convert the timer
- * to a lost timer */
+ * to a lost timer, leave the num_rtx_retry as it is for stats */
timer->type = TIMER_TYPE_LOST;
timer->rtx_delay = 0;
timer->rtx_retry = 0;
reschedule_timer (jitterbuffer, timer, timer->seqnum,
timer->rtx_base + timer->rtx_retry, timer->rtx_delay, FALSE);
+ JBUF_UNLOCK (priv);
+ gst_pad_push_event (priv->sinkpad, event);
+ JBUF_LOCK (priv);
+
return FALSE;
}
{
GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
GstClockTime duration, timestamp;
- guint seqnum, num;
+ guint seqnum, lost_packets, num_rtx_retry;
gboolean late;
+ GstEvent *event;
+ RTPJitterBufferItem *item;
seqnum = timer->seqnum;
timestamp = apply_offset (jitterbuffer, timer->timeout);
duration = timer->duration;
if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
duration = priv->packet_spacing;
- num = MAX (timer->num, 1);
+ lost_packets = MAX (timer->num, 1);
late = timer->num > 0;
+ num_rtx_retry = timer->num_rtx_retry;
- /* remove timer now */
- remove_timer (jitterbuffer, timer);
+ /* we had a gap and thus we lost some packets. Create an event for this. */
+ if (lost_packets > 1)
+ GST_DEBUG_OBJECT (jitterbuffer, "Packets #%d -> #%d lost", seqnum,
+ seqnum + lost_packets - 1);
+ else
+ GST_DEBUG_OBJECT (jitterbuffer, "Packet #%d lost", seqnum);
+
+ priv->num_late += lost_packets;
+ priv->num_rtx_failed += num_rtx_retry;
+
+ /* create paket lost event */
+ event = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM,
+ gst_structure_new ("GstRTPPacketLost",
+ "seqnum", G_TYPE_UINT, (guint) seqnum,
+ "timestamp", G_TYPE_UINT64, timestamp,
+ "duration", G_TYPE_UINT64, duration,
+ "late", G_TYPE_BOOLEAN, late,
+ "retry", G_TYPE_UINT, num_rtx_retry, NULL));
- /* this releases the lock */
- send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
+ item = alloc_item (event, ITEM_TYPE_LOST, -1, -1, seqnum, lost_packets, -1);
+ rtp_jitter_buffer_insert (priv->jbuf, item, NULL, NULL);
- /* now we can let the pushing thread try again */
+ /* remove timer now */
+ remove_timer (jitterbuffer, timer);
JBUF_SIGNAL_EVENT (priv);
return TRUE;
timer_timeout = test_timeout;
}
}
- if (timer) {
+ if (timer && !priv->blocked) {
GstClock *clock;
GstClockTime sync_time;
GstClockID id;
priv->clock_id = NULL;
} else {
/* no timers, wait for activity */
- GST_DEBUG_OBJECT (jitterbuffer, "waiting");
JBUF_WAIT_TIMER (priv);
- GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
}
}
JBUF_UNLOCK (priv);
do {
result = handle_next_buffer (jitterbuffer);
if (G_LIKELY (result == GST_FLOW_WAIT)) {
- GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
/* now wait for the next event */
JBUF_WAIT_EVENT (priv, flushing);
- GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
result = GST_FLOW_OK;
}
}