gst/gstqueue.c (gst_queue_init, gst_queue_link_sink)
authorMartin Soto <martinsoto@users.sourceforge.net>
Mon, 22 Nov 2004 23:50:37 +0000 (23:50 +0000)
committerMartin Soto <martinsoto@users.sourceforge.net>
Mon, 22 Nov 2004 23:50:37 +0000 (23:50 +0000)
Original commit message from CVS:
2004-11-23  Martin Soto  <martinsoto@users.sourceforge.net>

* gst/gstqueue.c (gst_queue_init, gst_queue_link_sink)
(gst_queue_link_src): Allow for renegotiating the caps of the sink
pad. The queue will now wait until it is empty and forward the new
caps to the source.
* gst/gstbin.c (gst_bin_set_element_sched)
(gst_bin_unset_element_sched): Make sure that all elements and
links are registered and unregistered with the scheduler exactly
once. This elaborates on a fix by Benjamin Otte, but
guarantees that decoupled elements are also registered.

ChangeLog
gst/gstbin.c
gst/gstqueue.c
plugins/elements/gstqueue.c

index 6b5f763..ddbd696 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,15 @@
+2004-11-23  Martin Soto  <martinsoto@users.sourceforge.net>
+
+       * gst/gstqueue.c (gst_queue_init, gst_queue_link_sink)
+       (gst_queue_link_src): Allow for renegotiating the caps of the sink
+       pad. The queue will now wait until it is empty and forward the new
+       caps to the source.
+       * gst/gstbin.c (gst_bin_set_element_sched)
+       (gst_bin_unset_element_sched): Make sure that all elements and
+       links are registered and unregistered with the scheduler exactly
+       once. This elaborates on a fix by Benjamin Otte, but
+       guarantees that decoupled elements are also registered.
+
 2004-11-11  Thomas Vander Stichele  <thomas at apestaart dot org>
 
        * docs/manual/quotes.xml:
index 30d476f..75f49e8 100644 (file)
@@ -326,42 +326,42 @@ gst_bin_set_element_sched (GstElement * element, GstScheduler * sched)
     /* set the children's schedule */
     g_list_foreach (GST_BIN (element)->children,
         (GFunc) gst_bin_set_element_sched, sched);
-  }
-  /* otherwise, if it's just a regular old element */
-  else if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+  } else {
+    /* otherwise, if it's just a regular old element */
     GList *pads;
 
     gst_scheduler_add_element (sched, element);
 
-    /* set the sched pointer in all the pads */
-    pads = element->pads;
-    while (pads) {
-      GstPad *pad;
-
-      pad = GST_PAD (pads->data);
-      pads = g_list_next (pads);
-
-      /* we only operate on real pads */
-      if (!GST_IS_REAL_PAD (pad))
-        continue;
-
-      /* if the peer element exists and is a candidate */
-      if (GST_PAD_PEER (pad)) {
-        if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
-          GST_CAT_LOG (GST_CAT_SCHEDULING,
-              "peer is in same scheduler, telling scheduler");
-
-          if (GST_PAD_IS_SRC (pad))
-            gst_scheduler_pad_link (sched, pad, GST_PAD_PEER (pad));
-          else
-            gst_scheduler_pad_link (sched, GST_PAD_PEER (pad), pad);
+    if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+      /* set the sched pointer in all the pads */
+      pads = element->pads;
+      while (pads) {
+        GstPad *pad;
+
+        pad = GST_PAD (pads->data);
+        pads = g_list_next (pads);
+
+        /* we only operate on real pads */
+        if (!GST_IS_REAL_PAD (pad))
+          continue;
+
+        /* if the peer element exists and is a candidate */
+        if (GST_PAD_PEER (pad)) {
+          if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
+            GST_CAT_LOG (GST_CAT_SCHEDULING,
+                "peer is in same scheduler, telling scheduler");
+
+            if (GST_PAD_IS_SRC (pad))
+              gst_scheduler_pad_link (sched, pad, GST_PAD_PEER (pad));
+            else
+              gst_scheduler_pad_link (sched, GST_PAD_PEER (pad), pad);
+          }
         }
       }
     }
   }
 }
 
-
 static void
 gst_bin_unset_element_sched (GstElement * element, GstScheduler * sched)
 {
@@ -391,35 +391,38 @@ gst_bin_unset_element_sched (GstElement * element, GstScheduler * sched)
         (GFunc) gst_bin_unset_element_sched, sched);
 
     gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
-  } else if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+  } else {
     /* otherwise, if it's just a regular old element */
     GList *pads;
 
-    /* set the sched pointer in all the pads */
-    pads = element->pads;
-    while (pads) {
-      GstPad *pad;
-
-      pad = GST_PAD (pads->data);
-      pads = g_list_next (pads);
-
-      /* we only operate on real pads */
-      if (!GST_IS_REAL_PAD (pad))
-        continue;
-
-      /* if the peer element exists and is a candidate */
-      if (GST_PAD_PEER (pad)) {
-        if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
-          GST_CAT_LOG (GST_CAT_SCHEDULING,
-              "peer is in same scheduler, telling scheduler");
-
-          if (GST_PAD_IS_SRC (pad))
-            gst_scheduler_pad_unlink (sched, pad, GST_PAD_PEER (pad));
-          else
-            gst_scheduler_pad_unlink (sched, GST_PAD_PEER (pad), pad);
+    if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+      /* unset the sched pointer in all the pads */
+      pads = element->pads;
+      while (pads) {
+        GstPad *pad;
+
+        pad = GST_PAD (pads->data);
+        pads = g_list_next (pads);
+
+        /* we only operate on real pads */
+        if (!GST_IS_REAL_PAD (pad))
+          continue;
+
+        /* if the peer element exists and is a candidate */
+        if (GST_PAD_PEER (pad)) {
+          if (gst_pad_get_scheduler (GST_PAD_PEER (pad)) == sched) {
+            GST_CAT_LOG (GST_CAT_SCHEDULING,
+                "peer is in same scheduler, telling scheduler");
+
+            if (GST_PAD_IS_SRC (pad))
+              gst_scheduler_pad_unlink (sched, pad, GST_PAD_PEER (pad));
+            else
+              gst_scheduler_pad_unlink (sched, GST_PAD_PEER (pad), pad);
+          }
         }
       }
     }
+
     gst_scheduler_remove_element (GST_ELEMENT_SCHED (element), element);
   }
 }
index 6e54087..9e4b8d8 100644 (file)
@@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS_ANY);
 
 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
+                     GST_DEBUG_PAD_NAME (pad), \
+                     queue->cur_level.buffers, \
+                     queue->min_threshold.buffers, \
+                     queue->max_size.buffers, \
+                     queue->cur_level.bytes, \
+                     queue->min_threshold.bytes, \
+                     queue->max_size.bytes, \
+                     queue->cur_level.time, \
+                     queue->min_threshold.time, \
+                     queue->max_size.time, \
+                     queue->queue->length)
 
 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
     "Generic",
@@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad,
     GstQueryType type, GstFormat * fmt, gint64 * value);
 
 static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
 static void gst_queue_locked_flush (GstQueue * queue);
 
 static GstElementStateReturn gst_queue_change_state (GstElement * element);
@@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue)
       GST_DEBUG_FUNCPTR (gst_queue_chain));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
   gst_pad_set_link_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue_link));
+      GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   gst_pad_set_getcaps_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_active (queue->sinkpad, TRUE);
@@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue)
       "src");
   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
-  gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+  gst_pad_set_link_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_link_src));
   gst_pad_set_getcaps_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_event_function (queue->srcpad,
@@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad)
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
-  if (queue->cur_level.bytes > 0) {
+  if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
     return gst_caps_copy (queue->negotiated_caps);
   }
 
@@ -382,7 +403,45 @@ gst_queue_getcaps (GstPad * pad)
 }
 
 static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+  GstQueue *queue;
+  GstPadLinkReturn link_ret;
+
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+  GST_QUEUE_MUTEX_LOCK;
+
+  if (queue->cur_level.bytes > 0) {
+    if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+      GST_QUEUE_MUTEX_UNLOCK;
+      return GST_PAD_LINK_OK;
+    }
+
+    /* Wait until the queue is empty before attempting the pad
+       negotiation. */
+    STATUS (queue, "waiting for queue to get empty");
+    while (queue->cur_level.bytes > 0) {
+      g_cond_wait (queue->item_del, queue->qlock);
+    }
+    STATUS (queue, "queue is now empty");
+  }
+
+  link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+  if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+    /* we store an extra copy of the negotiated caps, just in case
+     * the pads become unnegotiated while we have buffers */
+    gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+  }
+
+  GST_QUEUE_MUTEX_UNLOCK;
+
+  return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
 {
   GstQueue *queue;
   GstPadLinkReturn link_ret;
@@ -465,23 +524,6 @@ gst_queue_handle_pending_events (GstQueue * queue)
   g_mutex_unlock (queue->event_lock);
 }
 
-#define STATUS(queue, msg) \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
-                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
-                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
-                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
-                     GST_DEBUG_PAD_NAME (pad), \
-                     queue->cur_level.buffers, \
-                     queue->min_threshold.buffers, \
-                     queue->max_size.buffers, \
-                     queue->cur_level.bytes, \
-                     queue->min_threshold.bytes, \
-                     queue->max_size.bytes, \
-                     queue->cur_level.time, \
-                     queue->min_threshold.time, \
-                     queue->max_size.time, \
-                     queue->queue->length)
-
 static void
 gst_queue_chain (GstPad * pad, GstData * data)
 {
index 6e54087..9e4b8d8 100644 (file)
@@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS_ANY);
 
 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
+                     GST_DEBUG_PAD_NAME (pad), \
+                     queue->cur_level.buffers, \
+                     queue->min_threshold.buffers, \
+                     queue->max_size.buffers, \
+                     queue->cur_level.bytes, \
+                     queue->min_threshold.bytes, \
+                     queue->max_size.bytes, \
+                     queue->cur_level.time, \
+                     queue->min_threshold.time, \
+                     queue->max_size.time, \
+                     queue->queue->length)
 
 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
     "Generic",
@@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad,
     GstQueryType type, GstFormat * fmt, gint64 * value);
 
 static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
 static void gst_queue_locked_flush (GstQueue * queue);
 
 static GstElementStateReturn gst_queue_change_state (GstElement * element);
@@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue)
       GST_DEBUG_FUNCPTR (gst_queue_chain));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
   gst_pad_set_link_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue_link));
+      GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   gst_pad_set_getcaps_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_active (queue->sinkpad, TRUE);
@@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue)
       "src");
   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
-  gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+  gst_pad_set_link_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_link_src));
   gst_pad_set_getcaps_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_event_function (queue->srcpad,
@@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad)
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
-  if (queue->cur_level.bytes > 0) {
+  if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
     return gst_caps_copy (queue->negotiated_caps);
   }
 
@@ -382,7 +403,45 @@ gst_queue_getcaps (GstPad * pad)
 }
 
 static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+  GstQueue *queue;
+  GstPadLinkReturn link_ret;
+
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+  GST_QUEUE_MUTEX_LOCK;
+
+  if (queue->cur_level.bytes > 0) {
+    if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+      GST_QUEUE_MUTEX_UNLOCK;
+      return GST_PAD_LINK_OK;
+    }
+
+    /* Wait until the queue is empty before attempting the pad
+       negotiation. */
+    STATUS (queue, "waiting for queue to get empty");
+    while (queue->cur_level.bytes > 0) {
+      g_cond_wait (queue->item_del, queue->qlock);
+    }
+    STATUS (queue, "queue is now empty");
+  }
+
+  link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+  if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+    /* we store an extra copy of the negotiated caps, just in case
+     * the pads become unnegotiated while we have buffers */
+    gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+  }
+
+  GST_QUEUE_MUTEX_UNLOCK;
+
+  return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
 {
   GstQueue *queue;
   GstPadLinkReturn link_ret;
@@ -465,23 +524,6 @@ gst_queue_handle_pending_events (GstQueue * queue)
   g_mutex_unlock (queue->event_lock);
 }
 
-#define STATUS(queue, msg) \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
-                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
-                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
-                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
-                     GST_DEBUG_PAD_NAME (pad), \
-                     queue->cur_level.buffers, \
-                     queue->min_threshold.buffers, \
-                     queue->max_size.buffers, \
-                     queue->cur_level.bytes, \
-                     queue->min_threshold.bytes, \
-                     queue->max_size.bytes, \
-                     queue->cur_level.time, \
-                     queue->min_threshold.time, \
-                     queue->max_size.time, \
-                     queue->queue->length)
-
 static void
 gst_queue_chain (GstPad * pad, GstData * data)
 {