From d2ed0906a69ee89f55a916f68154f8f4cac8395d Mon Sep 17 00:00:00 2001 From: Andy Wingo Date: Sun, 26 May 2002 03:23:25 +0000 Subject: [PATCH] implemented threadsafe property set/get system as discussed in docs/random/wingo/threadsafe-properties some cleanups ... Original commit message from CVS: * implemented threadsafe property set/get system as discussed in docs/random/wingo/threadsafe-properties * some cleanups * this change will cause binary incompatibilities, better rebuild them plugins now, off to drink :-) --- gst/gstelement.c | 360 +++++++++++++++++++++++++++++++++++-- gst/gstelement.h | 66 ++++--- gst/gstobject.h | 2 +- gst/gstpad.c | 6 +- gst/gstthread.c | 13 +- gst/schedulers/gstbasicscheduler.c | 100 +++++++++-- 6 files changed, 490 insertions(+), 57 deletions(-) diff --git a/gst/gstelement.c b/gst/gstelement.c index 4b80c2f..56ed769 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -23,6 +23,7 @@ /* #define GST_DEBUG_ENABLED */ #include #include +#include #include "gst_private.h" #include "gstelement.h" @@ -54,9 +55,9 @@ static void gst_element_class_init (GstElementClass *klass); static void gst_element_init (GstElement *element); static void gst_element_base_class_init (GstElementClass *klass); -static void gst_element_set_property (GObject *object, guint prop_id, +static void gst_element_real_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); -static void gst_element_get_property (GObject *object, guint prop_id, GValue *value, +static void gst_element_real_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec); static void gst_element_dispatch_properties_changed (GObject * object, guint n_pspecs, GParamSpec **pspecs); @@ -139,8 +140,8 @@ gst_element_class_init (GstElementClass *klass) 2, G_TYPE_OBJECT, G_TYPE_PARAM); - gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_element_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_element_get_property); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_element_real_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_element_real_get_property); /* see the comments at gst_element_dispatch_properties_changed */ gobject_class->dispatch_properties_changed @@ -167,8 +168,8 @@ gst_element_base_class_init (GstElementClass *klass) gobject_class = (GObjectClass*) klass; - gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_element_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_element_get_property); + gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_element_real_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_element_real_get_property); } static void @@ -188,7 +189,7 @@ gst_element_init (GstElement *element) } static void -gst_element_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) +gst_element_real_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec) { GstElementClass *oclass = CLASS (object); @@ -197,7 +198,7 @@ gst_element_set_property (GObject *object, guint prop_id, const GValue *value, G } static void -gst_element_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) +gst_element_real_get_property (GObject *object, guint prop_id, GValue *value, GParamSpec *pspec) { GstElementClass *oclass = CLASS (object); @@ -237,6 +238,339 @@ gst_element_dispatch_properties_changed (GObject *object, } } +typedef struct { + const GParamSpec *pspec; + const GValue *value; +} prop_value_t; + +static void +element_set_property (GstElement *element, const GParamSpec *pspec, const GValue *value) +{ + prop_value_t *prop_value = g_new0 (prop_value_t, 1); + + g_message ("Setting property %s::%s to %s for object %s\n", G_OBJECT_TYPE_NAME (element), + pspec->name, g_strdup_value_contents (value), GST_OBJECT_NAME (element)); + + prop_value->pspec = pspec; + prop_value->value = value; + + g_async_queue_push (element->prop_value_queue, prop_value); +} + +static void +element_get_property (GstElement *element, const GParamSpec *pspec, GValue *value) +{ + g_message ("Getting property %s::%s to %s for object %s\n", G_OBJECT_TYPE_NAME (element), + pspec->name, g_strdup_value_contents (value), GST_OBJECT_NAME (element)); + + g_mutex_lock (element->property_mutex); + g_object_get_property ((GObject*)element, pspec->name, value); + g_mutex_unlock (element->property_mutex); +} + +static void +gst_element_threadsafe_properties_pre_run (GstElement *element) +{ + GST_DEBUG (GST_CAT_THREAD, "locking element %s", GST_OBJECT_NAME (element)); + g_mutex_lock (element->property_mutex); + gst_element_set_pending_properties (element); +} + +static void +gst_element_threadsafe_properties_post_run (GstElement *element) +{ + GST_DEBUG (GST_CAT_THREAD, "unlocking element %s", GST_OBJECT_NAME (element)); + g_mutex_unlock (element->property_mutex); +} + +void +gst_element_enable_threadsafe_properties (GstElement *element) +{ + g_return_if_fail (GST_IS_ELEMENT (element)); + + GST_FLAG_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES); + element->pre_run_func = gst_element_threadsafe_properties_pre_run; + element->post_run_func = gst_element_threadsafe_properties_post_run; + if (!element->prop_value_queue) + element->prop_value_queue = g_async_queue_new (); + if (!element->property_mutex) + element->property_mutex = g_mutex_new (); +} + +void +gst_element_disable_threadsafe_properties (GstElement *element) +{ + g_return_if_fail (GST_IS_ELEMENT (element)); + + GST_FLAG_UNSET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES); + element->pre_run_func = NULL; + element->post_run_func = NULL; + /* let's keep around that async queue */ +} + +void +gst_element_set_pending_properties (GstElement *element) +{ + prop_value_t *prop_value; + + while ((prop_value = g_async_queue_try_pop (element->prop_value_queue))) { + g_object_set_property ((GObject*)element, prop_value->pspec->name, prop_value->value); + g_free (prop_value); + } +} + +/* following 6 functions taken mostly from gobject.c */ + +void +gst_element_set (GstElement *element, const gchar *first_property_name, ...) +{ + va_list var_args; + + g_return_if_fail (GST_IS_ELEMENT (element)); + + va_start (var_args, first_property_name); + gst_element_set_valist (element, first_property_name, var_args); + va_end (var_args); +} + +void +gst_element_get (GstElement *element, const gchar *first_property_name, ...) +{ + va_list var_args; + + g_return_if_fail (GST_IS_ELEMENT (element)); + + va_start (var_args, first_property_name); + gst_element_get_valist (element, first_property_name, var_args); + va_end (var_args); +} + +void +gst_element_set_valist (GstElement *element, const gchar *first_property_name, va_list var_args) +{ + const gchar *name; + GObject *object; + + g_return_if_fail (GST_IS_ELEMENT (element)); + + object = (GObject*)element; + + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) { + g_object_set_valist (object, first_property_name, var_args); + return; + } + + g_object_ref (object); + + name = first_property_name; + + while (name) + { + GValue value = { 0, }; + GParamSpec *pspec; + gchar *error = NULL; + + pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), name); + + if (!pspec) + { + g_warning ("%s: object class `%s' has no property named `%s'", + G_STRLOC, + G_OBJECT_TYPE_NAME (object), + name); + break; + } + if (!(pspec->flags & G_PARAM_WRITABLE)) + { + g_warning ("%s: property `%s' of object class `%s' is not writable", + G_STRLOC, + pspec->name, + G_OBJECT_TYPE_NAME (object)); + break; + } + + g_value_init (&value, G_PARAM_SPEC_VALUE_TYPE (pspec)); + + G_VALUE_COLLECT (&value, var_args, 0, &error); + if (error) + { + g_warning ("%s: %s", G_STRLOC, error); + g_free (error); + + /* we purposely leak the value here, it might not be + * in a sane state if an error condition occoured + */ + break; + } + + element_set_property (element, pspec, &value); + g_value_unset (&value); + + name = va_arg (var_args, gchar*); + } + + g_object_unref (object); +} + +void +gst_element_get_valist (GstElement *element, const gchar *first_property_name, va_list var_args) +{ + const gchar *name; + GObject *object; + + g_return_if_fail (GST_IS_ELEMENT (element)); + + object = (GObject*)element; + + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) { + g_object_get_valist (object, first_property_name, var_args); + return; + } + + g_object_ref (object); + + name = first_property_name; + + while (name) + { + GValue value = { 0, }; + GParamSpec *pspec; + gchar *error; + + pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), name); + + if (!pspec) + { + g_warning ("%s: object class `%s' has no property named `%s'", + G_STRLOC, + G_OBJECT_TYPE_NAME (object), + name); + break; + } + if (!(pspec->flags & G_PARAM_READABLE)) + { + g_warning ("%s: property `%s' of object class `%s' is not readable", + G_STRLOC, + pspec->name, + G_OBJECT_TYPE_NAME (object)); + break; + } + + g_value_init (&value, G_PARAM_SPEC_VALUE_TYPE (pspec)); + + element_get_property (element, pspec, &value); + + G_VALUE_LCOPY (&value, var_args, 0, &error); + if (error) + { + g_warning ("%s: %s", G_STRLOC, error); + g_free (error); + g_value_unset (&value); + break; + } + + g_value_unset (&value); + + name = va_arg (var_args, gchar*); + } + + g_object_unref (object); +} + +void +gst_element_set_property (GstElement *element, const gchar *property_name, const GValue *value) +{ + GParamSpec *pspec; + GObject *object; + + g_return_if_fail (GST_IS_ELEMENT (element)); + g_return_if_fail (property_name != NULL); + g_return_if_fail (G_IS_VALUE (value)); + + object = (GObject*)element; + + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) { + g_object_set_property (object, property_name, value); + return; + } + + g_object_ref (object); + + pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), property_name); + + if (!pspec) + g_warning ("%s: object class `%s' has no property named `%s'", + G_STRLOC, + G_OBJECT_TYPE_NAME (object), + property_name); + else + element_set_property (element, pspec, value); + + g_object_unref (object); +} + +void +gst_element_get_property (GstElement *element, const gchar *property_name, GValue *value) +{ + GParamSpec *pspec; + GObject *object; + + g_return_if_fail (GST_IS_ELEMENT (element)); + g_return_if_fail (property_name != NULL); + g_return_if_fail (G_IS_VALUE (value)); + + object = (GObject*)element; + + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_USE_THREADSAFE_PROPERTIES)) { + g_object_get_property (object, property_name, value); + return; + } + + g_object_ref (object); + + pspec = g_object_class_find_property (G_OBJECT_GET_CLASS (object), property_name); + + if (!pspec) + g_warning ("%s: object class `%s' has no property named `%s'", + G_STRLOC, + G_OBJECT_TYPE_NAME (object), + property_name); + else + { + GValue *prop_value, tmp_value = { 0, }; + + /* auto-conversion of the callers value type + */ + if (G_VALUE_TYPE (value) == G_PARAM_SPEC_VALUE_TYPE (pspec)) + { + g_value_reset (value); + prop_value = value; + } + else if (!g_value_type_transformable (G_PARAM_SPEC_VALUE_TYPE (pspec), G_VALUE_TYPE (value))) + { + g_warning ("can't retrieve property `%s' of type `%s' as value of type `%s'", + pspec->name, + g_type_name (G_PARAM_SPEC_VALUE_TYPE (pspec)), + G_VALUE_TYPE_NAME (value)); + g_object_unref (object); + return; + } + else + { + g_value_init (&tmp_value, G_PARAM_SPEC_VALUE_TYPE (pspec)); + prop_value = &tmp_value; + } + element_get_property (element, pspec, prop_value); + if (prop_value != value) + { + g_value_transform (prop_value, value); + g_value_unset (&tmp_value); + } + } + + g_object_unref (object); +} + static GstPad* gst_element_request_pad (GstElement *element, GstPadTemplate *templ, const gchar* name) { @@ -1618,6 +1952,12 @@ gst_element_dispose (GObject *object) g_mutex_free (element->state_mutex); g_cond_free (element->state_cond); + if (element->prop_value_queue) + g_async_queue_unref (element->prop_value_queue); + element->prop_value_queue = NULL; + if (element->property_mutex) + g_mutex_free (element->property_mutex); + G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -1897,7 +2237,7 @@ gst_element_state_get_name (GstElementState state) static void gst_element_populate_std_props (GObjectClass * klass, - const char *prop_name, guint arg_id, GParamFlags flags) + const gchar *prop_name, guint arg_id, GParamFlags flags) { GQuark prop_id = g_quark_from_string (prop_name); GParamSpec *pspec; @@ -2004,7 +2344,7 @@ gst_element_populate_std_props (GObjectClass * klass, * the flags determine readability / writeability. **/ void -gst_element_class_install_std_props (GstElementClass * klass, const char *first_name, ...) +gst_element_class_install_std_props (GstElementClass * klass, const gchar *first_name, ...) { const char *name; diff --git a/gst/gstelement.h b/gst/gstelement.h index a8a6a6c..73d7ba8 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -32,9 +32,7 @@ #include #include -#ifdef __cplusplus -extern "C" { -#endif /* __cplusplus */ +G_BEGIN_DECLS #define GST_NUM_STATES 4 @@ -80,23 +78,22 @@ typedef enum { GST_ELEMENT_THREAD_SUGGESTED, /* this element is incable of seeking (FIXME: does this apply to filters?) */ GST_ELEMENT_NO_SEEK, - /* this element, for some reason, has a loop function that performs * an infinite loop without calls to gst_element_yield () */ GST_ELEMENT_INFINITE_LOOP, - - /* private flags that can be used by the scheduler */ - GST_ELEMENT_SCHEDULER_PRIVATE1, - GST_ELEMENT_SCHEDULER_PRIVATE2, - /* there is a new loopfunction ready for placement */ GST_ELEMENT_NEW_LOOPFUNC, - /* if this element can handle events */ GST_ELEMENT_EVENT_AWARE, + /* use threadsafe property get/set implementation */ + GST_ELEMENT_USE_THREADSAFE_PROPERTIES, + + /* private flags that can be used by the scheduler */ + GST_ELEMENT_SCHEDULER_PRIVATE1, + GST_ELEMENT_SCHEDULER_PRIVATE2, /* use some padding for future expansion */ - GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 12, + GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 16, } GstElementFlags; #define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED)) @@ -111,14 +108,14 @@ typedef enum { #define GST_ELEMENT_CLOCK(obj) (((GstElement*)(obj))->clock) #define GST_ELEMENT_PADS(obj) ((obj)->pads) -/*typedef struct _GstElement GstElement;*/ -/*typedef struct _GstElementClass GstElementClass;*/ typedef struct _GstElementFactory GstElementFactory; typedef struct _GstElementFactoryClass GstElementFactoryClass; typedef void (*GstElementLoopFunction) (GstElement *element); typedef void (*GstElementSetClockFunction) (GstElement *element, GstClock *clock); typedef GstClock* (*GstElementGetClockFunction) (GstElement *element); +typedef void (*GstElementPreRunFunction) (GstElement *element); +typedef void (*GstElementPostRunFunction) (GstElement *element); struct _GstElement { GstObject object; @@ -143,6 +140,11 @@ struct _GstElement { GMutex *state_mutex; GCond *state_cond; + + GstElementPreRunFunction pre_run_func; + GstElementPostRunFunction post_run_func; + GAsyncQueue *prop_value_queue; + GMutex *property_mutex; }; struct _GstElementClass { @@ -155,12 +157,12 @@ struct _GstElementClass { gint numpadtemplates; /* signal callbacks */ - void (*state_change) (GstElement *element, GstElementState old, GstElementState state); - void (*new_pad) (GstElement *element, GstPad *pad); - void (*pad_removed) (GstElement *element, GstPad *pad); - void (*error) (GstElement *element, GstElement *source, gchar *error); - void (*eos) (GstElement *element); - void (*deep_notify) (GstObject *object, GstObject *orig, GParamSpec *pspec); + void (*state_change) (GstElement *element, GstElementState old, GstElementState state); + void (*new_pad) (GstElement *element, GstPad *pad); + void (*pad_removed) (GstElement *element, GstPad *pad); + void (*error) (GstElement *element, GstElement *source, gchar *error); + void (*eos) (GstElement *element); + void (*deep_notify) (GstObject *object, GstObject *orig, GParamSpec *pspec); /* local pointers for get/set */ void (*set_property) (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); @@ -175,6 +177,9 @@ struct _GstElementClass { }; void gst_element_class_add_pad_template (GstElementClass *klass, GstPadTemplate *templ); +void gst_element_class_install_std_props (GstElementClass *klass, + const gchar *first_name, ...); + GType gst_element_get_type (void); #define gst_element_destroy(element) gst_object_destroy (GST_OBJECT (element)) @@ -182,6 +187,22 @@ GType gst_element_get_type (void); void gst_element_set_loop_function (GstElement *element, GstElementLoopFunction loop); +/* threadsafe versions of their g_object_* counterparts */ +void gst_element_set (GstElement *element, const gchar *first_property_name, ...); +void gst_element_get (GstElement *element, const gchar *first_property_name, ...); +void gst_element_set_valist (GstElement *element, const gchar *first_property_name, + va_list var_args); +void gst_element_get_valist (GstElement *element, const gchar *first_property_name, + va_list var_args); +void gst_element_set_property (GstElement *element, const gchar *property_name, + const GValue *value); +void gst_element_get_property (GstElement *element, const gchar *property_name, + GValue *value); + +void gst_element_enable_threadsafe_properties (GstElement *element); +void gst_element_disable_threadsafe_properties (GstElement *element); +void gst_element_set_pending_properties (GstElement *element); + void gst_element_set_name (GstElement *element, const gchar *name); const gchar* gst_element_get_name (GstElement *element); @@ -248,9 +269,6 @@ const gchar* gst_element_state_get_name (GstElementState state); GstElementFactory* gst_element_get_factory (GstElement *element); -void gst_element_class_install_std_props (GstElementClass *klass, - const char *first_name, ...); - GstBin* gst_element_get_managing_bin (GstElement *element); @@ -314,9 +332,7 @@ GstElement* gst_element_factory_create (GstElementFactory *factory, /* FIXME this name is wrong, probably so is the one above it */ GstElement* gst_element_factory_make (const gchar *factoryname, const gchar *name); -#ifdef __cplusplus -} -#endif /* __cplusplus */ +G_END_DECLS #endif /* __GST_ELEMENT_H__ */ diff --git a/gst/gstobject.h b/gst/gstobject.h index 33c3eb6..74a05c9 100644 --- a/gst/gstobject.h +++ b/gst/gstobject.h @@ -84,7 +84,7 @@ struct _GstObject { /* locking for all sorts of things (like the refcount) */ GMutex *lock; - /* this objects parent */ + /* this object's parent */ GstObject *parent; guint32 flags; diff --git a/gst/gstpad.c b/gst/gstpad.c index b0b9451..229f411 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -1791,7 +1791,9 @@ gst_pad_push (GstPad *pad, GstBuffer *buf) GstBuffer* gst_pad_pull (GstPad *pad) { - GstRealPad *peer = GST_RPAD_PEER(pad); + GstRealPad *peer; + + peer = GST_RPAD_PEER (pad); GST_DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); @@ -1811,8 +1813,10 @@ gst_pad_pull (GstPad *pad) GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer)); buf = (peer->gethandler) (GST_PAD_CAST (peer)); + if (buf) return buf; + /* no null buffers allowed */ gst_element_error (GST_PAD_PARENT (pad), "NULL buffer during pull on %s:%s", GST_DEBUG_PAD_NAME (pad), NULL); diff --git a/gst/gstthread.c b/gst/gstthread.c index 7efe801..58fe4ec 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -290,6 +290,14 @@ gst_thread_change_state (GstElement * element) g_mutex_unlock (thread->lock); break; case GST_STATE_PAUSED_TO_PLAYING: + { + /* fixme: recurse into sub-bins */ + const GList *elements = gst_bin_get_list (GST_BIN (thread)); + while (elements) { + gst_element_enable_threadsafe_properties ((GstElement*)elements->data); + elements = g_list_next (elements); + } + THR_DEBUG ("telling thread to start spinning"); g_mutex_lock (thread->lock); THR_DEBUG ("signaling"); @@ -299,9 +307,10 @@ gst_thread_change_state (GstElement * element) THR_DEBUG ("got ack"); g_mutex_unlock (thread->lock); break; + } case GST_STATE_PLAYING_TO_PAUSED: { - GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread)); + const GList *elements = (GList *) gst_bin_get_list (GST_BIN (thread)); THR_INFO ("pausing thread"); @@ -374,6 +383,8 @@ gst_thread_change_state (GstElement * element) } } } + + gst_element_disable_threadsafe_properties (element); } THR_DEBUG ("telling thread to pause, signaling"); g_cond_signal (thread->cond); diff --git a/gst/schedulers/gstbasicscheduler.c b/gst/schedulers/gstbasicscheduler.c index 32b847f..c2bb846 100644 --- a/gst/schedulers/gstbasicscheduler.c +++ b/gst/schedulers/gstbasicscheduler.c @@ -21,7 +21,7 @@ */ /*#define GST_DEBUG_ENABLED */ -#include +#include "../gst.h" #include "cothreads_compat.h" @@ -62,6 +62,7 @@ struct _GstSchedulerChain { (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER)) #define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched)) +#define SCHED(element) GST_BASIC_SCHEDULER_CAST (GST_ELEMENT_SCHED (element)) typedef enum { GST_BASIC_SCHEDULER_STATE_NONE, @@ -87,6 +88,7 @@ struct _GstBasicScheduler { GstBasicSchedulerState state; cothread_context *context; + GstElement *current; }; struct _GstBasicSchedulerClass { @@ -123,6 +125,18 @@ static void gst_basic_scheduler_show (GstScheduler *sched); static GstSchedulerClass *parent_class = NULL; +#define do_element_switch(element) G_STMT_START{ \ + GstElement *from = SCHED (element)->current; \ + if (from->post_run_func) \ + from->post_run_func (from); \ + SCHED (element)->current = element; \ + do_cothread_switch (GST_ELEMENT_THREADSTATE (element)); \ + /* we assume other cothread switches will set ->current \ + * properly, no need to do it from this side */ \ + if (from->pre_run_func) \ + from->pre_run_func (from); \ +}G_STMT_END + static GType gst_basic_scheduler_get_type (void) { @@ -312,6 +326,9 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[]) GST_DEBUG_ENTER ("(%d,\"%s\")", argc, name); + if (element->pre_run_func) + element->pre_run_func (element); + do { pads = element->pads; while (pads) { @@ -358,8 +375,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[]) static void gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) { - GstRealPad *peer = GST_RPAD_PEER (pad); gint loop_count = 100; + GstElement *parent; + GstRealPad *peer; + + parent = GST_PAD_PARENT (pad); + peer = GST_RPAD_PEER (pad); GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); GST_DEBUG (GST_CAT_DATAFLOW, "putting buffer %p in peer \"%s:%s\"'s pen", buf, @@ -370,8 +391,9 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) */ while (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) != NULL && --loop_count) { GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen %d", - GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)), loop_count); - do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + GST_ELEMENT_THREADSTATE (parent), loop_count); + + do_element_switch (parent); /* we may no longer be the same pad, check. */ if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) { @@ -381,16 +403,19 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) } if (loop_count == 0) { - gst_element_error (GST_PAD_PARENT (pad), + gst_element_error (parent, "(internal error) maximum number of switches exceeded"); return; } + g_assert (GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) == NULL); + /* now fill the bufferpen and switch so it can be consumed */ GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf; GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p", GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); - do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + + do_element_switch (parent); GST_DEBUG (GST_CAT_DATAFLOW, "done switching"); } @@ -398,6 +423,10 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) static void gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf) { + GstElement *parent; + + parent = GST_PAD_PARENT (pad); + GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); GST_DEBUG (GST_CAT_DATAFLOW, "putting buffer %p in peer's pen", buf); @@ -406,11 +435,11 @@ gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf) /* now fill the bufferpen and switch so it can be consumed */ GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf; GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p", - GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); - GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad; - - do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + GST_ELEMENT_THREADSTATE (parent)); + parent->select_pad = pad; + do_element_switch (parent); + GST_DEBUG (GST_CAT_DATAFLOW, "done switching"); } @@ -419,7 +448,11 @@ static GstBuffer * gst_basic_scheduler_gethandler_proxy (GstPad * pad) { GstBuffer *buf; - GstRealPad *peer = GST_RPAD_PEER (pad); + GstElement *parent; + GstRealPad *peer; + + parent = GST_PAD_PARENT (pad); + peer = GST_RPAD_PEER (pad); GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); @@ -427,16 +460,17 @@ gst_basic_scheduler_gethandler_proxy (GstPad * pad) /* we will loop switching to the peer until it's filled up the bufferpen */ while (GST_RPAD_BUFPEN (pad) == NULL) { GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen", - GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))), - GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); - do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + GST_ELEMENT_NAME (parent), + GST_ELEMENT_THREADSTATE (parent)); + + do_element_switch (parent); /* we may no longer be the same pad, check. */ if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) { GST_DEBUG (GST_CAT_DATAFLOW, "new pad in mid-switch!"); pad = (GstPad *) GST_RPAD_PEER (peer); if (!pad) { - gst_element_error (GST_ELEMENT (GST_PAD_PARENT (peer)), "pad unconnected"); + gst_element_error (parent, "pad unconnected"); } } } @@ -453,7 +487,11 @@ static GstBuffer * gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guint64 offset, guint64 len) { GstBuffer *buf; - GstRealPad *peer = GST_RPAD_PEER (pad); + GstElement *parent; + GstRealPad *peer; + + parent = GST_PAD_PARENT (pad); + peer = GST_RPAD_PEER (pad); GST_DEBUG_ENTER ("%s:%s,%d,%lld,%lld", GST_DEBUG_PAD_NAME (pad), type, offset, len); @@ -466,8 +504,9 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin /* we will loop switching to the peer until it's filled up the bufferpen */ while (GST_RPAD_BUFPEN (pad) == NULL) { GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen", - GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); - do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + GST_ELEMENT_THREADSTATE (parent)); + + do_element_switch (parent); /* we may no longer be the same pad, check. */ if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) { @@ -1064,7 +1103,13 @@ static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element) { if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) { + if (element->post_run_func) + element->post_run_func (element); + + SCHED (element)->current = NULL; do_cothread_switch (do_cothread_get_main (((GstBasicScheduler *) sched)->context)); + + /* no need to do a pre_run, the cothread is stopping */ } } @@ -1072,6 +1117,11 @@ static gboolean gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element) { GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING); + + if (element->post_run_func) + element->post_run_func (element); + + SCHED (element)->current = NULL; do_cothread_switch (do_cothread_get_main (((GstBasicScheduler *) sched)->context)); return FALSE; @@ -1091,6 +1141,10 @@ gst_basic_scheduler_error (GstScheduler *sched, GstElement *element) GST_SCHEDULER_STATE (sched) = GST_SCHEDULER_STATE_ERROR; + if (element->post_run_func) + element->post_run_func (element); + + SCHED (element)->current = NULL; do_cothread_switch (do_cothread_get_main (((GstBasicScheduler *) sched)->context)); } } @@ -1192,7 +1246,7 @@ gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist) if (pad != NULL) { GstRealPad *peer = GST_RPAD_PEER (pad); - do_cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (peer))); + do_element_switch (GST_PAD_PARENT (peer)); pad = GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad; @@ -1266,7 +1320,15 @@ gst_basic_scheduler_iterate (GstScheduler * sched) GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)", GST_ELEMENT_NAME (entry), entry); if (GST_ELEMENT_THREADSTATE (entry)) { + if (entry->pre_run_func) + entry->pre_run_func (entry); + + bsched->current = entry; do_cothread_switch (GST_ELEMENT_THREADSTATE (entry)); + + if (bsched->current && bsched->current->post_run_func) + bsched->current->post_run_func (bsched->current); + state = GST_SCHEDULER_STATE (sched); /* if something changed, return - go on else */ if (GST_FLAG_IS_SET(bsched, GST_BASIC_SCHEDULER_CHANGE) && -- 2.7.4