From d8575222375b28d6111c506214dec66fb97a4617 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Olivier=20Cr=C3=AAte?= Date: Thu, 13 Dec 2018 19:17:43 -0500 Subject: [PATCH] rtpjitterbuffer: Run all timers immediately on EOS When the EOS event is received, run all timers immediately and avoid pushing the EOS downstream before this has been run. This ensures that the lost packet statistics are accurate. --- gst/rtpmanager/gstrtpjitterbuffer.c | 26 +++++++++--- tests/check/elements/rtpjitterbuffer.c | 72 ++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+), 5 deletions(-) diff --git a/gst/rtpmanager/gstrtpjitterbuffer.c b/gst/rtpmanager/gstrtpjitterbuffer.c index f1d580c..2fbbd4b 100644 --- a/gst/rtpmanager/gstrtpjitterbuffer.c +++ b/gst/rtpmanager/gstrtpjitterbuffer.c @@ -202,14 +202,14 @@ enum #define JBUF_WAIT_TIMER(priv) G_STMT_START { \ GST_DEBUG ("waiting timer"); \ - (priv)->waiting_timer = TRUE; \ + (priv)->waiting_timer++; \ g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock); \ - (priv)->waiting_timer = FALSE; \ + (priv)->waiting_timer--; \ GST_DEBUG ("waiting timer done"); \ } G_STMT_END #define JBUF_SIGNAL_TIMER(priv) G_STMT_START { \ if (G_UNLIKELY ((priv)->waiting_timer)) { \ - GST_DEBUG ("signal timer"); \ + GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \ g_cond_signal (&(priv)->jbuf_timer); \ } \ } G_STMT_END @@ -2302,6 +2302,8 @@ remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer) GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx); g_array_remove_index_fast (priv->timers, idx); timer->idx = idx; + + JBUF_SIGNAL_TIMER (priv); } static void @@ -2311,6 +2313,7 @@ remove_all_timers (GstRtpJitterBuffer * jitterbuffer) GST_DEBUG_OBJECT (jitterbuffer, "removed all timers"); g_array_set_size (priv->timers, 0); unschedule_current_timer (jitterbuffer); + JBUF_SIGNAL_TIMER (priv); } /* get the extra delay to wait before sending RTX */ @@ -3495,6 +3498,17 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum) priv->next_seqnum = (seqnum + item->count) & 0xffff; } msg = check_buffering_percent (jitterbuffer, percent); + + if (type == ITEM_TYPE_EVENT && outevent && + GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) { + g_assert (priv->eos); + while (priv->timers->len > 0) { + /* Stopping timers */ + unschedule_current_timer (jitterbuffer); + JBUF_WAIT_TIMER (priv); + } + } + JBUF_UNLOCK (priv); item->data = NULL; @@ -4001,7 +4015,9 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) * otherwise always be 0 */ GST_OBJECT_LOCK (jitterbuffer); - if (GST_ELEMENT_CLOCK (jitterbuffer)) { + if (priv->eos) { + now = GST_CLOCK_TIME_NONE; + } else if (GST_ELEMENT_CLOCK (jitterbuffer)) { now = gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) - GST_ELEMENT_CAST (jitterbuffer)->base_time; @@ -4076,7 +4092,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer) GstClockReturn ret; GstClockTimeDiff clock_jitter; - if (timer_timeout == -1 || timer_timeout <= now) { + if (timer_timeout == -1 || timer_timeout <= now || priv->eos) { /* We have normally removed all lost timers in the loop above */ g_assert (timer->type != TIMER_TYPE_LOST); diff --git a/tests/check/elements/rtpjitterbuffer.c b/tests/check/elements/rtpjitterbuffer.c index 80da5b5..7b5bcbb 100644 --- a/tests/check/elements/rtpjitterbuffer.c +++ b/tests/check/elements/rtpjitterbuffer.c @@ -300,6 +300,77 @@ GST_START_TEST (test_push_unordered) GST_END_TEST; +gboolean is_eos; + +static gboolean +eos_event_function (GstPad * pad, GstObject * parent, GstEvent * event) +{ + if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) { + g_mutex_lock (&check_mutex); + is_eos = TRUE; + g_cond_signal (&check_cond); + g_mutex_unlock (&check_mutex); + } + + return TRUE; +} + +GST_START_TEST (test_push_eos) +{ + GstElement *jitterbuffer; + const guint num_buffers = 5; + GstBuffer *buffer; + GList *node; + GstStructure *stats; + guint64 pushed, lost, late, duplicates; + int n = 0; + + is_eos = FALSE; + + jitterbuffer = setup_jitterbuffer (num_buffers); + gst_pad_set_event_function (mysinkpad, eos_event_function); + + g_object_set (jitterbuffer, "latency", 1, NULL); + + fail_unless (start_jitterbuffer (jitterbuffer) + == GST_STATE_CHANGE_SUCCESS, "could not set to playing"); + + /* push buffers: 0,1,2, */ + for (node = inbuffers; node; node = g_list_next (node)) { + n++; + /* Skip 1 */ + if (n == 2) { + continue; + } + buffer = (GstBuffer *) node->data; + fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK); + } + + gst_pad_push_event (mysrcpad, gst_event_new_eos ()); + + g_mutex_lock (&check_mutex); + while (!is_eos) + g_cond_wait (&check_cond, &check_mutex); + g_mutex_unlock (&check_mutex); + + /* Verify statistics */ + g_object_get (jitterbuffer, "stats", &stats, NULL); + gst_structure_get (stats, "num-pushed", G_TYPE_UINT64, &pushed, + "num-lost", G_TYPE_UINT64, &lost, + "num-late", G_TYPE_UINT64, &late, + "num-duplicates", G_TYPE_UINT64, &duplicates, NULL); + fail_unless_equals_int (pushed, g_list_length (inbuffers) - 1); + fail_unless_equals_int (lost, 1); + fail_unless_equals_int (late, 0); + fail_unless_equals_int (duplicates, 0); + gst_structure_free (stats); + + /* cleanup */ + cleanup_jitterbuffer (jitterbuffer); +} + +GST_END_TEST; + GST_START_TEST (test_basetime) { GstElement *jitterbuffer; @@ -2140,6 +2211,7 @@ rtpjitterbuffer_suite (void) tcase_add_test (tc_chain, test_push_forward_seq); tcase_add_test (tc_chain, test_push_backward_seq); tcase_add_test (tc_chain, test_push_unordered); + tcase_add_test (tc_chain, test_push_eos); tcase_add_test (tc_chain, test_basetime); tcase_add_test (tc_chain, test_clear_pt_map); -- 2.7.4