From ac3bb3acf622e87c2e9a44003b3f9108015ef2cb Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Wed, 18 Sep 2013 11:59:28 +0200 Subject: [PATCH] rtpjitterbuffer: handle large gaps with one lost event When we have a large number of missing packets, generate one lost event for all the packets that have no chance of being pushed out in time. Fix and activate unit test for large gaps. --- gst/rtpmanager/gstrtpjitterbuffer.c | 53 ++++++++++++++++---- tests/check/elements/rtpjitterbuffer.c | 88 +++++++++++++++++++++------------- 2 files changed, 99 insertions(+), 42 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index 799fdba..493a8f9 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -1721,35 +1721,64 @@ flushing: } -static void +static GstFlowReturn calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, guint16 seqnum, GstClockTime dts, gint gap) { GstRtpJitterBufferPrivate *priv = jitterbuffer->priv; - GstClockTime duration, expected_dts; + GstFlowReturn ret = GST_FLOW_OK; + GstClockTime total_duration, duration, expected_dts; TimerType type; GST_DEBUG_OBJECT (jitterbuffer, "dts %" GST_TIME_FORMAT ", last %" GST_TIME_FORMAT, GST_TIME_ARGS (dts), GST_TIME_ARGS (priv->last_in_dts)); - /* interpolate between the current time and the last time based on - * number of packets we are missing, this is the estimated duration - * for the missing packet based on equidistant packet spacing. Also make - * sure we never go negative. */ + /* the total duration spanned by the missing packets */ if (dts >= priv->last_in_dts) - duration = (dts - priv->last_in_dts) / (gap + 1); + total_duration = dts - priv->last_in_dts; else - /* packet already lost, timer will timeout quickly */ - duration = 0; + total_duration = 0; + + /* interpolate between the current time and the last time based on + * number of packets we are missing, this is the estimated duration + * for the missing packet based on equidistant packet spacing. */ + duration = total_duration / (gap + 1); GST_DEBUG_OBJECT (jitterbuffer, "duration %" GST_TIME_FORMAT, GST_TIME_ARGS (duration)); + if (total_duration > priv->latency_ns) { + GstClockTime gap_time; + guint lost_packets; + + gap_time = total_duration - priv->latency_ns; + + if (duration > 0) + lost_packets = gap_time / duration; + else + lost_packets = gap; + + /* too many lost packets, some of the missing packets are already + * too late and we can generate lost packet events for them. */ + GST_DEBUG_OBJECT (jitterbuffer, "too many lost packets %" GST_TIME_FORMAT + " > %" GST_TIME_FORMAT ", consider %u lost", + GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns), + lost_packets); + + ret = + send_lost_event (jitterbuffer, expected, lost_packets, + priv->last_in_dts + duration, gap_time, TRUE); + + expected += lost_packets; + priv->last_in_dts += gap_time; + } + expected_dts = priv->last_in_dts + duration; if (priv->do_retransmission) { type = TIMER_TYPE_EXPECTED; + /* if we had a timer for the first missing packet, leave it. */ if (find_timer (jitterbuffer, type, expected)) expected++; } else { @@ -1761,6 +1790,7 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected, expected_dts += duration; expected++; } + return ret; } static GstFlowReturn @@ -1884,7 +1914,10 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent, } else { GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap); /* fill in the gap with EXPECTED timers */ - calculate_expected (jitterbuffer, expected, seqnum, dts, gap); + ret = calculate_expected (jitterbuffer, expected, seqnum, dts, gap); + if (ret != GST_FLOW_OK) + goto out_flushing; + do_next_seqnum = TRUE; } } diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c index dfc9a2b..181c563 100644 --- a/tests/check/elements/rtpjitterbuffer.c +++ b/tests/check/elements/rtpjitterbuffer.c @@ -25,6 +25,8 @@ #include #include +#include + /* For ease of programming we use globals to keep refs for our floating * src and sink pads we create; otherwise we always have to do get_pad, * get_peer, and then remove references in every test function */ @@ -323,7 +325,6 @@ GST_START_TEST (test_basetime) GST_END_TEST; -#if 0 static const guint payload_size = 160; static const guint clock_rate = 8000; static const guint pcmu_payload_type = 0; @@ -357,25 +358,30 @@ generate_test_buffer (GstClockTime gst_ts, GstBuffer *buf; guint8 *payload; guint i; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; buf = gst_rtp_buffer_new_allocate (payload_size, 0, 0); GST_BUFFER_TIMESTAMP (buf) = gst_ts; - GST_BUFFER_CAPS (buf) = generate_caps (); - gst_rtp_buffer_set_payload_type (buf, pcmu_payload_type); - gst_rtp_buffer_set_marker (buf, marker_bit); - gst_rtp_buffer_set_seq (buf, seq_num); - gst_rtp_buffer_set_timestamp (buf, rtp_ts); - gst_rtp_buffer_set_ssrc (buf, test_ssrc); - - payload = gst_rtp_buffer_get_payload (buf); + //GST_BUFFER_CAPS (buf) = generate_caps (); + + gst_rtp_buffer_map (buf, GST_MAP_READWRITE, &rtp); + gst_rtp_buffer_set_payload_type (&rtp, pcmu_payload_type); + gst_rtp_buffer_set_marker (&rtp, marker_bit); + gst_rtp_buffer_set_seq (&rtp, seq_num); + gst_rtp_buffer_set_timestamp (&rtp, rtp_ts); + gst_rtp_buffer_set_ssrc (&rtp, test_ssrc); + + payload = gst_rtp_buffer_get_payload (&rtp); for (i = 0; i < payload_size; i++) payload[i] = 0xff; + gst_rtp_buffer_unmap (&rtp); + return buf; } static GstFlowReturn -test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer) +test_sink_pad_chain_cb (GstPad * pad, GstObject * parent, GstBuffer * buffer) { TestData *data = gst_pad_get_element_private (pad); g_async_queue_push (data->buf_queue, buffer); @@ -383,10 +389,13 @@ test_sink_pad_chain_cb (GstPad * pad, GstBuffer * buffer) } static gboolean -test_sink_pad_event_cb (GstPad * pad, GstEvent * event) +test_sink_pad_event_cb (GstPad * pad, GstObject * parent, GstEvent * event) { TestData *data = gst_pad_get_element_private (pad); const GstStructure *structure = gst_event_get_structure (event); + + GST_DEBUG ("got event %" GST_PTR_FORMAT, event); + if (strcmp (gst_structure_get_name (structure), "GstRTPPacketLost") == 0) data->lost_event_count++; @@ -398,6 +407,8 @@ static void setup_testharness (TestData * data) { GstPad *jb_sink_pad, *jb_src_pad; + GstSegment seg; + GstMiniObject *obj; // create the testclock data->clock = gst_test_clock_new (); @@ -405,7 +416,7 @@ setup_testharness (TestData * data) gst_test_clock_set_time (GST_TEST_CLOCK (data->clock), 0); // rig up the jitter buffer - data->jitter_buffer = gst_element_factory_make ("gstrtpjitterbuffer", NULL); + data->jitter_buffer = gst_element_factory_make ("rtpjitterbuffer", NULL); g_assert (data->jitter_buffer); gst_element_set_clock (data->jitter_buffer, data->clock); g_object_set (data->jitter_buffer, "do-lost", TRUE, NULL); @@ -414,8 +425,7 @@ setup_testharness (TestData * data) // link in the test source-pad data->test_src_pad = gst_pad_new ("src", GST_PAD_SRC); - gst_pad_set_caps (data->test_src_pad, generate_caps ()); - jb_sink_pad = gst_element_get_pad (data->jitter_buffer, "sink"); + jb_sink_pad = gst_element_get_static_pad (data->jitter_buffer, "sink"); g_assert_cmpint (gst_pad_link (data->test_src_pad, jb_sink_pad), ==, GST_PAD_LINK_OK); g_assert (gst_pad_set_active (data->test_src_pad, TRUE)); @@ -426,7 +436,7 @@ setup_testharness (TestData * data) gst_pad_set_caps (data->test_sink_pad, generate_caps ()); gst_pad_set_chain_function (data->test_sink_pad, test_sink_pad_chain_cb); gst_pad_set_event_function (data->test_sink_pad, test_sink_pad_event_cb); - jb_src_pad = gst_element_get_pad (data->jitter_buffer, "src"); + jb_src_pad = gst_element_get_static_pad (data->jitter_buffer, "src"); g_assert_cmpint (gst_pad_link (jb_src_pad, data->test_sink_pad), ==, GST_PAD_LINK_OK); g_assert (gst_pad_set_active (data->test_sink_pad, TRUE)); @@ -440,6 +450,16 @@ setup_testharness (TestData * data) data->lost_event_count = 0; gst_pad_set_element_private (data->test_sink_pad, data); + + gst_segment_init (&seg, GST_FORMAT_TIME); + + gst_pad_push_event (data->test_src_pad, + gst_event_new_stream_start ("stream0")); + gst_pad_set_caps (data->test_src_pad, generate_caps ()); + gst_pad_push_event (data->test_src_pad, gst_event_new_segment (&seg)); + + while ((obj = g_async_queue_try_pop (data->event_queue))) + gst_mini_object_unref (obj); } static void @@ -501,12 +521,13 @@ verify_lost_event (GstEvent * event, guint32 expected_seqnum, GST_START_TEST (test_only_one_lost_event_on_large_gaps) { TestData data; - GstTestClockPendingID id; + GstClockID id, test_id; guint64 timeout; GstBuffer *in_buf, *out_buf; GstEvent *out_event; gint jb_latency_ms = 200; guint buffer_size_ms = (payload_size * 1000) / clock_rate; + GstRTPBuffer rtp = GST_RTP_BUFFER_INIT; setup_testharness (&data); timeout = 20 * G_USEC_PER_SEC; @@ -519,14 +540,13 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps) g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK); // wait for the first buffer to be synced to timestamp + latency - g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK - (data.clock), &id)); + gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id); // increase the time to timestamp + latency and release the wait gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), jb_latency_ms * GST_MSECOND); g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)) - == id.clock_id); + == id); // check for the buffer coming out that was pushed in out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout); @@ -550,25 +570,27 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps) g_assert_cmpint (gst_pad_push (data.test_src_pad, in_buf), ==, GST_FLOW_OK); // release the wait - g_assert (gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK - (data.clock), &id)); - g_assert (gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)) - == id.clock_id); + GST_DEBUG ("wait for id"); + gst_test_clock_wait_for_next_pending_id (GST_TEST_CLOCK (data.clock), &id); + GST_DEBUG ("got wait id %p", id); + gst_test_clock_advance_time (GST_TEST_CLOCK (data.clock), GST_MSECOND * 20); + test_id = gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)); + GST_DEBUG ("process id %p", test_id); + g_assert (id == test_id); // we should now receive a packet-lost-event for buffers 1 through 489 out_event = g_async_queue_timeout_pop (data.event_queue, timeout); g_assert (out_event != NULL); g_assert_cmpint (data.lost_event_count, ==, 1); - verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 489, + verify_lost_event (out_event, 1, 1 * GST_MSECOND * 20, GST_MSECOND * 20 * 490, TRUE); // churn through sync_times until the new buffer gets pushed out while (g_async_queue_length (data.buf_queue) < 1) { if (gst_test_clock_peek_next_pending_id (GST_TEST_CLOCK (data.clock), &id)) { - if (id.time > gst_clock_get_time (data.clock)) { - gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), id.time); - g_print ("setting time to %" GST_TIME_FORMAT "\n", - GST_TIME_ARGS (id.time)); + GstClockTime t = gst_clock_id_get_time (id); + if (t > gst_clock_get_time (data.clock)) { + gst_test_clock_set_time (GST_TEST_CLOCK (data.clock), t); } gst_test_clock_process_next_clock_id (GST_TEST_CLOCK (data.clock)); } @@ -577,19 +599,21 @@ GST_START_TEST (test_only_one_lost_event_on_large_gaps) out_buf = g_async_queue_timeout_pop (data.buf_queue, timeout); g_assert (out_buf != NULL); g_assert (GST_BUFFER_FLAG_IS_SET (out_buf, GST_BUFFER_FLAG_DISCONT)); - g_assert_cmpint (gst_rtp_buffer_get_seq (out_buf), ==, 500); + gst_rtp_buffer_map (out_buf, GST_MAP_READ, &rtp); + g_assert_cmpint (gst_rtp_buffer_get_seq (&rtp), ==, 500); + gst_rtp_buffer_unmap (&rtp); g_assert_cmpint (GST_BUFFER_TIMESTAMP (out_buf), ==, (10 * GST_SECOND)); // we get as many lost events as the the number of buffers the jitterbuffer // is able to wait for (+ the one we already got) - g_assert_cmpint (data.lost_event_count, ==, - jb_latency_ms / buffer_size_ms + 1); + g_assert_cmpint (data.lost_event_count, ==, jb_latency_ms / buffer_size_ms); destroy_testharness (&data); } GST_END_TEST; +#if 0 GST_START_TEST (test_two_lost_one_arrives_in_time) { TestData data; @@ -840,8 +864,8 @@ rtpjitterbuffer_suite (void) tcase_add_test (tc_chain, test_push_backward_seq); tcase_add_test (tc_chain, test_push_unordered); tcase_add_test (tc_chain, test_basetime); -#if 0 tcase_add_test (tc_chain, test_only_one_lost_event_on_large_gaps); +#if 0 tcase_add_test (tc_chain, test_two_lost_one_arrives_in_time); tcase_add_test (tc_chain, test_late_packets_still_makes_lost_events); tcase_add_test (tc_chain, test_all_packets_are_timestamped_zero); -- 2.7.4