rtpjitterbuffer: Run all timers immediately on EOS
authorOlivier CrĂȘte <olivier.crete@collabora.com>
Fri, 14 Dec 2018 00:17:43 +0000 (19:17 -0500)
committerNicolas Dufresne <nicolas@ndufresne.ca>
Fri, 14 Dec 2018 12:10:16 +0000 (12:10 +0000)
When the EOS event is received, run all timers immediately and avoid
pushing the EOS downstream before this has been run. This ensures that
the lost packet statistics are accurate.

gst/rtpmanager/gstrtpjitterbuffer.c
tests/check/elements/rtpjitterbuffer.c

index f1d580c..2fbbd4b 100644 (file)
@@ -202,14 +202,14 @@ enum
 
 #define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
   GST_DEBUG ("waiting timer");                            \
-  (priv)->waiting_timer = TRUE;                           \
+  (priv)->waiting_timer++;                                \
   g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
-  (priv)->waiting_timer = FALSE;                          \
+  (priv)->waiting_timer--;                                \
   GST_DEBUG ("waiting timer done");                       \
 } G_STMT_END
 #define JBUF_SIGNAL_TIMER(priv) G_STMT_START {            \
   if (G_UNLIKELY ((priv)->waiting_timer)) {               \
-    GST_DEBUG ("signal timer");                           \
+    GST_DEBUG ("signal timer, %d waiters", (priv)->waiting_timer); \
     g_cond_signal (&(priv)->jbuf_timer);                  \
   }                                                       \
 } G_STMT_END
@@ -2302,6 +2302,8 @@ remove_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
   GST_DEBUG_OBJECT (jitterbuffer, "removed index %d", idx);
   g_array_remove_index_fast (priv->timers, idx);
   timer->idx = idx;
+
+  JBUF_SIGNAL_TIMER (priv);
 }
 
 static void
@@ -2311,6 +2313,7 @@ remove_all_timers (GstRtpJitterBuffer * jitterbuffer)
   GST_DEBUG_OBJECT (jitterbuffer, "removed all timers");
   g_array_set_size (priv->timers, 0);
   unschedule_current_timer (jitterbuffer);
+  JBUF_SIGNAL_TIMER (priv);
 }
 
 /* get the extra delay to wait before sending RTX */
@@ -3495,6 +3498,17 @@ pop_and_push_next (GstRtpJitterBuffer * jitterbuffer, guint seqnum)
     priv->next_seqnum = (seqnum + item->count) & 0xffff;
   }
   msg = check_buffering_percent (jitterbuffer, percent);
+
+  if (type == ITEM_TYPE_EVENT && outevent &&
+      GST_EVENT_TYPE (outevent) == GST_EVENT_EOS) {
+    g_assert (priv->eos);
+    while (priv->timers->len > 0) {
+      /* Stopping timers */
+      unschedule_current_timer (jitterbuffer);
+      JBUF_WAIT_TIMER (priv);
+    }
+  }
+
   JBUF_UNLOCK (priv);
 
   item->data = NULL;
@@ -4001,7 +4015,9 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
      * otherwise always be 0
      */
     GST_OBJECT_LOCK (jitterbuffer);
-    if (GST_ELEMENT_CLOCK (jitterbuffer)) {
+    if (priv->eos) {
+      now = GST_CLOCK_TIME_NONE;
+    } else if (GST_ELEMENT_CLOCK (jitterbuffer)) {
       now =
           gst_clock_get_time (GST_ELEMENT_CLOCK (jitterbuffer)) -
           GST_ELEMENT_CAST (jitterbuffer)->base_time;
@@ -4076,7 +4092,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
       GstClockReturn ret;
       GstClockTimeDiff clock_jitter;
 
-      if (timer_timeout == -1 || timer_timeout <= now) {
+      if (timer_timeout == -1 || timer_timeout <= now || priv->eos) {
         /* We have normally removed all lost timers in the loop above */
         g_assert (timer->type != TIMER_TYPE_LOST);
 
index 80da5b5..7b5bcbb 100644 (file)
@@ -300,6 +300,77 @@ GST_START_TEST (test_push_unordered)
 
 GST_END_TEST;
 
+gboolean is_eos;
+
+static gboolean
+eos_event_function (GstPad * pad, GstObject * parent, GstEvent * event)
+{
+  if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
+    g_mutex_lock (&check_mutex);
+    is_eos = TRUE;
+    g_cond_signal (&check_cond);
+    g_mutex_unlock (&check_mutex);
+  }
+
+  return TRUE;
+}
+
+GST_START_TEST (test_push_eos)
+{
+  GstElement *jitterbuffer;
+  const guint num_buffers = 5;
+  GstBuffer *buffer;
+  GList *node;
+  GstStructure *stats;
+  guint64 pushed, lost, late, duplicates;
+  int n = 0;
+
+  is_eos = FALSE;
+
+  jitterbuffer = setup_jitterbuffer (num_buffers);
+  gst_pad_set_event_function (mysinkpad, eos_event_function);
+
+  g_object_set (jitterbuffer, "latency", 1, NULL);
+
+  fail_unless (start_jitterbuffer (jitterbuffer)
+      == GST_STATE_CHANGE_SUCCESS, "could not set to playing");
+
+  /* push buffers: 0,1,2, */
+  for (node = inbuffers; node; node = g_list_next (node)) {
+    n++;
+    /* Skip 1 */
+    if (n == 2) {
+      continue;
+    }
+    buffer = (GstBuffer *) node->data;
+    fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+  }
+
+  gst_pad_push_event (mysrcpad, gst_event_new_eos ());
+
+  g_mutex_lock (&check_mutex);
+  while (!is_eos)
+    g_cond_wait (&check_cond, &check_mutex);
+  g_mutex_unlock (&check_mutex);
+
+  /* Verify statistics */
+  g_object_get (jitterbuffer, "stats", &stats, NULL);
+  gst_structure_get (stats, "num-pushed", G_TYPE_UINT64, &pushed,
+      "num-lost", G_TYPE_UINT64, &lost,
+      "num-late", G_TYPE_UINT64, &late,
+      "num-duplicates", G_TYPE_UINT64, &duplicates, NULL);
+  fail_unless_equals_int (pushed, g_list_length (inbuffers) - 1);
+  fail_unless_equals_int (lost, 1);
+  fail_unless_equals_int (late, 0);
+  fail_unless_equals_int (duplicates, 0);
+  gst_structure_free (stats);
+
+  /* cleanup */
+  cleanup_jitterbuffer (jitterbuffer);
+}
+
+GST_END_TEST;
+
 GST_START_TEST (test_basetime)
 {
   GstElement *jitterbuffer;
@@ -2140,6 +2211,7 @@ rtpjitterbuffer_suite (void)
   tcase_add_test (tc_chain, test_push_forward_seq);
   tcase_add_test (tc_chain, test_push_backward_seq);
   tcase_add_test (tc_chain, test_push_unordered);
+  tcase_add_test (tc_chain, test_push_eos);
   tcase_add_test (tc_chain, test_basetime);
   tcase_add_test (tc_chain, test_clear_pt_map);