Add two new functions for filler events (which are used to synchronize streams if...
authorRonald S. Bultje <rbultje@ronald.bitfreak.net>
Sat, 8 Jan 2005 18:10:50 +0000 (18:10 +0000)
committerRonald S. Bultje <rbultje@ronald.bitfreak.net>
Sat, 8 Jan 2005 18:10:50 +0000 (18:10 +0000)
Original commit message from CVS:
* docs/gst/gstreamer-sections.txt:
* docs/gst/tmpl/gstevent.sgml:
* gst/gstevent.c: (gst_event_new_filler_stamped),
(gst_event_filler_get_duration):
* gst/gstevent.h:
Add two new functions for filler events (which are used to
synchronize streams if one of them is not having any data
for a while) without interrupting the actual data-stream.
Basically a no-op.
* gst/gstqueue.c: (gst_queue_init), (gst_queue_getcaps),
(gst_queue_link_sink), (gst_queue_link_src),
(gst_queue_change_state):
Allow for renegotiation while filled. Required for stream
switching while playing.

ChangeLog
docs/gst/gstreamer-sections.txt
docs/gst/tmpl/gstevent.sgml
gst/gstevent.c
gst/gstevent.h
gst/gstqueue.c
plugins/elements/gstqueue.c

index f2e4573..acd6b43 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,20 @@
+2005-01-08  Ronald S. Bultje  <rbultje@ronald.bitfreak.net>
+
+       * docs/gst/gstreamer-sections.txt:
+       * docs/gst/tmpl/gstevent.sgml:
+       * gst/gstevent.c: (gst_event_new_filler_stamped),
+       (gst_event_filler_get_duration):
+       * gst/gstevent.h:
+         Add two new functions for filler events (which are used to
+         synchronize streams if one of them is not having any data
+         for a while) without interrupting the actual data-stream.
+         Basically a no-op.
+       * gst/gstqueue.c: (gst_queue_init), (gst_queue_getcaps),
+       (gst_queue_link_sink), (gst_queue_link_src),
+       (gst_queue_change_state):
+         Allow for renegotiation while filled. Required for stream
+         switching while playing.
+
 2005-01-08  Benjamin Otte  <otte@gnome.org>
 
        * gst/gstelement.c: (gst_element_link_many):
index c05cf9c..8d39e7c 100644 (file)
@@ -598,6 +598,8 @@ gst_event_new_discontinuous
 gst_event_new_discontinuous_valist
 gst_event_discont_get_value
 gst_event_new_filler
+gst_event_new_filler_stamped
+gst_event_filler_get_duration
 gst_event_new_flush
 <SUBSECTION Standard>
 GST_EVENT
index ccf8369..5a6c1ad 100644 (file)
@@ -442,6 +442,25 @@ Create a new dummy event that should be ignored
 
 
 
+<!-- ##### FUNCTION gst_event_new_filler_stamped ##### -->
+<para>
+
+</para>
+
+@time: 
+@duration: 
+@Returns: 
+
+
+<!-- ##### FUNCTION gst_event_filler_get_duration ##### -->
+<para>
+
+</para>
+
+@event: 
+@Returns: 
+
+
 <!-- ##### MACRO gst_event_new_flush ##### -->
 <para>
 Create a new flush event.
index a9f6a65..a6ff14c 100644 (file)
@@ -25,6 +25,7 @@
 #include "gst_private.h"
 #include "gstdata_private.h"
 
+#include "gstclock.h"
 #include "gstinfo.h"
 #include "gstmemchunk.h"
 #include "gstevent.h"
@@ -353,3 +354,72 @@ gst_event_new_segment_seek (GstSeekType type, gint64 start, gint64 stop)
 
   return event;
 }
+
+/**
+ * gst_event_new_filler_stamped:
+ * @time: timestamp of the filler, in nanoseconds.
+ * @duration: duration of the filler, in nanoseconds.
+ *
+ * Creates "filler" data, which is basically empty data that is used to
+ * synchronize streams if one stream has no data for a while. This is
+ * used to prevent deadlocks.
+ *
+ * Returns: the newly created event.
+ */
+
+GstEvent *
+gst_event_new_filler_stamped (guint64 time, guint64 duration)
+{
+  GstEvent *event = gst_event_new_filler ();
+
+  GST_EVENT_TIMESTAMP (event) = time;
+  if (GST_CLOCK_TIME_IS_VALID (duration)) {
+    GValue value = { 0 };
+
+    event->event_data.structure.structure =
+        gst_structure_new ("application/x-gst-filler", NULL);
+    g_value_init (&value, G_TYPE_UINT64);
+    g_value_set_uint64 (&value, duration);
+    gst_structure_set_value (event->event_data.structure.structure,
+        "duration", &value);
+    g_value_unset (&value);
+  }
+
+  return event;
+}
+
+/**
+ * gst_event_filler_get_duration:
+ * @event: the event to get the duration from.
+ *
+ * Filler events are used to synchronize streams (and thereby prevent
+ * application deadlocks) if one stream receives no data for a while.
+ * This function gets the duration of a filler event, which is the
+ * amount of time from the start of this event (see GST_EVENT_TIMESTAMP())
+ * that no data is available.
+ *
+ * Returns: duration of the lack of data, or GST_CLOCK_TIME_NONE.
+ */
+
+guint64
+gst_event_filler_get_duration (GstEvent * event)
+{
+  const GValue *value;
+
+  g_return_val_if_fail (event != NULL, GST_CLOCK_TIME_NONE);
+  g_return_val_if_fail (GST_EVENT_TYPE (event) == GST_EVENT_FILLER,
+      GST_CLOCK_TIME_NONE);
+
+  /* check the event */
+  if (!event->event_data.structure.structure)
+    return GST_CLOCK_TIME_NONE;
+  value = gst_structure_get_value (event->event_data.structure.structure,
+      "duration");
+  if (!value)
+    return GST_CLOCK_TIME_NONE;
+  g_return_val_if_fail (G_VALUE_TYPE (value) == G_TYPE_UINT64,
+      GST_CLOCK_TIME_NONE);
+
+  /* return */
+  return g_value_get_uint64 (value);
+}
index c8e4b5a..d10b7f1 100644 (file)
@@ -224,6 +224,9 @@ GstEvent*   gst_event_new_discontinuous_valist      (gboolean new_media,
 gboolean       gst_event_discont_get_value     (GstEvent *event, GstFormat format, gint64 *value);
 
 #define                gst_event_new_filler()          gst_event_new(GST_EVENT_FILLER)
+GstEvent*      gst_event_new_filler_stamped    (guint64 time,
+                                                guint64 duration);
+guint64                gst_event_filler_get_duration   (GstEvent *event);
 
 /* flush events */
 #define                gst_event_new_flush()           gst_event_new(GST_EVENT_FLUSH)
index 17ee2cc..11662bd 100644 (file)
@@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS_ANY);
 
 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
+                     GST_DEBUG_PAD_NAME (pad), \
+                     queue->cur_level.buffers, \
+                     queue->min_threshold.buffers, \
+                     queue->max_size.buffers, \
+                     queue->cur_level.bytes, \
+                     queue->min_threshold.bytes, \
+                     queue->max_size.bytes, \
+                     queue->cur_level.time, \
+                     queue->min_threshold.time, \
+                     queue->max_size.time, \
+                     queue->queue->length)
 
 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
     "Generic",
@@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad,
     GstQueryType type, GstFormat * fmt, gint64 * value);
 
 static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
 static void gst_queue_locked_flush (GstQueue * queue);
 
 static GstElementStateReturn gst_queue_change_state (GstElement * element);
@@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue)
       GST_DEBUG_FUNCPTR (gst_queue_chain));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
   gst_pad_set_link_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue_link));
+      GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   gst_pad_set_getcaps_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_active (queue->sinkpad, TRUE);
@@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue)
       "src");
   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
-  gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+  gst_pad_set_link_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_link_src));
   gst_pad_set_getcaps_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_event_function (queue->srcpad,
@@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad)
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
-  if (queue->cur_level.bytes > 0) {
+  if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
     return gst_caps_copy (queue->negotiated_caps);
   }
 
@@ -382,7 +403,50 @@ gst_queue_getcaps (GstPad * pad)
 }
 
 static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+  GstQueue *queue;
+  GstPadLinkReturn link_ret;
+
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+  if (queue->cur_level.bytes > 0) {
+    if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+      return GST_PAD_LINK_OK;
+    } else if (GST_STATE (queue) != GST_STATE_PLAYING) {
+      return GST_PAD_LINK_DELAYED;
+    }
+
+    /* Wait until the queue is empty before attempting the pad
+       negotiation. */
+    GST_QUEUE_MUTEX_LOCK;
+
+    STATUS (queue, "waiting for queue to get empty");
+    while (queue->cur_level.bytes > 0) {
+      g_cond_wait (queue->item_del, queue->qlock);
+      if (queue->interrupt) {
+        GST_QUEUE_MUTEX_UNLOCK;
+        return GST_PAD_LINK_DELAYED;
+      }
+    }
+    STATUS (queue, "queue is now empty");
+
+    GST_QUEUE_MUTEX_UNLOCK;
+  }
+
+  link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+  if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+    /* we store an extra copy of the negotiated caps, just in case
+     * the pads become unnegotiated while we have buffers */
+    gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+  }
+
+  return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
 {
   GstQueue *queue;
   GstPadLinkReturn link_ret;
@@ -465,23 +529,6 @@ gst_queue_handle_pending_events (GstQueue * queue)
   g_mutex_unlock (queue->event_lock);
 }
 
-#define STATUS(queue, msg) \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
-                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
-                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
-                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
-                     GST_DEBUG_PAD_NAME (pad), \
-                     queue->cur_level.buffers, \
-                     queue->min_threshold.buffers, \
-                     queue->max_size.buffers, \
-                     queue->cur_level.bytes, \
-                     queue->min_threshold.bytes, \
-                     queue->max_size.bytes, \
-                     queue->cur_level.time, \
-                     queue->min_threshold.time, \
-                     queue->max_size.time, \
-                     queue->queue->length)
-
 static void
 gst_queue_chain (GstPad * pad, GstData * data)
 {
@@ -961,7 +1008,8 @@ gst_queue_change_state (GstElement * element)
 
   queue = GST_QUEUE (element);
 
-  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
+  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element,
+      "starting state change 0x%x", GST_STATE_TRANSITION (element));
 
   /* lock the queue so another thread (not in sync with this thread's state)
    * can't call this queue's _get (or whatever)
@@ -1009,6 +1057,8 @@ gst_queue_change_state (GstElement * element)
       break;
   }
 
+  GST_QUEUE_MUTEX_UNLOCK;
+
   if (GST_ELEMENT_CLASS (parent_class)->change_state)
     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
 
@@ -1018,6 +1068,10 @@ gst_queue_change_state (GstElement * element)
   gst_pad_set_active (queue->sinkpad, TRUE);
   gst_pad_set_active (queue->srcpad, TRUE);
 
+  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
+
+  return ret;
+
 unlock:
   GST_QUEUE_MUTEX_UNLOCK;
 
index 17ee2cc..11662bd 100644 (file)
@@ -41,6 +41,24 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_STATIC_CAPS_ANY);
 
 GST_DEBUG_CATEGORY_STATIC (queue_dataflow);
+#define GST_CAT_DEFAULT (queue_dataflow)
+
+#define STATUS(queue, msg) \
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
+                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
+                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
+                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
+                     GST_DEBUG_PAD_NAME (pad), \
+                     queue->cur_level.buffers, \
+                     queue->min_threshold.buffers, \
+                     queue->max_size.buffers, \
+                     queue->cur_level.bytes, \
+                     queue->min_threshold.bytes, \
+                     queue->max_size.bytes, \
+                     queue->cur_level.time, \
+                     queue->min_threshold.time, \
+                     queue->max_size.time, \
+                     queue->queue->length)
 
 static GstElementDetails gst_queue_details = GST_ELEMENT_DETAILS ("Queue",
     "Generic",
@@ -120,7 +138,9 @@ static gboolean gst_queue_handle_src_query (GstPad * pad,
     GstQueryType type, GstFormat * fmt, gint64 * value);
 
 static GstCaps *gst_queue_getcaps (GstPad * pad);
-static GstPadLinkReturn gst_queue_link (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps);
+static GstPadLinkReturn gst_queue_link_src (GstPad * pad, const GstCaps * caps);
 static void gst_queue_locked_flush (GstQueue * queue);
 
 static GstElementStateReturn gst_queue_change_state (GstElement * element);
@@ -288,7 +308,7 @@ gst_queue_init (GstQueue * queue)
       GST_DEBUG_FUNCPTR (gst_queue_chain));
   gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
   gst_pad_set_link_function (queue->sinkpad,
-      GST_DEBUG_FUNCPTR (gst_queue_link));
+      GST_DEBUG_FUNCPTR (gst_queue_link_sink));
   gst_pad_set_getcaps_function (queue->sinkpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_active (queue->sinkpad, TRUE);
@@ -298,7 +318,8 @@ gst_queue_init (GstQueue * queue)
       "src");
   gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_get));
   gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
-  gst_pad_set_link_function (queue->srcpad, GST_DEBUG_FUNCPTR (gst_queue_link));
+  gst_pad_set_link_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_link_src));
   gst_pad_set_getcaps_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_getcaps));
   gst_pad_set_event_function (queue->srcpad,
@@ -374,7 +395,7 @@ gst_queue_getcaps (GstPad * pad)
 
   queue = GST_QUEUE (gst_pad_get_parent (pad));
 
-  if (queue->cur_level.bytes > 0) {
+  if (pad == queue->srcpad && queue->cur_level.bytes > 0) {
     return gst_caps_copy (queue->negotiated_caps);
   }
 
@@ -382,7 +403,50 @@ gst_queue_getcaps (GstPad * pad)
 }
 
 static GstPadLinkReturn
-gst_queue_link (GstPad * pad, const GstCaps * caps)
+gst_queue_link_sink (GstPad * pad, const GstCaps * caps)
+{
+  GstQueue *queue;
+  GstPadLinkReturn link_ret;
+
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+  if (queue->cur_level.bytes > 0) {
+    if (gst_caps_is_equal (caps, queue->negotiated_caps)) {
+      return GST_PAD_LINK_OK;
+    } else if (GST_STATE (queue) != GST_STATE_PLAYING) {
+      return GST_PAD_LINK_DELAYED;
+    }
+
+    /* Wait until the queue is empty before attempting the pad
+       negotiation. */
+    GST_QUEUE_MUTEX_LOCK;
+
+    STATUS (queue, "waiting for queue to get empty");
+    while (queue->cur_level.bytes > 0) {
+      g_cond_wait (queue->item_del, queue->qlock);
+      if (queue->interrupt) {
+        GST_QUEUE_MUTEX_UNLOCK;
+        return GST_PAD_LINK_DELAYED;
+      }
+    }
+    STATUS (queue, "queue is now empty");
+
+    GST_QUEUE_MUTEX_UNLOCK;
+  }
+
+  link_ret = gst_pad_proxy_pad_link (pad, caps);
+
+  if (GST_PAD_LINK_SUCCESSFUL (link_ret)) {
+    /* we store an extra copy of the negotiated caps, just in case
+     * the pads become unnegotiated while we have buffers */
+    gst_caps_replace (&queue->negotiated_caps, gst_caps_copy (caps));
+  }
+
+  return link_ret;
+}
+
+static GstPadLinkReturn
+gst_queue_link_src (GstPad * pad, const GstCaps * caps)
 {
   GstQueue *queue;
   GstPadLinkReturn link_ret;
@@ -465,23 +529,6 @@ gst_queue_handle_pending_events (GstQueue * queue)
   g_mutex_unlock (queue->event_lock);
 }
 
-#define STATUS(queue, msg) \
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, \
-                     "(%s:%s) " msg ": %u of %u-%u buffers, %u of %u-%u " \
-                     "bytes, %" G_GUINT64_FORMAT " of %" G_GUINT64_FORMAT \
-                     "-%" G_GUINT64_FORMAT " ns, %u elements", \
-                     GST_DEBUG_PAD_NAME (pad), \
-                     queue->cur_level.buffers, \
-                     queue->min_threshold.buffers, \
-                     queue->max_size.buffers, \
-                     queue->cur_level.bytes, \
-                     queue->min_threshold.bytes, \
-                     queue->max_size.bytes, \
-                     queue->cur_level.time, \
-                     queue->min_threshold.time, \
-                     queue->max_size.time, \
-                     queue->queue->length)
-
 static void
 gst_queue_chain (GstPad * pad, GstData * data)
 {
@@ -961,7 +1008,8 @@ gst_queue_change_state (GstElement * element)
 
   queue = GST_QUEUE (element);
 
-  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
+  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element,
+      "starting state change 0x%x", GST_STATE_TRANSITION (element));
 
   /* lock the queue so another thread (not in sync with this thread's state)
    * can't call this queue's _get (or whatever)
@@ -1009,6 +1057,8 @@ gst_queue_change_state (GstElement * element)
       break;
   }
 
+  GST_QUEUE_MUTEX_UNLOCK;
+
   if (GST_ELEMENT_CLASS (parent_class)->change_state)
     ret = GST_ELEMENT_CLASS (parent_class)->change_state (element);
 
@@ -1018,6 +1068,10 @@ gst_queue_change_state (GstElement * element)
   gst_pad_set_active (queue->sinkpad, TRUE);
   gst_pad_set_active (queue->srcpad, TRUE);
 
+  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
+
+  return ret;
+
 unlock:
   GST_QUEUE_MUTEX_UNLOCK;