examples/: Update a couple of the examples to work again.
authorJan Schmidt <thaytan@mad.scientist.com>
Wed, 6 Jul 2005 16:22:47 +0000 (16:22 +0000)
committerJan Schmidt <thaytan@mad.scientist.com>
Wed, 6 Jul 2005 16:22:47 +0000 (16:22 +0000)
Original commit message from CVS:
* examples/Makefile.am:
* examples/helloworld/helloworld.c: (event_loop), (main):
* examples/queue/queue.c: (event_loop), (main):
* examples/queue2/queue2.c: (main):
Update a couple of the examples to work again.

* gst/base/gstbasesink.c: (gst_base_sink_preroll_queue_empty),
(gst_base_sink_preroll_queue_flush), (gst_base_sink_handle_event):
Spelling corrections and extra debug.

* gst/gstbin.c: (gst_bin_class_init), (gst_bin_init), (is_eos),
(gst_bin_add_func), (bin_element_is_sink), (gst_bin_get_state),
(gst_bin_change_state), (gst_bin_dispose), (bin_bus_handler):
* gst/gstbin.h:
* gst/gstpipeline.c: (gst_pipeline_init), (gst_pipeline_dispose),
(gst_pipeline_change_state):
* gst/gstpipeline.h:
Move the bus handler for children to the GstBin, and create a
separate bus for receiving messages from children to the one the
bus sends 'upwards' on.

14 files changed:
examples/Makefile.am
examples/helloworld/helloworld.c
examples/queue/queue.c
examples/queue2/queue2.c
gst/base/gstbasesink.c
gst/gstbin.c
gst/gstbin.h
gst/gstpipeline.c
gst/gstpipeline.h
libs/gst/base/gstbasesink.c
tests/old/examples/Makefile.am
tests/old/examples/helloworld/helloworld.c
tests/old/examples/queue/queue.c
tests/old/examples/queue2/queue2.c

index 3646ccf2aae6f2c0bd20740581b13e966e4bce54..1d1d24723a09052c518ec819bcc3ea0903a29a64 100644 (file)
@@ -7,9 +7,6 @@ endif
 dirs = \
        helloworld                      \
        queue                           \
-       queue2                          \
-       queue3                          \
-       queue4                          \
        launch                          \
        thread                          \
        plugins                         \
@@ -20,6 +17,10 @@ dirs = \
        pwg                             \
        retag
 
+#queue2                                \
+#queue3                                \
+#queue4                                
+
 SUBDIRS = $(dirs)                      \
        $(GST_LOADSAVE_DIRS)
 
index 9cf72e842cf0026da1a438abda87279803ecc650..d641f54b3b89bc2d48e506ae1306f8c29ab2b5d9 100644 (file)
@@ -1,10 +1,48 @@
 #include <stdlib.h>
 #include <gst/gst.h>
 
+static void
+event_loop (GstElement * pipe)
+{
+  GstBus *bus;
+  GstMessageType revent;
+  GstMessage *message = NULL;
+
+  bus = gst_element_get_bus (GST_ELEMENT (pipe));
+
+  while (TRUE) {
+    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
+
+    message = gst_bus_pop (bus);
+    g_assert (message != NULL);
+
+    switch (revent) {
+      case GST_MESSAGE_EOS:
+        gst_message_unref (message);
+        return;
+      case GST_MESSAGE_WARNING:
+      case GST_MESSAGE_ERROR:{
+        GError *gerror;
+        gchar *debug;
+
+        gst_message_parse_error (message, &gerror, &debug);
+        gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
+        gst_message_unref (message);
+        g_error_free (gerror);
+        g_free (debug);
+        return;
+      }
+      default:
+        gst_message_unref (message);
+        break;
+    }
+  }
+}
+
 int
 main (int argc, char *argv[])
 {
-  GstElement *bin, *filesrc, *decoder, *osssink;
+  GstElement *bin, *filesrc, *decoder, *audiosink;
 
   gst_init (&argc, &argv);
 
@@ -29,19 +67,20 @@ main (int argc, char *argv[])
     return -1;
   }
   /* and an audio sink */
-  osssink = gst_element_factory_make ("osssink", "play_audio");
-  g_assert (osssink);
+  audiosink = gst_element_factory_make ("alsasink", "play_audio");
+  g_assert (audiosink);
 
   /* add objects to the main pipeline */
-  gst_bin_add_many (GST_BIN (bin), filesrc, decoder, osssink, NULL);
+  gst_bin_add_many (GST_BIN (bin), filesrc, decoder, audiosink, NULL);
 
   /* link the elements */
-  gst_element_link_many (filesrc, decoder, osssink, NULL);
+  gst_element_link_many (filesrc, decoder, audiosink, NULL);
 
   /* start playing */
   gst_element_set_state (bin, GST_STATE_PLAYING);
 
-  while (gst_bin_iterate (GST_BIN (bin)));
+  /* Run event loop listening for bus messages until EOS or ERROR */
+  event_loop (bin);
 
   /* stop the bin */
   gst_element_set_state (bin, GST_STATE_NULL);
index 67b8a2321fa71503f7c3d3e28d7e8345c87d2b6e..6dad8701c3cdc7e0d4a640db2277a701445e7426 100644 (file)
@@ -1,12 +1,57 @@
 #include <stdlib.h>
 #include <gst/gst.h>
 
+/* This example uses the queue element to create a buffer between 2 elements.
+ * The scheduler automatically uses 2 threads, 1 to feed and another to consume
+ * data from the queue buffer
+ */
+
+/* Event loop to listen to events posted on the GstBus from the pipeline. Exits
+ * on EOS or ERROR events
+ */
+static void
+event_loop (GstElement * pipe)
+{
+  GstBus *bus;
+  GstMessageType revent;
+  GstMessage *message = NULL;
+
+  bus = gst_element_get_bus (GST_ELEMENT (pipe));
+
+  while (TRUE) {
+    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
+
+    message = gst_bus_pop (bus);
+    g_assert (message != NULL);
+
+    switch (revent) {
+      case GST_MESSAGE_EOS:
+        gst_message_unref (message);
+        return;
+      case GST_MESSAGE_WARNING:
+      case GST_MESSAGE_ERROR:{
+        GError *gerror;
+        gchar *debug;
+
+        gst_message_parse_error (message, &gerror, &debug);
+        gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
+        gst_message_unref (message);
+        g_error_free (gerror);
+        g_free (debug);
+        return;
+      }
+      default:
+        gst_message_unref (message);
+        break;
+    }
+  }
+}
+
 int
 main (int argc, char *argv[])
 {
-  GstElement *filesrc, *osssink, *parse, *decode, *queue;
+  GstElement *filesrc, *audiosink, *decode, *queue;
   GstElement *pipeline;
-  GstElement *thread;
 
   gst_init (&argc, &argv);
 
@@ -15,10 +60,6 @@ main (int argc, char *argv[])
     exit (-1);
   }
 
-  /* create a new thread to hold the elements */
-  thread = gst_thread_new ("thread");
-  g_assert (thread != NULL);
-
   /* create a new pipeline to hold the elements */
   pipeline = gst_pipeline_new ("pipeline");
   g_assert (pipeline != NULL);
@@ -28,27 +69,27 @@ main (int argc, char *argv[])
   g_assert (filesrc != NULL);
   g_object_set (G_OBJECT (filesrc), "location", argv[1], NULL);
 
-  parse = gst_element_factory_make ("mp3parse", "parse");
   decode = gst_element_factory_make ("mad", "decode");
+  g_assert (decode != NULL);
 
   queue = gst_element_factory_make ("queue", "queue");
+  g_assert (queue != NULL);
 
   /* and an audio sink */
-  osssink = gst_element_factory_make ("osssink", "play_audio");
-  g_assert (osssink != NULL);
+  audiosink = gst_element_factory_make ("alsasink", "play_audio");
+  g_assert (audiosink != NULL);
 
   /* add objects to the main pipeline */
-  gst_bin_add_many (GST_BIN (pipeline), filesrc, parse, decode, queue, NULL);
-
-  gst_bin_add (GST_BIN (thread), osssink);
-  gst_bin_add (GST_BIN (pipeline), thread);
+  gst_bin_add_many (GST_BIN (pipeline), filesrc, decode, queue, audiosink,
+      NULL);
 
-  gst_element_link_many (filesrc, parse, decode, queue, osssink, NULL);
+  gst_element_link_many (filesrc, decode, queue, audiosink, NULL);
 
   /* start playing */
   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
 
-  while (gst_bin_iterate (GST_BIN (pipeline)));
+  /* Listen for EOS */
+  event_loop (pipeline);
 
   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
 
index 7d5862600bc0717826531eaa74e8a04a9bb30591..49b67ff459fc300ae671517689151090d1036208 100644 (file)
@@ -15,9 +15,8 @@ eos (GstElement * element, gpointer data)
 int
 main (int argc, char *argv[])
 {
-  GstElement *filesrc, *osssink, *queue;
+  GstElement *filesrc, *audiosink, *queue;
   GstElement *pipeline;
-  GstElement *thread;
 
   gst_init (&argc, &argv);
 
@@ -26,10 +25,6 @@ main (int argc, char *argv[])
     exit (-1);
   }
 
-  /* create a new thread to hold the elements */
-  thread = gst_thread_new ("thread");
-  g_assert (thread != NULL);
-
   /* create a new bin to hold the elements */
   pipeline = gst_pipeline_new ("pipeline");
   g_assert (pipeline != NULL);
@@ -43,18 +38,18 @@ main (int argc, char *argv[])
   queue = gst_element_factory_make ("queue", "queue");
 
   /* and an audio sink */
-  osssink = gst_element_factory_make ("osssink", "play_audio");
-  g_assert (osssink != NULL);
+  audiosink = gst_element_factory_make ("alsasink", "play_audio");
+  g_assert (audiosink != NULL);
 
   /* add objects to the main pipeline */
   /*
      gst_pipeline_add_src(GST_PIPELINE(pipeline), filesrc);
      gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
 
-     gst_bin_add(GST_BIN(thread), osssink);
+     gst_bin_add(GST_BIN (pipeline), audiosink);
 
      gst_pad_link(gst_element_get_pad(queue,"src"),
-     gst_element_get_pad(osssink,"sink"));
+     gst_element_get_pad(audiosink,"sink"));
 
      if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
      g_print("cannot autoplug pipeline\n");
index 78befff6f6d1156ae748272ad4030a255033ebcb..046bf99e48be4e3d2db459ff89a6d42a998d2640 100644 (file)
@@ -402,10 +402,10 @@ gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
       GST_PREROLL_UNLOCK (pad);
 
       if (is_buffer) {
-        GST_DEBUG ("poped buffer %p", obj);
+        GST_DEBUG ("popped buffer %p", obj);
         ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER (obj));
       } else {
-        GST_DEBUG ("poped event %p", obj);
+        GST_DEBUG ("popped event %p", obj);
         gst_base_sink_handle_event (basesink, GST_EVENT (obj));
         ret = GST_FLOW_OK;
       }
@@ -427,7 +427,7 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
   GST_DEBUG ("flushing queue %p", basesink);
   if (q) {
     while ((obj = g_queue_pop_head (q))) {
-      GST_DEBUG ("poped %p", obj);
+      GST_DEBUG ("popped %p", obj);
       gst_mini_object_unref (obj);
     }
   }
@@ -814,6 +814,7 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event)
       /* if we are still EOS, we can post the EOS message */
       if (basesink->eos) {
         /* ok, now we can post the message */
+        GST_DEBUG_OBJECT (basesink, "Now posting EOS");
         gst_element_post_message (GST_ELEMENT (basesink),
             gst_message_new_eos (GST_OBJECT (basesink)));
       }
index 99cb8dc4a49c7163c15b3f835d78f085ff66a440..fbd6b08e9906361cb8ccada4a799c8e6ac90df9f 100644 (file)
@@ -68,10 +68,11 @@ static GstClock *gst_bin_get_clock_func (GstElement * element);
 static void gst_bin_set_clock_func (GstElement * element, GstClock * clock);
 
 static void gst_bin_set_manager (GstElement * element, GstPipeline * manager);
-static void gst_bin_set_bus (GstElement * element, GstBus * bus);
 static void gst_bin_set_scheduler (GstElement * element, GstScheduler * sched);
 
 static gboolean gst_bin_send_event (GstElement * element, GstEvent * event);
+static GstBusSyncReply bin_bus_handler (GstBus * bus,
+    GstMessage * message, GstBin * bin);
 static gboolean gst_bin_query (GstElement * element, GstQuery * query);
 
 #ifndef GST_DISABLE_LOADSAVE
@@ -179,7 +180,6 @@ gst_bin_class_init (GstBinClass * klass)
   gstelement_class->get_clock = GST_DEBUG_FUNCPTR (gst_bin_get_clock_func);
   gstelement_class->set_clock = GST_DEBUG_FUNCPTR (gst_bin_set_clock_func);
   gstelement_class->set_manager = GST_DEBUG_FUNCPTR (gst_bin_set_manager);
-  gstelement_class->set_bus = GST_DEBUG_FUNCPTR (gst_bin_set_bus);
   gstelement_class->set_scheduler = GST_DEBUG_FUNCPTR (gst_bin_set_scheduler);
 
   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_bin_send_event);
@@ -192,9 +192,23 @@ gst_bin_class_init (GstBinClass * klass)
 static void
 gst_bin_init (GstBin * bin)
 {
+  GstBus *bus;
+
   bin->numchildren = 0;
   bin->children = NULL;
   bin->children_cookie = 0;
+  bin->eosed = NULL;
+
+  /* Set up a bus for listening to child elements, 
+   * and one for sending messages up the hierarchy */
+  bus = g_object_new (gst_bus_get_type (), NULL);
+  bin->child_bus = bus;
+  gst_bus_set_sync_handler (bus, (GstBusSyncHandler) bin_bus_handler, bin);
+
+  bus = g_object_new (gst_bus_get_type (), NULL);
+  gst_element_set_bus (GST_ELEMENT (bin), bus);
+  /* set_bus refs the bus via gst_object_replace, we drop our ref */
+  gst_object_unref (bus);
 }
 
 /**
@@ -281,29 +295,6 @@ gst_bin_get_clock_func (GstElement * element)
   return result;
 }
 
-/* set the bus on all of the children in this bin
- *
- * MT safe
- */
-static void
-gst_bin_set_bus (GstElement * element, GstBus * bus)
-{
-  GList *children;
-  GstBin *bin;
-
-  bin = GST_BIN (element);
-
-  parent_class->set_bus (element, bus);
-
-  GST_LOCK (bin);
-  for (children = bin->children; children; children = g_list_next (children)) {
-    GstElement *child = GST_ELEMENT (children->data);
-
-    gst_element_set_bus (child, bus);
-  }
-  GST_UNLOCK (bin);
-}
-
 /* set the scheduler on all of the children in this bin
  *
  * MT safe
@@ -348,6 +339,53 @@ gst_bin_set_manager (GstElement * element, GstPipeline * manager)
   GST_UNLOCK (element);
 }
 
+static gboolean
+is_eos (GstBin * bin)
+{
+  GstIterator *sinks;
+  gboolean result = TRUE;
+  gboolean done = FALSE;
+
+  sinks = gst_bin_iterate_sinks (bin);
+  while (!done) {
+    gpointer data;
+
+    switch (gst_iterator_next (sinks, &data)) {
+      case GST_ITERATOR_OK:
+      {
+        GstElement *element = GST_ELEMENT (data);
+        GList *eosed;
+        gchar *name;
+
+        name = gst_element_get_name (element);
+        eosed = g_list_find (bin->eosed, element);
+        if (!eosed) {
+          GST_DEBUG ("element %s did not post EOS yet", name);
+          result = FALSE;
+          done = TRUE;
+        } else {
+          GST_DEBUG ("element %s posted EOS", name);
+        }
+        g_free (name);
+        gst_object_unref (element);
+        break;
+      }
+      case GST_ITERATOR_RESYNC:
+        result = TRUE;
+        gst_iterator_resync (sinks);
+        break;
+      case GST_ITERATOR_DONE:
+        done = TRUE;
+        break;
+      default:
+        g_assert_not_reached ();
+        break;
+    }
+  }
+  gst_iterator_free (sinks);
+  return result;
+}
+
 /* add an element to this bin
  *
  * MT safe
@@ -389,7 +427,7 @@ gst_bin_add_func (GstBin * bin, GstElement * element)
   bin->children_cookie++;
 
   gst_element_set_manager (element, GST_ELEMENT (bin)->manager);
-  gst_element_set_bus (element, GST_ELEMENT (bin)->bus);
+  gst_element_set_bus (element, bin->child_bus);
   gst_element_set_scheduler (element, GST_ELEMENT_SCHEDULER (bin));
   gst_element_set_clock (element, GST_ELEMENT_CLOCK (bin));
 
@@ -697,11 +735,11 @@ bin_element_is_sink (GstElement * child, GstBin * bin)
    * get its name safely. */
   GST_LOCK (child);
   is_sink = GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK);
-  GST_UNLOCK (child);
 
   GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
       "child %s %s sink", GST_OBJECT_NAME (child), is_sink ? "is" : "is not");
 
+  GST_UNLOCK (child);
   return is_sink ? 0 : 1;
 }
 
@@ -1112,6 +1150,12 @@ gst_bin_change_state (GstElement * element)
   if (pending == GST_STATE_VOID_PENDING)
     return GST_STATE_SUCCESS;
 
+  /* Clear eosed element list on READY-> PAUSED */
+  if (GST_STATE_TRANSITION (element) == GST_STATE_READY_TO_PAUSED) {
+    g_list_free (bin->eosed);
+    bin->eosed = NULL;
+  }
+
   /* all elements added to these queues should have their refcount
    * incremented */
   elem_queue = g_queue_new ();
@@ -1335,6 +1379,10 @@ gst_bin_dispose (GObject * object)
   /* ref to not hit 0 again */
   gst_object_ref (object);
 
+  g_list_free (bin->eosed);
+  gst_object_unref (bin->child_bus);
+  gst_element_set_bus (GST_ELEMENT (bin), NULL);
+
   while (bin->children) {
     gst_bin_remove (bin, GST_ELEMENT (bin->children->data));
   }
@@ -1393,6 +1441,39 @@ gst_bin_send_event (GstElement * element, GstEvent * event)
   return res;
 }
 
+/* FIXME, make me threadsafe */
+static GstBusSyncReply
+bin_bus_handler (GstBus * bus, GstMessage * message, GstBin * bin)
+{
+  /* we don't want messages from the streaming thread while we're doing the
+   * state change. We do want them from the state change functions. */
+  switch (GST_MESSAGE_TYPE (message)) {
+    case GST_MESSAGE_EOS:
+      GST_DEBUG_OBJECT (bin, "got EOS message from %s",
+          gst_object_get_name (GST_MESSAGE_SRC (message)));
+
+      GST_LOCK (bin->child_bus);
+      bin->eosed = g_list_prepend (bin->eosed, GST_MESSAGE_SRC (message));
+      GST_UNLOCK (bin->child_bus);
+
+      if (is_eos (bin)) {
+        GST_DEBUG_OBJECT (bin, "all sinks posted EOS");
+        gst_bus_post (GST_ELEMENT (bin)->bus,
+            gst_message_new_eos (GST_OBJECT (bin)));
+      }
+
+      /* we drop all EOS messages */
+      gst_message_unref (message);
+      break;
+    default:
+      /* Send all other messages upward */
+      gst_bus_post (GST_ELEMENT (bin)->bus, message);
+      break;
+  }
+
+  return GST_BUS_DROP;
+}
+
 static gboolean
 gst_bin_query (GstElement * element, GstQuery * query)
 {
index c96ec0103607bc77ab63fd2afd438fbb4065be18..3006eee084246c46d48d6766c732e9e4cd5c4eac 100644 (file)
@@ -26,6 +26,7 @@
 
 #include <gst/gstelement.h>
 #include <gst/gstiterator.h>
+#include <gst/gstbus.h>
 
 G_BEGIN_DECLS
 
@@ -70,6 +71,9 @@ struct _GstBin {
   GList                *children;
   guint32       children_cookie;
 
+  GstBus        *child_bus;    /* Bus we set on our children */
+  GList         *eosed;         /* list of elements that posted EOS */
+
   /*< private >*/
   gpointer _gst_reserved[GST_PADDING];
 };
index 42622e28fd4e214de5f15bd518c904eeddbf3693..9aaacaf61cf3280c5c981f2652e7db7810e024e2 100644 (file)
@@ -63,8 +63,6 @@ static void gst_pipeline_get_property (GObject * object, guint prop_id,
 
 static gboolean gst_pipeline_send_event (GstElement * element,
     GstEvent * event);
-static GstBusSyncReply pipeline_bus_handler (GstBus * bus, GstMessage * message,
-    GstPipeline * pipeline);
 
 static GstClock *gst_pipeline_get_clock_func (GstElement * element);
 static GstElementStateReturn gst_pipeline_change_state (GstElement * element);
@@ -141,7 +139,6 @@ gst_pipeline_init (GTypeInstance * instance, gpointer g_class)
 {
   GstScheduler *scheduler;
   GstPipeline *pipeline = GST_PIPELINE (instance);
-  GstBus *bus;
 
   /* get an instance of the default scheduler */
   scheduler = gst_scheduler_factory_make (NULL, GST_ELEMENT (pipeline));
@@ -159,18 +156,10 @@ gst_pipeline_init (GTypeInstance * instance, gpointer g_class)
     gst_object_unref ((GstObject *) scheduler);
   }
 
-  pipeline->eosed = NULL;
   pipeline->delay = DEFAULT_DELAY;
   pipeline->play_timeout = DEFAULT_PLAY_TIMEOUT;
   /* we are our own manager */
   GST_ELEMENT_MANAGER (pipeline) = pipeline;
-
-  bus = g_object_new (gst_bus_get_type (), NULL);
-  gst_bus_set_sync_handler (bus,
-      (GstBusSyncHandler) pipeline_bus_handler, pipeline);
-  gst_element_set_bus (GST_ELEMENT (pipeline), bus);
-  /* set_bus refs the bus via gst_object_replace, we drop our ref */
-  gst_object_unref ((GstObject *) bus);
 }
 
 static void
@@ -178,7 +167,6 @@ gst_pipeline_dispose (GObject * object)
 {
   GstPipeline *pipeline = GST_PIPELINE (object);
 
-  gst_element_set_bus (GST_ELEMENT (pipeline), NULL);
   gst_scheduler_reset (GST_ELEMENT_SCHEDULER (object));
   gst_element_set_scheduler (GST_ELEMENT (pipeline), NULL);
   gst_object_replace ((GstObject **) & pipeline->fixed_clock, NULL);
@@ -228,53 +216,6 @@ gst_pipeline_get_property (GObject * object, guint prop_id,
   GST_UNLOCK (pipeline);
 }
 
-static gboolean
-is_eos (GstPipeline * pipeline)
-{
-  GstIterator *sinks;
-  gboolean result = TRUE;
-  gboolean done = FALSE;
-
-  sinks = gst_bin_iterate_sinks (GST_BIN (pipeline));
-  while (!done) {
-    gpointer data;
-
-    switch (gst_iterator_next (sinks, &data)) {
-      case GST_ITERATOR_OK:
-      {
-        GstElement *element = GST_ELEMENT (data);
-        GList *eosed;
-        gchar *name;
-
-        name = gst_element_get_name (element);
-        eosed = g_list_find (pipeline->eosed, element);
-        if (!eosed) {
-          GST_DEBUG ("element %s did not post EOS yet", name);
-          result = FALSE;
-          done = TRUE;
-        } else {
-          GST_DEBUG ("element %s posted EOS", name);
-        }
-        g_free (name);
-        gst_object_unref (element);
-        break;
-      }
-      case GST_ITERATOR_RESYNC:
-        result = TRUE;
-        gst_iterator_resync (sinks);
-        break;
-      case GST_ITERATOR_DONE:
-        done = TRUE;
-        break;
-      default:
-        g_assert_not_reached ();
-        break;
-    }
-  }
-  gst_iterator_free (sinks);
-  return result;
-}
-
 /* sending an event on the pipeline pauses the pipeline if it
  * was playing.
  */
@@ -318,47 +259,6 @@ gst_pipeline_send_event (GstElement * element, GstEvent * event)
   return res;
 }
 
-/* FIXME, make me threadsafe */
-static GstBusSyncReply
-pipeline_bus_handler (GstBus * bus, GstMessage * message,
-    GstPipeline * pipeline)
-{
-  GstBusSyncReply result = GST_BUS_PASS;
-  gboolean posteos = FALSE;
-
-  /* we don't want messages from the streaming thread while we're doing the 
-   * state change. We do want them from the state change functions. */
-
-  switch (GST_MESSAGE_TYPE (message)) {
-    case GST_MESSAGE_EOS:
-      if (GST_MESSAGE_SRC (message) != GST_OBJECT (pipeline)) {
-        GST_DEBUG ("got EOS message");
-        GST_LOCK (bus);
-        pipeline->eosed =
-            g_list_prepend (pipeline->eosed, GST_MESSAGE_SRC (message));
-        GST_UNLOCK (bus);
-        if (is_eos (pipeline)) {
-          posteos = TRUE;
-          GST_DEBUG ("all sinks posted EOS");
-        }
-        /* we drop all EOS messages */
-        result = GST_BUS_DROP;
-        gst_message_unref (message);
-      }
-      break;
-    case GST_MESSAGE_ERROR:
-      break;
-    default:
-      break;
-  }
-
-  if (posteos) {
-    gst_bus_post (bus, gst_message_new_eos (GST_OBJECT (pipeline)));
-  }
-
-  return result;
-}
-
 /**
  * gst_pipeline_new:
  * @name: name of new pipeline
@@ -396,7 +296,6 @@ gst_pipeline_change_state (GstElement * element)
       clock = gst_element_get_clock (element);
       gst_element_set_clock (element, clock);
       gst_object_unref (clock);
-      pipeline->eosed = NULL;
       break;
     }
     case GST_STATE_PAUSED_TO_PLAYING:
index c726a8f5009f8d6e1eeb19ed7c811b6cde1bb09d..2f3cdae4276d1b7de1e3e3ed5488d7bcda28af6a 100644 (file)
@@ -26,7 +26,6 @@
 
 #include <gst/gsttypes.h>
 #include <gst/gstbin.h>
-#include <gst/gstbus.h>
 
 G_BEGIN_DECLS
 
@@ -54,8 +53,6 @@ struct _GstPipeline {
   GstClockTime   delay;
   GstClockTime   play_timeout;
 
-  GList                *eosed;         /* list of elements that posted EOS */
-
   /*< private >*/
   gpointer _gst_reserved[GST_PADDING];
 };
index 78befff6f6d1156ae748272ad4030a255033ebcb..046bf99e48be4e3d2db459ff89a6d42a998d2640 100644 (file)
@@ -402,10 +402,10 @@ gst_base_sink_preroll_queue_empty (GstBaseSink * basesink, GstPad * pad)
       GST_PREROLL_UNLOCK (pad);
 
       if (is_buffer) {
-        GST_DEBUG ("poped buffer %p", obj);
+        GST_DEBUG ("popped buffer %p", obj);
         ret = gst_base_sink_handle_buffer (basesink, GST_BUFFER (obj));
       } else {
-        GST_DEBUG ("poped event %p", obj);
+        GST_DEBUG ("popped event %p", obj);
         gst_base_sink_handle_event (basesink, GST_EVENT (obj));
         ret = GST_FLOW_OK;
       }
@@ -427,7 +427,7 @@ gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
   GST_DEBUG ("flushing queue %p", basesink);
   if (q) {
     while ((obj = g_queue_pop_head (q))) {
-      GST_DEBUG ("poped %p", obj);
+      GST_DEBUG ("popped %p", obj);
       gst_mini_object_unref (obj);
     }
   }
@@ -814,6 +814,7 @@ gst_base_sink_handle_event (GstBaseSink * basesink, GstEvent * event)
       /* if we are still EOS, we can post the EOS message */
       if (basesink->eos) {
         /* ok, now we can post the message */
+        GST_DEBUG_OBJECT (basesink, "Now posting EOS");
         gst_element_post_message (GST_ELEMENT (basesink),
             gst_message_new_eos (GST_OBJECT (basesink)));
       }
index 3646ccf2aae6f2c0bd20740581b13e966e4bce54..1d1d24723a09052c518ec819bcc3ea0903a29a64 100644 (file)
@@ -7,9 +7,6 @@ endif
 dirs = \
        helloworld                      \
        queue                           \
-       queue2                          \
-       queue3                          \
-       queue4                          \
        launch                          \
        thread                          \
        plugins                         \
@@ -20,6 +17,10 @@ dirs = \
        pwg                             \
        retag
 
+#queue2                                \
+#queue3                                \
+#queue4                                
+
 SUBDIRS = $(dirs)                      \
        $(GST_LOADSAVE_DIRS)
 
index 9cf72e842cf0026da1a438abda87279803ecc650..d641f54b3b89bc2d48e506ae1306f8c29ab2b5d9 100644 (file)
@@ -1,10 +1,48 @@
 #include <stdlib.h>
 #include <gst/gst.h>
 
+static void
+event_loop (GstElement * pipe)
+{
+  GstBus *bus;
+  GstMessageType revent;
+  GstMessage *message = NULL;
+
+  bus = gst_element_get_bus (GST_ELEMENT (pipe));
+
+  while (TRUE) {
+    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
+
+    message = gst_bus_pop (bus);
+    g_assert (message != NULL);
+
+    switch (revent) {
+      case GST_MESSAGE_EOS:
+        gst_message_unref (message);
+        return;
+      case GST_MESSAGE_WARNING:
+      case GST_MESSAGE_ERROR:{
+        GError *gerror;
+        gchar *debug;
+
+        gst_message_parse_error (message, &gerror, &debug);
+        gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
+        gst_message_unref (message);
+        g_error_free (gerror);
+        g_free (debug);
+        return;
+      }
+      default:
+        gst_message_unref (message);
+        break;
+    }
+  }
+}
+
 int
 main (int argc, char *argv[])
 {
-  GstElement *bin, *filesrc, *decoder, *osssink;
+  GstElement *bin, *filesrc, *decoder, *audiosink;
 
   gst_init (&argc, &argv);
 
@@ -29,19 +67,20 @@ main (int argc, char *argv[])
     return -1;
   }
   /* and an audio sink */
-  osssink = gst_element_factory_make ("osssink", "play_audio");
-  g_assert (osssink);
+  audiosink = gst_element_factory_make ("alsasink", "play_audio");
+  g_assert (audiosink);
 
   /* add objects to the main pipeline */
-  gst_bin_add_many (GST_BIN (bin), filesrc, decoder, osssink, NULL);
+  gst_bin_add_many (GST_BIN (bin), filesrc, decoder, audiosink, NULL);
 
   /* link the elements */
-  gst_element_link_many (filesrc, decoder, osssink, NULL);
+  gst_element_link_many (filesrc, decoder, audiosink, NULL);
 
   /* start playing */
   gst_element_set_state (bin, GST_STATE_PLAYING);
 
-  while (gst_bin_iterate (GST_BIN (bin)));
+  /* Run event loop listening for bus messages until EOS or ERROR */
+  event_loop (bin);
 
   /* stop the bin */
   gst_element_set_state (bin, GST_STATE_NULL);
index 67b8a2321fa71503f7c3d3e28d7e8345c87d2b6e..6dad8701c3cdc7e0d4a640db2277a701445e7426 100644 (file)
@@ -1,12 +1,57 @@
 #include <stdlib.h>
 #include <gst/gst.h>
 
+/* This example uses the queue element to create a buffer between 2 elements.
+ * The scheduler automatically uses 2 threads, 1 to feed and another to consume
+ * data from the queue buffer
+ */
+
+/* Event loop to listen to events posted on the GstBus from the pipeline. Exits
+ * on EOS or ERROR events
+ */
+static void
+event_loop (GstElement * pipe)
+{
+  GstBus *bus;
+  GstMessageType revent;
+  GstMessage *message = NULL;
+
+  bus = gst_element_get_bus (GST_ELEMENT (pipe));
+
+  while (TRUE) {
+    revent = gst_bus_poll (bus, GST_MESSAGE_ANY, -1);
+
+    message = gst_bus_pop (bus);
+    g_assert (message != NULL);
+
+    switch (revent) {
+      case GST_MESSAGE_EOS:
+        gst_message_unref (message);
+        return;
+      case GST_MESSAGE_WARNING:
+      case GST_MESSAGE_ERROR:{
+        GError *gerror;
+        gchar *debug;
+
+        gst_message_parse_error (message, &gerror, &debug);
+        gst_object_default_error (GST_MESSAGE_SRC (message), gerror, debug);
+        gst_message_unref (message);
+        g_error_free (gerror);
+        g_free (debug);
+        return;
+      }
+      default:
+        gst_message_unref (message);
+        break;
+    }
+  }
+}
+
 int
 main (int argc, char *argv[])
 {
-  GstElement *filesrc, *osssink, *parse, *decode, *queue;
+  GstElement *filesrc, *audiosink, *decode, *queue;
   GstElement *pipeline;
-  GstElement *thread;
 
   gst_init (&argc, &argv);
 
@@ -15,10 +60,6 @@ main (int argc, char *argv[])
     exit (-1);
   }
 
-  /* create a new thread to hold the elements */
-  thread = gst_thread_new ("thread");
-  g_assert (thread != NULL);
-
   /* create a new pipeline to hold the elements */
   pipeline = gst_pipeline_new ("pipeline");
   g_assert (pipeline != NULL);
@@ -28,27 +69,27 @@ main (int argc, char *argv[])
   g_assert (filesrc != NULL);
   g_object_set (G_OBJECT (filesrc), "location", argv[1], NULL);
 
-  parse = gst_element_factory_make ("mp3parse", "parse");
   decode = gst_element_factory_make ("mad", "decode");
+  g_assert (decode != NULL);
 
   queue = gst_element_factory_make ("queue", "queue");
+  g_assert (queue != NULL);
 
   /* and an audio sink */
-  osssink = gst_element_factory_make ("osssink", "play_audio");
-  g_assert (osssink != NULL);
+  audiosink = gst_element_factory_make ("alsasink", "play_audio");
+  g_assert (audiosink != NULL);
 
   /* add objects to the main pipeline */
-  gst_bin_add_many (GST_BIN (pipeline), filesrc, parse, decode, queue, NULL);
-
-  gst_bin_add (GST_BIN (thread), osssink);
-  gst_bin_add (GST_BIN (pipeline), thread);
+  gst_bin_add_many (GST_BIN (pipeline), filesrc, decode, queue, audiosink,
+      NULL);
 
-  gst_element_link_many (filesrc, parse, decode, queue, osssink, NULL);
+  gst_element_link_many (filesrc, decode, queue, audiosink, NULL);
 
   /* start playing */
   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
 
-  while (gst_bin_iterate (GST_BIN (pipeline)));
+  /* Listen for EOS */
+  event_loop (pipeline);
 
   gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
 
index 7d5862600bc0717826531eaa74e8a04a9bb30591..49b67ff459fc300ae671517689151090d1036208 100644 (file)
@@ -15,9 +15,8 @@ eos (GstElement * element, gpointer data)
 int
 main (int argc, char *argv[])
 {
-  GstElement *filesrc, *osssink, *queue;
+  GstElement *filesrc, *audiosink, *queue;
   GstElement *pipeline;
-  GstElement *thread;
 
   gst_init (&argc, &argv);
 
@@ -26,10 +25,6 @@ main (int argc, char *argv[])
     exit (-1);
   }
 
-  /* create a new thread to hold the elements */
-  thread = gst_thread_new ("thread");
-  g_assert (thread != NULL);
-
   /* create a new bin to hold the elements */
   pipeline = gst_pipeline_new ("pipeline");
   g_assert (pipeline != NULL);
@@ -43,18 +38,18 @@ main (int argc, char *argv[])
   queue = gst_element_factory_make ("queue", "queue");
 
   /* and an audio sink */
-  osssink = gst_element_factory_make ("osssink", "play_audio");
-  g_assert (osssink != NULL);
+  audiosink = gst_element_factory_make ("alsasink", "play_audio");
+  g_assert (audiosink != NULL);
 
   /* add objects to the main pipeline */
   /*
      gst_pipeline_add_src(GST_PIPELINE(pipeline), filesrc);
      gst_pipeline_add_sink(GST_PIPELINE(pipeline), queue);
 
-     gst_bin_add(GST_BIN(thread), osssink);
+     gst_bin_add(GST_BIN (pipeline), audiosink);
 
      gst_pad_link(gst_element_get_pad(queue,"src"),
-     gst_element_get_pad(osssink,"sink"));
+     gst_element_get_pad(audiosink,"sink"));
 
      if (!gst_pipeline_autoplug(GST_PIPELINE(pipeline))) {
      g_print("cannot autoplug pipeline\n");