GtkType type);
static void gst_bin_create_plan_func (GstBin *bin);
-static void gst_bin_iterate_func (GstBin *bin);
+static gboolean gst_bin_iterate_func (GstBin *bin);
static xmlNodePtr gst_bin_save_thyself (GstElement *element, xmlNodePtr parent);
static void gst_bin_restore_thyself (GstElement *element, xmlNodePtr parent,
bin->numchildren = 0;
bin->children = NULL;
bin->eos_providers = NULL;
+ bin->num_eos_providers = 0;
bin->chains = NULL;
// FIXME temporary testing measure
// bin->use_cothreads = TRUE;
*
* Iterates over the elements in this bin.
*/
-void
+gboolean
gst_bin_iterate (GstBin *bin)
{
GstBinClass *oclass;
+ gboolean eos = TRUE;
GST_DEBUG_ENTER("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass);
if (oclass->iterate)
- (oclass->iterate) (bin);
+ eos = (oclass->iterate) (bin);
GST_DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
+
+ return eos;
}
/**
(oclass->create_plan) (bin);
}
+/* out internal element fired EOS, we decrement the number of pending EOS childs */
+static void
+gst_bin_received_eos (GstElement *element, GstBin *bin)
+{
+ GST_INFO_ELEMENT (GST_CAT_PLANNING, bin, "child %s fired eos, pending %d\n", gst_element_get_name (element),
+ bin->num_eos_providers);
+
+ if (bin->num_eos_providers) {
+ bin->num_eos_providers--;
+ }
+}
+
/**
* gst_bin_schedule:
* @bin: #GstBin to schedule
if (GST_IS_BIN (element)) {
GST_DEBUG (0,"flattened recurse into \"%s\"\n",elementname);
pending = g_slist_prepend (pending, element);
+
+ gtk_signal_connect (GTK_OBJECT (element), "eos", gst_bin_received_eos, bin);
+ bin->eos_providers = g_list_prepend (bin->eos_providers, element);
+ bin->num_eos_providers++;
+
// otherwise add it to the list of elements
} else {
GST_DEBUG (0,"found element \"%s\" that I manage\n",elementname);
GST_DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
}
-static void
+static gboolean
gst_bin_iterate_func (GstBin *bin)
{
GList *chains;
GstPad *pad;
GstBuffer *buf = NULL;
gint num_scheduled = 0;
+ gboolean eos = FALSE;
GST_DEBUG_ENTER("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin)));
- g_return_if_fail (bin != NULL);
- g_return_if_fail (GST_IS_BIN (bin));
- g_return_if_fail (GST_STATE (bin) == GST_STATE_PLAYING);
+ g_return_val_if_fail (bin != NULL, TRUE);
+ g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
+ g_return_val_if_fail (GST_STATE (bin) == GST_STATE_PLAYING, TRUE);
// step through all the chains
chains = bin->chains;
num_scheduled++;
}
- if (!num_scheduled) {
+ /*
+ g_print ("bin \"%s\", eos providers:%d, scheduled: %d\n",
+ gst_element_get_name (GST_ELEMENT (bin)),
+ bin->num_eos_providers, num_scheduled);
+ */
+
+ if (!num_scheduled && !bin->num_eos_providers) {
gst_element_signal_eos (GST_ELEMENT (bin));
+ eos = TRUE;
}
GST_DEBUG_LEAVE("(%s)", gst_element_get_name (GST_ELEMENT (bin)));
+ return !eos;
}
void (*create_plan) (GstBin *bin);
void (*schedule) (GstBin *bin);
/* run a full iteration of operation */
- void (*iterate) (GstBin *bin);
+ gboolean (*iterate) (GstBin *bin);
};
struct __GstBinChain {
GstElementState state,
GtkType type);
-void gst_bin_iterate (GstBin *bin);
+gboolean gst_bin_iterate (GstBin *bin);
/* hack FIXME */
void gst_bin_use_cothreads (GstBin *bin,
destparent = gst_object_get_parent (GST_OBJECT (dest));
/* have to make sure that they have the same parents... */
+ /*
if (srcparent != destparent) {
GST_ERROR_OBJECT(srcparent,destparent,"%s and %s have different parents",
gst_element_get_name(src),gst_element_get_name(dest));
return;
}
+ */
/* we're satisified they can be connected, let's do it */
gst_pad_connect(srcpad,destpad);
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
- * gstqueue.c:
+ * gstqueue.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
#ifdef STATUS_ENABLED
#define STATUS(A) GST_DEBUG(0,A, gst_element_get_name(GST_ELEMENT(queue)))
#else
-#define STATUS(A)
+#define STATUS(A)
#endif
#include <pthread.h>
};
-static void gst_queue_class_init (GstQueueClass *klass);
-static void gst_queue_init (GstQueue *queue);
+static void gst_queue_class_init (GstQueueClass *klass);
+static void gst_queue_init (GstQueue *queue);
-static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
+static gboolean gst_queue_handle_eos (GstPad *pad);
+static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static GstBuffer * gst_queue_get (GstPad *pad);
-static void gst_queue_flush (GstQueue *queue);
+static void gst_queue_flush (GstQueue *queue);
-static GstElementStateReturn gst_queue_change_state (GstElement *element);
+static GstElementStateReturn gst_queue_change_state (GstElement *element);
static GstElementClass *parent_class = NULL;
return queue_type;
}
-static void
-gst_queue_class_init (GstQueueClass *klass)
+static void
+gst_queue_class_init (GstQueueClass *klass)
{
GtkObjectClass *gtkobject_class;
GstElementClass *gstelement_class;
gtk_object_add_arg_type ("GstQueue::timeout", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_TIMEOUT);
- gtkobject_class->set_arg = gst_queue_set_arg;
+ gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
gstelement_class->change_state = gst_queue_change_state;
}
-static void
-gst_queue_init (GstQueue *queue)
+static void
+gst_queue_init (GstQueue *queue)
{
// scheduling on this kind of element is, well, interesting
GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
+ gst_pad_set_eos_function (queue->sinkpad, gst_queue_handle_eos);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
queue->fullcond = g_cond_new ();
}
-static void
-gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
+static gboolean
+gst_queue_handle_eos (GstPad *pad)
+{
+ GstQueue *queue;
+
+ queue = GST_QUEUE(pad->parent);
+
+ GST_DEBUG (0,"queue: %s received eos\n", gst_element_get_name (GST_ELEMENT (queue)));
+
+ GST_LOCK (queue);
+ GST_DEBUG (0,"queue: %s has %d buffers left\n", gst_element_get_name (GST_ELEMENT (queue)),
+ queue->level_buffers);
+
+ GST_FLAG_SET (pad, GST_PAD_EOS);
+
+ g_cond_signal (queue->emptycond);
+
+ GST_UNLOCK (queue);
+
+ return TRUE;
+}
+
+static void
+gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
GST_DEBUG (0,"queue: %s cleaning buffer %p\n", (gchar *)user_data, data);
-
+
gst_buffer_unref (GST_BUFFER (data));
}
-static void
-gst_queue_flush (GstQueue *queue)
+static void
+gst_queue_flush (GstQueue *queue)
{
- g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
- (char *)gst_element_get_name (GST_ELEMENT (queue)));
+ g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
+ (char *)gst_element_get_name (GST_ELEMENT (queue)));
g_slist_free (queue->queue);
-
+
queue->queue = NULL;
queue->level_buffers = 0;
queue->timeval = NULL;
}
-static void
-gst_queue_chain (GstPad *pad, GstBuffer *buf)
+static void
+gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
gboolean tosignal = FALSE;
}
static GstBuffer *
-gst_queue_get (GstPad *pad)
+gst_queue_get (GstPad *pad)
{
GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
GstBuffer *buf = NULL;
while (!queue->level_buffers) {
STATUS("queue: %s U released lock\n");
//g_cond_timed_wait (queue->emptycond, queue->emptylock, queue->timeval);
+ if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
+ gst_pad_set_eos (queue->srcpad);
+ return NULL;
+ }
//FIXME need to signal other thread in case signals got lost?
g_cond_signal (queue->fullcond);
g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
/* unlock now */
}
-static GstElementStateReturn
-gst_queue_change_state (GstElement *element)
+static GstElementStateReturn
+gst_queue_change_state (GstElement *element)
{
GstQueue *queue;
g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
}
-static void
-gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
+static void
+gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
{
GstQueue *queue;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_QUEUE (object));
-
+
queue = GST_QUEUE (object);
switch(id) {
}
}
-static void
-gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
+static void
+gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
{
GstQueue *queue;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_QUEUE (object));
-
+
queue = GST_QUEUE (object);
switch (id) {
if ((GST_RPAD_DIRECTION(pad) == GST_PAD_SINK) &&
(GST_FLAG_IS_SET (peerparent, GST_ELEMENT_DECOUPLED))) {
chain->entries = g_list_prepend (chain->entries, peerparent);
+ gtk_signal_connect (GTK_OBJECT (peerparent), "eos", gst_scheduler_handle_eos, chain);
GST_DEBUG (0,"added '%s' as DECOUPLED entry into the chain\n",gst_element_get_name(peerparent));
}
} else
gst_thread_signal_thread (thread);
while (!GST_FLAG_IS_SET (thread, GST_THREAD_STATE_REAPING)) {
- if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING))
- gst_bin_iterate (GST_BIN (thread));
+ if (GST_FLAG_IS_SET (thread, GST_THREAD_STATE_SPINNING)) {
+ if (!gst_bin_iterate (GST_BIN (thread))) {
+ GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
+ }
+ }
else {
+ GST_DEBUG (0, "thread \"%s\" waiting\n", gst_element_get_name (GST_ELEMENT (thread)));
gst_thread_wait_thread (thread);
}
}
return NULL;
}
-static void
-gst_thread_signal_thread (GstThread *thread)
+static void
+gst_thread_signal_thread (GstThread *thread)
{
GST_DEBUG (0,"signaling thread\n");
g_mutex_lock (thread->lock);
}
-static void
+static void
gst_thread_restore_thyself (GstElement *element,
- xmlNodePtr parent,
- GHashTable *elements)
+ xmlNodePtr parent,
+ GHashTable *elements)
{
GST_DEBUG (0,"gstthread: restore\n");
GST_ELEMENT_CLASS (parent_class)->restore_thyself (element,parent, elements);
}
-static xmlNodePtr
+static xmlNodePtr
gst_thread_save_thyself (GstElement *element,
- xmlNodePtr parent)
+ xmlNodePtr parent)
{
if (GST_ELEMENT_CLASS (parent_class)->save_thyself)
GST_ELEMENT_CLASS (parent_class)->save_thyself (element,parent);
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000 Wim Taymans <wtay@chello.be>
*
- * gstqueue.c:
+ * gstqueue.c:
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
#ifdef STATUS_ENABLED
#define STATUS(A) GST_DEBUG(0,A, gst_element_get_name(GST_ELEMENT(queue)))
#else
-#define STATUS(A)
+#define STATUS(A)
#endif
#include <pthread.h>
};
-static void gst_queue_class_init (GstQueueClass *klass);
-static void gst_queue_init (GstQueue *queue);
+static void gst_queue_class_init (GstQueueClass *klass);
+static void gst_queue_init (GstQueue *queue);
-static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
+static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
-static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
+static gboolean gst_queue_handle_eos (GstPad *pad);
+static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static GstBuffer * gst_queue_get (GstPad *pad);
-static void gst_queue_flush (GstQueue *queue);
+static void gst_queue_flush (GstQueue *queue);
-static GstElementStateReturn gst_queue_change_state (GstElement *element);
+static GstElementStateReturn gst_queue_change_state (GstElement *element);
static GstElementClass *parent_class = NULL;
return queue_type;
}
-static void
-gst_queue_class_init (GstQueueClass *klass)
+static void
+gst_queue_class_init (GstQueueClass *klass)
{
GtkObjectClass *gtkobject_class;
GstElementClass *gstelement_class;
gtk_object_add_arg_type ("GstQueue::timeout", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_TIMEOUT);
- gtkobject_class->set_arg = gst_queue_set_arg;
+ gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
gstelement_class->change_state = gst_queue_change_state;
}
-static void
-gst_queue_init (GstQueue *queue)
+static void
+gst_queue_init (GstQueue *queue)
{
// scheduling on this kind of element is, well, interesting
GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED);
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain));
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
+ gst_pad_set_eos_function (queue->sinkpad, gst_queue_handle_eos);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_get_function (queue->srcpad, GST_DEBUG_FUNCPTR(gst_queue_get));
queue->fullcond = g_cond_new ();
}
-static void
-gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
+static gboolean
+gst_queue_handle_eos (GstPad *pad)
+{
+ GstQueue *queue;
+
+ queue = GST_QUEUE(pad->parent);
+
+ GST_DEBUG (0,"queue: %s received eos\n", gst_element_get_name (GST_ELEMENT (queue)));
+
+ GST_LOCK (queue);
+ GST_DEBUG (0,"queue: %s has %d buffers left\n", gst_element_get_name (GST_ELEMENT (queue)),
+ queue->level_buffers);
+
+ GST_FLAG_SET (pad, GST_PAD_EOS);
+
+ g_cond_signal (queue->emptycond);
+
+ GST_UNLOCK (queue);
+
+ return TRUE;
+}
+
+static void
+gst_queue_cleanup_buffers (gpointer data, const gpointer user_data)
{
GST_DEBUG (0,"queue: %s cleaning buffer %p\n", (gchar *)user_data, data);
-
+
gst_buffer_unref (GST_BUFFER (data));
}
-static void
-gst_queue_flush (GstQueue *queue)
+static void
+gst_queue_flush (GstQueue *queue)
{
- g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
- (char *)gst_element_get_name (GST_ELEMENT (queue)));
+ g_slist_foreach (queue->queue, gst_queue_cleanup_buffers,
+ (char *)gst_element_get_name (GST_ELEMENT (queue)));
g_slist_free (queue->queue);
-
+
queue->queue = NULL;
queue->level_buffers = 0;
queue->timeval = NULL;
}
-static void
-gst_queue_chain (GstPad *pad, GstBuffer *buf)
+static void
+gst_queue_chain (GstPad *pad, GstBuffer *buf)
{
GstQueue *queue;
gboolean tosignal = FALSE;
}
static GstBuffer *
-gst_queue_get (GstPad *pad)
+gst_queue_get (GstPad *pad)
{
GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
GstBuffer *buf = NULL;
while (!queue->level_buffers) {
STATUS("queue: %s U released lock\n");
//g_cond_timed_wait (queue->emptycond, queue->emptylock, queue->timeval);
+ if (GST_FLAG_IS_SET (queue->sinkpad, GST_PAD_EOS)) {
+ gst_pad_set_eos (queue->srcpad);
+ return NULL;
+ }
//FIXME need to signal other thread in case signals got lost?
g_cond_signal (queue->fullcond);
g_cond_wait (queue->emptycond, GST_OBJECT(queue)->lock);
/* unlock now */
}
-static GstElementStateReturn
-gst_queue_change_state (GstElement *element)
+static GstElementStateReturn
+gst_queue_change_state (GstElement *element)
{
GstQueue *queue;
g_return_val_if_fail (GST_IS_QUEUE (element), GST_STATE_FAILURE);
}
-static void
-gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
+static void
+gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id)
{
GstQueue *queue;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_QUEUE (object));
-
+
queue = GST_QUEUE (object);
switch(id) {
}
}
-static void
-gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
+static void
+gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id)
{
GstQueue *queue;
/* it's not null if we got it, but it might not be ours */
g_return_if_fail (GST_IS_QUEUE (object));
-
+
queue = GST_QUEUE (object);
switch (id) {