jitterbuffer: use separate thread for timeouts
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 16 Sep 2013 13:53:47 +0000 (15:53 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Mon, 16 Sep 2013 13:55:55 +0000 (15:55 +0200)
Use a separate thread for scheduling the timeouts instead of using the
downstream streaming thread that might block at any time.

gst/rtpmanager/gstrtpjitterbuffer.c

index 6326476..8391a13 100644 (file)
@@ -117,17 +117,29 @@ enum
   if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
     goto label;                                       \
 } G_STMT_END
-
 #define JBUF_UNLOCK(priv) (g_mutex_unlock (&(priv)->jbuf_lock))
-#define JBUF_WAIT(priv)   (g_cond_wait (&(priv)->jbuf_cond, &(priv)->jbuf_lock))
 
-#define JBUF_WAIT_CHECK(priv,label) G_STMT_START {    \
-  JBUF_WAIT(priv);                                    \
-  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))    \
-    goto label;                                       \
+#define JBUF_WAIT_TIMER(priv)   G_STMT_START {            \
+  (priv)->waiting_timer = TRUE;                           \
+  g_cond_wait (&(priv)->jbuf_timer, &(priv)->jbuf_lock);  \
+  (priv)->waiting_timer = FALSE;                          \
+} G_STMT_END
+#define JBUF_SIGNAL_TIMER(priv) G_STMT_START {    \
+  if (G_UNLIKELY ((priv)->waiting_timer))         \
+    g_cond_signal (&(priv)->jbuf_timer);          \
 } G_STMT_END
 
-#define JBUF_SIGNAL(priv) (g_cond_signal (&(priv)->jbuf_cond))
+#define JBUF_WAIT_EVENT(priv,label) G_STMT_START {       \
+  (priv)->waiting_event = TRUE;                          \
+  g_cond_wait (&(priv)->jbuf_event, &(priv)->jbuf_lock); \
+  (priv)->waiting_event = FALSE;                         \
+  if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))       \
+    goto label;                                          \
+} G_STMT_END
+#define JBUF_SIGNAL_EVENT(priv) G_STMT_START {    \
+  if (G_UNLIKELY ((priv)->waiting_event))         \
+    g_cond_signal (&(priv)->jbuf_event);          \
+} G_STMT_END
 
 struct _GstRtpJitterBufferPrivate
 {
@@ -136,13 +148,18 @@ struct _GstRtpJitterBufferPrivate
 
   RTPJitterBuffer *jbuf;
   GMutex jbuf_lock;
-  GCond jbuf_cond;
-  gboolean waiting;
+  gboolean waiting_timer;
+  GCond jbuf_timer;
+  gboolean waiting_event;
+  GCond jbuf_event;
   gboolean discont;
   gboolean ts_discont;
   gboolean active;
   guint64 out_offset;
 
+  gboolean timer_running;
+  GThread *timer_thread;
+
   /* properties */
   guint latency_ms;
   guint64 latency_ns;
@@ -321,6 +338,8 @@ static void do_handle_sync (GstRtpJitterBuffer * jitterbuffer);
 static void unschedule_current_timer (GstRtpJitterBuffer * jitterbuffer);
 static void remove_all_timers (GstRtpJitterBuffer * jitterbuffer);
 
+static void wait_next_timeout (GstRtpJitterBuffer * jitterbuffer);
+
 static void
 gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
 {
@@ -596,7 +615,8 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer)
   priv->timers = g_array_new (FALSE, TRUE, sizeof (TimerData));
   priv->jbuf = rtp_jitter_buffer_new ();
   g_mutex_init (&priv->jbuf_lock);
-  g_cond_init (&priv->jbuf_cond);
+  g_cond_init (&priv->jbuf_timer);
+  g_cond_init (&priv->jbuf_event);
 
   /* reset skew detection initialy */
   rtp_jitter_buffer_reset_skew (priv->jbuf);
@@ -641,7 +661,8 @@ gst_rtp_jitter_buffer_finalize (GObject * object)
 
   g_array_free (jitterbuffer->priv->timers, TRUE);
   g_mutex_clear (&jitterbuffer->priv->jbuf_lock);
-  g_cond_clear (&jitterbuffer->priv->jbuf_cond);
+  g_cond_clear (&jitterbuffer->priv->jbuf_timer);
+  g_cond_clear (&jitterbuffer->priv->jbuf_event);
 
   g_object_unref (jitterbuffer->priv->jbuf);
 
@@ -831,7 +852,7 @@ gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
     GST_DEBUG_OBJECT (jbuf, "out offset %" GST_TIME_FORMAT,
         GST_TIME_ARGS (priv->out_offset));
     priv->active = active;
-    JBUF_SIGNAL (priv);
+    JBUF_SIGNAL_EVENT (priv);
   }
   if (!active) {
     rtp_jitter_buffer_set_buffering (priv->jbuf, TRUE);
@@ -980,7 +1001,7 @@ gst_rtp_jitter_buffer_flush_start (GstRtpJitterBuffer * jitterbuffer)
   priv->srcresult = GST_FLOW_FLUSHING;
   GST_DEBUG_OBJECT (jitterbuffer, "Disabling pop on queue");
   /* this unblocks any waiting pops on the src pad task */
-  JBUF_SIGNAL (priv);
+  JBUF_SIGNAL_EVENT (priv);
   /* unlock clock, we just unschedule, the entry will be released by the
    * locking streaming thread. */
   unschedule_current_timer (jitterbuffer);
@@ -1077,13 +1098,16 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
       priv->last_pt = -1;
       /* block until we go to PLAYING */
       priv->blocked = TRUE;
+      priv->timer_running = TRUE;
+      priv->timer_thread =
+          g_thread_new ("timer", (GThreadFunc) wait_next_timeout, jitterbuffer);
       JBUF_UNLOCK (priv);
       break;
     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
       JBUF_LOCK (priv);
       /* unblock to allow streaming in PLAYING */
       priv->blocked = FALSE;
-      JBUF_SIGNAL (priv);
+      JBUF_SIGNAL_EVENT (priv);
       JBUF_UNLOCK (priv);
       break;
     default:
@@ -1108,7 +1132,13 @@ gst_rtp_jitter_buffer_change_state (GstElement * element,
         ret = GST_STATE_CHANGE_NO_PREROLL;
       break;
     case GST_STATE_CHANGE_PAUSED_TO_READY:
+      JBUF_LOCK (priv);
       gst_buffer_replace (&priv->last_sr, NULL);
+      priv->timer_running = FALSE;
+      JBUF_SIGNAL_TIMER (priv);
+      JBUF_UNLOCK (priv);
+      g_thread_join (priv->timer_thread);
+      priv->timer_thread = NULL;
       break;
     case GST_STATE_CHANGE_READY_TO_NULL:
       break;
@@ -1229,7 +1259,7 @@ gst_rtp_jitter_buffer_sink_event (GstPad * pad, GstObject * parent,
       if (ret && !priv->eos) {
         GST_INFO_OBJECT (jitterbuffer, "queuing EOS");
         priv->eos = TRUE;
-        JBUF_SIGNAL (priv);
+        JBUF_SIGNAL_EVENT (priv);
       } else if (priv->eos) {
         GST_DEBUG_OBJECT (jitterbuffer, "dropping EOS, we are already EOS");
       } else {
@@ -1479,6 +1509,7 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
     timer->rtx_retry = 0;
   }
   recalculate_timer (jitterbuffer, timer);
+  JBUF_SIGNAL_TIMER (priv);
 
   return timer;
 }
@@ -1599,6 +1630,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
       continue;
 
     if (gap == 0) {
+      GST_DEBUG ("found timer for current seqnum");
       /* the timer for the current seqnum */
       timer = test;
     } else if (gap > priv->rtx_delay_reorder) {
@@ -1624,6 +1656,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
     /* if we had a timer, remove it, we don't know when to expect the next
      * packet. */
     remove_timer (jitterbuffer, timer);
+    JBUF_SIGNAL_EVENT (priv);
   }
 }
 
@@ -1676,8 +1709,9 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
   expected_dts = priv->last_in_dts + duration;
 
   if (priv->do_retransmission) {
-    expected++;
     type = TIMER_TYPE_EXPECTED;
+    if (find_timer (jitterbuffer, type, expected))
+      expected++;
   } else {
     type = TIMER_TYPE_LOST;
   }
@@ -1894,8 +1928,8 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
     do_handle_sync (jitterbuffer);
 
   /* signal addition of new buffer when the _loop is waiting. */
-  if (priv->waiting && priv->active)
-    JBUF_SIGNAL (priv);
+  if (priv->active && priv->waiting_timer)
+    JBUF_SIGNAL_EVENT (priv);
 
   /* let's unschedule and unblock any waiting buffers. We only want to do this
    * when the tail buffer changed */
@@ -2194,7 +2228,7 @@ wait:
 }
 
 /* the timeout for when we expected a packet expired */
-static GstFlowReturn
+static void
 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
     GstClockTimeDiff clock_jitter)
 {
@@ -2228,12 +2262,10 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
   }
   reschedule_timer (jitterbuffer, timer, timer->seqnum,
       timer->rtx_base + timer->rtx_retry);
-
-  return priv->srcresult;
 }
 
 /* a packet is lost */
-static GstFlowReturn
+static void
 do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
     GstClockTimeDiff clock_jitter)
 {
@@ -2286,6 +2318,7 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
     priv->next_seqnum = (timer->seqnum + lost_packets) & 0xffff;
   /* remove timer now */
   remove_timer (jitterbuffer, timer);
+  JBUF_SIGNAL_EVENT (priv);
 
   if (priv->do_lost) {
     GstEvent *event;
@@ -2299,25 +2332,31 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
             "late", G_TYPE_BOOLEAN, lost_packets_late, NULL));
     JBUF_UNLOCK (priv);
     gst_pad_push_event (priv->srcpad, event);
-    JBUF_LOCK_CHECK (priv, flushing);
-  }
-  return GST_FLOW_OK;
-
-  /* ERRORS */
-flushing:
-  {
-    GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
-    return priv->srcresult;
+    JBUF_LOCK (priv);
   }
 }
 
-static GstFlowReturn
+static void
 do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
 {
+  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
   GST_INFO_OBJECT (jitterbuffer, "got the NPT timeout");
   remove_timer (jitterbuffer, timer);
+  JBUF_SIGNAL_EVENT (priv);
+}
 
-  return GST_FLOW_EOS;
+static void
+do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
+    GstClockTimeDiff clock_jitter)
+{
+  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+  GST_INFO_OBJECT (jitterbuffer, "got deadline timeout");
+
+  priv->next_seqnum = timer->seqnum;
+  remove_timer (jitterbuffer, timer);
+  JBUF_SIGNAL_EVENT (priv);
 }
 
 /* called when we need to wait for the next timeout.
@@ -2327,137 +2366,127 @@ do_eos_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
  *
  * If there are no timers, we wait on a gcond until something new happens.
  */
-static GstFlowReturn
+static void
 wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  GstFlowReturn result = GST_FLOW_OK;
-  gint i, len;
-  TimerData *timer = NULL;
-  GstClockTime timer_timeout = -1;
-  gint timer_idx;
 
-  len = priv->timers->len;
-  for (i = 0; i < len; i++) {
-    TimerData *test = &g_array_index (priv->timers, TimerData, i);
-    GstClockTime test_timeout = get_timeout (jitterbuffer, test);
-
-    GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %" GST_TIME_FORMAT,
-        i, test->seqnum, GST_TIME_ARGS (test_timeout));
-
-    /* find the smallest timeout */
-    if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
-      timer = test;
-      timer_timeout = test_timeout;
-      if (timer_timeout == -1)
-        break;
+  JBUF_LOCK (priv);
+  while (priv->timer_running) {
+    TimerData *timer = NULL;
+    GstClockTime timer_timeout = -1;
+    gint i, len;
+    gint timer_idx;
+
+    len = priv->timers->len;
+    for (i = 0; i < len; i++) {
+      TimerData *test = &g_array_index (priv->timers, TimerData, i);
+      GstClockTime test_timeout = get_timeout (jitterbuffer, test);
+
+      GST_DEBUG_OBJECT (jitterbuffer, "%d, %d, %d, %" GST_TIME_FORMAT,
+          i, test->type, test->seqnum, GST_TIME_ARGS (test_timeout));
+
+      /* find the smallest timeout */
+      if (timer == NULL || test_timeout == -1 || test_timeout < timer_timeout) {
+        timer = test;
+        timer_timeout = test_timeout;
+        if (timer_timeout == -1)
+          break;
+      }
     }
-  }
-  if (timer) {
-    GstClock *clock;
-    GstClockTime sync_time;
-    GstClockID id;
-    GstClockReturn ret;
-    GstClockTimeDiff clock_jitter;
-
-    /* no timestamp, timeout immeditately */
-    if (timer_timeout == -1)
-      goto do_timeout;
+    if (timer) {
+      GstClock *clock;
+      GstClockTime sync_time;
+      GstClockID id;
+      GstClockReturn ret;
+      GstClockTimeDiff clock_jitter;
+
+      /* no timestamp, timeout immeditately */
+      if (timer_timeout == -1)
+        goto do_timeout;
+
+      GST_OBJECT_LOCK (jitterbuffer);
+      clock = GST_ELEMENT_CLOCK (jitterbuffer);
+      if (!clock) {
+        GST_OBJECT_UNLOCK (jitterbuffer);
+        /* let's just push if there is no clock */
+        GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
+        goto do_timeout;
+      }
 
-    GST_OBJECT_LOCK (jitterbuffer);
-    clock = GST_ELEMENT_CLOCK (jitterbuffer);
-    if (!clock) {
+      /* prepare for sync against clock */
+      sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
+      /* add latency of peer to get input time */
+      sync_time += priv->peer_latency;
+
+      GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
+          " with sync time %" GST_TIME_FORMAT,
+          GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
+
+      /* create an entry for the clock */
+      id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
+      priv->unscheduled = FALSE;
+      priv->timer_timeout = timer_timeout;
+      priv->timer_seqnum = timer->seqnum;
+      timer_idx = timer->idx;
       GST_OBJECT_UNLOCK (jitterbuffer);
-      /* let's just push if there is no clock */
-      GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
-      goto do_timeout;
-    }
-
-    /* prepare for sync against clock */
-    sync_time = timer_timeout + GST_ELEMENT_CAST (jitterbuffer)->base_time;
-    /* add latency of peer to get input time */
-    sync_time += priv->peer_latency;
 
-    GST_DEBUG_OBJECT (jitterbuffer, "sync to timestamp %" GST_TIME_FORMAT
-        " with sync time %" GST_TIME_FORMAT,
-        GST_TIME_ARGS (timer_timeout), GST_TIME_ARGS (sync_time));
-
-    /* create an entry for the clock */
-    id = priv->clock_id = gst_clock_new_single_shot_id (clock, sync_time);
-    priv->unscheduled = FALSE;
-    priv->timer_timeout = timer_timeout;
-    priv->timer_seqnum = timer->seqnum;
-    timer_idx = timer->idx;
-    GST_OBJECT_UNLOCK (jitterbuffer);
-
-    /* release the lock so that the other end can push stuff or unlock */
-    JBUF_UNLOCK (priv);
-
-    ret = gst_clock_id_wait (id, &clock_jitter);
+      /* release the lock so that the other end can push stuff or unlock */
+      JBUF_UNLOCK (priv);
 
-    JBUF_LOCK (priv);
-    GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
-        ret, priv->timer_seqnum, clock_jitter);
-    /* and free the entry */
-    gst_clock_id_unref (id);
-    priv->clock_id = NULL;
-
-    /* at this point, the clock could have been unlocked by a timeout, a new
-     * tail element was added to the queue or because we are shutting down. Check
-     * for shutdown first. */
-    if (G_UNLIKELY (priv->srcresult != GST_FLOW_OK))
-      goto flushing;
-
-    if (priv->timers->len <= timer_idx)
-      goto done;
-
-    /* we released the lock, the array might have changed */
-    timer = &g_array_index (priv->timers, TimerData, timer_idx);
-    /* if changed to timeout immediately, do so */
-    if (timer->timeout == -1)
-      goto do_timeout;
-
-    /* if we got unscheduled and we are not flushing, it's because a new tail
-     * element became available in the queue or we flushed the queue.
-     * Grab it and try to push or sync. */
-    if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
-      GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
-      goto done;
-    }
+      ret = gst_clock_id_wait (id, &clock_jitter);
 
-  do_timeout:
-    switch (timer->type) {
-      case TIMER_TYPE_EXPECTED:
-        result = do_expected_timeout (jitterbuffer, timer, clock_jitter);
-        break;
-      case TIMER_TYPE_LOST:
-        result = do_lost_timeout (jitterbuffer, timer, clock_jitter);
-        break;
-      case TIMER_TYPE_DEADLINE:
-        priv->next_seqnum = timer->seqnum;
-        remove_timer (jitterbuffer, timer);
-        break;
-      case TIMER_TYPE_EOS:
-        result = do_eos_timeout (jitterbuffer, timer);
+      JBUF_LOCK (priv);
+      if (!priv->timer_running)
         break;
-    }
-  } else {
-    /* no timers, wait for activity */
-    GST_DEBUG_OBJECT (jitterbuffer, "waiting");
-    priv->waiting = TRUE;
-    JBUF_WAIT_CHECK (priv, flushing);
-    priv->waiting = FALSE;
-    GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
-  }
 
-done:
-  return result;
+      GST_DEBUG_OBJECT (jitterbuffer, "sync done, %d, #%d, %" G_GINT64_FORMAT,
+          ret, priv->timer_seqnum, clock_jitter);
+      /* and free the entry */
+      gst_clock_id_unref (id);
+      priv->clock_id = NULL;
+
+      if (priv->timers->len <= timer_idx)
+        continue;
+
+      /* we released the lock, the array might have changed */
+      timer = &g_array_index (priv->timers, TimerData, timer_idx);
+      /* if changed to timeout immediately, do so */
+      if (timer->timeout == -1)
+        goto do_timeout;
+
+      /* if we got unscheduled and we are not flushing, it's because a new tail
+       * element became available in the queue or we flushed the queue.
+       * Grab it and try to push or sync. */
+      if (ret == GST_CLOCK_UNSCHEDULED || priv->unscheduled) {
+        GST_DEBUG_OBJECT (jitterbuffer, "Wait got unscheduled");
+        continue;
+      }
 
-flushing:
-  {
-    GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
-    return priv->srcresult;
+    do_timeout:
+      switch (timer->type) {
+        case TIMER_TYPE_EXPECTED:
+          do_expected_timeout (jitterbuffer, timer, clock_jitter);
+          break;
+        case TIMER_TYPE_LOST:
+          do_lost_timeout (jitterbuffer, timer, clock_jitter);
+          break;
+        case TIMER_TYPE_DEADLINE:
+          do_deadline_timeout (jitterbuffer, timer, clock_jitter);
+          break;
+        case TIMER_TYPE_EOS:
+          do_eos_timeout (jitterbuffer, timer);
+          break;
+      }
+    } else {
+      /* no timers, wait for activity */
+      GST_DEBUG_OBJECT (jitterbuffer, "waiting");
+      JBUF_WAIT_TIMER (priv);
+      GST_DEBUG_OBJECT (jitterbuffer, "waiting done");
+    }
   }
+  GST_DEBUG_OBJECT (jitterbuffer, "we are stopping");
+  return;
 }
 
 /*
@@ -2477,9 +2506,13 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   JBUF_LOCK_CHECK (priv, flushing);
   do {
     result = handle_next_buffer (jitterbuffer);
-    if (G_LIKELY (result == GST_FLOW_WAIT))
+    if (G_LIKELY (result == GST_FLOW_WAIT)) {
+      GST_DEBUG_OBJECT (jitterbuffer, "waiting for event");
       /* now wait for the next event */
-      result = wait_next_timeout (jitterbuffer);
+      JBUF_WAIT_EVENT (priv, flushing);
+      GST_DEBUG_OBJECT (jitterbuffer, "waiting for event done");
+      result = GST_FLOW_OK;
+    }
   }
   while (result == GST_FLOW_OK);
   JBUF_UNLOCK (priv);