rtpjitterbuffer: push the lost event from the timer thread
authorWim Taymans <wim.taymans@collabora.co.uk>
Wed, 18 Sep 2013 12:57:09 +0000 (14:57 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Wed, 18 Sep 2013 12:57:09 +0000 (14:57 +0200)
Instead of pushing the lost event from the chain function, schedule a timeout
that will push the lost event from the timer thread. This avoid blocking the
upstream thread while we push and sync the event.

gst/rtpmanager/gstrtpjitterbuffer.c

index c79ef71..cf33956 100644 (file)
@@ -239,6 +239,7 @@ typedef struct
 {
   guint idx;
   guint16 seqnum;
+  guint num;
   TimerType type;
   GstClockTime timeout;
   GstClockTime duration;
@@ -1486,7 +1487,7 @@ recalculate_timer (GstRtpJitterBuffer * jitterbuffer, TimerData * timer)
 
 static TimerData *
 add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
-    guint16 seqnum, GstClockTime timeout, GstClockTime duration)
+    guint16 seqnum, guint num, GstClockTime timeout, GstClockTime duration)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
   TimerData *timer;
@@ -1502,6 +1503,7 @@ add_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
   timer->idx = len;
   timer->type = type;
   timer->seqnum = seqnum;
+  timer->num = num;
   timer->timeout = timeout;
   timer->duration = duration;
   if (type == TIMER_TYPE_EXPECTED) {
@@ -1562,7 +1564,7 @@ set_timer (GstRtpJitterBuffer * jitterbuffer, TimerType type,
   /* find the seqnum timer */
   timer = find_timer (jitterbuffer, type, seqnum);
   if (timer == NULL) {
-    timer = add_timer (jitterbuffer, type, seqnum, timeout, -1);
+    timer = add_timer (jitterbuffer, type, seqnum, 0, timeout, -1);
   } else {
     reschedule_timer (jitterbuffer, timer, seqnum, timeout);
   }
@@ -1644,7 +1646,7 @@ update_timers (GstRtpJitterBuffer * jitterbuffer, guint16 seqnum,
     if (timer)
       reschedule_timer (jitterbuffer, timer, priv->next_in_seqnum, expected);
     else
-      add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum,
+      add_timer (jitterbuffer, TIMER_TYPE_EXPECTED, priv->next_in_seqnum, 0,
           expected, priv->packet_spacing);
   } else if (timer && timer->type != TIMER_TYPE_DEADLINE) {
     /* if we had a timer, remove it, we don't know when to expect the next
@@ -1675,7 +1677,7 @@ calculate_packet_spacing (GstRtpJitterBuffer * jitterbuffer, guint32 rtptime,
   }
 }
 
-static GstFlowReturn
+static void
 send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
     guint lost_packets, GstClockTime timestamp, GstClockTime duration,
     gboolean late)
@@ -1710,25 +1712,15 @@ send_lost_event (GstRtpJitterBuffer * jitterbuffer, guint seqnum,
             "late", G_TYPE_BOOLEAN, late, NULL));
     JBUF_UNLOCK (priv);
     gst_pad_push_event (priv->srcpad, event);
-    JBUF_LOCK_CHECK (priv, flushing);
-  }
-  return GST_FLOW_OK;
-
-  /* ERRORS */
-flushing:
-  {
-    GST_DEBUG_OBJECT (jitterbuffer, "we are flushing");
-    return GST_FLOW_FLUSHING;
+    JBUF_LOCK (priv);
   }
 }
 
-
-static GstFlowReturn
+static void
 calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
     guint16 seqnum, GstClockTime dts, gint gap)
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
-  GstFlowReturn ret = GST_FLOW_OK;
   GstClockTime total_duration, duration, expected_dts;
   TimerType type;
 
@@ -1770,9 +1762,10 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
         GST_TIME_ARGS (total_duration), GST_TIME_ARGS (priv->latency_ns),
         lost_packets);
 
-    ret =
-        send_lost_event (jitterbuffer, expected, lost_packets,
-        priv->last_in_dts + duration, gap_time, TRUE);
+    /* this timer will fire immediately and the lost event will be pushed from
+     * the timer thread */
+    add_timer (jitterbuffer, TIMER_TYPE_LOST, expected, lost_packets,
+        priv->last_in_dts + duration, gap_time);
 
     expected += lost_packets;
     priv->last_in_dts += gap_time;
@@ -1790,11 +1783,10 @@ calculate_expected (GstRtpJitterBuffer * jitterbuffer, guint32 expected,
   }
 
   while (expected < seqnum) {
-    add_timer (jitterbuffer, type, expected, expected_dts, duration);
+    add_timer (jitterbuffer, type, expected, 0, expected_dts, duration);
     expected_dts += duration;
     expected++;
   }
-  return ret;
 }
 
 static GstFlowReturn
@@ -1918,9 +1910,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstObject * parent,
         } else {
           GST_DEBUG_OBJECT (jitterbuffer, "%d missing packets", gap);
           /* fill in the gap with EXPECTED timers */
-          ret = calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
-          if (ret != GST_FLOW_OK)
-            goto out_flushing;
+          calculate_expected (jitterbuffer, expected, seqnum, dts, gap);
 
           do_next_seqnum = TRUE;
         }
@@ -2350,19 +2340,22 @@ do_lost_timeout (GstRtpJitterBuffer * jitterbuffer, TimerData * timer,
 {
   GstRtpJitterBufferPrivate *priv = jitterbuffer->priv;
   GstClockTime duration, timestamp;
-  guint seqnum;
+  guint seqnum, num;
+  gboolean late;
 
   seqnum = timer->seqnum;
   timestamp = apply_offset (jitterbuffer, timer->timeout);
   duration = timer->duration;
   if (duration == GST_CLOCK_TIME_NONE && priv->packet_spacing > 0)
     duration = priv->packet_spacing;
+  num = MAX (timer->num, 1);
+  late = timer->num > 0;
 
   /* remove timer now */
   remove_timer (jitterbuffer, timer);
   JBUF_SIGNAL_EVENT (priv);
 
-  send_lost_event (jitterbuffer, seqnum, 1, timestamp, duration, FALSE);
+  send_lost_event (jitterbuffer, seqnum, num, timestamp, duration, late);
 
   return TRUE;
 }