This is an attempt at not segfaulting on errors but reporting some usefull info instead.
authorWim Taymans <wim.taymans@gmail.com>
Sat, 22 Dec 2001 21:18:17 +0000 (21:18 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Sat, 22 Dec 2001 21:18:17 +0000 (21:18 +0000)
Original commit message from CVS:
This is an attempt at not segfaulting on errors but reporting some
usefull info instead.
- bin changes so errors can propagate.
- changed the _FAST macros to _CAST because that is what they do.
- removed all references to cothreads out of the core, they are
really a scheduler issue, handler with a sched_private gpointer.
- added a live buffer count, for debugging buffer leaks.
- added error checking in gst_scheduler_state_transition this solves the
"out of cothreads" problem.
- GST_ELEMENT_NO_ENTRY == GST_ELEMENT_INFINITE_LOOP
- added 2 private element flasg for use by the scheduler
(_COTHREAD_STOPPING) is now
- added scheduler entry points:
- _yield : to create possible scheduling points.
- _interrupt: to stop execution of an element.
- _error: to signal en error condition to the scheduler.
- improved error messages for pads.
- signal gst_element_error where appropriate.
- added the a new bin to the parent before entering it so one can reference
its children.
- queue memleak fixes on dispose.
- added possible deadlock detection in queue (turned off be default)
- GstBasicScheduler is a real class of its own now, hiding its internal
variables.
- GST_ELEMENT_IS_COTHREAD_STOPPING is gone. either call explicit _yield
operations, or make a sane loop.
- Better state change handling in filesrc. Better error reporting/recovery
too.
- updated core plugins.
- detect non decoupled elements on scheduler boundries and error.

29 files changed:
gst/autoplug/gstautoplugcache.c
gst/cothreads.c
gst/elements/gstaggregator.c
gst/elements/gstfakesrc.c
gst/elements/gstfilesrc.c
gst/elements/gstidentity.c
gst/gstbin.c
gst/gstbin.h
gst/gstbuffer.c
gst/gstbuffer.h
gst/gstelement.c
gst/gstelement.h
gst/gstobject.h
gst/gstpad.c
gst/gstpad.h
gst/gstparse.c
gst/gstqueue.c
gst/gstqueue.h
gst/gstscheduler.c
gst/gstscheduler.h
gst/gstthread.c
gst/gsttypefind.c
gst/schedulers/gstbasicscheduler.c
plugins/elements/gstaggregator.c
plugins/elements/gstfakesrc.c
plugins/elements/gstfilesrc.c
plugins/elements/gstidentity.c
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h

index 6bb5721..326310c 100644 (file)
@@ -203,72 +203,69 @@ gst_autoplugcache_loop (GstElement *element)
    * the playout pointer hits the end of cache again it has to start pulling.
    */
 
-  do {
-    /* the first time through, the current_playout pointer is going to be NULL */
-    if (cache->current_playout == NULL) {
-      /* get a buffer */
-      buf = gst_pad_pull (cache->sinkpad);
+  /* the first time through, the current_playout pointer is going to be NULL */
+  if (cache->current_playout == NULL) {
+    /* get a buffer */
+    buf = gst_pad_pull (cache->sinkpad);
 
-      /* add it to the cache, though cache == NULL */
-      gst_buffer_ref (buf);
-      cache->cache = g_list_prepend (cache->cache, buf);
-      cache->buffer_count++;
+    /* add it to the cache, though cache == NULL */
+    gst_buffer_ref (buf);
+    cache->cache = g_list_prepend (cache->cache, buf);
+    cache->buffer_count++;
 
-      /* set the current_playout pointer */
-      cache->current_playout = cache->cache;
+    /* set the current_playout pointer */
+    cache->current_playout = cache->cache;
 
-      g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
-
-      /* send the buffer on its way */
-      gst_pad_push (cache->srcpad, buf);
-    }
-
-    /* the steady state is where the playout is at the front of the cache */
-    else if (g_list_previous(cache->current_playout) == NULL) {
+    g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
 
-      /* if we've been told to fire an empty signal (after a reset) */
-      if (cache->fire_empty) {
-        int oldstate = GST_STATE(cache);
-        GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n");
+    /* send the buffer on its way */
+    gst_pad_push (cache->srcpad, buf);
+  }
+  /* the steady state is where the playout is at the front of the cache */
+  else if (g_list_previous(cache->current_playout) == NULL) {
+
+    /* if we've been told to fire an empty signal (after a reset) */
+    if (cache->fire_empty) {
+      int oldstate = GST_STATE(cache);
+      GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n");
+      gst_object_ref (GST_OBJECT (cache));
+      g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL);
+      if (GST_STATE(cache) != oldstate) {
         gst_object_ref (GST_OBJECT (cache));
-        g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL);
-        if (GST_STATE(cache) != oldstate) {
-          gst_object_ref (GST_OBJECT (cache));
-          GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n");
-          cothread_switch(cothread_current_main());
-        }
-        gst_object_unref (GST_OBJECT (cache));
+        GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n");
+        cothread_switch(cothread_current_main());
       }
+      gst_object_unref (GST_OBJECT (cache));
+    }
 
-      /* get a buffer */
-      buf = gst_pad_pull (cache->sinkpad);
+    /* get a buffer */
+    buf = gst_pad_pull (cache->sinkpad);
 
-      /* add it to the front of the cache */
-      gst_buffer_ref (buf);
-      cache->cache = g_list_prepend (cache->cache, buf);
-      cache->buffer_count++;
+    /* add it to the front of the cache */
+    gst_buffer_ref (buf);
+    cache->cache = g_list_prepend (cache->cache, buf);
+    cache->buffer_count++;
 
-      /* set the current_playout pointer */
-      cache->current_playout = cache->cache;
+    /* set the current_playout pointer */
+    cache->current_playout = cache->cache;
 
-      /* send the buffer on its way */
-      gst_pad_push (cache->srcpad, buf);
-    }
-
-    /* otherwise we're trundling through existing cached buffers */
-    else {
-      /* move the current_playout pointer */
-      cache->current_playout = g_list_previous (cache->current_playout);
+    /* send the buffer on its way */
+    gst_pad_push (cache->srcpad, buf);
+  }
 
-      if (cache->fire_first) {
-        g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
-        cache->fire_first = FALSE;
-      }
+  /* otherwise we're trundling through existing cached buffers */
+  else {
+    /* move the current_playout pointer */
+    cache->current_playout = g_list_previous (cache->current_playout);
 
-      /* push that buffer */
-      gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data));
+    if (cache->fire_first) {
+      g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
+      cache->fire_first = FALSE;
     }
-  } while (!GST_FLAG_IS_SET (element, GST_ELEMENT_COTHREAD_STOPPING));
+
+    /* push that buffer */
+    gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data));
+  }
 }
 
 static GstPadNegotiateReturn
index 48aa164..f9b668e 100644 (file)
@@ -166,7 +166,6 @@ cothread_create (cothread_context *ctx)
       cothread_destroy (ctx->threads[slot]);
       break;
     }
-           
   }
 
   sp = CURRENT_STACK_FRAME;
index 4a6baf5..e6f2337 100644 (file)
@@ -272,46 +272,44 @@ gst_aggregator_loop (GstElement *element)
 
   aggregator = GST_AGGREGATOR (element);
 
-  do {
-    if (aggregator->sched == AGGREGATOR_LOOP ||
-        aggregator->sched == AGGREGATOR_LOOP_PEEK) {
-      GList *pads = aggregator->sinkpads;
-
-      while (pads) {
-       GstPad *pad = GST_PAD (pads->data);
-       pads = g_list_next (pads);
-
-        if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
-         buf = gst_pad_peek (pad);
-         if (buf == NULL)
-           continue;
-
-         g_assert (buf == gst_pad_pull (pad));
-         debug = "loop_peek";
-        }
-       else {
-         buf = gst_pad_pull (pad);
-         debug = "loop";
-       }
-       gst_aggregator_push (aggregator, pad, buf, debug);
-      }
-    }
-    else {
-      if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
-       GstPad *pad;
+  if (aggregator->sched == AGGREGATOR_LOOP ||
+      aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+    GList *pads = aggregator->sinkpads;
 
-       debug = "loop_select";
+    while (pads) {
+      GstPad *pad = GST_PAD (pads->data);
+      pads = g_list_next (pads);
 
-       pad = gst_pad_select (aggregator->sinkpads);
-       buf = gst_pad_pull (pad);
+      if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+       buf = gst_pad_peek (pad);
+       if (buf == NULL)
+         continue;
 
-       gst_aggregator_push (aggregator, pad, buf, debug);
+       g_assert (buf == gst_pad_pull (pad));
+       debug = "loop_peek";
       }
       else {
-       g_assert_not_reached ();
+       buf = gst_pad_pull (pad);
+       debug = "loop";
       }
+      gst_aggregator_push (aggregator, pad, buf, debug);
+    }
+  }
+  else {
+    if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
+      GstPad *pad;
+
+      debug = "loop_select";
+
+      pad = gst_pad_select (aggregator->sinkpads);
+      buf = gst_pad_pull (pad);
+
+      gst_aggregator_push (aggregator, pad, buf, debug);
     }
-  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+    else {
+      g_assert_not_reached ();
+    }
+  }
 }
 
 /**
index 7d8d6e3..1f87031 100644 (file)
@@ -387,7 +387,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
     case ARG_OUTPUT:
       break;
     case ARG_DATA:
-      src->data = g_value_get_int (value);
+      src->data = g_value_get_enum (value);
       switch (src->data) {
        case FAKESRC_DATA_ALLOCATE:
           if (src->parent) {
@@ -403,7 +403,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
       }
       break;
     case ARG_SIZETYPE:
-      src->sizetype = g_value_get_int (value);
+      src->sizetype = g_value_get_enum (value);
       break;
     case ARG_SIZEMIN:
       src->sizemin = g_value_get_int (value);
@@ -415,7 +415,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
       src->parentsize = g_value_get_int (value);
       break;
     case ARG_FILLTYPE:
-      src->filltype = g_value_get_int (value);
+      src->filltype = g_value_get_enum (value);
       break;
     case ARG_PATTERN:
       break;
@@ -455,13 +455,13 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
       g_value_set_boolean (value, src->loop_based);
       break;
     case ARG_OUTPUT:
-      g_value_set_int (value, src->output);
+      g_value_set_enum (value, src->output);
       break;
     case ARG_DATA:
-      g_value_set_int (value, src->data);
+      g_value_set_enum (value, src->data);
       break;
     case ARG_SIZETYPE:
-      g_value_set_int (value, src->sizetype);
+      g_value_set_enum (value, src->sizetype);
       break;
     case ARG_SIZEMIN:
       g_value_set_int (value, src->sizemin);
@@ -473,7 +473,7 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
       g_value_set_int (value, src->parentsize);
       break;
     case ARG_FILLTYPE:
-      g_value_set_int (value, src->filltype);
+      g_value_set_enum (value, src->filltype);
       break;
     case ARG_PATTERN:
       g_value_set_string (value, src->pattern);
@@ -689,49 +689,46 @@ static void
 gst_fakesrc_loop(GstElement *element)
 {
   GstFakeSrc *src;
+  GList *pads;
 
   g_return_if_fail(element != NULL);
   g_return_if_fail(GST_IS_FAKESRC(element));
 
   src = GST_FAKESRC (element);
 
-  do {
-    GList *pads;
-
-    pads = GST_ELEMENT (src)->pads;
+  pads = gst_element_get_pad_list (element);
 
-    while (pads) {
-      GstPad *pad = GST_PAD (pads->data);
-      GstBuffer *buf;
+  while (pads) {
+    GstPad *pad = GST_PAD (pads->data);
+    GstBuffer *buf;
 
-      if (src->rt_num_buffers == 0) {
-       src->eos = TRUE;
-      }
-      else {
-        if (src->rt_num_buffers > 0)
-          src->rt_num_buffers--;
-      }
+    if (src->rt_num_buffers == 0) {
+      src->eos = TRUE;
+    }
+    else {
+      if (src->rt_num_buffers > 0)
+        src->rt_num_buffers--;
+    }
 
-      if (src->eos) {
-        gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
-       return;
-      }
+    if (src->eos) {
+      gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
+      return;
+    }
 
-      buf = gst_fakesrc_create_buffer (src);
-      GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
+    buf = gst_fakesrc_create_buffer (src);
+    GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
 
-      if (!src->silent) {
-        gst_element_info (element, "fakesrc:  loop    ******* (%s:%s)  > (%d bytes, %llu)",
-                            GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
-      }
+    if (!src->silent) {
+      gst_element_info (element, "fakesrc:  loop    ******* (%s:%s)  > (%d bytes, %llu)",
+                          GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
+    }
 
-      g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
+    g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
                        buf, pad);
-      gst_pad_push (pad, buf);
+    gst_pad_push (pad, buf);
 
-      pads = g_list_next (pads);
-    }
-  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+    pads = g_list_next (pads);
+  }
 }
 
 static GstElementStateReturn
index 346f648..db05b10 100644 (file)
@@ -30,6 +30,7 @@
 #include <unistd.h>
 #include <sys/mman.h>
 #include <errno.h>
+#include <string.h>
 
 
 /**********************************************************************
@@ -327,10 +328,10 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size)
   /* mmap() the data into this new buffer */
   GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset);
   if (GST_BUFFER_DATA(buf) == NULL) {
-    fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n");
+    gst_element_error (GST_ELEMENT (src), "couldn't map file");
   } else if (GST_BUFFER_DATA(buf) == MAP_FAILED) {
-    g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s",
-            size, src->fd, offset, sys_errlist[errno]);
+    gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s",
+            size, src->fd, offset, strerror (errno));
   }
 #ifdef MADV_SEQUENTIAL
   /* madvise to tell the kernel what to do with it */
@@ -533,8 +534,8 @@ gst_filesrc_open_file (GstFileSrc *src)
   /* open the file */
   src->fd = open (src->filename, O_RDONLY);
   if (src->fd < 0) {
-    perror ("open");
-    gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL));
+    gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)", 
+                      src->filename, strerror (errno), NULL);
     return FALSE;
   } else {
     /* find the file length */
@@ -557,7 +558,6 @@ gst_filesrc_close_file (GstFileSrc *src)
 {
   g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN));
 
-  g_print ("close\n");
   /* close the file */
   close (src->fd);
 
@@ -565,6 +565,8 @@ gst_filesrc_close_file (GstFileSrc *src)
   src->fd = 0;
   src->filelen = 0;
   src->curoffset = 0;
+  if (src->mapbuf)
+    gst_buffer_unref (src->mapbuf);
 
   GST_FLAG_UNSET (src, GST_FILESRC_OPEN);
 }
@@ -575,17 +577,22 @@ gst_filesrc_change_state (GstElement *element)
 {
   GstFileSrc *src = GST_FILESRC(element);
 
-  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
-      gst_filesrc_close_file (GST_FILESRC (element));
-  } if (GST_STATE_PENDING (element) == GST_STATE_READY) {
-    src->curoffset = 0;
-  } else {
-
-    if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
-      if (!gst_filesrc_open_file (GST_FILESRC (element)))
-        return GST_STATE_FAILURE;
-    }
+  switch (GST_STATE_TRANSITION (element)) {
+    case GST_STATE_NULL_TO_READY:
+      if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
+        if (!gst_filesrc_open_file (GST_FILESRC (element)))
+          return GST_STATE_FAILURE;
+      }
+      break;
+    case GST_STATE_READY_TO_NULL:
+      if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
+        gst_filesrc_close_file (GST_FILESRC (element));
+      break;
+    case GST_STATE_READY_TO_PAUSED:
+    case GST_STATE_PAUSED_TO_READY:
+      src->curoffset = 0;
+    default:
+      break;
   }
 
   if (GST_ELEMENT_CLASS (parent_class)->change_state)
index e8195aa..ddd4c02 100644 (file)
@@ -205,27 +205,24 @@ gst_identity_loop (GstElement *element)
 
   identity = GST_IDENTITY (element);
   
-  do {
-    buf = gst_pad_pull (identity->sinkpad);
+  buf = gst_pad_pull (identity->sinkpad);
     
-    for (i=identity->duplicate; i; i--) {
-      if (!identity->silent)
-        g_print("identity: loop    ******* (%s:%s)i (%d bytes, %llu) \n",
+  for (i=identity->duplicate; i; i--) {
+    if (!identity->silent)
+      g_print("identity: loop    ******* (%s:%s)i (%d bytes, %llu) \n",
                      GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
 
-      g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
+    g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
                               buf);
 
-      if (i>1) 
-       gst_buffer_ref (buf);
-
-      gst_pad_push (identity->srcpad, buf);
+    if (i>1) 
+      gst_buffer_ref (buf);
 
-      if (identity->sleep_time)
-        usleep (identity->sleep_time);
-    }
+    gst_pad_push (identity->srcpad, buf);
 
-  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+    if (identity->sleep_time)
+      usleep (identity->sleep_time);
+  }
 }
 
 static void 
index 19674e8..7a0b845 100644 (file)
@@ -138,7 +138,6 @@ gst_bin_init (GstBin * bin)
 
   bin->numchildren = 0;
   bin->children = NULL;
-  bin->eoscond = g_cond_new ();
 }
 
 /**
@@ -364,8 +363,8 @@ gst_bin_remove (GstBin * bin, GstElement * element)
 }
 
 void
-gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState new,
-                           GstElement * child)
+gst_bin_child_state_change (GstBin *bin, GstElementState old, GstElementState new,
+                           GstElement *child)
 {
   gint old_idx = 0, new_idx = 0, i;
 
@@ -394,6 +393,22 @@ gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState n
   GST_UNLOCK (bin);
 }
 
+void
+gst_bin_child_error (GstBin *bin, GstElement *child)
+{
+  if (GST_STATE (bin) != GST_STATE_NULL) {
+       /*
+    GST_STATE_PENDING (bin) = ((GST_STATE (bin) >> 1));
+    if (gst_element_set_state (bin, GST_STATE (bin)>>1) != GST_STATE_SUCCESS) {
+      gst_element_error (GST_ELEMENT (bin), "bin \"%s\" couldn't change state on error from child \"%s\"", 
+                   GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
+    }
+  */
+    gst_element_info (GST_ELEMENT (bin), "bin \"%s\" stopped because child \"%s\" signalled an error",
+                   GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
+  }
+}
+
 static void
 gst_bin_send_event (GstElement *element, GstEvent *event)
 {
@@ -437,6 +452,9 @@ gst_bin_change_state (GstElement * element)
 
        gst_element_set_state (child, old_state);
        if (GST_ELEMENT_SCHED (child) == GST_ELEMENT_SCHED (element)) {
+          /* reset to what is was */
+          GST_STATE_PENDING (element) = old_state;
+          gst_bin_change_state (element);
          return GST_STATE_FAILURE;
        }
        break;
@@ -551,8 +569,6 @@ gst_bin_dispose (GObject * object)
   bin->children = NULL;
   bin->numchildren = 0;
 
-  g_cond_free (bin->eoscond);
-
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
index 7e36704..ae9fe1c 100644 (file)
@@ -38,15 +38,15 @@ extern GType _gst_bin_type;
 # define GST_IS_BIN(obj)             (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_BIN))
 # define GST_IS_BIN_CLASS(obj)       (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_BIN))
 
-#define GST_BIN_FAST(obj)            ((GstBin*)(obj))
-#define GST_BIN_CLASS_FAST(klass)    ((GstBinClass*)(klass))
+#define GST_BIN_CAST(obj)            ((GstBin*)(obj))
+#define GST_BIN_CLASS_CAST(klass)    ((GstBinClass*)(klass))
 
 #ifdef GST_TYPE_PARANOID
 # define GST_BIN(obj)                (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_BIN, GstBin))
 # define GST_BIN_CLASS(klass)        (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BIN, GstBinClass))
 #else
-# define GST_BIN                     GST_BIN_FAST
-# define GST_BIN_CLASS               GST_BIN_CLASS_FAST
+# define GST_BIN                     GST_BIN_CAST
+# define GST_BIN_CLASS               GST_BIN_CLASS_CAST
 #endif
 
 typedef enum {
@@ -71,11 +71,10 @@ struct _GstBin {
   /* our children */
   gint numchildren;
   GList *children;
-  GCond *eoscond;
 
   GstElementState child_states[GST_NUM_STATES];
   
-  cothread_context *threadcontext;
+  gpointer sched_private;
 };
 
 struct _GstBinClass {
@@ -113,6 +112,7 @@ gboolean    gst_bin_iterate                 (GstBin *bin);
 /* internal */
 void           gst_bin_child_state_change      (GstBin *bin, GstElementState oldstate, 
                                                 GstElementState newstate, GstElement *child);
+void           gst_bin_child_error             (GstBin *bin, GstElement *child);
 
 #ifdef __cplusplus
 }
index 5f304f2..841a1fc 100644 (file)
@@ -31,6 +31,7 @@ GType _gst_buffer_type;
 
 static GMemChunk *_gst_buffer_chunk;
 static GMutex *_gst_buffer_chunk_lock;
+static gint _gst_buffer_live;
 
 void 
 _gst_buffer_initialize (void) 
@@ -58,6 +59,20 @@ _gst_buffer_initialize (void)
   _gst_buffer_chunk_lock = g_mutex_new ();
 
   _gst_buffer_type = g_type_register_static (G_TYPE_INT, "GstBuffer", &buffer_info, 0);
+
+  _gst_buffer_live = 0;
+}
+
+/**
+ * gst_buffer_print_stats:
+ *
+ * Print statistics about live buffers.
+ */
+void
+gst_buffer_print_stats (void)
+{
+  g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO, 
+                 "%d live buffers", _gst_buffer_live);
 }
 
 /**
@@ -74,6 +89,7 @@ gst_buffer_new (void)
 
   g_mutex_lock (_gst_buffer_chunk_lock);
   buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
+  _gst_buffer_live++;
   g_mutex_unlock (_gst_buffer_chunk_lock);
   GST_INFO (GST_CAT_BUFFER,"creating new buffer %p",buffer);
 
@@ -153,11 +169,12 @@ gst_buffer_create_sub (GstBuffer *parent,
 
   g_mutex_lock (_gst_buffer_chunk_lock);
   buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
-  GST_DATA_TYPE(buffer) = _gst_buffer_type;
+  _gst_buffer_live++;
   g_mutex_unlock (_gst_buffer_chunk_lock);
   GST_INFO (GST_CAT_BUFFER,"creating new subbuffer %p from parent %p (size %u, offset %u)", 
                  buffer, parent, size, offset);
 
+  GST_DATA_TYPE(buffer) = _gst_buffer_type;
   buffer->lock = g_mutex_new ();
 #ifdef HAVE_ATOMIC_H
   atomic_set (&buffer->refcount, 1);
@@ -292,6 +309,7 @@ gst_buffer_destroy (GstBuffer *buffer)
   /* remove it entirely from memory */
   g_mutex_lock (_gst_buffer_chunk_lock);
   g_mem_chunk_free (_gst_buffer_chunk,buffer);
+  _gst_buffer_live--;
   g_mutex_unlock (_gst_buffer_chunk_lock);
 }
 
index 17edbd8..89af2cc 100644 (file)
@@ -167,6 +167,8 @@ GstBuffer*  gst_buffer_append               (GstBuffer *buffer, GstBuffer *append);
 
 gboolean       gst_buffer_is_span_fast         (GstBuffer *buf1, GstBuffer *buf2);
 
+void           gst_buffer_print_stats          (void);
+
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */
index 6315fd6..fd0b3c2 100644 (file)
@@ -173,8 +173,8 @@ gst_element_init (GstElement *element)
   element->numsinkpads = 0;
   element->pads = NULL;
   element->loopfunc = NULL;
-  element->threadstate = NULL;
   element->sched = NULL;
+  element->sched_private = NULL;
   element->state_mutex = g_mutex_new ();
   element->state_cond = g_cond_new ();
 }
@@ -798,10 +798,25 @@ void
 gst_element_error (GstElement *element, const gchar *error, ...)
 {
   va_list var_args;
+  GstObject *parent;
   
+  g_return_if_fail (GST_IS_ELEMENT (element));
+  g_return_if_fail (error != NULL);
+
   va_start (var_args, error);
   gst_element_message (element, "error", error, var_args);
   va_end (var_args);
+
+  parent = GST_ELEMENT_PARENT (element);
+
+  if (parent && GST_IS_BIN (parent)) {
+    gst_bin_child_error (GST_BIN (parent), element);
+  }
+
+  if (element->sched) {
+    gst_scheduler_error (element->sched, element); 
+  }
+
 }
 
 /**
@@ -817,6 +832,9 @@ gst_element_info (GstElement *element, const gchar *info, ...)
 {
   va_list var_args;
   
+  g_return_if_fail (GST_IS_ELEMENT (element));
+  g_return_if_fail (info != NULL);
+
   va_start (var_args, info);
   gst_element_message (element, "info", info, var_args);
   va_end (var_args);
@@ -965,7 +983,7 @@ static GstElementStateReturn
 gst_element_change_state (GstElement *element)
 {
   GstElementState old_state;
-  //GstEvent *event;
+  GstObject *parent;
 
   g_return_val_if_fail (element != NULL, GST_STATE_FAILURE);
   g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
@@ -973,7 +991,7 @@ gst_element_change_state (GstElement *element)
   old_state = GST_STATE (element);
 
   if (GST_STATE_PENDING (element) == GST_STATE_VOID_PENDING || old_state == GST_STATE_PENDING (element)) {
-    GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)\n", GST_ELEMENT_NAME (element));
+    GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)", GST_ELEMENT_NAME (element));
     return GST_STATE_SUCCESS;
   }
   
@@ -983,8 +1001,12 @@ gst_element_change_state (GstElement *element)
                     GST_STATE_TRANSITION (element));
 
   /* tell the scheduler if we have one */
-  if (element->sched)
-    gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element));
+  if (element->sched) {
+    if (gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element)) 
+                   != GST_STATE_SUCCESS) {
+      return GST_STATE_FAILURE;
+    }
+  }
 
   GST_STATE (element) = GST_STATE_PENDING (element);
   GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
@@ -993,11 +1015,11 @@ gst_element_change_state (GstElement *element)
   g_cond_signal (element->state_cond);
   g_mutex_unlock (element->state_mutex);
 
-  if (GST_ELEMENT_PARENT (element)) {
-    gst_bin_child_state_change (GST_BIN (GST_ELEMENT_PARENT (element)), old_state, GST_STATE (element), element);
+  parent = GST_ELEMENT_PARENT (element);
+
+  if (parent && GST_IS_BIN (parent)) {
+    gst_bin_child_state_change (GST_BIN (parent), old_state, GST_STATE (element), element);
   }
-  //event = gst_event_new_state_change (old_state, GST_STATE (element));
-  //gst_element_send_event (element, event);
 
   return GST_STATE_SUCCESS;
 }
@@ -1328,7 +1350,6 @@ gst_element_signal_eos (GstElement *element)
 
   GST_DEBUG(GST_CAT_EVENT, "signaling EOS on element %s\n",GST_OBJECT_NAME(element));
   g_signal_emit (G_OBJECT (element), gst_element_signals[EOS], 0);
-  GST_FLAG_SET(element,GST_ELEMENT_COTHREAD_STOPPING);
 }
 
 
index 036c606..08e7d26 100644 (file)
@@ -58,8 +58,8 @@ extern GType _gst_element_type;
 
 #define GST_TYPE_ELEMENT               (_gst_element_type)
 
-#define GST_ELEMENT_FAST(obj)          ((GstElement*)(obj))
-#define GST_ELEMENT_CLASS_FAST(klass)  ((GstElementClass*)(klass))
+#define GST_ELEMENT_CAST(obj)          ((GstElement*)(obj))
+#define GST_ELEMENT_CLASS_CAST(klass)  ((GstElementClass*)(klass))
 #define GST_IS_ELEMENT(obj)            (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_ELEMENT))
 #define GST_IS_ELEMENT_CLASS(obj)      (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_ELEMENT))
 
@@ -67,8 +67,8 @@ extern GType _gst_element_type;
 # define GST_ELEMENT(obj)              (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_ELEMENT, GstElement))
 # define GST_ELEMENT_CLASS(klass)       (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_ELEMENT, GstElementClass))
 #else
-# define GST_ELEMENT                    GST_ELEMENT_FAST
-# define GST_ELEMENT_CLASS              GST_ELEMENT_CLASS_FAST
+# define GST_ELEMENT                    GST_ELEMENT_CAST
+# define GST_ELEMENT_CLASS              GST_ELEMENT_CLASS_CAST
 #endif
 
 typedef enum {
@@ -82,18 +82,16 @@ typedef enum {
   /* this element is incable of seeking (FIXME: does this apply to filters?) */
   GST_ELEMENT_NO_SEEK,
 
-  /***** !!!!! need to have a flag that says that an element must
-    *not* be an entry into a scheduling chain !!!!! *****/
-  /* this element for some reason doesn't obey COTHREAD_STOPPING, or
-     has some other reason why it can't be the entry */
-  GST_ELEMENT_NO_ENTRY,
+  /* this element, for some reason, has a loop function that performs
+   * an infinite loop without calls to gst_element_yield () */
+  GST_ELEMENT_INFINITE_LOOP,
+
+  /* private flags that can be used by the scheduler */
+  GST_ELEMENT_SCHEDULER_PRIVATE1,
+  GST_ELEMENT_SCHEDULER_PRIVATE2,
 
   /* there is a new loopfunction ready for placement */
   GST_ELEMENT_NEW_LOOPFUNC,
-  /* the cothread holding this element needs to be stopped */
-  GST_ELEMENT_COTHREAD_STOPPING,
-  /* the element has to be scheduled as a cothread for any sanity */
-  GST_ELEMENT_USE_COTHREAD,
 
   /* if this element can handle events */
   GST_ELEMENT_EVENT_AWARE,
@@ -103,7 +101,6 @@ typedef enum {
 } GstElementFlags;
 
 #define GST_ELEMENT_IS_THREAD_SUGGESTED(obj)   (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
-#define GST_ELEMENT_IS_COTHREAD_STOPPING(obj)  (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING))
 #define GST_ELEMENT_IS_EOS(obj)                        (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS))
 #define GST_ELEMENT_IS_EVENT_AWARE(obj)                (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE))
 
@@ -127,9 +124,10 @@ struct _GstElement {
   guint8               current_state;
   guint8               pending_state;
   GstElement           *manager;
-  GstScheduler                 *sched;
   GstElementLoopFunction loopfunc;
-  cothread_state       *threadstate;
+
+  GstScheduler                 *sched;
+  gpointer             sched_private;
 
   /* element pads */
   guint16              numpads;
@@ -185,6 +183,8 @@ const gchar*            gst_element_get_name            (GstElement *element);
 void                    gst_element_set_parent          (GstElement *element, GstObject *parent);
 GstObject*              gst_element_get_parent          (GstElement *element);
 
+#define                        gst_element_yield(element)      gst_scheduler_yield(GST_ELEMENT_SCHED(element),element)
+#define                        gst_element_interrupt(element)  gst_scheduler_interrupt(GST_ELEMENT_SCHED(element),element)
 void                   gst_element_set_sched           (GstElement *element, GstScheduler *sched);
 GstScheduler*          gst_element_get_sched           (GstElement *element);
 
index d889dfb..0f2078b 100644 (file)
@@ -53,15 +53,15 @@ extern GType _gst_object_type;
 # define GST_IS_OBJECT(obj)             (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_OBJECT))
 # define GST_IS_OBJECT_CLASS(obj)       (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_OBJECT))
 
-#define GST_OBJECT_FAST(obj)            ((GstObject*)(obj))
-#define GST_OBJECT_CLASS_FAST(klass)    ((GstObjectClass*)(klass))
+#define GST_OBJECT_CAST(obj)            ((GstObject*)(obj))
+#define GST_OBJECT_CLASS_CAST(klass)    ((GstObjectClass*)(klass))
 
 #ifdef GST_TYPE_PARANOID
 # define GST_OBJECT(obj)                (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_OBJECT, GstObject))
 # define GST_OBJECT_CLASS(klass)        (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_OBJECT, GstObjectClass))
 #else
-# define GST_OBJECT                     GST_OBJECT_FAST
-# define GST_OBJECT_CLASS               GST_OBJECT_CLASS_FAST
+# define GST_OBJECT                     GST_OBJECT_CAST
+# define GST_OBJECT_CLASS               GST_OBJECT_CLASS_CAST
 #endif
 
 /*typedef struct _GstObject GstObject; */ 
@@ -115,7 +115,7 @@ struct _GstObjectClass {
 #endif
 };
 
-#define GST_FLAGS(obj)                 (GST_OBJECT (obj)->flags)
+#define GST_FLAGS(obj)                 (GST_OBJECT_CAST (obj)->flags)
 #define GST_FLAG_IS_SET(obj,flag)      (GST_FLAGS (obj) & (1<<(flag)))
 #define GST_FLAG_SET(obj,flag)         G_STMT_START{ (GST_FLAGS (obj) |= (1<<(flag))); }G_STMT_END
 #define GST_FLAG_UNSET(obj,flag)       G_STMT_START{ (GST_FLAGS (obj) &= ~(1<<(flag))); }G_STMT_END
@@ -127,10 +127,10 @@ struct _GstObjectClass {
 #define GST_OBJECT_FLOATING(obj)       (GST_FLAG_IS_SET (obj, GST_FLOATING))
 
 /* object locking */
-#define GST_LOCK(obj)          (g_mutex_lock(GST_OBJECT(obj)->lock))
-#define GST_TRYLOCK(obj)       (g_mutex_trylock(GST_OBJECT(obj)->lock))
-#define GST_UNLOCK(obj)                (g_mutex_unlock(GST_OBJECT(obj)->lock))
-#define GST_GET_LOCK(obj)      (GST_OBJECT(obj)->lock)
+#define GST_LOCK(obj)          (g_mutex_lock(GST_OBJECT_CAST(obj)->lock))
+#define GST_TRYLOCK(obj)       (g_mutex_trylock(GST_OBJECT_CAST(obj)->lock))
+#define GST_UNLOCK(obj)                (g_mutex_unlock(GST_OBJECT_CAST(obj)->lock))
+#define GST_GET_LOCK(obj)      (GST_OBJECT_CAST(obj)->lock)
 
 
 /* normal GObject stuff */
index 2c52a5e..cf7e65c 100644 (file)
@@ -192,6 +192,9 @@ gst_real_pad_init (GstRealPad *pad)
   pad->direction = GST_PAD_UNKNOWN;
   pad->peer = NULL;
 
+  pad->sched = NULL;
+  pad->sched_private = NULL;
+
   pad->chainfunc = NULL;
   pad->getfunc = NULL;
   pad->getregionfunc = NULL;
@@ -564,13 +567,11 @@ void
 gst_pad_connect (GstPad *srcpad,
                 GstPad *sinkpad)
 {
-  if (!gst_pad_try_connect (srcpad, sinkpad))
-/* FIXME: g_critical is glib-2.0, not glib-1.2
-    g_critical ("couldn't connect %s:%s and %s:%s",
-*/
+  if (!gst_pad_try_connect (srcpad, sinkpad)) {
     g_warning ("couldn't connect %s:%s and %s:%s",
                GST_DEBUG_PAD_NAME (srcpad),
                GST_DEBUG_PAD_NAME (sinkpad));
+  }
 }
 
 /**
@@ -1451,18 +1452,33 @@ gst_pad_push (GstPad *pad, GstBuffer *buf)
   GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
 
   g_return_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC);
+
   if (!peer) {
-    g_warning ("gst_pad_push but %s:%s is unconnected", GST_DEBUG_PAD_NAME (pad));
-    return;
+    g_warning ("push on pad %s:%s but it is unconnected", GST_DEBUG_PAD_NAME (pad));
   }
-  
-  if (peer->chainhandler) {
-    GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n",
-          GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer)));
-    (peer->chainhandler) (GST_PAD_FAST (peer), buf);
-  } 
   else {
-    g_warning ("gst_pad_push but %s:%s has but no chainhandler", GST_DEBUG_PAD_NAME (peer));
+    if (peer->chainhandler) {
+      if (buf) {
+        GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n",
+            GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer)));
+        (peer->chainhandler) (GST_PAD_CAST (peer), buf);
+       return;
+      }
+      else {
+        g_warning ("trying to push a NULL buffer on pad %s:%s", GST_DEBUG_PAD_NAME (peer));
+       return;
+      }
+    } 
+    else {
+      g_warning ("(internal error) push on pad %s:%s but it has no chainhandler", GST_DEBUG_PAD_NAME (peer));
+    }
+  }
+  /* clean up the mess here */
+  if (buf != NULL) {
+    if (GST_IS_BUFFER (buf))
+      gst_buffer_unref (buf);
+    else
+      gst_pad_event_default (pad, GST_EVENT (buf));
   }
 }
 #endif
@@ -1486,18 +1502,33 @@ gst_pad_pull (GstPad *pad)
   g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL);
 
   if (!peer) {
-    g_warning ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad));
-    return NULL;
+    gst_element_error (GST_PAD_PARENT (pad), 
+                   "pull on pad %s:%s but it was unconnected", 
+                   GST_ELEMENT_NAME (GST_PAD_PARENT (pad)), GST_PAD_NAME (pad),
+                   NULL);
   }
-
-  if (peer->gethandler) {
-    GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n",
-      GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
-    return (peer->gethandler) (GST_PAD_FAST (peer));
-  } else {
-    g_warning ("gst_pad_pull but %s:%s has no gethandler", GST_DEBUG_PAD_NAME (peer));
-    return NULL;
+  else {
+    if (peer->gethandler) {
+      GstBuffer *buf;
+
+      GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n",
+        GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
+
+      buf = (peer->gethandler) (GST_PAD_CAST (peer));
+      if (buf)
+        return buf;
+      /* no null buffers allowed */
+      gst_element_error (GST_PAD_PARENT (pad), 
+                   "NULL buffer during pull on %s:%s", GST_DEBUG_PAD_NAME (pad), NULL);
+         
+    } else {
+      gst_element_error (GST_PAD_PARENT (pad), 
+                   "(internal error) pull on pad %s:%s but the peer pad %s:%s has no gethandler", 
+                   GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer),
+                   NULL);
+    }
   }
+  return NULL;
 }
 #endif
 
@@ -1535,8 +1566,8 @@ gst_pad_pullregion (GstPad *pad, GstRegionType type, guint64 offset, guint64 len
 
     if (peer->pullregionfunc) {
       GST_DEBUG (GST_CAT_DATAFLOW, "calling pullregionfunc &%s of peer pad %s:%s\n",
-          GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_FAST (peer)));
-      result = (peer->pullregionfunc) (GST_PAD_FAST (peer), type, offset, len);
+          GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_CAST (peer)));
+      result = (peer->pullregionfunc) (GST_PAD_CAST (peer), type, offset, len);
     } else {
       GST_DEBUG (GST_CAT_DATAFLOW,"no pullregionfunc\n");
       result = NULL;
@@ -1986,6 +2017,8 @@ gst_pad_event_default (GstPad *pad, GstEvent *event)
           pads = g_list_next (pads);
        }
       }
+      /* we have to try to schedule another element because this one is deisabled */
+      gst_element_yield (element);
       break;
     default:
       g_warning ("no default handler for event\n");
index d2f6653..aa08ce6 100644 (file)
@@ -48,8 +48,8 @@ extern GType _gst_ghost_pad_type;
  */
 #define GST_TYPE_PAD                   (_gst_pad_type)
 
-#define GST_PAD_FAST(obj)              ((GstPad*)(obj))
-#define GST_PAD_CLASS_FAST(klass)      ((GstPadClass*)(klass))
+#define GST_PAD_CAST(obj)              ((GstPad*)(obj))
+#define GST_PAD_CLASS_CAST(klass)      ((GstPadClass*)(klass))
 #define GST_IS_PAD(obj)                        (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_PAD))
 #define GST_IS_PAD_FAST(obj)           (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD || \
                                         G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD)
@@ -59,8 +59,8 @@ extern GType _gst_ghost_pad_type;
 # define GST_PAD(obj)                  (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_PAD, GstPad))
 # define GST_PAD_CLASS(klass)          (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_PAD, GstPadClass))
 #else
-# define GST_PAD                       GST_PAD_FAST
-# define GST_PAD_CLASS                 GST_PAD_CLASS_FAST
+# define GST_PAD                       GST_PAD_CAST
+# define GST_PAD_CLASS                 GST_PAD_CLASS_CAST
 #endif
 
 /* 
@@ -68,8 +68,8 @@ extern GType _gst_ghost_pad_type;
  */
 #define GST_TYPE_REAL_PAD              (_gst_real_pad_type)
 
-#define GST_REAL_PAD_FAST(obj)         ((GstRealPad*)(obj))
-#define GST_REAL_PAD_CLASS_FAST(klass) ((GstRealPadClass*)(klass))
+#define GST_REAL_PAD_CAST(obj)         ((GstRealPad*)(obj))
+#define GST_REAL_PAD_CLASS_CAST(klass) ((GstRealPadClass*)(klass))
 #define GST_IS_REAL_PAD(obj)           (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_REAL_PAD))
 #define GST_IS_REAL_PAD_FAST(obj)      (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD)
 #define GST_IS_REAL_PAD_CLASS(obj)     (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_REAL_PAD))
@@ -78,8 +78,8 @@ extern GType _gst_ghost_pad_type;
 # define GST_REAL_PAD(obj)             (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_REAL_PAD, GstRealPad))
 # define GST_REAL_PAD_CLASS(klass)     (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_REAL_PAD, GstRealPadClass))
 #else
-# define GST_REAL_PAD                  GST_REAL_PAD_FAST
-# define GST_REAL_PAD_CLASS            GST_REAL_PAD_CLASS_FAST
+# define GST_REAL_PAD                  GST_REAL_PAD_CAST
+# define GST_REAL_PAD_CLASS            GST_REAL_PAD_CLASS_CAST
 #endif
 
 /* 
@@ -87,8 +87,8 @@ extern GType _gst_ghost_pad_type;
  */
 #define GST_TYPE_GHOST_PAD             (_gst_ghost_pad_type)
 
-#define GST_GHOST_PAD_FAST(obj)                ((GstGhostPad*)(obj))
-#define GST_GHOST_PAD_CLASS_FAST(klass)        ((GstGhostPadClass*)(klass))
+#define GST_GHOST_PAD_CAST(obj)                ((GstGhostPad*)(obj))
+#define GST_GHOST_PAD_CLASS_CAST(klass)        ((GstGhostPadClass*)(klass))
 #define GST_IS_GHOST_PAD(obj)          (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_GHOST_PAD))
 #define GST_IS_GHOST_PAD_FAST(obj)     (G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD)
 #define GST_IS_GHOST_PAD_CLASS(obj)    (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_GHOST_PAD))
@@ -97,8 +97,8 @@ extern GType _gst_ghost_pad_type;
 # define GST_GHOST_PAD(obj)            (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_GHOST_PAD, GstGhostPad))
 # define GST_GHOST_PAD_CLASS(klass)    (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_GHOST_PAD, GstGhostPadClass))
 #else
-# define GST_GHOST_PAD                 GST_GHOST_PAD_FAST
-# define GST_GHOST_PAD_CLASS           GST_GHOST_PAD_CLASS_FAST
+# define GST_GHOST_PAD                 GST_GHOST_PAD_CAST
+# define GST_GHOST_PAD_CLASS           GST_GHOST_PAD_CLASS_CAST
 #endif
 
 
@@ -169,7 +169,8 @@ struct _GstRealPad {
   GstCaps                      *caps;
   GstPadDirection              direction;
 
-  cothread_state               *threadstate;
+  GstScheduler                 *sched;
+  gpointer                     sched_private;
 
   GstRealPad                   *peer;
 
@@ -178,8 +179,6 @@ struct _GstRealPad {
   guint64                      offset;
   guint64                      len;
 
-  GstScheduler                 *sched;
-
   GstPadChainFunction          chainfunc;
   GstPadChainFunction          chainhandler;
   GstPadGetFunction            getfunc;
@@ -257,7 +256,7 @@ struct _GstGhostPadClass {
 #define GST_GPAD_REALPAD(pad)          (((GstGhostPad *)(pad))->realpad)
 
 /* Generic */
-#define GST_PAD_REALIZE(pad)   (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
+#define GST_PAD_REALIZE(pad)           (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
 #define GST_PAD_DIRECTION(pad)         GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
 #define GST_PAD_CAPS(pad)              GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
 #define GST_PAD_PEER(pad)              GST_RPAD_PEER(GST_PAD_REALIZE(pad))
index 26ec647..49fc423 100644 (file)
@@ -104,6 +104,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
   GList *pads;
   gint elementcount = 0;
   gint retval = 0;
+  gboolean backref = FALSE;
 
   priv->binlevel++;
 
@@ -174,13 +175,14 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
        GstElement *new;
 
        GST_DEBUG (0, "have pad for element %s\n", element_name);
-       new = gst_bin_get_by_name (parent, element_name);
+       new = gst_bin_get_by_name_recurse_up (parent, element_name);
        if (!new) {
           GST_DEBUG (0, "element %s does not exist! trying to continue\n", element_name);
        }
        else {
           previous = new;
          srcpadname = ptr + 1;
+         backref = TRUE;
        }
       }
       
@@ -321,6 +323,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
          i++;
          continue;
        }
+        gst_bin_add (GST_BIN (parent), element);
 
        j = gst_parse_launch_cmdline (argc - i, argv + i + 1, GST_BIN (element), priv);
        /* check for parse error */
@@ -347,9 +350,9 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
          return GST_PARSE_ERROR_NOSUCH_ELEMENT;
        }
        GST_DEBUG (0, "CREATED element %s\n", GST_ELEMENT_NAME (element));
+        gst_bin_add (GST_BIN (parent), element);
       }
 
-      gst_bin_add (GST_BIN (parent), element);
       elementcount++;
 
       g_slist_free (sinkpads);
@@ -412,7 +415,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
       else
        GST_DEBUG (0, "have sink pad %s:%s\n", GST_DEBUG_PAD_NAME (GST_PARSE_LISTPAD (sinkpads)));
 
-      if (!srcpads && sinkpads && previous) {
+      if (!srcpads && sinkpads && previous && srcpadname) {
        dyn_connect *connect = g_malloc (sizeof (dyn_connect));
 
        connect->srcpadname = srcpadname;
@@ -442,7 +445,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri
       sinkpads = NULL;
 
       /* if we're the first element, ghost all the sinkpads */
-      if (elementcount == 1) {
+      if (elementcount == 1 && !backref) {
        DEBUG ("first element, ghosting all of %s's sink pads to parent %s\n",
               GST_ELEMENT_NAME (element), GST_ELEMENT_NAME (GST_ELEMENT (parent)));
        pads = gst_element_get_pad_list (element);
index 4375cec..0a00fb8 100644 (file)
@@ -65,11 +65,13 @@ enum {
   ARG_LEAKY,
   ARG_LEVEL,
   ARG_MAX_LEVEL,
+  ARG_MAY_DEADLOCK,
 };
 
 
 static void                    gst_queue_class_init            (GstQueueClass *klass);
 static void                    gst_queue_init                  (GstQueue *queue);
+static void                    gst_queue_dispose               (GObject *object);
 
 static void                    gst_queue_set_property          (GObject *object, guint prop_id, 
                                                                 const GValue *value, GParamSpec *pspec);
@@ -141,18 +143,22 @@ gst_queue_class_init (GstQueueClass *klass)
 
   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
 
-  g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
-    g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
-                      GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
-  g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
-    g_param_spec_int("level","Level","How many buffers are in the queue.",
-                     0,G_MAXINT,0,G_PARAM_READABLE));
-  g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
-    g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
-                     0,G_MAXINT,100,G_PARAM_READWRITE));
-
-  gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
-  gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
+    g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
+                       GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
+    g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
+                      0, G_MAXINT, 0, G_PARAM_READABLE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
+    g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
+                      0, G_MAXINT, 100, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
+    g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
+                      TRUE, G_PARAM_READWRITE));
+
+  gobject_class->dispose                = GST_DEBUG_FUNCPTR (gst_queue_dispose);
+  gobject_class->set_property          = GST_DEBUG_FUNCPTR (gst_queue_set_property);
+  gobject_class->get_property          = GST_DEBUG_FUNCPTR (gst_queue_get_property);
 
   gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
 }
@@ -182,6 +188,7 @@ gst_queue_init (GstQueue *queue)
   queue->size_buffers = 100;           /* 100 buffers */
   queue->size_bytes = 100 * 1024;      /* 100KB */
   queue->size_time = 1000000000LL;     /* 1sec */
+  queue->may_deadlock = TRUE;
 
   queue->qlock = g_mutex_new ();
   queue->reader = FALSE;
@@ -191,6 +198,18 @@ gst_queue_init (GstQueue *queue)
   GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
 }
 
+static void
+gst_queue_dispose (GObject *object)
+{
+  GstQueue *queue = GST_QUEUE (object);
+
+  g_mutex_free (queue->qlock);
+  g_cond_free (queue->not_empty);
+  g_cond_free (queue->not_full);
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
 static GstBufferPool*
 gst_queue_get_bufferpool (GstPad *pad)
 {
@@ -334,10 +353,21 @@ restart:
       while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         g_mutex_unlock (queue->qlock);
-        cothread_switch(cothread_current_main());
+       gst_element_interrupt (GST_ELEMENT (queue));
        goto restart;
       }
-      g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+      if (GST_STATE (queue) != GST_STATE_PLAYING) {
+       /* this means the other end is shut down */
+       /* try to signal to resolve the error */
+       if (!queue->may_deadlock) {
+          g_mutex_unlock (queue->qlock);
+          gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
+         return;
+       }
+       else {
+          gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
+       }
+      }
 
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
       if (queue->writer)
@@ -402,10 +432,20 @@ restart:
     while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       g_mutex_unlock (queue->qlock);
-      cothread_switch(cothread_current_main());
+      gst_element_interrupt (GST_ELEMENT (queue));
       goto restart;
     }
-    g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+    if (GST_STATE (queue) != GST_STATE_PLAYING) {
+      /* this means the other end is shut down */
+      if (!queue->may_deadlock) {
+        g_mutex_unlock (queue->qlock);
+        gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
+        return NULL;
+      }
+      else {
+        gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
+      }
+    }
 
     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
     if (queue->reader)
@@ -513,10 +553,13 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
 
   switch (prop_id) {
     case ARG_LEAKY:
-      queue->leaky = g_value_get_int(value);
+      queue->leaky = g_value_get_int (value);
       break;
     case ARG_MAX_LEVEL:
-      queue->size_buffers = g_value_get_int(value);
+      queue->size_buffers = g_value_get_int (value);
+      break;
+    case ARG_MAY_DEADLOCK:
+      queue->may_deadlock = g_value_get_boolean (value);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -536,13 +579,16 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
 
   switch (prop_id) {
     case ARG_LEAKY:
-      g_value_set_int(value, queue->leaky);
+      g_value_set_int (value, queue->leaky);
       break;
     case ARG_LEVEL:
-      g_value_set_int(value, queue->level_buffers);
+      g_value_set_int (value, queue->level_buffers);
       break;
     case ARG_MAX_LEVEL:
-      g_value_set_int(value, queue->size_buffers);
+      g_value_set_int (value, queue->size_buffers);
+      break;
+    case ARG_MAY_DEADLOCK:
+      g_value_set_boolean (value, queue->may_deadlock);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
index fb091e1..3af0ef6 100644 (file)
@@ -74,6 +74,7 @@ struct _GstQueue {
   guint64 size_time;   /* size of queue in time */
 
   gint leaky;          /* whether the queue is leaky, and if so at which end */
+  gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
 
   GMutex *qlock;       /* lock for queue (vs object lock) */
   /* we are single reader and single writer queue */
index e49ca5c..cfa59c8 100644 (file)
@@ -76,6 +76,8 @@ gst_scheduler_init (GstScheduler *sched)
 void
 gst_scheduler_setup (GstScheduler *sched)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+
   if (CLASS (sched)->setup)
     CLASS (sched)->setup (sched);
 }
@@ -89,6 +91,8 @@ gst_scheduler_setup (GstScheduler *sched)
 void
 gst_scheduler_reset (GstScheduler *sched)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+
   if (CLASS (sched)->reset)
     CLASS (sched)->reset (sched);
 }
@@ -104,6 +108,10 @@ gst_scheduler_reset (GstScheduler *sched)
 void
 gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_PAD (srcpad));
+  g_return_if_fail (GST_IS_PAD (sinkpad));
+
   if (CLASS (sched)->pad_connect)
     CLASS (sched)->pad_connect (sched, srcpad, sinkpad);
 }
@@ -119,6 +127,10 @@ gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
 void
 gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_PAD (srcpad));
+  g_return_if_fail (GST_IS_PAD (sinkpad));
+
   if (CLASS (sched)->pad_disconnect)
     CLASS (sched)->pad_disconnect (sched, srcpad, sinkpad);
 }
@@ -135,6 +147,9 @@ gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp
 GstPad *
 gst_scheduler_pad_select (GstScheduler *sched, GList *padlist)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (padlist != NULL);
+
   if (CLASS (sched)->pad_select)
     CLASS (sched)->pad_select (sched, padlist);
 }
@@ -149,6 +164,9 @@ gst_scheduler_pad_select (GstScheduler *sched, GList *padlist)
 void
 gst_scheduler_add_element (GstScheduler *sched, GstElement *element)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
   if (CLASS (sched)->add_element)
     CLASS (sched)->add_element (sched, element);
 }
@@ -161,11 +179,16 @@ gst_scheduler_add_element (GstScheduler *sched, GstElement *element)
  *
  * Tell the scheduler that an element changed its state.
  */
-void
+GstElementStateReturn
 gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
 {
+  g_return_val_if_fail (GST_IS_SCHEDULER (sched), GST_STATE_FAILURE);
+  g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
+
   if (CLASS (sched)->state_transition)
-    CLASS (sched)->state_transition (sched, element, transition);
+    return CLASS (sched)->state_transition (sched, element, transition);
+
+  return GST_STATE_SUCCESS;
 }
 
 /**
@@ -178,6 +201,9 @@ gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint t
 void
 gst_scheduler_remove_element (GstScheduler *sched, GstElement *element)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
   if (CLASS (sched)->remove_element)
     CLASS (sched)->remove_element (sched, element);
 }
@@ -192,6 +218,9 @@ gst_scheduler_remove_element (GstScheduler *sched, GstElement *element)
 void
 gst_scheduler_lock_element (GstScheduler *sched, GstElement *element)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
   if (CLASS (sched)->lock_element)
     CLASS (sched)->lock_element (sched, element);
 }
@@ -206,11 +235,65 @@ gst_scheduler_lock_element (GstScheduler *sched, GstElement *element)
 void
 gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
   if (CLASS (sched)->unlock_element)
     CLASS (sched)->unlock_element (sched, element);
 }
 
 /**
+ * gst_scheduler_error:
+ * @sched: the schedulerr
+ * @element: the element with the error
+ *
+ * Tell the scheduler an element was in error
+ */
+void
+gst_scheduler_error (GstScheduler *sched, GstElement *element)
+{
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
+  if (CLASS (sched)->error)
+    CLASS (sched)->error (sched, element);
+}
+
+/**
+ * gst_scheduler_yield:
+ * @sched: the schedulerr
+ * @element: the element requesting a yield
+ *
+ * Tell the scheduler to schedule another element.
+ */
+void
+gst_scheduler_yield (GstScheduler *sched, GstElement *element)
+{
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
+  if (CLASS (sched)->yield)
+    CLASS (sched)->yield (sched, element);
+}
+
+/**
+ * gst_scheduler_interrupt:
+ * @sched: the schedulerr
+ * @element: the element requesting an interrupt
+ *
+ * Tell the scheduler to interrupt execution of this element.
+ */
+void
+gst_scheduler_interrupt (GstScheduler *sched, GstElement *element)
+{
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+  g_return_if_fail (GST_IS_ELEMENT (element));
+
+  if (CLASS (sched)->interrupt)
+    CLASS (sched)->interrupt (sched, element);
+}
+
+/**
  * gst_scheduler_iterate:
  * @sched: the schedulerr
  *
@@ -221,6 +304,8 @@ gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
 gboolean
 gst_scheduler_iterate (GstScheduler *sched)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+
   if (CLASS (sched)->iterate)
     CLASS (sched)->iterate (sched);
 }
@@ -235,6 +320,8 @@ gst_scheduler_iterate (GstScheduler *sched)
 void
 gst_scheduler_show (GstScheduler *sched)
 {
+  g_return_if_fail (GST_IS_SCHEDULER (sched));
+
   if (CLASS (sched)->show)
     CLASS (sched)->show (sched);
 }
index 1b2dac5..31e5b5a 100644 (file)
@@ -54,31 +54,31 @@ struct _GstScheduler {
   GstObject object;
 
   GstElement *parent;
-
-  GList *elements;
-  gint num_elements;
-
-  GList *chains;
-  gint num_chains;
 };
 
 struct _GstSchedulerClass {
   GstObjectClass parent_class;
 
+  /* virtual methods */
   void (*setup)                        (GstScheduler *sched);
   void (*reset)                        (GstScheduler *sched);
   void (*add_element)          (GstScheduler *sched, GstElement *element);
-  void (*remove_element)       (GstScheduler *sched, GstElement *element);
-  void (*state_transition)     (GstScheduler *sched, GstElement *element, gint transition);
+  void         (*remove_element)       (GstScheduler *sched, GstElement *element);
+  GstElementStateReturn 
+         (*state_transition)   (GstScheduler *sched, GstElement *element, gint transition);
   void (*lock_element)         (GstScheduler *sched, GstElement *element);
   void (*unlock_element)       (GstScheduler *sched, GstElement *element);
+  void (*yield)                        (GstScheduler *sched, GstElement *element);
+  void (*interrupt)            (GstScheduler *sched, GstElement *element);
+  void (*error)                        (GstScheduler *sched, GstElement *element);
   void (*pad_connect)          (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
   void (*pad_disconnect)       (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
   void (*pad_select)           (GstScheduler *sched, GList *padlist);
   gboolean (*iterate)          (GstScheduler *sched);
-
   /* for debugging */
   void (*show)                 (GstScheduler *sched);
+
+  /* signals go here */
 };
 
 GType                  gst_scheduler_get_type          (void);
@@ -89,9 +89,12 @@ void                 gst_scheduler_setup             (GstScheduler *sched);
 void                   gst_scheduler_reset             (GstScheduler *sched);
 void                   gst_scheduler_add_element       (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_remove_element    (GstScheduler *sched, GstElement *element);
-void                   gst_scheduler_state_transition  (GstScheduler *sched, GstElement *element, gint transition);
+GstElementStateReturn  gst_scheduler_state_transition  (GstScheduler *sched, GstElement *element, gint transition);
 void                   gst_scheduler_lock_element      (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_unlock_element    (GstScheduler *sched, GstElement *element);
+void                   gst_scheduler_yield             (GstScheduler *sched, GstElement *element);
+void                   gst_scheduler_interrupt         (GstScheduler *sched, GstElement *element);
+void                   gst_scheduler_error             (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_pad_connect       (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
 void                   gst_scheduler_pad_disconnect    (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
 GstPad*                 gst_scheduler_pad_select       (GstScheduler *sched, GList *padlist);
index e76b8d5..321fe62 100644 (file)
@@ -147,8 +147,6 @@ gst_thread_init (GstThread *thread)
 
   thread->ppid = getpid();
   thread->thread_id = -1;
-
-/*  gst_element_set_manager(GST_ELEMENT(thread),GST_ELEMENT(thread)); */
 }
 
 static void
@@ -303,7 +301,8 @@ gst_thread_change_state (GstElement * element)
       break;
     case GST_STATE_PLAYING_TO_PAUSED:
     {
-      GList *elements = (element->sched)->elements;
+      //GList *elements = (element->sched)->elements;
+      GList *elements = gst_bin_get_list(GST_BIN (thread));
 
       THR_INFO ("pausing thread");
 
@@ -313,7 +312,7 @@ gst_thread_change_state (GstElement * element)
        * + the pending state was already set by gstelement.c::set_state()
        * + find every queue we manage, and signal its empty and full conditions
        */ 
-        g_mutex_lock (thread->lock);
+      g_mutex_lock (thread->lock);
 
       GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
 
index 27032f0..dda6135 100644 (file)
@@ -197,7 +197,7 @@ gst_typefind_chain (GstPad *pad, GstBuffer *buf)
         if (GST_STATE(typefind) != oldstate) {
          gst_object_unref (GST_OBJECT (typefind));
           GST_DEBUG(0, "state changed during signal, aborting\n");
-          cothread_switch(cothread_current_main());
+         gst_element_yield (typefind);
         }
        gst_object_unref (GST_OBJECT (typefind));
 }
index 02f3848..be83954 100644 (file)
 
 typedef struct _GstSchedulerChain GstSchedulerChain;
 
+#define GST_PAD_THREADSTATE(pad)       (cothread_state*) (GST_PAD_CAST (pad)->sched_private)
+#define GST_ELEMENT_THREADSTATE(elem)  (cothread_state*) (GST_ELEMENT_CAST (elem)->sched_private)
+#define GST_BIN_THREADCONTEXT(bin)     (cothread_context*) (GST_BIN_CAST (bin)->sched_private)
+
+#define GST_ELEMENT_COTHREAD_STOPPING                  GST_ELEMENT_SCHEDULER_PRIVATE1
+#define GST_ELEMENT_IS_COTHREAD_STOPPING(element)      GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
+
+typedef struct _GstBasicScheduler GstBasicScheduler;
+typedef struct _GstBasicSchedulerClass GstBasicSchedulerClass;
+
 struct _GstSchedulerChain {
-  GstScheduler *sched;
+  GstBasicScheduler *sched;
 
   GList *disabled;
 
@@ -39,10 +49,37 @@ struct _GstSchedulerChain {
   gboolean schedule;
 };
 
+#define GST_TYPE_BASIC_SCHEDULER \
+  (gst_basic_scheduler_get_type())
+#define GST_BASIC_SCHEDULER(obj) \
+  (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASIC_SCHEDULER,GstBasicScheduler))
+#define GST_BASIC_SCHEDULER_CLASS(klass) \
+  (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASIC_SCHEDULER,GstBasicSchedulerClass))
+#define GST_IS_BASIC_SCHEDULER(obj) \
+  (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASIC_SCHEDULER))
+#define GST_IS_BASIC_SCHEDULER_CLASS(obj) \
+  (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER))
+
+#define GST_BASIC_SCHEDULER_CAST(sched)        ((GstBasicScheduler *)(sched))
+
+struct _GstBasicScheduler {
+  GstScheduler parent;
+
+  GList *elements;
+  gint num_elements;
+
+  GList *chains;
+  gint num_chains;
+};
+
+struct _GstBasicSchedulerClass {
+  GstSchedulerClass parent_class;
+};
+
 static GType _gst_basic_scheduler_type = 0;
 
-static void    gst_basic_scheduler_class_init          (GstSchedulerClass * klass);
-static void    gst_basic_scheduler_init                (GstScheduler * scheduler);
+static void    gst_basic_scheduler_class_init          (GstBasicSchedulerClass * klass);
+static void    gst_basic_scheduler_init                (GstBasicScheduler * scheduler);
 
 static void    gst_basic_scheduler_dispose             (GObject *object);
 
@@ -50,9 +87,13 @@ static void  gst_basic_scheduler_setup               (GstScheduler *sched);
 static void    gst_basic_scheduler_reset               (GstScheduler *sched);
 static void    gst_basic_scheduler_add_element         (GstScheduler *sched, GstElement *element);
 static void     gst_basic_scheduler_remove_element     (GstScheduler *sched, GstElement *element);
-static void     gst_basic_scheduler_state_transition   (GstScheduler *sched, GstElement *element, gint transition);
+static GstElementStateReturn  
+               gst_basic_scheduler_state_transition    (GstScheduler *sched, GstElement *element, gint transition);
 static void    gst_basic_scheduler_lock_element        (GstScheduler *sched, GstElement *element);
 static void    gst_basic_scheduler_unlock_element      (GstScheduler *sched, GstElement *element);
+static void    gst_basic_scheduler_yield               (GstScheduler *sched, GstElement *element);
+static void    gst_basic_scheduler_interrupt           (GstScheduler *sched, GstElement *element);
+static void    gst_basic_scheduler_error               (GstScheduler *sched, GstElement *element);
 static void     gst_basic_scheduler_pad_connect                (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
 static void     gst_basic_scheduler_pad_disconnect     (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
 static GstPad*  gst_basic_scheduler_pad_select                 (GstScheduler *sched, GList *padlist);
@@ -67,13 +108,13 @@ gst_basic_scheduler_get_type (void)
 {
   if (!_gst_basic_scheduler_type) {
     static const GTypeInfo scheduler_info = {
-      sizeof (GstSchedulerClass),
+      sizeof (GstBasicSchedulerClass),
       NULL,
       NULL,
       (GClassInitFunc) gst_basic_scheduler_class_init,
       NULL,
       NULL,
-      sizeof (GstScheduler),
+      sizeof (GstBasicScheduler),
       0,
       (GInstanceInitFunc) gst_basic_scheduler_init,
       NULL
@@ -85,40 +126,48 @@ gst_basic_scheduler_get_type (void)
 }
 
 static void
-gst_basic_scheduler_class_init (GstSchedulerClass * klass)
+gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass)
 {
   GObjectClass *gobject_class;
   GstObjectClass *gstobject_class;
+  GstSchedulerClass *gstscheduler_class;
 
   gobject_class = (GObjectClass*)klass;
   gstobject_class = (GstObjectClass*)klass;
+  gstscheduler_class = (GstSchedulerClass*)klass;
 
   parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
 
   gobject_class->dispose       = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose);
 
-  klass->setup                         = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
-  klass->reset                 = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
-  klass->add_element           = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
-  klass->remove_element        = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
-  klass->state_transition      = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
-  klass->lock_element          = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
-  klass->unlock_element        = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
-  klass->pad_connect           = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
-  klass->pad_disconnect        = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
-  klass->pad_select            = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
-  klass->iterate               = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
+  gstscheduler_class->setup            = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
+  gstscheduler_class->reset            = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
+  gstscheduler_class->add_element      = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
+  gstscheduler_class->remove_element   = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
+  gstscheduler_class->state_transition         = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
+  gstscheduler_class->lock_element     = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
+  gstscheduler_class->unlock_element   = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
+  gstscheduler_class->yield            = GST_DEBUG_FUNCPTR (gst_basic_scheduler_yield);
+  gstscheduler_class->interrupt        = GST_DEBUG_FUNCPTR (gst_basic_scheduler_interrupt);
+  gstscheduler_class->error            = GST_DEBUG_FUNCPTR (gst_basic_scheduler_error);
+  gstscheduler_class->pad_connect      = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
+  gstscheduler_class->pad_disconnect   = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
+  gstscheduler_class->pad_select       = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
+  gstscheduler_class->iterate          = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
 }
 
 static void
-gst_basic_scheduler_init (GstScheduler *scheduler)
+gst_basic_scheduler_init (GstBasicScheduler *scheduler)
 {
+  scheduler->elements = NULL;
+  scheduler->num_elements = 0;
+  scheduler->chains = NULL;
+  scheduler->num_chains = 0;
 }
 
 static void
 gst_basic_scheduler_dispose (GObject *object)
 {
-  
   G_OBJECT_CLASS (parent_class)->dispose (object);
 }
 
@@ -152,7 +201,7 @@ GstPluginDesc plugin_desc = {
 static int
 gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
 {
-  GstElement *element = GST_ELEMENT (argv);
+  GstElement *element = GST_ELEMENT_CAST (argv);
   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
 
   GST_DEBUG_ENTER ("(%d,'%s')", argc, name);
@@ -162,6 +211,7 @@ gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
               GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name);
     (element->loopfunc) (element);
     GST_DEBUG (GST_CAT_DATAFLOW, "element %s ended loop function\n", name);
+
   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
 
@@ -172,7 +222,7 @@ gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
 static int
 gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
 {
-  GstElement *element = GST_ELEMENT (argv);
+  GstElement *element = GST_ELEMENT_CAST (argv);
   G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
 
   GST_DEBUG_ENTER ("(\"%s\")", name);
@@ -189,7 +239,9 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
       pads = g_list_next (pads);
       if (!GST_IS_REAL_PAD (pad))
        continue;
-      realpad = GST_REAL_PAD (pad);
+
+      realpad = GST_REAL_PAD_CAST (pad);
+
       if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
        GstBuffer *buf;
 
@@ -204,10 +256,14 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
            GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s\n", name,
                       GST_PAD_NAME (pad));
            GST_RPAD_CHAINFUNC (realpad) (pad, buf);
+           GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
+                      GST_PAD_NAME (pad));
          }
        }
-       GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
-                  GST_PAD_NAME (pad));
+       else {
+          gst_element_error (element, "NULL buffer detected. Is \"%s:%s\" connected?",
+                         name, GST_PAD_NAME (pad), NULL);
+       }
       }
     }
   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
@@ -220,7 +276,7 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
 static int
 gst_basic_scheduler_src_wrapper (int argc, char *argv[])
 {
-  GstElement *element = GST_ELEMENT (argv);
+  GstElement *element = GST_ELEMENT_CAST (argv);
   GList *pads;
   GstRealPad *realpad;
   GstBuffer *buf = NULL;
@@ -231,9 +287,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
   do {
     pads = element->pads;
     while (pads) {
+
       if (!GST_IS_REAL_PAD (pads->data))
        continue;
-      realpad = (GstRealPad *) (pads->data);
+
+      realpad = GST_REAL_PAD_CAST (pads->data);
+
       pads = g_list_next (pads);
       if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) {
        GST_DEBUG (GST_CAT_DATAFLOW, "calling _getfunc for %s:%s\n", GST_DEBUG_PAD_NAME (realpad));
@@ -243,7 +302,7 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
 /*            fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); */
 /*          else                                                          */
          buf =
-           (GST_RPAD_GETREGIONFUNC (realpad)) ((GstPad *) realpad, realpad->regiontype,
+           (GST_RPAD_GETREGIONFUNC (realpad)) (GST_PAD_CAST (realpad), realpad->regiontype,
                                                realpad->offset, realpad->len);
          realpad->regiontype = GST_REGION_VOID;
        }
@@ -252,12 +311,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
 /*          if (GST_RPAD_GETFUNC(realpad) == NULL)                     */
 /*            fprintf(stderr,"error, no getfunc in \"%s\"\n", name);   */
 /*          else                                                       */
-         buf = GST_RPAD_GETFUNC (realpad) ((GstPad *) realpad);
+         buf = GST_RPAD_GETFUNC (realpad) (GST_PAD_CAST (realpad));
        }
 
        GST_DEBUG (GST_CAT_DATAFLOW, "calling gst_pad_push on pad %s:%s\n",
                   GST_DEBUG_PAD_NAME (realpad));
-       gst_pad_push ((GstPad *) realpad, buf);
+       gst_pad_push (GST_PAD_CAST (realpad), buf);
       }
     }
   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
@@ -281,8 +340,8 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
    */
   while (GST_RPAD_BUFPEN (pad) != NULL) {
     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
-              GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
-    cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+              GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+    cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
 
     /* we may no longer be the same pad, check. */
     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@@ -295,8 +354,8 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf)
   /* now fill the bufferpen and switch so it can be consumed */
   GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
   GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
-            GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
-  cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+            GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+  cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
 
   GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
 }
@@ -314,12 +373,12 @@ gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf)
   /* now fill the bufferpen and switch so it can be consumed */
   GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
   GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
-            GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+            GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
   g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
           gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
   GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
   GST_FLAG_UNSET (GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
-  cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+  cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
 
   g_print ("done switching\n");
   GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
@@ -339,8 +398,8 @@ gst_basic_scheduler_gethandler_proxy (GstPad * pad)
   while (GST_RPAD_BUFPEN (pad) == NULL) {
     GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
               GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))),
-              GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
-    cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+              GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+    cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
 
     /* we may no longer be the same pad, check. */
     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@@ -374,8 +433,8 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin
   /* we will loop switching to the peer until it's filled up the bufferpen */
   while (GST_RPAD_BUFPEN (pad) == NULL) {
     GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
-              GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
-    cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+              GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+    cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
 
     /* we may no longer be the same pad, check. */
     if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
@@ -392,7 +451,7 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin
 }
 
 
-static void
+static gboolean
 gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
 {
   GList *elements;
@@ -403,15 +462,19 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
 
   GST_DEBUG (GST_CAT_SCHEDULING, "chain is using COTHREADS\n");
 
-  g_assert (bin->threadcontext != NULL);
-
+  g_assert (GST_BIN_THREADCONTEXT (bin) != NULL);
 
   /* walk through all the chain's elements */
   elements = chain->elements;
   while (elements) {
-    element = GST_ELEMENT (elements->data);
+    gboolean decoupled;
+    gint same_sched = 0;
+
+    element = GST_ELEMENT_CAST (elements->data);
     elements = g_list_next (elements);
 
+    decoupled = (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED) ? TRUE :  FALSE);
+
     /* start out without a wrapper function, we select it later */
     wrapper_function = NULL;
 
@@ -423,7 +486,7 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
     else {
       /* otherwise we need to decide what kind of cothread */
       /* if it's not DECOUPLED, we decide based on whether it's a source or not */
-      if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+      if (!decoupled) {
        /* if it doesn't have any sinks, it must be a source (duh) */
        if (element->numsinkpads == 0) {
          wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper);
@@ -441,16 +504,28 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
     /* now we have to walk through the pads to set up their state */
     pads = gst_element_get_pad_list (element);
     while (pads) {
+      GstPad *peerpad;
+
       pad = GST_PAD (pads->data);
       pads = g_list_next (pads);
+
       if (!GST_IS_REAL_PAD (pad))
        continue;
+      
+      peerpad = GST_PAD (GST_RPAD_PEER (pad));
 
       /* if the element is DECOUPLED or outside the manager, we have to chain */
       if ((wrapper_function == NULL) ||
-         (GST_RPAD_PEER (pad) &&
-          (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
-       ) {
+         (peerpad && (GST_ELEMENT_CAST (GST_PAD_PARENT (peerpad))->sched != GST_SCHEDULER (chain->sched)))) {
+
+       if (!decoupled && GST_RPAD_PEER (pad) && 
+           !GST_FLAG_IS_SET (GST_PAD_PARENT (peerpad), GST_ELEMENT_DECOUPLED)) {
+          /* whoa non decoupled with different schedulers */
+          gst_element_error (element, "element \"%s\" is not decoupled but has pads in different schedulers",
+                         GST_ELEMENT_NAME (element), NULL);
+         return FALSE;
+       }
+       
        /* set the chain proxies */
        if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
          GST_DEBUG (GST_CAT_SCHEDULING, "copying chain function into push proxy for %s:%s\n",
@@ -464,17 +539,17 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
          GST_RPAD_PULLREGIONFUNC (pad) = GST_RPAD_GETREGIONFUNC (pad);
        }
 
-       /* otherwise we really are a cothread */
       }
+      /* otherwise we really are a cothread */
       else {
        if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
          GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded push proxy for sinkpad %s:%s\n",
-                    GST_DEBUG_PAD_NAME (pad));
+            GST_DEBUG_PAD_NAME (pad));
          GST_RPAD_CHAINHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
        }
        else {
          GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded pull proxy for srcpad %s:%s\n",
-                    GST_DEBUG_PAD_NAME (pad));
+            GST_DEBUG_PAD_NAME (pad));
          GST_RPAD_GETHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy);
          GST_RPAD_PULLREGIONFUNC (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pullregionfunc_proxy);
        }
@@ -483,17 +558,24 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
 
     /* need to set up the cothread now */
     if (wrapper_function != NULL) {
-      if (element->threadstate == NULL) {
-       /* FIXME handle cothread_create returning NULL */
-       element->threadstate = cothread_create (bin->threadcontext);
-       GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", element->threadstate,
+      if (GST_ELEMENT_THREADSTATE (element) == NULL) {
+       GST_ELEMENT_THREADSTATE (element) = cothread_create (GST_BIN_THREADCONTEXT (bin));
+       if (GST_ELEMENT_THREADSTATE (element) == NULL) {
+          gst_element_error (element, "could not create cothread for \"%s\"", 
+                         GST_ELEMENT_NAME (element), NULL);
+         return FALSE;
+       }
+       GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", 
+                  GST_ELEMENT_THREADSTATE (element),
                   GST_ELEMENT_NAME (element));
       }
-      cothread_setfunc (element->threadstate, wrapper_function, 0, (char **) element);
+      cothread_setfunc (GST_ELEMENT_THREADSTATE (element), wrapper_function, 0, (char **) element);
       GST_DEBUG (GST_CAT_SCHEDULING, "set wrapper function for '%s' to &%s\n",
                 GST_ELEMENT_NAME (element), GST_DEBUG_FUNCPTR_NAME (wrapper_function));
     }
   }
+
+  return TRUE;
 }
 
 /*
@@ -533,7 +615,7 @@ gst_basic_scheduler_chained_chain (GstBin *bin, _GstBinChain *chain) {
 
 
 static GstSchedulerChain *
-gst_basic_scheduler_chain_new (GstScheduler * sched)
+gst_basic_scheduler_chain_new (GstBasicScheduler * sched)
 {
   GstSchedulerChain *chain = g_new (GstSchedulerChain, 1);
 
@@ -559,7 +641,7 @@ gst_basic_scheduler_chain_new (GstScheduler * sched)
 static void
 gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain)
 {
-  GstScheduler *sched = chain->sched;
+  GstBasicScheduler *sched = chain->sched;
 
   /* remove the chain from the schedulers' list of chains */
   sched->chains = g_list_remove (sched->chains, chain);
@@ -582,14 +664,14 @@ gst_basic_scheduler_chain_add_element (GstSchedulerChain * chain, GstElement * e
            chain);
 
   /* set the sched pointer for the element */
-  element->sched = chain->sched;
+  element->sched = GST_SCHEDULER (chain->sched);
 
   /* add the element to the list of 'disabled' elements */
   chain->disabled = g_list_prepend (chain->disabled, element);
   chain->num_elements++;
 }
 
-static void
+static gboolean
 gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement * element)
 {
   GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),
@@ -602,7 +684,7 @@ gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement
   chain->elements = g_list_prepend (chain->elements, element);
 
   /* reschedule the chain */
-  gst_basic_scheduler_cothreaded_chain (GST_BIN (chain->sched->parent), chain);
+  return gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain->sched)->parent), chain);
 }
 
 static void
@@ -633,9 +715,9 @@ gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement
     gst_basic_scheduler_chain_disable_element (chain, element);
   }
   /* we have to check for a threadstate here because a queue doesn't have one */
-  if (element->threadstate) {
-    cothread_free (element->threadstate);
-    element->threadstate = NULL;
+  if (GST_ELEMENT_THREADSTATE (element)) {
+    cothread_free (GST_ELEMENT_THREADSTATE (element));
+    GST_ELEMENT_THREADSTATE (element) = NULL;
   }
 
   /* remove the element from the list of elements */
@@ -645,13 +727,10 @@ gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement
   /* if there are no more elements in the chain, destroy the chain */
   if (chain->num_elements == 0)
     gst_basic_scheduler_chain_destroy (chain);
-
-  /* unset the sched pointer for the element */
-  element->sched = NULL;
 }
 
 static void
-gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1, GstElement * element2)
+gst_basic_scheduler_chain_elements (GstBasicScheduler * sched, GstElement * element1, GstElement * element2)
 {
   GList *chains;
   GstSchedulerChain *chain;
@@ -722,7 +801,7 @@ gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1,
 
 /* find the chain within the scheduler that holds the element, if any */
 static GstSchedulerChain *
-gst_basic_scheduler_find_chain (GstScheduler * sched, GstElement * element)
+gst_basic_scheduler_find_chain (GstBasicScheduler * sched, GstElement * element)
 {
   GList *chains;
   GstSchedulerChain *chain;
@@ -788,9 +867,9 @@ gst_basic_scheduler_setup (GstScheduler *sched)
   GstBin *bin = GST_BIN (sched->parent);
 
   /* first create thread context */
-  if (bin->threadcontext == NULL) {
+  if (GST_BIN_THREADCONTEXT (bin) == NULL) {
     GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context\n");
-    bin->threadcontext = cothread_context_init ();
+    GST_BIN_THREADCONTEXT (bin) = cothread_context_init ();
   }
 }
 
@@ -799,18 +878,18 @@ gst_basic_scheduler_reset (GstScheduler *sched)
 {
   cothread_context *ctx;
   GstBin *bin = GST_BIN (GST_SCHED_PARENT (sched));
-  GList *elements = sched->elements;
+  GList *elements = GST_BASIC_SCHEDULER_CAST (sched)->elements;
 
   while (elements) {
-    GST_ELEMENT (elements->data)->threadstate = NULL;
+    GST_ELEMENT_THREADSTATE (elements->data) = NULL;
     elements = g_list_next (elements);
   }
   
-  ctx = GST_BIN (GST_SCHED_PARENT (sched))->threadcontext;
+  ctx = GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched));
 
   cothread_context_free (ctx);
   
-  GST_BIN (GST_SCHED_PARENT (sched))->threadcontext = NULL;
+  GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)) = NULL;
 }
 
 static void
@@ -820,9 +899,7 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
   GstPad *pad;
   GstElement *peerelement;
   GstSchedulerChain *chain;
-
-  g_return_if_fail (element != NULL);
-  g_return_if_fail (GST_IS_ELEMENT (element));
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
   /* if it's already in this scheduler, don't bother doing anything */
   if (GST_ELEMENT_SCHED (element) == sched)
@@ -844,11 +921,11 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
     return;
 
   /* first add it to the list of elements that are to be scheduled */
-  sched->elements = g_list_prepend (sched->elements, element);
-  sched->num_elements++;
+  bsched->elements = g_list_prepend (bsched->elements, element);
+  bsched->num_elements++;
 
   /* create a chain to hold it, and add */
-  chain = gst_basic_scheduler_chain_new (sched);
+  chain = gst_basic_scheduler_chain_new (bsched);
   gst_basic_scheduler_chain_add_element (chain, element);
 
   /* set the sched pointer in all the pads */
@@ -870,7 +947,7 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element)
       if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) {
        GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together");
        /* make sure that the two elements are in the same chain */
-       gst_basic_scheduler_chain_elements (sched, element, peerelement);
+       gst_basic_scheduler_chain_elements (bsched, element, peerelement);
       }
     }
   }
@@ -880,67 +957,104 @@ static void
 gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
 {
   GstSchedulerChain *chain;
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
-  g_return_if_fail (element != NULL);
-  g_return_if_fail (GST_IS_ELEMENT (element));
-
-  if (g_list_find (sched->elements, element)) {
+  if (g_list_find (bsched->elements, element)) {
     GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler",
              GST_ELEMENT_NAME (element));
 
     /* find what chain the element is in */
-    chain = gst_basic_scheduler_find_chain (sched, element);
+    chain = gst_basic_scheduler_find_chain (bsched, element);
 
     /* remove it from its chain */
     gst_basic_scheduler_chain_remove_element (chain, element);
 
     /* remove it from the list of elements */
-    sched->elements = g_list_remove (sched->elements, element);
-    sched->num_elements--;
+    bsched->elements = g_list_remove (bsched->elements, element);
+    bsched->num_elements--;
 
     /* unset the scheduler pointer in the element */
     GST_ELEMENT_SCHED (element) = NULL;
   }
 }
 
-static void
+static GstElementStateReturn
 gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
 {
   GstSchedulerChain *chain;
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
   /* find the chain the element is in */
-  chain = gst_basic_scheduler_find_chain (sched, element);
+  chain = gst_basic_scheduler_find_chain (bsched, element);
 
   /* remove it from the chain */
   if (chain) {
     if (transition == GST_STATE_PLAYING_TO_PAUSED) 
       gst_basic_scheduler_chain_disable_element (chain, element);
     if (transition == GST_STATE_PAUSED_TO_PLAYING) 
-      gst_basic_scheduler_chain_enable_element (chain, element);
+      if (!gst_basic_scheduler_chain_enable_element (chain, element)) {
+        GST_INFO (GST_CAT_SCHEDULING, "could not enable element \"%s\"", GST_ELEMENT_NAME (element));
+        return GST_STATE_FAILURE;
+      }
   }
   else {
     GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, no state change", GST_ELEMENT_NAME (element));
   }
+
+  return GST_STATE_SUCCESS;
 }
 
 static void
 gst_basic_scheduler_lock_element (GstScheduler * sched, GstElement * element)
 {
-  if (element->threadstate)
-    cothread_lock (element->threadstate);
+  if (GST_ELEMENT_THREADSTATE (element))
+    cothread_lock (GST_ELEMENT_THREADSTATE (element));
 }
 
 static void
 gst_basic_scheduler_unlock_element (GstScheduler * sched, GstElement * element)
 {
-  if (element->threadstate)
-    cothread_unlock (element->threadstate);
+  if (GST_ELEMENT_THREADSTATE (element))
+    cothread_unlock (GST_ELEMENT_THREADSTATE (element));
+}
+
+static void
+gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element)
+{
+  if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
+    cothread_switch (cothread_current_main ());
+  }
+}
+
+static void
+gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
+{
+  cothread_switch (cothread_current_main ());
 }
 
 static void
-gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad * sinkpad)
+gst_basic_scheduler_error (GstScheduler *sched, GstElement *element)
+{
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
+
+  if (GST_ELEMENT_THREADSTATE (element)) {
+    GstSchedulerChain *chain;
+    
+    chain = gst_basic_scheduler_find_chain (bsched, element);
+    if (chain)
+      gst_basic_scheduler_chain_disable_element (chain, element);
+
+    GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) = GST_STATE_PAUSED;
+
+    cothread_switch (cothread_current_main ());
+  }
+}
+
+static void
+gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad *srcpad, GstPad *sinkpad)
 {
   GstElement *srcelement, *sinkelement;
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
   srcelement = GST_PAD_PARENT (srcpad);
   g_return_if_fail (srcelement != NULL);
@@ -955,7 +1069,7 @@ gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad *
   if (GST_ELEMENT_SCHED (srcelement) == GST_ELEMENT_SCHED (sinkelement)) {
     GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same scheduler, chaining together",
              GST_DEBUG_PAD_NAME (sinkpad));
-    gst_basic_scheduler_chain_elements (sched, srcelement, sinkelement);
+    gst_basic_scheduler_chain_elements (bsched, srcelement, sinkelement);
   }
 }
 
@@ -965,21 +1079,22 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa
   GstSchedulerChain *chain;
   GstElement *element1, *element2;
   GstSchedulerChain *chain1, *chain2;
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
   GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
            GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
 
   /* we need to have the parent elements of each pad */
-  element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad));
-  element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad));
+  element1 = GST_ELEMENT_CAST (GST_PAD_PARENT (srcpad));
+  element2 = GST_ELEMENT_CAST (GST_PAD_PARENT (sinkpad));
 
   /* first task is to remove the old chain they belonged to.
    * this can be accomplished by taking either of the elements,
    * since they are guaranteed to be in the same chain
    * FIXME is it potentially better to make an attempt at splitting cleaner??
    */
-  chain1 = gst_basic_scheduler_find_chain (sched, element1);
-  chain2 = gst_basic_scheduler_find_chain (sched, element2);
+  chain1 = gst_basic_scheduler_find_chain (bsched, element1);
+  chain2 = gst_basic_scheduler_find_chain (bsched, element2);
 
   if (chain1 != chain2) {
     /* elements not in the same chain don't need to be separated */
@@ -992,14 +1107,14 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa
     gst_basic_scheduler_chain_destroy (chain1);
 
     /* now create a new chain to hold element1 and build it from scratch */
-    chain1 = gst_basic_scheduler_chain_new (sched);
+    chain1 = gst_basic_scheduler_chain_new (bsched);
     gst_basic_scheduler_chain_recursive_add (chain1, element1);
   }
 
   /* check the other element to see if it landed in the newly created chain */
-  if (gst_basic_scheduler_find_chain (sched, element2) == NULL) {
+  if (gst_basic_scheduler_find_chain (bsched, element2) == NULL) {
     /* if not in chain, create chain and build from scratch */
-    chain2 = gst_basic_scheduler_chain_new (sched);
+    chain2 = gst_basic_scheduler_chain_new (bsched);
     gst_basic_scheduler_chain_recursive_add (chain2, element2);
   }
 }
@@ -1034,7 +1149,7 @@ gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist)
   if (pad != NULL) {
     GstRealPad *peer = GST_RPAD_PEER (pad);
 
-    cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
+    cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (peer)));
 
     g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
             gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
@@ -1055,14 +1170,13 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
   GstElement *entry;
   gboolean eos = FALSE;
   GList *elements;
+  gint scheduled = 0;
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
   GST_DEBUG_ENTER ("(\"%s\")", GST_ELEMENT_NAME (bin));
 
-  g_return_val_if_fail (bin != NULL, TRUE);
-  g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
-
   /* step through all the chains */
-  chains = sched->chains;
+  chains = bsched->chains;
 
   if (chains == NULL)
     return FALSE;
@@ -1080,14 +1194,14 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
       GST_DEBUG (GST_CAT_SCHEDULING, "there are %d elements in this chain\n", chain->num_elements);
       elements = chain->elements;
       while (elements) {
-       entry = GST_ELEMENT (elements->data);
+       entry = GST_ELEMENT_CAST (elements->data);
        elements = g_list_next (elements);
        if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) {
          GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is DECOUPLED, skipping\n",
                     GST_ELEMENT_NAME (entry));
          entry = NULL;
        }
-       else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_NO_ENTRY)) {
+       else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_INFINITE_LOOP)) {
          GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is not valid, skipping\n",
                     GST_ELEMENT_NAME (entry));
          entry = NULL;
@@ -1099,7 +1213,13 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
        GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
        GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
                   GST_ELEMENT_NAME (entry), entry);
-       cothread_switch (entry->threadstate);
+       if (GST_ELEMENT_THREADSTATE (entry)) {
+         cothread_switch (GST_ELEMENT_THREADSTATE (entry));
+       }
+       else {
+         GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch not possible, element has no threadstate\n");
+         return FALSE;
+       }
 
        /* following is a check to see if the chain was interrupted due to a
         * top-half state_change().  (i.e., if there's a pending state.)
@@ -1113,16 +1233,18 @@ gst_basic_scheduler_iterate (GstScheduler * sched)
                     GST_STATE_PENDING (GST_SCHEDULER (sched)->parent));
          return FALSE;
        }
-
+       scheduled++;
       }
       else {
        GST_INFO (GST_CAT_DATAFLOW, "NO ENTRY INTO CHAIN!");
-        eos = TRUE;
+        if (scheduled == 0) 
+          eos = TRUE;
       }
     }
     else {
       GST_INFO (GST_CAT_DATAFLOW, "NO ENABLED ELEMENTS IN CHAIN!!");
-      eos = TRUE;
+      if (scheduled == 0) 
+        eos = TRUE;
     }
   }
 
@@ -1137,6 +1259,7 @@ gst_basic_scheduler_show (GstScheduler * sched)
   GList *chains, *elements;
   GstElement *element;
   GstSchedulerChain *chain;
+  GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
 
   if (sched == NULL) {
     g_print ("scheduler doesn't exist for this element\n");
@@ -1147,8 +1270,8 @@ gst_basic_scheduler_show (GstScheduler * sched)
 
   g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n", GST_ELEMENT_NAME (sched->parent));
 
-  g_print ("scheduler has %d elements in it: ", sched->num_elements);
-  elements = sched->elements;
+  g_print ("scheduler has %d elements in it: ", bsched->num_elements);
+  elements = bsched->elements;
   while (elements) {
     element = GST_ELEMENT (elements->data);
     elements = g_list_next (elements);
@@ -1157,8 +1280,8 @@ gst_basic_scheduler_show (GstScheduler * sched)
   }
   g_print ("\n");
 
-  g_print ("scheduler has %d chains in it\n", sched->num_chains);
-  chains = sched->chains;
+  g_print ("scheduler has %d chains in it\n", bsched->num_chains);
+  chains = bsched->chains;
   while (chains) {
     chain = (GstSchedulerChain *) (chains->data);
     chains = g_list_next (chains);
index 4a6baf5..e6f2337 100644 (file)
@@ -272,46 +272,44 @@ gst_aggregator_loop (GstElement *element)
 
   aggregator = GST_AGGREGATOR (element);
 
-  do {
-    if (aggregator->sched == AGGREGATOR_LOOP ||
-        aggregator->sched == AGGREGATOR_LOOP_PEEK) {
-      GList *pads = aggregator->sinkpads;
-
-      while (pads) {
-       GstPad *pad = GST_PAD (pads->data);
-       pads = g_list_next (pads);
-
-        if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
-         buf = gst_pad_peek (pad);
-         if (buf == NULL)
-           continue;
-
-         g_assert (buf == gst_pad_pull (pad));
-         debug = "loop_peek";
-        }
-       else {
-         buf = gst_pad_pull (pad);
-         debug = "loop";
-       }
-       gst_aggregator_push (aggregator, pad, buf, debug);
-      }
-    }
-    else {
-      if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
-       GstPad *pad;
+  if (aggregator->sched == AGGREGATOR_LOOP ||
+      aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+    GList *pads = aggregator->sinkpads;
 
-       debug = "loop_select";
+    while (pads) {
+      GstPad *pad = GST_PAD (pads->data);
+      pads = g_list_next (pads);
 
-       pad = gst_pad_select (aggregator->sinkpads);
-       buf = gst_pad_pull (pad);
+      if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+       buf = gst_pad_peek (pad);
+       if (buf == NULL)
+         continue;
 
-       gst_aggregator_push (aggregator, pad, buf, debug);
+       g_assert (buf == gst_pad_pull (pad));
+       debug = "loop_peek";
       }
       else {
-       g_assert_not_reached ();
+       buf = gst_pad_pull (pad);
+       debug = "loop";
       }
+      gst_aggregator_push (aggregator, pad, buf, debug);
+    }
+  }
+  else {
+    if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
+      GstPad *pad;
+
+      debug = "loop_select";
+
+      pad = gst_pad_select (aggregator->sinkpads);
+      buf = gst_pad_pull (pad);
+
+      gst_aggregator_push (aggregator, pad, buf, debug);
     }
-  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+    else {
+      g_assert_not_reached ();
+    }
+  }
 }
 
 /**
index 7d8d6e3..1f87031 100644 (file)
@@ -387,7 +387,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
     case ARG_OUTPUT:
       break;
     case ARG_DATA:
-      src->data = g_value_get_int (value);
+      src->data = g_value_get_enum (value);
       switch (src->data) {
        case FAKESRC_DATA_ALLOCATE:
           if (src->parent) {
@@ -403,7 +403,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
       }
       break;
     case ARG_SIZETYPE:
-      src->sizetype = g_value_get_int (value);
+      src->sizetype = g_value_get_enum (value);
       break;
     case ARG_SIZEMIN:
       src->sizemin = g_value_get_int (value);
@@ -415,7 +415,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G
       src->parentsize = g_value_get_int (value);
       break;
     case ARG_FILLTYPE:
-      src->filltype = g_value_get_int (value);
+      src->filltype = g_value_get_enum (value);
       break;
     case ARG_PATTERN:
       break;
@@ -455,13 +455,13 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
       g_value_set_boolean (value, src->loop_based);
       break;
     case ARG_OUTPUT:
-      g_value_set_int (value, src->output);
+      g_value_set_enum (value, src->output);
       break;
     case ARG_DATA:
-      g_value_set_int (value, src->data);
+      g_value_set_enum (value, src->data);
       break;
     case ARG_SIZETYPE:
-      g_value_set_int (value, src->sizetype);
+      g_value_set_enum (value, src->sizetype);
       break;
     case ARG_SIZEMIN:
       g_value_set_int (value, src->sizemin);
@@ -473,7 +473,7 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS
       g_value_set_int (value, src->parentsize);
       break;
     case ARG_FILLTYPE:
-      g_value_set_int (value, src->filltype);
+      g_value_set_enum (value, src->filltype);
       break;
     case ARG_PATTERN:
       g_value_set_string (value, src->pattern);
@@ -689,49 +689,46 @@ static void
 gst_fakesrc_loop(GstElement *element)
 {
   GstFakeSrc *src;
+  GList *pads;
 
   g_return_if_fail(element != NULL);
   g_return_if_fail(GST_IS_FAKESRC(element));
 
   src = GST_FAKESRC (element);
 
-  do {
-    GList *pads;
-
-    pads = GST_ELEMENT (src)->pads;
+  pads = gst_element_get_pad_list (element);
 
-    while (pads) {
-      GstPad *pad = GST_PAD (pads->data);
-      GstBuffer *buf;
+  while (pads) {
+    GstPad *pad = GST_PAD (pads->data);
+    GstBuffer *buf;
 
-      if (src->rt_num_buffers == 0) {
-       src->eos = TRUE;
-      }
-      else {
-        if (src->rt_num_buffers > 0)
-          src->rt_num_buffers--;
-      }
+    if (src->rt_num_buffers == 0) {
+      src->eos = TRUE;
+    }
+    else {
+      if (src->rt_num_buffers > 0)
+        src->rt_num_buffers--;
+    }
 
-      if (src->eos) {
-        gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
-       return;
-      }
+    if (src->eos) {
+      gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
+      return;
+    }
 
-      buf = gst_fakesrc_create_buffer (src);
-      GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
+    buf = gst_fakesrc_create_buffer (src);
+    GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
 
-      if (!src->silent) {
-        gst_element_info (element, "fakesrc:  loop    ******* (%s:%s)  > (%d bytes, %llu)",
-                            GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
-      }
+    if (!src->silent) {
+      gst_element_info (element, "fakesrc:  loop    ******* (%s:%s)  > (%d bytes, %llu)",
+                          GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
+    }
 
-      g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
+    g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
                        buf, pad);
-      gst_pad_push (pad, buf);
+    gst_pad_push (pad, buf);
 
-      pads = g_list_next (pads);
-    }
-  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+    pads = g_list_next (pads);
+  }
 }
 
 static GstElementStateReturn
index 346f648..db05b10 100644 (file)
@@ -30,6 +30,7 @@
 #include <unistd.h>
 #include <sys/mman.h>
 #include <errno.h>
+#include <string.h>
 
 
 /**********************************************************************
@@ -327,10 +328,10 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size)
   /* mmap() the data into this new buffer */
   GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset);
   if (GST_BUFFER_DATA(buf) == NULL) {
-    fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n");
+    gst_element_error (GST_ELEMENT (src), "couldn't map file");
   } else if (GST_BUFFER_DATA(buf) == MAP_FAILED) {
-    g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s",
-            size, src->fd, offset, sys_errlist[errno]);
+    gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s",
+            size, src->fd, offset, strerror (errno));
   }
 #ifdef MADV_SEQUENTIAL
   /* madvise to tell the kernel what to do with it */
@@ -533,8 +534,8 @@ gst_filesrc_open_file (GstFileSrc *src)
   /* open the file */
   src->fd = open (src->filename, O_RDONLY);
   if (src->fd < 0) {
-    perror ("open");
-    gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL));
+    gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)", 
+                      src->filename, strerror (errno), NULL);
     return FALSE;
   } else {
     /* find the file length */
@@ -557,7 +558,6 @@ gst_filesrc_close_file (GstFileSrc *src)
 {
   g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN));
 
-  g_print ("close\n");
   /* close the file */
   close (src->fd);
 
@@ -565,6 +565,8 @@ gst_filesrc_close_file (GstFileSrc *src)
   src->fd = 0;
   src->filelen = 0;
   src->curoffset = 0;
+  if (src->mapbuf)
+    gst_buffer_unref (src->mapbuf);
 
   GST_FLAG_UNSET (src, GST_FILESRC_OPEN);
 }
@@ -575,17 +577,22 @@ gst_filesrc_change_state (GstElement *element)
 {
   GstFileSrc *src = GST_FILESRC(element);
 
-  if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
-    if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
-      gst_filesrc_close_file (GST_FILESRC (element));
-  } if (GST_STATE_PENDING (element) == GST_STATE_READY) {
-    src->curoffset = 0;
-  } else {
-
-    if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
-      if (!gst_filesrc_open_file (GST_FILESRC (element)))
-        return GST_STATE_FAILURE;
-    }
+  switch (GST_STATE_TRANSITION (element)) {
+    case GST_STATE_NULL_TO_READY:
+      if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
+        if (!gst_filesrc_open_file (GST_FILESRC (element)))
+          return GST_STATE_FAILURE;
+      }
+      break;
+    case GST_STATE_READY_TO_NULL:
+      if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
+        gst_filesrc_close_file (GST_FILESRC (element));
+      break;
+    case GST_STATE_READY_TO_PAUSED:
+    case GST_STATE_PAUSED_TO_READY:
+      src->curoffset = 0;
+    default:
+      break;
   }
 
   if (GST_ELEMENT_CLASS (parent_class)->change_state)
index e8195aa..ddd4c02 100644 (file)
@@ -205,27 +205,24 @@ gst_identity_loop (GstElement *element)
 
   identity = GST_IDENTITY (element);
   
-  do {
-    buf = gst_pad_pull (identity->sinkpad);
+  buf = gst_pad_pull (identity->sinkpad);
     
-    for (i=identity->duplicate; i; i--) {
-      if (!identity->silent)
-        g_print("identity: loop    ******* (%s:%s)i (%d bytes, %llu) \n",
+  for (i=identity->duplicate; i; i--) {
+    if (!identity->silent)
+      g_print("identity: loop    ******* (%s:%s)i (%d bytes, %llu) \n",
                      GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
 
-      g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
+    g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
                               buf);
 
-      if (i>1) 
-       gst_buffer_ref (buf);
-
-      gst_pad_push (identity->srcpad, buf);
+    if (i>1) 
+      gst_buffer_ref (buf);
 
-      if (identity->sleep_time)
-        usleep (identity->sleep_time);
-    }
+    gst_pad_push (identity->srcpad, buf);
 
-  } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+    if (identity->sleep_time)
+      usleep (identity->sleep_time);
+  }
 }
 
 static void 
index 4375cec..0a00fb8 100644 (file)
@@ -65,11 +65,13 @@ enum {
   ARG_LEAKY,
   ARG_LEVEL,
   ARG_MAX_LEVEL,
+  ARG_MAY_DEADLOCK,
 };
 
 
 static void                    gst_queue_class_init            (GstQueueClass *klass);
 static void                    gst_queue_init                  (GstQueue *queue);
+static void                    gst_queue_dispose               (GObject *object);
 
 static void                    gst_queue_set_property          (GObject *object, guint prop_id, 
                                                                 const GValue *value, GParamSpec *pspec);
@@ -141,18 +143,22 @@ gst_queue_class_init (GstQueueClass *klass)
 
   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
 
-  g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
-    g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
-                      GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
-  g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
-    g_param_spec_int("level","Level","How many buffers are in the queue.",
-                     0,G_MAXINT,0,G_PARAM_READABLE));
-  g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
-    g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
-                     0,G_MAXINT,100,G_PARAM_READWRITE));
-
-  gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
-  gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
+    g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
+                       GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
+    g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
+                      0, G_MAXINT, 0, G_PARAM_READABLE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
+    g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
+                      0, G_MAXINT, 100, G_PARAM_READWRITE));
+  g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
+    g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
+                      TRUE, G_PARAM_READWRITE));
+
+  gobject_class->dispose                = GST_DEBUG_FUNCPTR (gst_queue_dispose);
+  gobject_class->set_property          = GST_DEBUG_FUNCPTR (gst_queue_set_property);
+  gobject_class->get_property          = GST_DEBUG_FUNCPTR (gst_queue_get_property);
 
   gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
 }
@@ -182,6 +188,7 @@ gst_queue_init (GstQueue *queue)
   queue->size_buffers = 100;           /* 100 buffers */
   queue->size_bytes = 100 * 1024;      /* 100KB */
   queue->size_time = 1000000000LL;     /* 1sec */
+  queue->may_deadlock = TRUE;
 
   queue->qlock = g_mutex_new ();
   queue->reader = FALSE;
@@ -191,6 +198,18 @@ gst_queue_init (GstQueue *queue)
   GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
 }
 
+static void
+gst_queue_dispose (GObject *object)
+{
+  GstQueue *queue = GST_QUEUE (object);
+
+  g_mutex_free (queue->qlock);
+  g_cond_free (queue->not_empty);
+  g_cond_free (queue->not_full);
+
+  G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
 static GstBufferPool*
 gst_queue_get_bufferpool (GstPad *pad)
 {
@@ -334,10 +353,21 @@ restart:
       while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         g_mutex_unlock (queue->qlock);
-        cothread_switch(cothread_current_main());
+       gst_element_interrupt (GST_ELEMENT (queue));
        goto restart;
       }
-      g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+      if (GST_STATE (queue) != GST_STATE_PLAYING) {
+       /* this means the other end is shut down */
+       /* try to signal to resolve the error */
+       if (!queue->may_deadlock) {
+          g_mutex_unlock (queue->qlock);
+          gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
+         return;
+       }
+       else {
+          gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
+       }
+      }
 
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
       if (queue->writer)
@@ -402,10 +432,20 @@ restart:
     while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       g_mutex_unlock (queue->qlock);
-      cothread_switch(cothread_current_main());
+      gst_element_interrupt (GST_ELEMENT (queue));
       goto restart;
     }
-    g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+    if (GST_STATE (queue) != GST_STATE_PLAYING) {
+      /* this means the other end is shut down */
+      if (!queue->may_deadlock) {
+        g_mutex_unlock (queue->qlock);
+        gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
+        return NULL;
+      }
+      else {
+        gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
+      }
+    }
 
     GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
     if (queue->reader)
@@ -513,10 +553,13 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa
 
   switch (prop_id) {
     case ARG_LEAKY:
-      queue->leaky = g_value_get_int(value);
+      queue->leaky = g_value_get_int (value);
       break;
     case ARG_MAX_LEVEL:
-      queue->size_buffers = g_value_get_int(value);
+      queue->size_buffers = g_value_get_int (value);
+      break;
+    case ARG_MAY_DEADLOCK:
+      queue->may_deadlock = g_value_get_boolean (value);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
@@ -536,13 +579,16 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe
 
   switch (prop_id) {
     case ARG_LEAKY:
-      g_value_set_int(value, queue->leaky);
+      g_value_set_int (value, queue->leaky);
       break;
     case ARG_LEVEL:
-      g_value_set_int(value, queue->level_buffers);
+      g_value_set_int (value, queue->level_buffers);
       break;
     case ARG_MAX_LEVEL:
-      g_value_set_int(value, queue->size_buffers);
+      g_value_set_int (value, queue->size_buffers);
+      break;
+    case ARG_MAY_DEADLOCK:
+      g_value_set_boolean (value, queue->may_deadlock);
       break;
     default:
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
index fb091e1..3af0ef6 100644 (file)
@@ -74,6 +74,7 @@ struct _GstQueue {
   guint64 size_time;   /* size of queue in time */
 
   gint leaky;          /* whether the queue is leaky, and if so at which end */
+  gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
 
   GMutex *qlock;       /* lock for queue (vs object lock) */
   /* we are single reader and single writer queue */