rtpjitterbuffer: Keep JBUF lock while processing timers
authorNicolas Dufresne <nicolas.dufresne@collabora.com>
Fri, 27 Mar 2020 19:48:32 +0000 (15:48 -0400)
committerNicolas Dufresne <nicolas.dufresne@collabora.com>
Mon, 8 Jun 2020 21:54:53 +0000 (17:54 -0400)
Until now, do_expected_timeout() was shortly dropping the JBUF_LOCK in order
to push RTX event event without causing deadlock. As a side effect, some
CPU hung would happen as the timerqueue would get filled while looping over
the due timers. To mitigate this, we were processing the lost timer first and
placing into a queue the remainign to be processed later.

In the gap caused by an unlock, we could endup receiving one of the seqnum
present in the pending timers. In that case, the timer would not be found and
a new one was created. When we then update the expected timer, the seqnum
would already exist and the updated timer would be lost.

In this patch we remove the unlock from do_expected_timeout() and place all
pending RTX event into a queue (instead of pending timer). Then, as soon as
we have selected a timer to wait (or if there is no timer to wait for) we send
all the upstream RTX events. As we no longer unlock, we no longer need to pop
more then one timer from the queue, and we do so with the lock held, which
blocks any new colliding timers from being created.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-good/-/merge_requests/616>

gst/rtpmanager/gstrtpjitterbuffer.c

index 03f2c7f..1d698ba 100644 (file)
@@ -3774,7 +3774,7 @@ update_rtx_stats (GstRtpJitterBuffer * jitterbuffer, const RtpTimer * timer,
 /* the timeout for when we expected a packet expired */
 static gboolean
 do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
-    GstClockTime now)
+    GstClockTime now, GQueue * events)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
   GstEvent *event;
@@ -3812,6 +3812,7 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
           "deadline", G_TYPE_UINT, rtx_deadline_ms,
           "packet-spacing", G_TYPE_UINT64, priv->packet_spacing,
           "avg-rtt", G_TYPE_UINT, avg_rtx_rtt_ms, NULL));
+  g_queue_push_tail (events, event);
   GST_DEBUG_OBJECT (jitterbuffer, "Request RTX: %" GST_PTR_FORMAT, event);
 
   priv->num_rtx_requests++;
@@ -3849,10 +3850,6 @@ do_expected_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
   rtp_timer_queue_update_timer (priv->timers, timer, timer->seqnum,
       timer->rtx_base + timer->rtx_retry, timer->rtx_delay, offset, FALSE);
 
-  JBUF_UNLOCK (priv);
-  gst_pad_push_event (priv->sinkpad, event);
-  JBUF_LOCK (priv);
-
   return FALSE;
 }
 
@@ -3922,13 +3919,13 @@ do_deadline_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
 
 static gboolean
 do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
-    GstClockTime now)
+    GstClockTime now, GQueue * events)
 {
   gboolean removed = FALSE;
 
   switch (timer->type) {
     case RTP_TIMER_EXPECTED:
-      removed = do_expected_timeout (jitterbuffer, timer, now);
+      removed = do_expected_timeout (jitterbuffer, timer, now, events);
       break;
     case RTP_TIMER_LOST:
       removed = do_lost_timeout (jitterbuffer, timer, now);
@@ -3943,6 +3940,35 @@ do_timeout (GstRtpJitterBuffer * jitterbuffer, RtpTimer * timer,
   return removed;
 }
 
+static void
+push_rtx_events_unlocked (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
+{
+  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+  GstEvent *event;
+
+  while ((event = (GstEvent *) g_queue_pop_head (events)))
+    gst_pad_push_event (priv->sinkpad, event);
+}
+
+/* called with JBUF lock
+ *
+ * Pushes all events in @events queue.
+ *
+ * Returns: %TRUE if the timer thread is not longer running
+ */
+static void
+push_rtx_events (GstRtpJitterBuffer * jitterbuffer, GQueue * events)
+{
+  GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
+
+  if (events->length == 0)
+    return;
+
+  JBUF_UNLOCK (priv);
+  push_rtx_events_unlocked (jitterbuffer, events);
+  JBUF_LOCK (priv);
+}
+
 /* called when we need to wait for the next timeout.
  *
  * We loop over the array of recorded timeouts and wait for the earliest one.
@@ -3959,7 +3985,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
   JBUF_LOCK (priv);
   while (priv->timer_running) {
     RtpTimer *timer = NULL;
-    GQueue timers = G_QUEUE_INIT;
+    GQueue events = G_QUEUE_INIT;
 
     /* don't produce data in paused */
     while (priv->blocked) {
@@ -3992,25 +4018,8 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
       rtp_timer_queue_remove_until (priv->rtx_stats_timers, now);
 
     /* Iterate expired "normal" timers */
-    while ((timer = rtp_timer_queue_pop_until (priv->timers, now))) {
-      do {
-        if (timer->type == RTP_TIMER_LOST) {
-          GST_DEBUG_OBJECT (jitterbuffer, "Weeding out expired lost timers");
-          do_lost_timeout (jitterbuffer, timer, now);
-        } else {
-          g_queue_push_tail_link (&timers, (GList *) timer);
-        }
-      } while ((timer = rtp_timer_queue_pop_until (priv->timers, now)));
-
-      /* execute the remaining timers */
-      while ((timer = (RtpTimer *) g_queue_pop_head_link (&timers)))
-        do_timeout (jitterbuffer, timer, now);
-
-      /* do_expected_timeout(), called by do_timeout will drop the
-       * JBUF_LOCK, so we need to check if we are still running */
-      if (!priv->timer_running)
-        goto stopping;
-    }
+    while ((timer = rtp_timer_queue_pop_until (priv->timers, now)))
+      do_timeout (jitterbuffer, timer, now, &events);
 
     timer = rtp_timer_queue_peek_earliest (priv->timers);
     if (timer) {
@@ -4031,6 +4040,7 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
         /* let's just push if there is no clock */
         GST_DEBUG_OBJECT (jitterbuffer, "No clock, timeout right away");
         now = timer->timeout;
+        push_rtx_events (jitterbuffer, &events);
         continue;
       }
 
@@ -4052,11 +4062,14 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
       /* release the lock so that the other end can push stuff or unlock */
       JBUF_UNLOCK (priv);
 
+      push_rtx_events_unlocked (jitterbuffer, &events);
+
       ret = gst_clock_id_wait (id, &clock_jitter);
 
       JBUF_LOCK (priv);
 
       if (!priv->timer_running) {
+        g_queue_clear_full (&events, (GDestroyNotify) gst_event_unref);
         gst_clock_id_unref (id);
         priv->clock_id = NULL;
         break;
@@ -4075,6 +4088,8 @@ wait_next_timeout (GstRtpJitterBuffer * jitterbuffer)
       gst_clock_id_unref (id);
       priv->clock_id = NULL;
     } else {
+      push_rtx_events_unlocked (jitterbuffer, &events);
+
       /* when draining the timers, the pusher thread will reuse our
        * condition to wait for completion. Signal that thread before
        * sleeping again here */