return ctx->threads[0];
}
+/**
+ * cothread_current:
+ *
+ * Get the currenttly executing cothread
+ *
+ * Returns: the #cothread_state of the current cothread
+ */
+cothread_state *
+cothread_current (void)
+{
+ cothread_context *ctx = pthread_getspecific (_cothread_key);
+
+ return ctx->threads[ctx->current];
+}
+
static void
cothread_stub (void)
{
int argc, char **argv);
void cothread_stop (cothread_state *thread);
-int cothread_getcurrent (void);
void cothread_switch (cothread_state *thread);
void cothread_set_data (cothread_state *thread, gchar *key, gpointer data);
gpointer cothread_get_data (cothread_state *thread, gchar *key);
cothread_state* cothread_main (cothread_context *ctx);
cothread_state* cothread_current_main (void);
+cothread_state* cothread_current (void);
#endif /* __COTHREAD_H__ */
#include <gst/gstutils.h>
#include <gst/gsttrace.h>
#include <gst/gstxml.h>
-#include <gst/cothreads.h>
#include <gst/gstscheduler.h>
#include <gst/gsttimecache.h>
#include <gst/gstevent.h>
#define __GST_BIN_H__
#include <gst/gstelement.h>
-#include <gst/cothreads.h>
#ifdef __cplusplus
extern "C" {
g_return_val_if_fail (element != NULL, GST_STATE_FAILURE);
g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
- g_return_val_if_fail (element->sched != NULL, GST_STATE_FAILURE);
GST_DEBUG_ELEMENT (GST_CAT_STATES,element, "setting state from %s to %s\n",
gst_element_statename(GST_STATE(element)),
}
}
-void
+gboolean
gst_element_interrupt (GstElement *element)
{
if (GST_ELEMENT_SCHED (element)) {
- gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element);
+ return gst_scheduler_interrupt (GST_ELEMENT_SCHED (element), element);
}
+ else
+ return FALSE;
}
/**
#include <gst/gsttypes.h>
#include <gst/gstobject.h>
#include <gst/gstpad.h>
-#include <gst/cothreads.h>
#include <gst/gstpluginfeature.h>
#ifdef __cplusplus
#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
#define GST_ELEMENT_IS_EOS(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS))
#define GST_ELEMENT_IS_EVENT_AWARE(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE))
+#define GST_ELEMENT_IS_DECOUPLED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_DECOUPLED))
#define GST_ELEMENT_NAME(obj) (GST_OBJECT_NAME(obj))
#define GST_ELEMENT_PARENT(obj) (GST_OBJECT_PARENT(obj))
GstObject* gst_element_get_parent (GstElement *element);
void gst_element_yield (GstElement *element);
-void gst_element_interrupt (GstElement *element);
+gboolean gst_element_interrupt (GstElement *element);
void gst_element_set_sched (GstElement *element, GstScheduler *sched);
GstScheduler* gst_element_get_sched (GstElement *element);
GST_DEBUG_FUNCPTR_NAME(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad))));
(GST_RPAD_CHAINFUNC(GST_RPAD_PEER(pad)))(pad,buf);
} else {
- GST_DEBUG (GST_CAT_DATAFLOW,"got a problem here: default pad_push handler in place, no chain function\n");
+ GST_DEBUG (GST_CAT_DATAFLOW,"default pad_push handler in place, no chain function\n");
+ g_warning ("(internal error) default pad_push in place for pad %s:%s but it has no chain function",
+ GST_DEBUG_PAD_NAME (pad));
}
}
#include <gst/gstobject.h>
#include <gst/gstbuffer.h>
-#include <gst/cothreads.h>
#include <gst/gstcaps.h>
#include <gst/gstevent.h>
static void
gst_pipeline_init (GstPipeline *pipeline)
{
+ const gchar *schedname;
GstScheduler *scheduler;
/* we're a manager by default */
GST_FLAG_SET (pipeline, GST_BIN_FLAG_MANAGER);
- scheduler = gst_schedulerfactory_make ("basic", GST_ELEMENT (pipeline));
+ schedname = gst_schedulerfactory_get_default_name ();
+ scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (pipeline));
+
GST_ELEMENT_SCHED (pipeline) = scheduler;
gst_object_ref (GST_OBJECT (scheduler));
gst_object_sink (GST_OBJECT (scheduler));
gst_scheduler_setup (scheduler);
-
- GST_DEBUG (GST_CAT_PIPELINE, "pipeline's scheduler is %p\n", scheduler);
}
static void
G_OBJECT_CLASS (parent_class)->dispose (object);
- gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline));
- gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline)));
+ if (GST_ELEMENT_SCHED (pipeline)) {
+ gst_scheduler_reset (GST_ELEMENT_SCHED (pipeline));
+ gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (pipeline)));
+ }
}
/**
GstElement* gst_pipeline_new (const gchar *name);
#define gst_pipeline_destroy(pipeline) gst_object_destroy(GST_OBJECT(pipeline))
+
#ifdef __cplusplus
}
#endif /* __cplusplus */
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
reader = FALSE;
-
restart:
/* we have to lock the queue since we span threads */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- gst_element_interrupt (GST_ELEMENT (queue));
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
writer = FALSE;
-
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- gst_element_interrupt (GST_ELEMENT (queue));
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return NULL;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
- return NULL;
+ goto restart;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
static GstObjectClass *parent_class = NULL;
+static gchar *_default_name = NULL;
+
GType
gst_scheduler_get_type (void)
{
* @element: the element requesting an interrupt
*
* Tell the scheduler to interrupt execution of this element.
+ *
+ * Retruns: TRUE if the element should return NULL from the chain/get
+ * function.
*/
-void
+gboolean
gst_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
- g_return_if_fail (GST_IS_SCHEDULER (sched));
- g_return_if_fail (GST_IS_ELEMENT (element));
+ g_return_val_if_fail (GST_IS_SCHEDULER (sched), FALSE);
+ g_return_val_if_fail (GST_IS_ELEMENT (element), FALSE);
if (CLASS (sched)->interrupt)
- CLASS (sched)->interrupt (sched, element);
+ return CLASS (sched)->interrupt (sched, element);
+
+ return FALSE;
}
/**
#endif
_gst_schedulerfactories = NULL;
+ _default_name = g_strdup ("basic");
}
static void
return gst_schedulerfactory_create (factory, parent);
}
+void
+gst_schedulerfactory_set_default_name (const gchar* name)
+{
+ if (_default_name)
+ g_free (_default_name);
+
+ _default_name = g_strdup (name);
+}
+
+const gchar*
+gst_schedulerfactory_get_default_name (void)
+{
+ return _default_name;
+}
+
#ifndef GST_DISABLE_REGISTRY
static xmlNodePtr
gst_schedulerfactory_save_thyself (GstObject *object, xmlNodePtr parent)
void (*lock_element) (GstScheduler *sched, GstElement *element);
void (*unlock_element) (GstScheduler *sched, GstElement *element);
void (*yield) (GstScheduler *sched, GstElement *element);
- void (*interrupt) (GstScheduler *sched, GstElement *element);
+ gboolean (*interrupt) (GstScheduler *sched, GstElement *element);
void (*error) (GstScheduler *sched, GstElement *element);
void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_yield (GstScheduler *sched, GstElement *element);
-void gst_scheduler_interrupt (GstScheduler *sched, GstElement *element);
+gboolean gst_scheduler_interrupt (GstScheduler *sched, GstElement *element);
void gst_scheduler_error (GstScheduler *sched, GstElement *element);
void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
GstScheduler* gst_schedulerfactory_create (GstSchedulerFactory *factory, GstElement *parent);
GstScheduler* gst_schedulerfactory_make (const gchar *name, GstElement *parent);
+void gst_schedulerfactory_set_default_name (const gchar* name);
+const gchar* gst_schedulerfactory_get_default_name (void);
+
#ifdef __cplusplus
}
static void
gst_thread_init (GstThread *thread)
{
+ const gchar *schedname;
+ GstScheduler *scheduler;
- GST_DEBUG (GST_CAT_THREAD,"initializing thread\n");
+ GST_DEBUG (GST_CAT_THREAD, "initializing thread\n");
/* we're a manager by default */
GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER);
- thread->lock = g_mutex_new();
- thread->cond = g_cond_new();
+ schedname = gst_schedulerfactory_get_default_name ();
- GST_ELEMENT_SCHED(thread) = gst_schedulerfactory_make ("basic", GST_ELEMENT(thread));
- GST_DEBUG(GST_CAT_THREAD, "thread's scheduler is %p\n",GST_ELEMENT_SCHED(thread));
+ scheduler = gst_schedulerfactory_make (schedname, GST_ELEMENT (thread));
- thread->ppid = getpid();
+ GST_ELEMENT_SCHED (thread) = scheduler;
+
+ gst_object_ref (GST_OBJECT (scheduler));
+ gst_object_sink (GST_OBJECT (scheduler));
+
+ thread->lock = g_mutex_new ();
+ thread->cond = g_cond_new ();
+
+ thread->ppid = getpid ();
thread->thread_id = -1;
}
{
GstThread *thread = GST_THREAD (object);
- GST_DEBUG (GST_CAT_REFCOUNTING,"dispose\n");
+ GST_DEBUG (GST_CAT_REFCOUNTING, "dispose\n");
g_mutex_free (thread->lock);
g_cond_free (thread->cond);
G_OBJECT_CLASS (parent_class)->dispose (object);
- gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
- gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
-
+ if (GST_ELEMENT_SCHED (thread)) {
+ gst_object_destroy (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
+ gst_object_unref (GST_OBJECT (GST_ELEMENT_SCHED (thread)));
+ }
}
static void
static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element);
-static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
+static gboolean gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
}
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+exit:
GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
GST_DEBUG_LEAVE ("");
}
}
-static void
+static gboolean
gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
{
+ GST_FLAG_SET (element, GST_ELEMENT_COTHREAD_STOPPING);
cothread_switch (cothread_current_main ());
+
+ return FALSE;
}
static void
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
reader = FALSE;
-
restart:
/* we have to lock the queue since we span threads */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- gst_element_interrupt (GST_ELEMENT (queue));
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
writer = FALSE;
-
restart:
/* have to lock for thread-safety */
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "locking t:%ld\n", pthread_self ());
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- gst_element_interrupt (GST_ELEMENT (queue));
+ if (gst_element_interrupt (GST_ELEMENT (queue)))
+ return NULL;
goto restart;
}
if (GST_STATE (queue) != GST_STATE_PLAYING) {
if (!queue->may_deadlock) {
g_mutex_unlock (queue->qlock);
gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
- return NULL;
+ goto restart;
}
else {
gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");