rtpbin: more buffering updates
authorWim Taymans <wim.taymans@collabora.co.uk>
Mon, 5 Oct 2009 17:45:35 +0000 (19:45 +0200)
committerWim Taymans <wim.taymans@collabora.co.uk>
Fri, 12 Feb 2010 16:22:53 +0000 (17:22 +0100)
Add signal to pause the jitterbuffer. This will be emitted from gstrtpbin when
one of the jitterbuffers is buffering.
Make rtpbin collect the buffering messages and post a new buffering message with
the min value.
Remove the stats callback from jitterbuffer but pass a percent integer to
functions that affect the buffering state of the jitterbuffer. This allows us
then to post buffering messages from outside of the jitterbuffer lock.

gst/rtpmanager/gstrtpbin-marshal.list
gst/rtpmanager/gstrtpbin.c
gst/rtpmanager/gstrtpbin.h
gst/rtpmanager/gstrtpjitterbuffer.c
gst/rtpmanager/gstrtpjitterbuffer.h
gst/rtpmanager/rtpjitterbuffer.c
gst/rtpmanager/rtpjitterbuffer.h

index ed73e43..1f6d2b7 100644 (file)
@@ -6,3 +6,4 @@ VOID:UINT,OBJECT
 VOID:UINT
 VOID:UINT,UINT
 VOID:OBJECT,OBJECT
+VOID:BOOL,UINT64
index 0326878..16cf77f 100644 (file)
@@ -297,7 +297,7 @@ struct _GstRtpBinStream
   gulong buffer_handlesync_sig;
   gulong buffer_ptreq_sig;
   gulong buffer_ntpstop_sig;
-  gboolean buffering;
+  gint percent;
 
   /* the PT demuxer of the SSRC */
   GstElement *demux;
@@ -1167,6 +1167,7 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
 
   stream->have_sync = FALSE;
   stream->unix_delta = 0;
+  stream->percent = 100;
   session->streams = g_slist_prepend (session->streams, stream);
 
   /* provide clock_rate to the jitterbuffer when needed */
@@ -1175,6 +1176,9 @@ create_stream (GstRtpBinSession * session, guint32 ssrc)
   stream->buffer_ntpstop_sig = g_signal_connect (buffer, "on-npt-stop",
       (GCallback) on_npt_stop, stream);
 
+  g_object_set_data (G_OBJECT (buffer), "GstRtpBinSession", session);
+  g_object_set_data (G_OBJECT (buffer), "GstRtpBinStream", stream);
+
   /* configure latency and packet lost */
   g_object_set (buffer, "latency", rtpbin->latency, NULL);
   g_object_set (buffer, "do-lost", rtpbin->do_lost, NULL);
@@ -1779,9 +1783,80 @@ gst_rtp_bin_handle_message (GstBin * bin, GstMessage * message)
     case GST_MESSAGE_BUFFERING:
     {
       gint percent;
+      gint min_percent = 100;
+      GSList *sessions, *streams, *elements = NULL;
+      GstRtpBinStream *stream;
+      guint64 base_time = 0;
+      gboolean change = FALSE, active = FALSE;
 
       gst_message_parse_buffering (message, &percent);
 
+      stream =
+          g_object_get_data (G_OBJECT (GST_MESSAGE_SRC (message)),
+          "GstRtpBinStream");
+
+      GST_DEBUG_OBJECT (bin, "got percent %d from stream %p", percent, stream);
+
+      /* get the stream */
+      if (stream) {
+        GST_RTP_BIN_LOCK (rtpbin);
+        /* fill in the percent */
+        stream->percent = percent;
+
+        for (sessions = rtpbin->sessions; sessions;
+            sessions = g_slist_next (sessions)) {
+          GstRtpBinSession *session = (GstRtpBinSession *) sessions->data;
+
+          GST_RTP_SESSION_LOCK (session);
+          for (streams = session->streams; streams;
+              streams = g_slist_next (streams)) {
+            GstRtpBinStream *stream = (GstRtpBinStream *) streams->data;
+            GstElement *element = stream->buffer;
+
+            GST_DEBUG_OBJECT (bin, "stream %p percent %d", stream,
+                stream->percent);
+
+            /* find min percent */
+            if (min_percent > stream->percent)
+              min_percent = stream->percent;
+
+            elements = g_slist_prepend (elements, gst_object_ref (element));
+          }
+          GST_RTP_SESSION_UNLOCK (session);
+        }
+        GST_DEBUG_OBJECT (bin, "min percent %d", min_percent);
+
+        if (rtpbin->buffering) {
+          if (min_percent == 100) {
+            rtpbin->buffering = FALSE;
+            active = TRUE;
+            change = TRUE;
+          }
+        } else {
+          /* pause the streams */
+          rtpbin->buffering = TRUE;
+          active = FALSE;
+          change = TRUE;
+        }
+        GST_RTP_BIN_UNLOCK (rtpbin);
+
+        gst_message_unref (message);
+        message =
+            gst_message_new_buffering (GST_OBJECT_CAST (bin), min_percent);
+
+        if (change) {
+          while (elements) {
+            GstElement *element = elements->data;
+
+            GST_DEBUG_OBJECT (bin, "setting %p to %d", element, active);
+            g_signal_emit_by_name (element, "set-active", active, base_time,
+                NULL);
+            gst_object_unref (element);
+            elements = g_slist_delete_link (elements, elements);
+          }
+        }
+      }
+
       GST_BIN_CLASS (parent_class)->handle_message (bin, message);
       break;
     }
index 44b6b01..a222f7a 100644 (file)
@@ -49,6 +49,7 @@ struct _GstRtpBin {
   gboolean        do_lost;
   gboolean        ignore_pt;
   RTPJitterBufferMode buffer_mode;
+  gboolean        buffering;
   /* a list of session */
   GSList         *sessions;
 
index 2d7be3d..69dbdc8 100644 (file)
@@ -85,6 +85,7 @@ enum
   SIGNAL_CLEAR_PT_MAP,
   SIGNAL_HANDLE_SYNC,
   SIGNAL_ON_NPT_STOP,
+  SIGNAL_SET_ACTIVE,
   LAST_SIGNAL
 };
 
@@ -134,6 +135,7 @@ struct _GstRtpJitterBufferPrivate
   GCond *jbuf_cond;
   gboolean waiting;
   gboolean discont;
+  gboolean active;
 
   /* properties */
   guint latency_ms;
@@ -266,8 +268,8 @@ static gboolean gst_rtp_jitter_buffer_query (GstPad * pad, GstQuery * query);
 static void
 gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer);
 static void
-do_stats_cb (RTPJitterBuffer * jbuf, guint percent,
-    GstRtpJitterBuffer * jitterbuffer);
+gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jitterbuffer,
+    gboolean active, guint64 base_time);
 
 static void
 gst_rtp_jitter_buffer_base_init (gpointer klass)
@@ -403,6 +405,20 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
       G_STRUCT_OFFSET (GstRtpJitterBufferClass, clear_pt_map), NULL, NULL,
       g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0, G_TYPE_NONE);
 
+  /**
+   * GstRtpJitterBuffer::set-active:
+   * @buffer: the object which received the signal
+   *
+   * Start pushing out packets with the given base time. This signal is only
+   * useful in buffering mode.
+   */
+  gst_rtp_jitter_buffer_signals[SIGNAL_SET_ACTIVE] =
+      g_signal_new ("set-active", G_TYPE_FROM_CLASS (klass),
+      G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION,
+      G_STRUCT_OFFSET (GstRtpJitterBufferClass, set_active), NULL, NULL,
+      gst_rtp_bin_marshal_VOID__BOOL_UINT64, G_TYPE_NONE, 2, G_TYPE_BOOLEAN,
+      G_TYPE_UINT64);
+
   gstelement_class->change_state =
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_change_state);
   gstelement_class->request_new_pad =
@@ -411,6 +427,7 @@ gst_rtp_jitter_buffer_class_init (GstRtpJitterBufferClass * klass)
       GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_release_pad);
 
   klass->clear_pt_map = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_clear_pt_map);
+  klass->set_active = GST_DEBUG_FUNCPTR (gst_rtp_jitter_buffer_set_active);
 
   GST_DEBUG_CATEGORY_INIT
       (rtpjitterbuffer_debug, "gstrtpjitterbuffer", 0, "RTP Jitter Buffer");
@@ -432,8 +449,6 @@ gst_rtp_jitter_buffer_init (GstRtpJitterBuffer * jitterbuffer,
 
   priv->jbuf = rtp_jitter_buffer_new ();
   rtp_jitter_buffer_set_delay (priv->jbuf, priv->latency_ns);
-  rtp_jitter_buffer_set_stats_cb (priv->jbuf, (RTPBufferingStats) do_stats_cb,
-      jitterbuffer);
   priv->jbuf_lock = g_mutex_new ();
   priv->jbuf_cond = g_cond_new ();
 
@@ -631,6 +646,22 @@ gst_rtp_jitter_buffer_clear_pt_map (GstRtpJitterBuffer * jitterbuffer)
   JBUF_UNLOCK (priv);
 }
 
+static void
+gst_rtp_jitter_buffer_set_active (GstRtpJitterBuffer * jbuf, gboolean active,
+    guint64 base_time)
+{
+  GstRtpJitterBufferPrivate *priv;
+
+  priv = jbuf->priv;
+
+  JBUF_LOCK (priv);
+  GST_DEBUG_OBJECT (jbuf, "setting active %d at time %" GST_TIME_FORMAT, active,
+      GST_TIME_ARGS (base_time));
+  priv->active = active;
+  JBUF_SIGNAL (priv);
+  JBUF_UNLOCK (priv);
+}
+
 static GstCaps *
 gst_rtp_jitter_buffer_getcaps (GstPad * pad)
 {
@@ -1119,8 +1150,7 @@ parse_failed:
 }
 
 static void
-do_stats_cb (RTPJitterBuffer * jbuf, guint percent,
-    GstRtpJitterBuffer * jitterbuffer)
+post_buffering_percent (GstRtpJitterBuffer * jitterbuffer, gint percent)
 {
   GstMessage *message;
 
@@ -1141,6 +1171,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
   GstClockTime timestamp;
   guint64 latency_ts;
   gboolean tail;
+  gint percent = -1;
   guint8 pt;
 
   jitterbuffer = GST_RTP_JITTER_BUFFER (gst_pad_get_parent (pad));
@@ -1256,7 +1287,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
     if (G_UNLIKELY (rtp_jitter_buffer_get_ts_diff (priv->jbuf) >= latency_ts)) {
       GstBuffer *old_buf;
 
-      old_buf = rtp_jitter_buffer_pop (priv->jbuf);
+      old_buf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
 
       GST_DEBUG_OBJECT (jitterbuffer, "Queue full, dropping old packet #%d",
           gst_rtp_buffer_get_seq (old_buf));
@@ -1273,7 +1304,7 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
    * FALSE if a packet with the same seqnum was already in the queue, meaning we
    * have a duplicate. */
   if (G_UNLIKELY (!rtp_jitter_buffer_insert (priv->jbuf, buffer, timestamp,
-              priv->clock_rate, priv->latency_ns, &tail)))
+              priv->clock_rate, &tail, &percent)))
     goto duplicate;
 
   /* signal addition of new buffer when the _loop is waiting. */
@@ -1295,6 +1326,9 @@ gst_rtp_jitter_buffer_chain (GstPad * pad, GstBuffer * buffer)
 finished:
   JBUF_UNLOCK (priv);
 
+  if (percent != -1)
+    post_buffering_percent (jitterbuffer, percent);
+
   gst_object_unref (jitterbuffer);
 
   return ret;
@@ -1427,6 +1461,7 @@ gst_rtp_jitter_buffer_loop (GstRtpJitterBuffer * jitterbuffer)
   GstClock *clock;
   GstClockID id;
   GstClockTime sync_time;
+  gint percent = -1;
 
   priv = jitterbuffer->priv;
 
@@ -1438,7 +1473,8 @@ again:
     /* always wait if we are blocked */
     if (G_LIKELY (!priv->blocked)) {
       /* we're buffering but not EOS, wait. */
-      if (!priv->eos && rtp_jitter_buffer_is_buffering (priv->jbuf))
+      if (!priv->eos && (!priv->active
+              || rtp_jitter_buffer_is_buffering (priv->jbuf)))
         goto do_wait;
       /* if we have a packet, we can exit the loop and grab it */
       if (rtp_jitter_buffer_num_packets (priv->jbuf) > 0)
@@ -1517,7 +1553,7 @@ again:
     if (G_UNLIKELY (gap < 0)) {
       GST_DEBUG_OBJECT (jitterbuffer, "Old packet #%d, next #%d dropping",
           seqnum, next_seqnum);
-      outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+      outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
       gst_buffer_unref (outbuf);
       goto again;
     }
@@ -1656,7 +1692,7 @@ again:
 push_buffer:
 
   /* when we get here we are ready to pop and push the buffer */
-  outbuf = rtp_jitter_buffer_pop (priv->jbuf);
+  outbuf = rtp_jitter_buffer_pop (priv->jbuf, &percent);
 
   if (G_UNLIKELY (discont || priv->discont)) {
     /* set DISCONT flag when we missed a packet. We pushed the buffer writable
@@ -1718,6 +1754,9 @@ push_buffer:
   priv->next_seqnum = (seqnum + 1) & 0xffff;
   JBUF_UNLOCK (priv);
 
+  if (percent != -1)
+    post_buffering_percent (jitterbuffer, percent);
+
   /* push buffer */
   GST_DEBUG_OBJECT (jitterbuffer,
       "Pushing buffer %d, timestamp %" GST_TIME_FORMAT, seqnum,
index 6d7610e..68604a2 100644 (file)
@@ -77,6 +77,8 @@ struct _GstRtpJitterBufferClass
   /* actions */
   void     (*clear_pt_map)   (GstRtpJitterBuffer *buffer);
 
+  void     (*set_active)     (GstRtpJitterBuffer *buffer, gboolean active, guint64 base_time);
+
   /*< private > */
   gpointer _gst_reserved[GST_PADDING];
 };
index 8397173..7c54da9 100644 (file)
@@ -120,24 +120,6 @@ rtp_jitter_buffer_new (void)
 }
 
 /**
- * rtp_jitter_buffer_set_stats_cb:
- * @jbuf: an #RTPJitterBuffer
- * @stats: the stats callback
- * @user_data: user data passed to the callback
- *
- * Install a callbacl that will be called when the buffering state of @jbuf
- * changed.
- */
-void
-rtp_jitter_buffer_set_stats_cb (RTPJitterBuffer * jbuf,
-    RTPBufferingStats stats_cb, gpointer user_data)
-{
-  jbuf->stats_cb = stats_cb;
-  jbuf->stats_data = user_data;
-}
-
-
-/**
  * rtp_jitter_buffer_get_mode:
  * @jbuf: an #RTPJitterBuffer
  *
@@ -225,7 +207,7 @@ rtp_jitter_buffer_resync (RTPJitterBuffer * jbuf, GstClockTime time,
 }
 
 static void
-update_buffer_level (RTPJitterBuffer * jbuf)
+update_buffer_level (RTPJitterBuffer * jbuf, gint * percent)
 {
   GstBuffer *high_buf, *low_buf;
   gboolean post = FALSE;
@@ -262,19 +244,19 @@ update_buffer_level (RTPJitterBuffer * jbuf)
     }
   }
   if (post) {
-    gint percent;
+    gint perc;
 
     if (jbuf->buffering) {
-      percent = (jbuf->level * 100 / jbuf->delay);
-      percent = MIN (percent, 100);
+      perc = (jbuf->level * 100 / jbuf->delay);
+      perc = MIN (perc, 100);
     } else {
-      percent = 100;
+      perc = 100;
     }
 
-    if (jbuf->stats_cb)
-      jbuf->stats_cb (jbuf, percent, jbuf->stats_data);
+    if (percent)
+      *percent = perc;
 
-    GST_DEBUG ("buffering %d", percent);
+    GST_DEBUG ("buffering %d", perc);
   }
 }
 
@@ -575,8 +557,7 @@ no_skew:
  */
 gboolean
 rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
-    GstClockTime time, guint32 clock_rate, GstClockTime max_delay,
-    gboolean * tail)
+    GstClockTime time, guint32 clock_rate, gboolean * tail, gint * percent)
 {
   GList *list;
   guint32 rtptime;
@@ -642,7 +623,9 @@ rtp_jitter_buffer_insert (RTPJitterBuffer * jbuf, GstBuffer * buf,
 
   /* buffering mode, update buffer stats */
   if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
-    update_buffer_level (jbuf);
+    update_buffer_level (jbuf, percent);
+  else
+    *percent = -1;
 
   /* tail was changed when we did not find a previous packet, we set the return
    * flag when requested. */
@@ -662,6 +645,7 @@ duplicate:
 /**
  * rtp_jitter_buffer_pop:
  * @jbuf: an #RTPJitterBuffer
+ * @percent: the buffering percent
  *
  * Pops the oldest buffer from the packet queue of @jbuf. The popped buffer will
  * have its timestamp adjusted with the incomming running_time and the detected
@@ -670,7 +654,7 @@ duplicate:
  * Returns: a #GstBuffer or %NULL when there was no packet in the queue.
  */
 GstBuffer *
-rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
+rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf, gint * percent)
 {
   GstBuffer *buf;
 
@@ -680,7 +664,9 @@ rtp_jitter_buffer_pop (RTPJitterBuffer * jbuf)
 
   /* buffering mode, update buffer stats */
   if (jbuf->mode == RTP_JITTER_BUFFER_MODE_BUFFER)
-    update_buffer_level (jbuf);
+    update_buffer_level (jbuf, percent);
+  else
+    *percent = -1;
 
   return buf;
 }
index c8f28d3..310cec0 100644 (file)
@@ -58,16 +58,6 @@ typedef enum {
 #define RTP_TYPE_JITTER_BUFFER_MODE (rtp_jitter_buffer_mode_get_type())
 GType rtp_jitter_buffer_mode_get_type (void);
 
-/**
- * RTPBufferingStats:
- * @jbuf: an #RTPJitterBuffer
- * @percent: the buffering percent
- * @user_data: user data specified when registering
- *
- * Called when buffering is going on in @jbuf.
- */
-typedef void (*RTPBufferingStats) (RTPJitterBuffer *jbuf, guint percent, gpointer user_data);
-
 #define RTP_JITTER_BUFFER_MAX_WINDOW 512
 /**
  * RTPJitterBuffer:
@@ -88,8 +78,6 @@ struct _RTPJitterBuffer {
   guint64           level;
   guint64           low_level;
   guint64           high_level;
-  RTPBufferingStats stats_cb;
-  gpointer          stats_data;
 
   /* for calculating skew */
   GstClockTime   base_time;
@@ -117,9 +105,6 @@ GType rtp_jitter_buffer_get_type (void);
 /* managing lifetime */
 RTPJitterBuffer*      rtp_jitter_buffer_new              (void);
 
-void                  rtp_jitter_buffer_set_stats_cb     (RTPJitterBuffer *jbuf, RTPBufferingStats stats_cb,
-                                                          gpointer user_data);
-
 RTPJitterBufferMode   rtp_jitter_buffer_get_mode         (RTPJitterBuffer *jbuf);
 void                  rtp_jitter_buffer_set_mode         (RTPJitterBuffer *jbuf, RTPJitterBufferMode mode);
 
@@ -131,10 +116,9 @@ void                  rtp_jitter_buffer_reset_skew       (RTPJitterBuffer *jbuf)
 gboolean              rtp_jitter_buffer_insert           (RTPJitterBuffer *jbuf, GstBuffer *buf,
                                                           GstClockTime time,
                                                           guint32 clock_rate,
-                                                          GstClockTime max_delay,
-                                                          gboolean *tail);
+                                                          gboolean *tail, gint *percent);
 GstBuffer *           rtp_jitter_buffer_peek             (RTPJitterBuffer *jbuf);
-GstBuffer *           rtp_jitter_buffer_pop              (RTPJitterBuffer *jbuf);
+GstBuffer *           rtp_jitter_buffer_pop              (RTPJitterBuffer *jbuf, gint *percent);
 
 void                  rtp_jitter_buffer_flush            (RTPJitterBuffer *jbuf);