From e7f54b1e74ace8ccd7dcbda38f706d685b762f1f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Thu, 27 Dec 2001 00:47:41 +0000 Subject: [PATCH] - Added a function to get the currently executing cothread Original commit message from CVS: - Added a function to get the currently executing cothread - Removed some useless includes - _interrupt now returns a boolean so the behaviour after the interrupt can be controlled by the scheduler. - Added a better way to set/get the default scheduler. - make thread and pipeline get the default scheduler. --- gst/cothreads.c | 15 +++++++++++++++ gst/cothreads.h | 2 +- gst/gst.h | 1 - gst/gstbin.h | 1 - gst/gstelement.c | 7 ++++--- gst/gstelement.h | 4 ++-- gst/gstpad.c | 4 +++- gst/gstpad.h | 1 - gst/gstpipeline.c | 13 ++++++++----- gst/gstpipeline.h | 1 + gst/gstqueue.c | 10 +++++----- gst/gstscheduler.c | 31 +++++++++++++++++++++++++++---- gst/gstscheduler.h | 7 +++++-- gst/gstthread.c | 29 +++++++++++++++++++---------- gst/schedulers/gstbasicscheduler.c | 8 ++++++-- plugins/elements/gstqueue.c | 10 +++++----- 16 files changed, 101 insertions(+), 43 deletions(-) diff --git a/gst/cothreads.c b/gst/cothreads.c index bbc91ee..a37ea45 100644 --- a/gst/cothreads.c +++ b/gst/cothreads.c @@ -308,6 +308,21 @@ cothread_current_main (void) return ctx->threads[0]; } +/** + * cothread_current: + * + * Get the currenttly executing cothread + * + * Returns: the #cothread_state of the current cothread + */ +cothread_state * +cothread_current (void) +{ + cothread_context *ctx = pthread_getspecific (_cothread_key); + + return ctx->threads[ctx->current]; +} + static void cothread_stub (void) { diff --git a/gst/cothreads.h b/gst/cothreads.h index 4fe370a..4bb06f8 100644 --- a/gst/cothreads.h +++ b/gst/cothreads.h @@ -76,7 +76,6 @@ void cothread_setfunc (cothread_state *thread, cothread_func func, int argc, char **argv); void cothread_stop (cothread_state *thread); -int cothread_getcurrent (void); void cothread_switch (cothread_state *thread); void cothread_set_data (cothread_state *thread, gchar *key, gpointer data); gpointer cothread_get_data (cothread_state *thread, gchar *key); @@ -87,5 +86,6 @@ void cothread_unlock (cothread_state *thread); cothread_state* cothread_main (cothread_context *ctx); cothread_state* cothread_current_main (void); +cothread_state* cothread_current (void); #endif /* __COTHREAD_H__ */ diff --git a/gst/gst.h b/gst/gst.h index 5fed063..ff3e641 100644 --- a/gst/gst.h +++ b/gst/gst.h @@ -47,7 +47,6 @@ #include #include #include -#include #include #include #include diff --git a/gst/gstbin.h b/gst/gstbin.h index ae9fe1c..ddef90a 100644 --- a/gst/gstbin.h +++ b/gst/gstbin.h @@ -25,7 +25,6 @@ #define __GST_BIN_H__ #include -#include #ifdef __cplusplus extern "C" { diff --git a/gst/gstelement.c b/gst/gstelement.c index f333539..679fb96 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -932,7 +932,6 @@ gst_element_set_state (GstElement *element, GstElementState state) g_return_val_if_fail (element != NULL, GST_STATE_FAILURE); g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE); - g_return_val_if_fail (element->sched != NULL, GST_STATE_FAILURE); GST_DEBUG_ELEMENT (GST_CAT_STATES,element, "setting state from %s to %s\n", gst_element_statename(GST_STATE(element)), @@ -1292,12 +1291,14 @@ gst_element_yield (GstElement *element) } } -void +gboolean gst_element_interrupt (GstElement *element) { if (GST_ELEMENT_SCHED (element)) { - gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element); + return gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element); } + else + return FALSE; } /** diff --git a/gst/gstelement.h b/gst/gstelement.h index cf8642f..fa8fd48 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -29,7 +29,6 @@ #include #include #include -#include #include #ifdef __cplusplus @@ -103,6 +102,7 @@ typedef enum { #define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED)) #define GST_ELEMENT_IS_EOS(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS)) #define GST_ELEMENT_IS_EVENT_AWARE(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE)) +#define GST_ELEMENT_IS_DECOUPLED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_DECOUPLED)) #define GST_ELEMENT_NAME(obj) (GST_OBJECT_NAME(obj)) #define GST_ELEMENT_PARENT(obj) (GST_OBJECT_PARENT(obj)) @@ -184,7 +184,7 @@ void gst_element_set_parent (GstElement *element, Gs GstObject* gst_element_get_parent (GstElement *element); void gst_element_yield (GstElement *element); -void gst_element_interrupt (GstElement *element); +gboolean gst_element_interrupt (GstElement *element); void gst_element_set_sched (GstElement *element, GstScheduler *sched); GstScheduler* gst_element_get_sched (GstElement *element); diff --git a/gst/gstpad.c b/gst/gstpad.c index 9e64db8..847eb9d 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -494,7 +494,9 @@ gst_pad_push_func(GstPad *pad, GstBuffer *buf) GST_DEBUG_FUNCPTR_NAME(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)))); (GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)))(pad,buf); } else { - GST_DEBUG (GST_CAT_DATAFLOW,"got a problem here: default pad_push handler in place, no chain function\n"); + GST_DEBUG (GST_CAT_DATAFLOW,"default pad_push handler in place, no chain function\n"); + g_warning ("(internal error) default pad_push in place for pad %s:%s but it has no chain function", + GST_DEBUG_PAD_NAME (pad)); } } diff --git a/gst/gstpad.h b/gst/gstpad.h index bf69928..5f572f0 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -28,7 +28,6 @@ #include #include -#include #include #include diff --git a/gst/gstpipeline.c b/gst/gstpipeline.c index dd1b459..95f4697 100644 --- a/gst/gstpipeline.c +++ b/gst/gstpipeline.c @@ -97,21 +97,22 @@ gst_pipeline_class_init (GstPipelineClass *klass) static void gst_pipeline_init (GstPipeline *pipeline) { + const gchar *schedname; GstScheduler *scheduler; /* we're a manager by default */ GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER); - scheduler = gst_schedulerfactory_make ("basic", GST_ELEMENT (pipeline)); + schedname = gst_schedulerfactory_get_default_name (); + scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (pipeline)); + GST_ELEMENT_SCHED (pipeline) = scheduler; gst_object_ref (GST_OBJECT (scheduler)); gst_object_sink (GST_OBJECT (scheduler)); gst_scheduler_setup (scheduler); - - GST_DEBUG (GST_CAT_PIPELINE, "pipeline's scheduler is %p\n", scheduler); } static void @@ -121,8 +122,10 @@ gst_pipeline_dispose (GObject *object) G_OBJECT_CLASS (parent_class)->dispose (object); - gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline)); - gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline))); + if (GST_ELEMENT_SCHED (pipeline)) { + gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline)); + gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline))); + } } /** diff --git a/gst/gstpipeline.h b/gst/gstpipeline.h index 3c0dbf0..7539c04 100644 --- a/gst/gstpipeline.h +++ b/gst/gstpipeline.h @@ -58,6 +58,7 @@ GType gst_pipeline_get_type (void); GstElement* gst_pipeline_new (const gchar *name); #define gst_pipeline_destroy(pipeline) gst_object_destroy(GST_OBJECT(pipeline)) + #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 140562e..3020513 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -284,7 +284,6 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); reader = FALSE; - restart: /* we have to lock the queue since we span threads */ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); @@ -353,7 +352,8 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - gst_element_interrupt (GST_ELEMENT (queue)); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return; goto restart; } if (GST_STATE (queue) != GST_STATE_PLAYING) { @@ -419,7 +419,6 @@ gst_queue_get (GstPad *pad) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); writer = FALSE; - restart: /* have to lock for thread-safety */ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); @@ -434,7 +433,8 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - gst_element_interrupt (GST_ELEMENT (queue)); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return NULL; goto restart; } if (GST_STATE (queue) != GST_STATE_PLAYING) { @@ -442,7 +442,7 @@ restart: if (!queue->may_deadlock) { g_mutex_unlock (queue->qlock); gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); - return NULL; + goto restart; } else { gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements"); diff --git a/gst/gstscheduler.c b/gst/gstscheduler.c index cfa59c8..c08cbbb 100644 --- a/gst/gstscheduler.c +++ b/gst/gstscheduler.c @@ -31,6 +31,8 @@ static void gst_scheduler_init (GstScheduler *sched); static GstObjectClass *parent_class = NULL; +static gchar *_default_name = NULL; + GType gst_scheduler_get_type (void) { @@ -282,15 +284,20 @@ gst_scheduler_yield (GstScheduler *sched, GstElement *element) * @element: the element requesting an interrupt * * Tell the scheduler to interrupt execution of this element. + * + * Retruns: TRUE if the element should return NULL from the chain/get + * function. */ -void +gboolean gst_scheduler_interrupt (GstScheduler *sched, GstElement *element) { - g_return_if_fail (GST_IS_SCHEDULER (sched)); - g_return_if_fail (GST_IS_ELEMENT (element)); + g_return_val_if_fail (GST_IS_SCHEDULER (sched), FALSE); + g_return_val_if_fail (GST_IS_ELEMENT (element), FALSE); if (CLASS (sched)->interrupt) - CLASS (sched)->interrupt (sched, element); + return CLASS (sched)->interrupt (sched, element); + + return FALSE; } /** @@ -387,6 +394,7 @@ gst_schedulerfactory_class_init (GstSchedulerFactoryClass *klass) #endif _gst_schedulerfactories = NULL; + _default_name = g_strdup ("basic"); } static void @@ -536,6 +544,21 @@ gst_schedulerfactory_make (const gchar *name, GstElement *parent) return gst_schedulerfactory_create (factory, parent); } +void +gst_schedulerfactory_set_default_name (const gchar* name) +{ + if (_default_name) + g_free (_default_name); + + _default_name = g_strdup (name); +} + +const gchar* +gst_schedulerfactory_get_default_name (void) +{ + return _default_name; +} + #ifndef GST_DISABLE_REGISTRY static xmlNodePtr gst_schedulerfactory_save_thyself (GstObject *object, xmlNodePtr parent) diff --git a/gst/gstscheduler.h b/gst/gstscheduler.h index a3b8af2..ef69975 100644 --- a/gst/gstscheduler.h +++ b/gst/gstscheduler.h @@ -77,7 +77,7 @@ struct _GstSchedulerClass { void (*lock_element) (GstScheduler *sched, GstElement *element); void (*unlock_element) (GstScheduler *sched, GstElement *element); void (*yield) (GstScheduler *sched, GstElement *element); - void (*interrupt) (GstScheduler *sched, GstElement *element); + gboolean (*interrupt) (GstScheduler *sched, GstElement *element); void (*error) (GstScheduler *sched, GstElement *element); void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); @@ -101,7 +101,7 @@ GstElementStateReturn gst_scheduler_state_transition (GstScheduler *sched, GstEl void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element); void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element); void gst_scheduler_yield (GstScheduler *sched, GstElement *element); -void gst_scheduler_interrupt (GstScheduler *sched, GstElement *element); +gboolean gst_scheduler_interrupt (GstScheduler *sched, GstElement *element); void gst_scheduler_error (GstScheduler *sched, GstElement *element); void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); @@ -150,6 +150,9 @@ GList* gst_schedulerfactory_get_list (void); GstScheduler* gst_schedulerfactory_create (GstSchedulerFactory *factory, GstElement *parent); GstScheduler* gst_schedulerfactory_make (const gchar *name, GstElement *parent); +void gst_schedulerfactory_set_default_name (const gchar* name); +const gchar* gst_schedulerfactory_get_default_name (void); + #ifdef __cplusplus } diff --git a/gst/gstthread.c b/gst/gstthread.c index 321fe62..7743f52 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -133,19 +133,27 @@ gst_thread_class_init (GstThreadClass *klass) static void gst_thread_init (GstThread *thread) { + const gchar *schedname; + GstScheduler *scheduler; - GST_DEBUG (GST_CAT_THREAD,"initializing thread\n"); + GST_DEBUG (GST_CAT_THREAD, "initializing thread\n"); /* we're a manager by default */ GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER); - thread->lock = g_mutex_new(); - thread->cond = g_cond_new(); + schedname = gst_schedulerfactory_get_default_name (); - GST_ELEMENT_SCHED(thread) = gst_schedulerfactory_make ("basic", GST_ELEMENT(thread)); - GST_DEBUG(GST_CAT_THREAD, "thread's scheduler is %p\n",GST_ELEMENT_SCHED(thread)); + scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (thread)); - thread->ppid = getpid(); + GST_ELEMENT_SCHED (thread) = scheduler; + + gst_object_ref (GST_OBJECT (scheduler)); + gst_object_sink (GST_OBJECT (scheduler)); + + thread->lock = g_mutex_new (); + thread->cond = g_cond_new (); + + thread->ppid = getpid (); thread->thread_id = -1; } @@ -154,16 +162,17 @@ gst_thread_dispose (GObject *object) { GstThread *thread = GST_THREAD (object); - GST_DEBUG (GST_CAT_REFCOUNTING,"dispose\n"); + GST_DEBUG (GST_CAT_REFCOUNTING, "dispose\n"); g_mutex_free (thread->lock); g_cond_free (thread->cond); G_OBJECT_CLASS (parent_class)->dispose (object); - gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread))); - gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread))); - + if (GST_ELEMENT_SCHED (thread)) { + gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread))); + gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread))); + } } static void diff --git a/gst/schedulers/gstbasicscheduler.c b/gst/schedulers/gstbasicscheduler.c index 329e0bc..0fc128b 100644 --- a/gst/schedulers/gstbasicscheduler.c +++ b/gst/schedulers/gstbasicscheduler.c @@ -101,7 +101,7 @@ static GstElementStateReturn static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element); -static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element); +static gboolean gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); @@ -330,6 +330,7 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[]) } } } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); +exit: GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING); GST_DEBUG_LEAVE (""); @@ -1056,10 +1057,13 @@ gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element) } } -static void +static gboolean gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element) { + GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING); cothread_switch (cothread_current_main ()); + + return FALSE; } static void diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 140562e..3020513 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -284,7 +284,6 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); reader = FALSE; - restart: /* we have to lock the queue since we span threads */ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); @@ -353,7 +352,8 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - gst_element_interrupt (GST_ELEMENT (queue)); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return; goto restart; } if (GST_STATE (queue) != GST_STATE_PLAYING) { @@ -419,7 +419,6 @@ gst_queue_get (GstPad *pad) queue = GST_QUEUE (GST_OBJECT_PARENT (pad)); writer = FALSE; - restart: /* have to lock for thread-safety */ GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ()); @@ -434,7 +433,8 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - gst_element_interrupt (GST_ELEMENT (queue)); + if (gst_element_interrupt (GST_ELEMENT (queue))) + return NULL; goto restart; } if (GST_STATE (queue) != GST_STATE_PLAYING) { @@ -442,7 +442,7 @@ restart: if (!queue->may_deadlock) { g_mutex_unlock (queue->qlock); gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); - return NULL; + goto restart; } else { gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements"); -- 2.7.4