- Added a function to get the currently executing cothread
authorWim Taymans <wim.taymans@gmail.com>
Thu, 27 Dec 2001 00:47:41 +0000 (00:47 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Thu, 27 Dec 2001 00:47:41 +0000 (00:47 +0000)
Original commit message from CVS:
- Added a function to get the currently executing cothread
- Removed some useless includes
- _interrupt now returns a boolean so the behaviour after the interrupt
can be controlled by the scheduler.
- Added a better way to set/get the default scheduler.
- make thread and pipeline get the default scheduler.

16 files changed:
gst/cothreads.c
gst/cothreads.h
gst/gst.h
gst/gstbin.h
gst/gstelement.c
gst/gstelement.h
gst/gstpad.c
gst/gstpad.h
gst/gstpipeline.c
gst/gstpipeline.h
gst/gstqueue.c
gst/gstscheduler.c
gst/gstscheduler.h
gst/gstthread.c
gst/schedulers/gstbasicscheduler.c
plugins/elements/gstqueue.c

index bbc91ee..a37ea45 100644 (file)
@@ -308,6 +308,21 @@ cothread_current_main (void)
   return ctx->threads[0];
 }
 
+/**
+ * cothread_current:
+ *
+ * Get the currenttly executing cothread
+ *
+ * Returns: the #cothread_state of the current cothread
+ */
+cothread_state *
+cothread_current (void)
+{
+  cothread_context *ctx = pthread_getspecific (_cothread_key);
+
+  return ctx->threads[ctx->current];
+}
+
 static void
 cothread_stub (void)
 {
index 4fe370a..4bb06f8 100644 (file)
@@ -76,7 +76,6 @@ void                          cothread_setfunc        (cothread_state *thread, cothread_func func,
                                                         int argc, char **argv);
 void                           cothread_stop           (cothread_state *thread);
 
-int                            cothread_getcurrent     (void);
 void                           cothread_switch         (cothread_state *thread);
 void                           cothread_set_data       (cothread_state *thread, gchar *key, gpointer data);
 gpointer                       cothread_get_data       (cothread_state *thread, gchar *key);
@@ -87,5 +86,6 @@ void                          cothread_unlock         (cothread_state *thread);
 
 cothread_state*                        cothread_main           (cothread_context *ctx);
 cothread_state*                        cothread_current_main   (void);
+cothread_state*                        cothread_current        (void);
 
 #endif /* __COTHREAD_H__ */
index 5fed063..ff3e641 100644 (file)
--- a/gst/gst.h
+++ b/gst/gst.h
@@ -47,7 +47,6 @@
 #include <gst/gstutils.h>
 #include <gst/gsttrace.h>
 #include <gst/gstxml.h>
-#include <gst/cothreads.h>
 #include <gst/gstscheduler.h>
 #include <gst/gsttimecache.h>
 #include <gst/gstevent.h>
index ae9fe1c..ddef90a 100644 (file)
@@ -25,7 +25,6 @@
 #define __GST_BIN_H__
 
 #include <gst/gstelement.h>
-#include <gst/cothreads.h>
 
 #ifdef __cplusplus
 extern "C" {
index f333539..679fb96 100644 (file)
@@ -932,7 +932,6 @@ gst_element_set_state (GstElement *element, GstElementState state)
 
   g_return_val_if_fail (element != NULL, GST_STATE_FAILURE);
   g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
-  g_return_val_if_fail (element->sched != NULL, GST_STATE_FAILURE);
 
   GST_DEBUG_ELEMENT (GST_CAT_STATES,element, "setting state from %s to %s\n",
                      gst_element_statename(GST_STATE(element)),
@@ -1292,12 +1291,14 @@ gst_element_yield (GstElement *element)
   }
 }
 
-void
+gboolean
 gst_element_interrupt (GstElement *element)
 {
   if (GST_ELEMENT_SCHED (element)) {
-    gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element);
+    return gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element);
   }
+  else 
+    return FALSE;
 }
 
 /**
index cf8642f..fa8fd48 100644 (file)
@@ -29,7 +29,6 @@
 #include <gst/gsttypes.h>
 #include <gst/gstobject.h>
 #include <gst/gstpad.h>
-#include <gst/cothreads.h>
 #include <gst/gstpluginfeature.h>
 
 #ifdef __cplusplus
@@ -103,6 +102,7 @@ typedef enum {
 #define GST_ELEMENT_IS_THREAD_SUGGESTED(obj)   (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
 #define GST_ELEMENT_IS_EOS(obj)                        (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS))
 #define GST_ELEMENT_IS_EVENT_AWARE(obj)                (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE))
+#define GST_ELEMENT_IS_DECOUPLED(obj)          (GST_FLAG_IS_SET(obj,GST_ELEMENT_DECOUPLED))
 
 #define GST_ELEMENT_NAME(obj)                  (GST_OBJECT_NAME(obj))
 #define GST_ELEMENT_PARENT(obj)                        (GST_OBJECT_PARENT(obj))
@@ -184,7 +184,7 @@ void                    gst_element_set_parent          (GstElement *element, Gs
 GstObject*              gst_element_get_parent          (GstElement *element);
 
 void                   gst_element_yield               (GstElement *element);
-void                   gst_element_interrupt           (GstElement *element);
+gboolean               gst_element_interrupt           (GstElement *element);
 void                   gst_element_set_sched           (GstElement *element, GstScheduler *sched);
 GstScheduler*          gst_element_get_sched           (GstElement *element);
 
index 9e64db8..847eb9d 100644 (file)
@@ -494,7 +494,9 @@ gst_pad_push_func(GstPad *pad, GstBuffer *buf)
                GST_DEBUG_FUNCPTR_NAME(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad))));
     (GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)))(pad,buf);
   } else {
-    GST_DEBUG (GST_CAT_DATAFLOW,"got a problem here: default pad_push handler in place, no chain function\n");
+    GST_DEBUG (GST_CAT_DATAFLOW,"default pad_push handler in place, no chain function\n");
+    g_warning ("(internal error) default pad_push in place for pad %s:%s but it has no chain function", 
+                   GST_DEBUG_PAD_NAME (pad));
   }
 }
 
index bf69928..5f572f0 100644 (file)
@@ -28,7 +28,6 @@
 
 #include <gst/gstobject.h>
 #include <gst/gstbuffer.h>
-#include <gst/cothreads.h>
 #include <gst/gstcaps.h>
 #include <gst/gstevent.h>
 
index dd1b459..95f4697 100644 (file)
@@ -97,21 +97,22 @@ gst_pipeline_class_init (GstPipelineClass *klass)
 static void
 gst_pipeline_init (GstPipeline *pipeline)
 {
+  const gchar *schedname;
   GstScheduler *scheduler;
 
   /* we're a manager by default */
   GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER);
 
-  scheduler = gst_schedulerfactory_make ("basic", GST_ELEMENT (pipeline));
+  schedname = gst_schedulerfactory_get_default_name ();
 
+  scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (pipeline));
+         
   GST_ELEMENT_SCHED (pipeline) = scheduler;
 
   gst_object_ref (GST_OBJECT (scheduler));
   gst_object_sink (GST_OBJECT (scheduler));
 
   gst_scheduler_setup (scheduler);
-
-  GST_DEBUG (GST_CAT_PIPELINE, "pipeline's scheduler is %p\n", scheduler);
 }
 
 static void
@@ -121,8 +122,10 @@ gst_pipeline_dispose (GObject *object)
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
 
-  gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline));
-  gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline)));
+  if (GST_ELEMENT_SCHED (pipeline)) {
+    gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline));
+    gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline)));
+  }
 }
 
 /**
index 3c0dbf0..7539c04 100644 (file)
@@ -58,6 +58,7 @@ GType         gst_pipeline_get_type           (void);
 GstElement*    gst_pipeline_new                (const gchar *name);
 #define                gst_pipeline_destroy(pipeline)  gst_object_destroy(GST_OBJECT(pipeline))
 
+
 #ifdef __cplusplus
 }
 #endif /* __cplusplus */
index 140562e..3020513 100644 (file)
@@ -284,7 +284,6 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   reader = FALSE;
-
 restart:
   /* we have to lock the queue since we span threads */
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
@@ -353,7 +352,8 @@ restart:
       while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         g_mutex_unlock (queue->qlock);
-       gst_element_interrupt (GST_ELEMENT (queue));
+       if (gst_element_interrupt (GST_ELEMENT (queue)))
+          return;
        goto restart;
       }
       if (GST_STATE (queue) != GST_STATE_PLAYING) {
@@ -419,7 +419,6 @@ gst_queue_get (GstPad *pad)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   writer = FALSE;
-
 restart:
   /* have to lock for thread-safety */
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
@@ -434,7 +433,8 @@ restart:
     while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       g_mutex_unlock (queue->qlock);
-      gst_element_interrupt (GST_ELEMENT (queue));
+      if (gst_element_interrupt (GST_ELEMENT (queue)))
+        return NULL;
       goto restart;
     }
     if (GST_STATE (queue) != GST_STATE_PLAYING) {
@@ -442,7 +442,7 @@ restart:
       if (!queue->may_deadlock) {
         g_mutex_unlock (queue->qlock);
         gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
-        return NULL;
+        goto restart;
       }
       else {
         gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
index cfa59c8..c08cbbb 100644 (file)
@@ -31,6 +31,8 @@ static void   gst_scheduler_init              (GstScheduler *sched);
 
 static GstObjectClass *parent_class = NULL;
 
+static gchar *_default_name = NULL;
+
 GType
 gst_scheduler_get_type (void)
 {
@@ -282,15 +284,20 @@ gst_scheduler_yield (GstScheduler *sched, GstElement *element)
  * @element: the element requesting an interrupt
  *
  * Tell the scheduler to interrupt execution of this element.
+ *
+ * Retruns: TRUE if the element should return NULL from the chain/get
+ * function.
  */
-void
+gboolean
 gst_scheduler_interrupt (GstScheduler *sched, GstElement *element)
 {
-  g_return_if_fail (GST_IS_SCHEDULER (sched));
-  g_return_if_fail (GST_IS_ELEMENT (element));
+  g_return_val_if_fail (GST_IS_SCHEDULER (sched), FALSE);
+  g_return_val_if_fail (GST_IS_ELEMENT (element), FALSE);
 
   if (CLASS (sched)->interrupt)
-    CLASS (sched)->interrupt (sched, element);
+    return CLASS (sched)->interrupt (sched, element);
+
+  return FALSE;
 }
 
 /**
@@ -387,6 +394,7 @@ gst_schedulerfactory_class_init (GstSchedulerFactoryClass *klass)
 #endif
 
   _gst_schedulerfactories = NULL;
+  _default_name = g_strdup ("basic");
 }
 
 static void
@@ -536,6 +544,21 @@ gst_schedulerfactory_make (const gchar *name, GstElement *parent)
   return gst_schedulerfactory_create (factory, parent);
 }
 
+void
+gst_schedulerfactory_set_default_name (const gchar* name)
+{
+  if (_default_name)
+    g_free (_default_name);
+
+  _default_name = g_strdup (name);
+}
+
+const gchar*
+gst_schedulerfactory_get_default_name (void)
+{
+  return _default_name;
+}
+
 #ifndef GST_DISABLE_REGISTRY
 static xmlNodePtr
 gst_schedulerfactory_save_thyself (GstObject *object, xmlNodePtr parent)
index a3b8af2..ef69975 100644 (file)
@@ -77,7 +77,7 @@ struct _GstSchedulerClass {
   void                         (*lock_element)         (GstScheduler *sched, GstElement *element);
   void                         (*unlock_element)       (GstScheduler *sched, GstElement *element);
   void                         (*yield)                (GstScheduler *sched, GstElement *element);
-  void                         (*interrupt)            (GstScheduler *sched, GstElement *element);
+  gboolean             (*interrupt)            (GstScheduler *sched, GstElement *element);
   void                         (*error)                (GstScheduler *sched, GstElement *element);
   void                         (*pad_connect)          (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
   void                         (*pad_disconnect)       (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
@@ -101,7 +101,7 @@ GstElementStateReturn       gst_scheduler_state_transition  (GstScheduler *sched, GstEl
 void                   gst_scheduler_lock_element      (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_unlock_element    (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_yield             (GstScheduler *sched, GstElement *element);
-void                   gst_scheduler_interrupt         (GstScheduler *sched, GstElement *element);
+gboolean               gst_scheduler_interrupt         (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_error             (GstScheduler *sched, GstElement *element);
 void                   gst_scheduler_pad_connect       (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
 void                   gst_scheduler_pad_disconnect    (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
@@ -150,6 +150,9 @@ GList*                      gst_schedulerfactory_get_list           (void);
 GstScheduler*          gst_schedulerfactory_create             (GstSchedulerFactory *factory, GstElement *parent);
 GstScheduler*          gst_schedulerfactory_make               (const gchar *name, GstElement *parent);
 
+void                   gst_schedulerfactory_set_default_name   (const gchar* name);
+const gchar*           gst_schedulerfactory_get_default_name   (void);
+
 
 #ifdef __cplusplus   
 }
index 321fe62..7743f52 100644 (file)
@@ -133,19 +133,27 @@ gst_thread_class_init (GstThreadClass *klass)
 static void
 gst_thread_init (GstThread *thread)
 {
+  const gchar *schedname;
+  GstScheduler *scheduler;
 
-  GST_DEBUG (GST_CAT_THREAD,"initializing thread\n");
+  GST_DEBUG (GST_CAT_THREAD, "initializing thread\n");
 
   /* we're a manager by default */
   GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
 
-  thread->lock = g_mutex_new();
-  thread->cond = g_cond_new();
+  schedname = gst_schedulerfactory_get_default_name ();
 
-  GST_ELEMENT_SCHED(thread) = gst_schedulerfactory_make ("basic", GST_ELEMENT(thread));
-  GST_DEBUG(GST_CAT_THREAD, "thread's scheduler is %p\n",GST_ELEMENT_SCHED(thread));
+  scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (thread));
 
-  thread->ppid = getpid();
+  GST_ELEMENT_SCHED (thread) = scheduler;
+
+  gst_object_ref (GST_OBJECT (scheduler));
+  gst_object_sink (GST_OBJECT (scheduler));
+
+  thread->lock = g_mutex_new ();
+  thread->cond = g_cond_new ();
+
+  thread->ppid = getpid ();
   thread->thread_id = -1;
 }
 
@@ -154,16 +162,17 @@ gst_thread_dispose (GObject *object)
 {
   GstThread *thread = GST_THREAD (object);
 
-  GST_DEBUG (GST_CAT_REFCOUNTING,"dispose\n");
+  GST_DEBUG (GST_CAT_REFCOUNTING, "dispose\n");
 
   g_mutex_free (thread->lock);
   g_cond_free (thread->cond);
 
   G_OBJECT_CLASS (parent_class)->dispose (object);
 
-  gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
-  gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
-
+  if (GST_ELEMENT_SCHED (thread)) {
+    gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
+    gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
+  }
 }
 
 static void
index 329e0bc..0fc128b 100644 (file)
@@ -101,7 +101,7 @@ static GstElementStateReturn
 static void            gst_basic_scheduler_lock_element        (GstScheduler *sched, GstElement *element);
 static void            gst_basic_scheduler_unlock_element      (GstScheduler *sched, GstElement *element);
 static void            gst_basic_scheduler_yield               (GstScheduler *sched, GstElement *element);
-static void            gst_basic_scheduler_interrupt           (GstScheduler *sched, GstElement *element);
+static gboolean                gst_basic_scheduler_interrupt           (GstScheduler *sched, GstElement *element);
 static void            gst_basic_scheduler_error               (GstScheduler *sched, GstElement *element);
 static void            gst_basic_scheduler_pad_connect         (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
 static void            gst_basic_scheduler_pad_disconnect      (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
@@ -330,6 +330,7 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[])
       }
     }
   } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+exit:
   GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
 
   GST_DEBUG_LEAVE ("");
@@ -1056,10 +1057,13 @@ gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element)
   }
 }
 
-static void
+static gboolean
 gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
 {
+  GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
   cothread_switch (cothread_current_main ());
+
+  return FALSE;
 }
 
 static void
index 140562e..3020513 100644 (file)
@@ -284,7 +284,6 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   reader = FALSE;
-
 restart:
   /* we have to lock the queue since we span threads */
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
@@ -353,7 +352,8 @@ restart:
       while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
         GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
         g_mutex_unlock (queue->qlock);
-       gst_element_interrupt (GST_ELEMENT (queue));
+       if (gst_element_interrupt (GST_ELEMENT (queue)))
+          return;
        goto restart;
       }
       if (GST_STATE (queue) != GST_STATE_PLAYING) {
@@ -419,7 +419,6 @@ gst_queue_get (GstPad *pad)
   queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
 
   writer = FALSE;
-
 restart:
   /* have to lock for thread-safety */
   GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
@@ -434,7 +433,8 @@ restart:
     while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
       GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
       g_mutex_unlock (queue->qlock);
-      gst_element_interrupt (GST_ELEMENT (queue));
+      if (gst_element_interrupt (GST_ELEMENT (queue)))
+        return NULL;
       goto restart;
     }
     if (GST_STATE (queue) != GST_STATE_PLAYING) {
@@ -442,7 +442,7 @@ restart:
       if (!queue->may_deadlock) {
         g_mutex_unlock (queue->qlock);
         gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
-        return NULL;
+        goto restart;
       }
       else {
         gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");