* the playout pointer hits the end of cache again it has to start pulling.
*/
- do {
- /* the first time through, the current_playout pointer is going to be NULL */
- if (cache->current_playout == NULL) {
- /* get a buffer */
- buf = gst_pad_pull (cache->sinkpad);
+ /* the first time through, the current_playout pointer is going to be NULL */
+ if (cache->current_playout == NULL) {
+ /* get a buffer */
+ buf = gst_pad_pull (cache->sinkpad);
- /* add it to the cache, though cache == NULL */
- gst_buffer_ref (buf);
- cache->cache = g_list_prepend (cache->cache, buf);
- cache->buffer_count++;
+ /* add it to the cache, though cache == NULL */
+ gst_buffer_ref (buf);
+ cache->cache = g_list_prepend (cache->cache, buf);
+ cache->buffer_count++;
- /* set the current_playout pointer */
- cache->current_playout = cache->cache;
+ /* set the current_playout pointer */
+ cache->current_playout = cache->cache;
- g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
-
- /* send the buffer on its way */
- gst_pad_push (cache->srcpad, buf);
- }
-
- /* the steady state is where the playout is at the front of the cache */
- else if (g_list_previous(cache->current_playout) == NULL) {
+ g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
- /* if we've been told to fire an empty signal (after a reset) */
- if (cache->fire_empty) {
- int oldstate = GST_STATE(cache);
- GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n");
+ /* send the buffer on its way */
+ gst_pad_push (cache->srcpad, buf);
+ }
+ /* the steady state is where the playout is at the front of the cache */
+ else if (g_list_previous(cache->current_playout) == NULL) {
+
+ /* if we've been told to fire an empty signal (after a reset) */
+ if (cache->fire_empty) {
+ int oldstate = GST_STATE(cache);
+ GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n");
+ gst_object_ref (GST_OBJECT (cache));
+ g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL);
+ if (GST_STATE(cache) != oldstate) {
gst_object_ref (GST_OBJECT (cache));
- g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL);
- if (GST_STATE(cache) != oldstate) {
- gst_object_ref (GST_OBJECT (cache));
- GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n");
- cothread_switch(cothread_current_main());
- }
- gst_object_unref (GST_OBJECT (cache));
+ GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n");
+ cothread_switch(cothread_current_main());
}
+ gst_object_unref (GST_OBJECT (cache));
+ }
- /* get a buffer */
- buf = gst_pad_pull (cache->sinkpad);
+ /* get a buffer */
+ buf = gst_pad_pull (cache->sinkpad);
- /* add it to the front of the cache */
- gst_buffer_ref (buf);
- cache->cache = g_list_prepend (cache->cache, buf);
- cache->buffer_count++;
+ /* add it to the front of the cache */
+ gst_buffer_ref (buf);
+ cache->cache = g_list_prepend (cache->cache, buf);
+ cache->buffer_count++;
- /* set the current_playout pointer */
- cache->current_playout = cache->cache;
+ /* set the current_playout pointer */
+ cache->current_playout = cache->cache;
- /* send the buffer on its way */
- gst_pad_push (cache->srcpad, buf);
- }
-
- /* otherwise we're trundling through existing cached buffers */
- else {
- /* move the current_playout pointer */
- cache->current_playout = g_list_previous (cache->current_playout);
+ /* send the buffer on its way */
+ gst_pad_push (cache->srcpad, buf);
+ }
- if (cache->fire_first) {
- g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
- cache->fire_first = FALSE;
- }
+ /* otherwise we're trundling through existing cached buffers */
+ else {
+ /* move the current_playout pointer */
+ cache->current_playout = g_list_previous (cache->current_playout);
- /* push that buffer */
- gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data));
+ if (cache->fire_first) {
+ g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf);
+ cache->fire_first = FALSE;
}
- } while (!GST_FLAG_IS_SET (element, GST_ELEMENT_COTHREAD_STOPPING));
+
+ /* push that buffer */
+ gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data));
+ }
}
static GstPadNegotiateReturn
cothread_destroy (ctx->threads[slot]);
break;
}
-
}
sp = CURRENT_STACK_FRAME;
aggregator = GST_AGGREGATOR (element);
- do {
- if (aggregator->sched == AGGREGATOR_LOOP ||
- aggregator->sched == AGGREGATOR_LOOP_PEEK) {
- GList *pads = aggregator->sinkpads;
-
- while (pads) {
- GstPad *pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
-
- if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
- buf = gst_pad_peek (pad);
- if (buf == NULL)
- continue;
-
- g_assert (buf == gst_pad_pull (pad));
- debug = "loop_peek";
- }
- else {
- buf = gst_pad_pull (pad);
- debug = "loop";
- }
- gst_aggregator_push (aggregator, pad, buf, debug);
- }
- }
- else {
- if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
- GstPad *pad;
+ if (aggregator->sched == AGGREGATOR_LOOP ||
+ aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+ GList *pads = aggregator->sinkpads;
- debug = "loop_select";
+ while (pads) {
+ GstPad *pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
- pad = gst_pad_select (aggregator->sinkpads);
- buf = gst_pad_pull (pad);
+ if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+ buf = gst_pad_peek (pad);
+ if (buf == NULL)
+ continue;
- gst_aggregator_push (aggregator, pad, buf, debug);
+ g_assert (buf == gst_pad_pull (pad));
+ debug = "loop_peek";
}
else {
- g_assert_not_reached ();
+ buf = gst_pad_pull (pad);
+ debug = "loop";
}
+ gst_aggregator_push (aggregator, pad, buf, debug);
+ }
+ }
+ else {
+ if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
+ GstPad *pad;
+
+ debug = "loop_select";
+
+ pad = gst_pad_select (aggregator->sinkpads);
+ buf = gst_pad_pull (pad);
+
+ gst_aggregator_push (aggregator, pad, buf, debug);
}
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+ else {
+ g_assert_not_reached ();
+ }
+ }
}
/**
case ARG_OUTPUT:
break;
case ARG_DATA:
- src->data = g_value_get_int (value);
+ src->data = g_value_get_enum (value);
switch (src->data) {
case FAKESRC_DATA_ALLOCATE:
if (src->parent) {
}
break;
case ARG_SIZETYPE:
- src->sizetype = g_value_get_int (value);
+ src->sizetype = g_value_get_enum (value);
break;
case ARG_SIZEMIN:
src->sizemin = g_value_get_int (value);
src->parentsize = g_value_get_int (value);
break;
case ARG_FILLTYPE:
- src->filltype = g_value_get_int (value);
+ src->filltype = g_value_get_enum (value);
break;
case ARG_PATTERN:
break;
g_value_set_boolean (value, src->loop_based);
break;
case ARG_OUTPUT:
- g_value_set_int (value, src->output);
+ g_value_set_enum (value, src->output);
break;
case ARG_DATA:
- g_value_set_int (value, src->data);
+ g_value_set_enum (value, src->data);
break;
case ARG_SIZETYPE:
- g_value_set_int (value, src->sizetype);
+ g_value_set_enum (value, src->sizetype);
break;
case ARG_SIZEMIN:
g_value_set_int (value, src->sizemin);
g_value_set_int (value, src->parentsize);
break;
case ARG_FILLTYPE:
- g_value_set_int (value, src->filltype);
+ g_value_set_enum (value, src->filltype);
break;
case ARG_PATTERN:
g_value_set_string (value, src->pattern);
gst_fakesrc_loop(GstElement *element)
{
GstFakeSrc *src;
+ GList *pads;
g_return_if_fail(element != NULL);
g_return_if_fail(GST_IS_FAKESRC(element));
src = GST_FAKESRC (element);
- do {
- GList *pads;
-
- pads = GST_ELEMENT (src)->pads;
+ pads = gst_element_get_pad_list (element);
- while (pads) {
- GstPad *pad = GST_PAD (pads->data);
- GstBuffer *buf;
+ while (pads) {
+ GstPad *pad = GST_PAD (pads->data);
+ GstBuffer *buf;
- if (src->rt_num_buffers == 0) {
- src->eos = TRUE;
- }
- else {
- if (src->rt_num_buffers > 0)
- src->rt_num_buffers--;
- }
+ if (src->rt_num_buffers == 0) {
+ src->eos = TRUE;
+ }
+ else {
+ if (src->rt_num_buffers > 0)
+ src->rt_num_buffers--;
+ }
- if (src->eos) {
- gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
- return;
- }
+ if (src->eos) {
+ gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
+ return;
+ }
- buf = gst_fakesrc_create_buffer (src);
- GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
+ buf = gst_fakesrc_create_buffer (src);
+ GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
- if (!src->silent) {
- gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
- GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
- }
+ if (!src->silent) {
+ gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
+ GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
+ }
- g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
+ g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
- gst_pad_push (pad, buf);
+ gst_pad_push (pad, buf);
- pads = g_list_next (pads);
- }
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+ pads = g_list_next (pads);
+ }
}
static GstElementStateReturn
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
+#include <string.h>
/**********************************************************************
/* mmap() the data into this new buffer */
GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset);
if (GST_BUFFER_DATA(buf) == NULL) {
- fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n");
+ gst_element_error (GST_ELEMENT (src), "couldn't map file");
} else if (GST_BUFFER_DATA(buf) == MAP_FAILED) {
- g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s",
- size, src->fd, offset, sys_errlist[errno]);
+ gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s",
+ size, src->fd, offset, strerror (errno));
}
#ifdef MADV_SEQUENTIAL
/* madvise to tell the kernel what to do with it */
/* open the file */
src->fd = open (src->filename, O_RDONLY);
if (src->fd < 0) {
- perror ("open");
- gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL));
+ gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)",
+ src->filename, strerror (errno), NULL);
return FALSE;
} else {
/* find the file length */
{
g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN));
- g_print ("close\n");
/* close the file */
close (src->fd);
src->fd = 0;
src->filelen = 0;
src->curoffset = 0;
+ if (src->mapbuf)
+ gst_buffer_unref (src->mapbuf);
GST_FLAG_UNSET (src, GST_FILESRC_OPEN);
}
{
GstFileSrc *src = GST_FILESRC(element);
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
- gst_filesrc_close_file (GST_FILESRC (element));
- } if (GST_STATE_PENDING (element) == GST_STATE_READY) {
- src->curoffset = 0;
- } else {
-
- if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
- if (!gst_filesrc_open_file (GST_FILESRC (element)))
- return GST_STATE_FAILURE;
- }
+ switch (GST_STATE_TRANSITION (element)) {
+ case GST_STATE_NULL_TO_READY:
+ if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
+ if (!gst_filesrc_open_file (GST_FILESRC (element)))
+ return GST_STATE_FAILURE;
+ }
+ break;
+ case GST_STATE_READY_TO_NULL:
+ if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
+ gst_filesrc_close_file (GST_FILESRC (element));
+ break;
+ case GST_STATE_READY_TO_PAUSED:
+ case GST_STATE_PAUSED_TO_READY:
+ src->curoffset = 0;
+ default:
+ break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
identity = GST_IDENTITY (element);
- do {
- buf = gst_pad_pull (identity->sinkpad);
+ buf = gst_pad_pull (identity->sinkpad);
- for (i=identity->duplicate; i; i--) {
- if (!identity->silent)
- g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
+ for (i=identity->duplicate; i; i--) {
+ if (!identity->silent)
+ g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
- g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
+ g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
buf);
- if (i>1)
- gst_buffer_ref (buf);
-
- gst_pad_push (identity->srcpad, buf);
+ if (i>1)
+ gst_buffer_ref (buf);
- if (identity->sleep_time)
- usleep (identity->sleep_time);
- }
+ gst_pad_push (identity->srcpad, buf);
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+ if (identity->sleep_time)
+ usleep (identity->sleep_time);
+ }
}
static void
bin->numchildren = 0;
bin->children = NULL;
- bin->eoscond = g_cond_new ();
}
/**
}
void
-gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState new,
- GstElement * child)
+gst_bin_child_state_change (GstBin *bin, GstElementState old, GstElementState new,
+ GstElement *child)
{
gint old_idx = 0, new_idx = 0, i;
GST_UNLOCK (bin);
}
+void
+gst_bin_child_error (GstBin *bin, GstElement *child)
+{
+ if (GST_STATE (bin) != GST_STATE_NULL) {
+ /*
+ GST_STATE_PENDING (bin) = ((GST_STATE (bin) >> 1));
+ if (gst_element_set_state (bin, GST_STATE (bin)>>1) != GST_STATE_SUCCESS) {
+ gst_element_error (GST_ELEMENT (bin), "bin \"%s\" couldn't change state on error from child \"%s\"",
+ GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
+ }
+ */
+ gst_element_info (GST_ELEMENT (bin), "bin \"%s\" stopped because child \"%s\" signalled an error",
+ GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child));
+ }
+}
+
static void
gst_bin_send_event (GstElement *element, GstEvent *event)
{
gst_element_set_state (child, old_state);
if (GST_ELEMENT_SCHED (child) == GST_ELEMENT_SCHED (element)) {
+ /* reset to what is was */
+ GST_STATE_PENDING (element) = old_state;
+ gst_bin_change_state (element);
return GST_STATE_FAILURE;
}
break;
bin->children = NULL;
bin->numchildren = 0;
- g_cond_free (bin->eoscond);
-
G_OBJECT_CLASS (parent_class)->dispose (object);
}
# define GST_IS_BIN(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_BIN))
# define GST_IS_BIN_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_BIN))
-#define GST_BIN_FAST(obj) ((GstBin*)(obj))
-#define GST_BIN_CLASS_FAST(klass) ((GstBinClass*)(klass))
+#define GST_BIN_CAST(obj) ((GstBin*)(obj))
+#define GST_BIN_CLASS_CAST(klass) ((GstBinClass*)(klass))
#ifdef GST_TYPE_PARANOID
# define GST_BIN(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_BIN, GstBin))
# define GST_BIN_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BIN, GstBinClass))
#else
-# define GST_BIN GST_BIN_FAST
-# define GST_BIN_CLASS GST_BIN_CLASS_FAST
+# define GST_BIN GST_BIN_CAST
+# define GST_BIN_CLASS GST_BIN_CLASS_CAST
#endif
typedef enum {
/* our children */
gint numchildren;
GList *children;
- GCond *eoscond;
GstElementState child_states[GST_NUM_STATES];
- cothread_context *threadcontext;
+ gpointer sched_private;
};
struct _GstBinClass {
/* internal */
void gst_bin_child_state_change (GstBin *bin, GstElementState oldstate,
GstElementState newstate, GstElement *child);
+void gst_bin_child_error (GstBin *bin, GstElement *child);
#ifdef __cplusplus
}
static GMemChunk *_gst_buffer_chunk;
static GMutex *_gst_buffer_chunk_lock;
+static gint _gst_buffer_live;
void
_gst_buffer_initialize (void)
_gst_buffer_chunk_lock = g_mutex_new ();
_gst_buffer_type = g_type_register_static (G_TYPE_INT, "GstBuffer", &buffer_info, 0);
+
+ _gst_buffer_live = 0;
+}
+
+/**
+ * gst_buffer_print_stats:
+ *
+ * Print statistics about live buffers.
+ */
+void
+gst_buffer_print_stats (void)
+{
+ g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO,
+ "%d live buffers", _gst_buffer_live);
}
/**
g_mutex_lock (_gst_buffer_chunk_lock);
buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
+ _gst_buffer_live++;
g_mutex_unlock (_gst_buffer_chunk_lock);
GST_INFO (GST_CAT_BUFFER,"creating new buffer %p",buffer);
g_mutex_lock (_gst_buffer_chunk_lock);
buffer = g_mem_chunk_alloc (_gst_buffer_chunk);
- GST_DATA_TYPE(buffer) = _gst_buffer_type;
+ _gst_buffer_live++;
g_mutex_unlock (_gst_buffer_chunk_lock);
GST_INFO (GST_CAT_BUFFER,"creating new subbuffer %p from parent %p (size %u, offset %u)",
buffer, parent, size, offset);
+ GST_DATA_TYPE(buffer) = _gst_buffer_type;
buffer->lock = g_mutex_new ();
#ifdef HAVE_ATOMIC_H
atomic_set (&buffer->refcount, 1);
/* remove it entirely from memory */
g_mutex_lock (_gst_buffer_chunk_lock);
g_mem_chunk_free (_gst_buffer_chunk,buffer);
+ _gst_buffer_live--;
g_mutex_unlock (_gst_buffer_chunk_lock);
}
gboolean gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2);
+void gst_buffer_print_stats (void);
+
#ifdef __cplusplus
}
#endif /* __cplusplus */
element->numsinkpads = 0;
element->pads = NULL;
element->loopfunc = NULL;
- element->threadstate = NULL;
element->sched = NULL;
+ element->sched_private = NULL;
element->state_mutex = g_mutex_new ();
element->state_cond = g_cond_new ();
}
gst_element_error (GstElement *element, const gchar *error, ...)
{
va_list var_args;
+ GstObject *parent;
+ g_return_if_fail (GST_IS_ELEMENT (element));
+ g_return_if_fail (error != NULL);
+
va_start (var_args, error);
gst_element_message (element, "error", error, var_args);
va_end (var_args);
+
+ parent = GST_ELEMENT_PARENT (element);
+
+ if (parent && GST_IS_BIN (parent)) {
+ gst_bin_child_error (GST_BIN (parent), element);
+ }
+
+ if (element->sched) {
+ gst_scheduler_error (element->sched, element);
+ }
+
}
/**
{
va_list var_args;
+ g_return_if_fail (GST_IS_ELEMENT (element));
+ g_return_if_fail (info != NULL);
+
va_start (var_args, info);
gst_element_message (element, "info", info, var_args);
va_end (var_args);
gst_element_change_state (GstElement *element)
{
GstElementState old_state;
- //GstEvent *event;
+ GstObject *parent;
g_return_val_if_fail (element != NULL, GST_STATE_FAILURE);
g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
old_state = GST_STATE (element);
if (GST_STATE_PENDING (element) == GST_STATE_VOID_PENDING || old_state == GST_STATE_PENDING (element)) {
- GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)\n", GST_ELEMENT_NAME (element));
+ GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)", GST_ELEMENT_NAME (element));
return GST_STATE_SUCCESS;
}
GST_STATE_TRANSITION (element));
/* tell the scheduler if we have one */
- if (element->sched)
- gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element));
+ if (element->sched) {
+ if (gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element))
+ != GST_STATE_SUCCESS) {
+ return GST_STATE_FAILURE;
+ }
+ }
GST_STATE (element) = GST_STATE_PENDING (element);
GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING;
g_cond_signal (element->state_cond);
g_mutex_unlock (element->state_mutex);
- if (GST_ELEMENT_PARENT (element)) {
- gst_bin_child_state_change (GST_BIN (GST_ELEMENT_PARENT (element)), old_state, GST_STATE (element), element);
+ parent = GST_ELEMENT_PARENT (element);
+
+ if (parent && GST_IS_BIN (parent)) {
+ gst_bin_child_state_change (GST_BIN (parent), old_state, GST_STATE (element), element);
}
- //event = gst_event_new_state_change (old_state, GST_STATE (element));
- //gst_element_send_event (element, event);
return GST_STATE_SUCCESS;
}
GST_DEBUG(GST_CAT_EVENT, "signaling EOS on element %s\n",GST_OBJECT_NAME(element));
g_signal_emit (G_OBJECT (element), gst_element_signals[EOS], 0);
- GST_FLAG_SET(element,GST_ELEMENT_COTHREAD_STOPPING);
}
#define GST_TYPE_ELEMENT (_gst_element_type)
-#define GST_ELEMENT_FAST(obj) ((GstElement*)(obj))
-#define GST_ELEMENT_CLASS_FAST(klass) ((GstElementClass*)(klass))
+#define GST_ELEMENT_CAST(obj) ((GstElement*)(obj))
+#define GST_ELEMENT_CLASS_CAST(klass) ((GstElementClass*)(klass))
#define GST_IS_ELEMENT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_ELEMENT))
#define GST_IS_ELEMENT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_ELEMENT))
# define GST_ELEMENT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_ELEMENT, GstElement))
# define GST_ELEMENT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_ELEMENT, GstElementClass))
#else
-# define GST_ELEMENT GST_ELEMENT_FAST
-# define GST_ELEMENT_CLASS GST_ELEMENT_CLASS_FAST
+# define GST_ELEMENT GST_ELEMENT_CAST
+# define GST_ELEMENT_CLASS GST_ELEMENT_CLASS_CAST
#endif
typedef enum {
/* this element is incable of seeking (FIXME: does this apply to filters?) */
GST_ELEMENT_NO_SEEK,
- /***** !!!!! need to have a flag that says that an element must
- *not* be an entry into a scheduling chain !!!!! *****/
- /* this element for some reason doesn't obey COTHREAD_STOPPING, or
- has some other reason why it can't be the entry */
- GST_ELEMENT_NO_ENTRY,
+ /* this element, for some reason, has a loop function that performs
+ * an infinite loop without calls to gst_element_yield () */
+ GST_ELEMENT_INFINITE_LOOP,
+
+ /* private flags that can be used by the scheduler */
+ GST_ELEMENT_SCHEDULER_PRIVATE1,
+ GST_ELEMENT_SCHEDULER_PRIVATE2,
/* there is a new loopfunction ready for placement */
GST_ELEMENT_NEW_LOOPFUNC,
- /* the cothread holding this element needs to be stopped */
- GST_ELEMENT_COTHREAD_STOPPING,
- /* the element has to be scheduled as a cothread for any sanity */
- GST_ELEMENT_USE_COTHREAD,
/* if this element can handle events */
GST_ELEMENT_EVENT_AWARE,
} GstElementFlags;
#define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED))
-#define GST_ELEMENT_IS_COTHREAD_STOPPING(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING))
#define GST_ELEMENT_IS_EOS(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS))
#define GST_ELEMENT_IS_EVENT_AWARE(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE))
guint8 current_state;
guint8 pending_state;
GstElement *manager;
- GstScheduler *sched;
GstElementLoopFunction loopfunc;
- cothread_state *threadstate;
+
+ GstScheduler *sched;
+ gpointer sched_private;
/* element pads */
guint16 numpads;
void gst_element_set_parent (GstElement *element, GstObject *parent);
GstObject* gst_element_get_parent (GstElement *element);
+#define gst_element_yield(element) gst_scheduler_yield(GST_ELEMENT_SCHED(element),element)
+#define gst_element_interrupt(element) gst_scheduler_interrupt(GST_ELEMENT_SCHED(element),element)
void gst_element_set_sched (GstElement *element, GstScheduler *sched);
GstScheduler* gst_element_get_sched (GstElement *element);
# define GST_IS_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_OBJECT))
# define GST_IS_OBJECT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_OBJECT))
-#define GST_OBJECT_FAST(obj) ((GstObject*)(obj))
-#define GST_OBJECT_CLASS_FAST(klass) ((GstObjectClass*)(klass))
+#define GST_OBJECT_CAST(obj) ((GstObject*)(obj))
+#define GST_OBJECT_CLASS_CAST(klass) ((GstObjectClass*)(klass))
#ifdef GST_TYPE_PARANOID
# define GST_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_OBJECT, GstObject))
# define GST_OBJECT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_OBJECT, GstObjectClass))
#else
-# define GST_OBJECT GST_OBJECT_FAST
-# define GST_OBJECT_CLASS GST_OBJECT_CLASS_FAST
+# define GST_OBJECT GST_OBJECT_CAST
+# define GST_OBJECT_CLASS GST_OBJECT_CLASS_CAST
#endif
/*typedef struct _GstObject GstObject; */
#endif
};
-#define GST_FLAGS(obj) (GST_OBJECT (obj)->flags)
+#define GST_FLAGS(obj) (GST_OBJECT_CAST (obj)->flags)
#define GST_FLAG_IS_SET(obj,flag) (GST_FLAGS (obj) & (1<<(flag)))
#define GST_FLAG_SET(obj,flag) G_STMT_START{ (GST_FLAGS (obj) |= (1<<(flag))); }G_STMT_END
#define GST_FLAG_UNSET(obj,flag) G_STMT_START{ (GST_FLAGS (obj) &= ~(1<<(flag))); }G_STMT_END
#define GST_OBJECT_FLOATING(obj) (GST_FLAG_IS_SET (obj, GST_FLOATING))
/* object locking */
-#define GST_LOCK(obj) (g_mutex_lock(GST_OBJECT(obj)->lock))
-#define GST_TRYLOCK(obj) (g_mutex_trylock(GST_OBJECT(obj)->lock))
-#define GST_UNLOCK(obj) (g_mutex_unlock(GST_OBJECT(obj)->lock))
-#define GST_GET_LOCK(obj) (GST_OBJECT(obj)->lock)
+#define GST_LOCK(obj) (g_mutex_lock(GST_OBJECT_CAST(obj)->lock))
+#define GST_TRYLOCK(obj) (g_mutex_trylock(GST_OBJECT_CAST(obj)->lock))
+#define GST_UNLOCK(obj) (g_mutex_unlock(GST_OBJECT_CAST(obj)->lock))
+#define GST_GET_LOCK(obj) (GST_OBJECT_CAST(obj)->lock)
/* normal GObject stuff */
pad->direction = GST_PAD_UNKNOWN;
pad->peer = NULL;
+ pad->sched = NULL;
+ pad->sched_private = NULL;
+
pad->chainfunc = NULL;
pad->getfunc = NULL;
pad->getregionfunc = NULL;
gst_pad_connect (GstPad *srcpad,
GstPad *sinkpad)
{
- if (!gst_pad_try_connect (srcpad, sinkpad))
-/* FIXME: g_critical is glib-2.0, not glib-1.2
- g_critical ("couldn't connect %s:%s and %s:%s",
-*/
+ if (!gst_pad_try_connect (srcpad, sinkpad)) {
g_warning ("couldn't connect %s:%s and %s:%s",
GST_DEBUG_PAD_NAME (srcpad),
GST_DEBUG_PAD_NAME (sinkpad));
+ }
}
/**
GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad));
g_return_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC);
+
if (!peer) {
- g_warning ("gst_pad_push but %s:%s is unconnected", GST_DEBUG_PAD_NAME (pad));
- return;
+ g_warning ("push on pad %s:%s but it is unconnected", GST_DEBUG_PAD_NAME (pad));
}
-
- if (peer->chainhandler) {
- GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n",
- GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer)));
- (peer->chainhandler) (GST_PAD_FAST (peer), buf);
- }
else {
- g_warning ("gst_pad_push but %s:%s has but no chainhandler", GST_DEBUG_PAD_NAME (peer));
+ if (peer->chainhandler) {
+ if (buf) {
+ GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n",
+ GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer)));
+ (peer->chainhandler) (GST_PAD_CAST (peer), buf);
+ return;
+ }
+ else {
+ g_warning ("trying to push a NULL buffer on pad %s:%s", GST_DEBUG_PAD_NAME (peer));
+ return;
+ }
+ }
+ else {
+ g_warning ("(internal error) push on pad %s:%s but it has no chainhandler", GST_DEBUG_PAD_NAME (peer));
+ }
+ }
+ /* clean up the mess here */
+ if (buf != NULL) {
+ if (GST_IS_BUFFER (buf))
+ gst_buffer_unref (buf);
+ else
+ gst_pad_event_default (pad, GST_EVENT (buf));
}
}
#endif
g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL);
if (!peer) {
- g_warning ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad));
- return NULL;
+ gst_element_error (GST_PAD_PARENT (pad),
+ "pull on pad %s:%s but it was unconnected",
+ GST_ELEMENT_NAME (GST_PAD_PARENT (pad)), GST_PAD_NAME (pad),
+ NULL);
}
-
- if (peer->gethandler) {
- GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n",
- GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
- return (peer->gethandler) (GST_PAD_FAST (peer));
- } else {
- g_warning ("gst_pad_pull but %s:%s has no gethandler", GST_DEBUG_PAD_NAME (peer));
- return NULL;
+ else {
+ if (peer->gethandler) {
+ GstBuffer *buf;
+
+ GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n",
+ GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer));
+
+ buf = (peer->gethandler) (GST_PAD_CAST (peer));
+ if (buf)
+ return buf;
+ /* no null buffers allowed */
+ gst_element_error (GST_PAD_PARENT (pad),
+ "NULL buffer during pull on %s:%s", GST_DEBUG_PAD_NAME (pad), NULL);
+
+ } else {
+ gst_element_error (GST_PAD_PARENT (pad),
+ "(internal error) pull on pad %s:%s but the peer pad %s:%s has no gethandler",
+ GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer),
+ NULL);
+ }
}
+ return NULL;
}
#endif
if (peer->pullregionfunc) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling pullregionfunc &%s of peer pad %s:%s\n",
- GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_FAST (peer)));
- result = (peer->pullregionfunc) (GST_PAD_FAST (peer), type, offset, len);
+ GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_CAST (peer)));
+ result = (peer->pullregionfunc) (GST_PAD_CAST (peer), type, offset, len);
} else {
GST_DEBUG (GST_CAT_DATAFLOW,"no pullregionfunc\n");
result = NULL;
pads = g_list_next (pads);
}
}
+ /* we have to try to schedule another element because this one is deisabled */
+ gst_element_yield (element);
break;
default:
g_warning ("no default handler for event\n");
*/
#define GST_TYPE_PAD (_gst_pad_type)
-#define GST_PAD_FAST(obj) ((GstPad*)(obj))
-#define GST_PAD_CLASS_FAST(klass) ((GstPadClass*)(klass))
+#define GST_PAD_CAST(obj) ((GstPad*)(obj))
+#define GST_PAD_CLASS_CAST(klass) ((GstPadClass*)(klass))
#define GST_IS_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_PAD))
#define GST_IS_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD || \
G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD)
# define GST_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_PAD, GstPad))
# define GST_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_PAD, GstPadClass))
#else
-# define GST_PAD GST_PAD_FAST
-# define GST_PAD_CLASS GST_PAD_CLASS_FAST
+# define GST_PAD GST_PAD_CAST
+# define GST_PAD_CLASS GST_PAD_CLASS_CAST
#endif
/*
*/
#define GST_TYPE_REAL_PAD (_gst_real_pad_type)
-#define GST_REAL_PAD_FAST(obj) ((GstRealPad*)(obj))
-#define GST_REAL_PAD_CLASS_FAST(klass) ((GstRealPadClass*)(klass))
+#define GST_REAL_PAD_CAST(obj) ((GstRealPad*)(obj))
+#define GST_REAL_PAD_CLASS_CAST(klass) ((GstRealPadClass*)(klass))
#define GST_IS_REAL_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_REAL_PAD))
#define GST_IS_REAL_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD)
#define GST_IS_REAL_PAD_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_REAL_PAD))
# define GST_REAL_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_REAL_PAD, GstRealPad))
# define GST_REAL_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_REAL_PAD, GstRealPadClass))
#else
-# define GST_REAL_PAD GST_REAL_PAD_FAST
-# define GST_REAL_PAD_CLASS GST_REAL_PAD_CLASS_FAST
+# define GST_REAL_PAD GST_REAL_PAD_CAST
+# define GST_REAL_PAD_CLASS GST_REAL_PAD_CLASS_CAST
#endif
/*
*/
#define GST_TYPE_GHOST_PAD (_gst_ghost_pad_type)
-#define GST_GHOST_PAD_FAST(obj) ((GstGhostPad*)(obj))
-#define GST_GHOST_PAD_CLASS_FAST(klass) ((GstGhostPadClass*)(klass))
+#define GST_GHOST_PAD_CAST(obj) ((GstGhostPad*)(obj))
+#define GST_GHOST_PAD_CLASS_CAST(klass) ((GstGhostPadClass*)(klass))
#define GST_IS_GHOST_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_GHOST_PAD))
#define GST_IS_GHOST_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD)
#define GST_IS_GHOST_PAD_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_GHOST_PAD))
# define GST_GHOST_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_GHOST_PAD, GstGhostPad))
# define GST_GHOST_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_GHOST_PAD, GstGhostPadClass))
#else
-# define GST_GHOST_PAD GST_GHOST_PAD_FAST
-# define GST_GHOST_PAD_CLASS GST_GHOST_PAD_CLASS_FAST
+# define GST_GHOST_PAD GST_GHOST_PAD_CAST
+# define GST_GHOST_PAD_CLASS GST_GHOST_PAD_CLASS_CAST
#endif
GstCaps *caps;
GstPadDirection direction;
- cothread_state *threadstate;
+ GstScheduler *sched;
+ gpointer sched_private;
GstRealPad *peer;
guint64 offset;
guint64 len;
- GstScheduler *sched;
-
GstPadChainFunction chainfunc;
GstPadChainFunction chainhandler;
GstPadGetFunction getfunc;
#define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad)
/* Generic */
-#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
+#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad))
#define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad))
#define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad))
#define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad))
GList *pads;
gint elementcount = 0;
gint retval = 0;
+ gboolean backref = FALSE;
priv->binlevel++;
GstElement *new;
GST_DEBUG (0, "have pad for element %s\n", element_name);
- new = gst_bin_get_by_name (parent, element_name);
+ new = gst_bin_get_by_name_recurse_up (parent, element_name);
if (!new) {
GST_DEBUG (0, "element %s does not exist! trying to continue\n", element_name);
}
else {
previous = new;
srcpadname = ptr + 1;
+ backref = TRUE;
}
}
i++;
continue;
}
+ gst_bin_add (GST_BIN (parent), element);
j = gst_parse_launch_cmdline (argc - i, argv + i + 1, GST_BIN (element), priv);
/* check for parse error */
return GST_PARSE_ERROR_NOSUCH_ELEMENT;
}
GST_DEBUG (0, "CREATED element %s\n", GST_ELEMENT_NAME (element));
+ gst_bin_add (GST_BIN (parent), element);
}
- gst_bin_add (GST_BIN (parent), element);
elementcount++;
g_slist_free (sinkpads);
else
GST_DEBUG (0, "have sink pad %s:%s\n", GST_DEBUG_PAD_NAME (GST_PARSE_LISTPAD (sinkpads)));
- if (!srcpads && sinkpads && previous) {
+ if (!srcpads && sinkpads && previous && srcpadname) {
dyn_connect *connect = g_malloc (sizeof (dyn_connect));
connect->srcpadname = srcpadname;
sinkpads = NULL;
/* if we're the first element, ghost all the sinkpads */
- if (elementcount == 1) {
+ if (elementcount == 1 && !backref) {
DEBUG ("first element, ghosting all of %s's sink pads to parent %s\n",
GST_ELEMENT_NAME (element), GST_ELEMENT_NAME (GST_ELEMENT (parent)));
pads = gst_element_get_pad_list (element);
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
+ ARG_MAY_DEADLOCK,
};
static void gst_queue_class_init (GstQueueClass *klass);
static void gst_queue_init (GstQueue *queue);
+static void gst_queue_dispose (GObject *object);
static void gst_queue_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
- g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
- g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
- GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
- g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
- g_param_spec_int("level","Level","How many buffers are in the queue.",
- 0,G_MAXINT,0,G_PARAM_READABLE));
- g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
- g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
- 0,G_MAXINT,100,G_PARAM_READWRITE));
-
- gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
- gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
+ g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
+ GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
+ g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
+ 0, G_MAXINT, 0, G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
+ g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
+ 0, G_MAXINT, 100, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
+ g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
+ TRUE, G_PARAM_READWRITE));
+
+ gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
+ gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
+ gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
}
queue->size_buffers = 100; /* 100 buffers */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = 1000000000LL; /* 1sec */
+ queue->may_deadlock = TRUE;
queue->qlock = g_mutex_new ();
queue->reader = FALSE;
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
+static void
+gst_queue_dispose (GObject *object)
+{
+ GstQueue *queue = GST_QUEUE (object);
+
+ g_mutex_free (queue->qlock);
+ g_cond_free (queue->not_empty);
+ g_cond_free (queue->not_full);
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
static GstBufferPool*
gst_queue_get_bufferpool (GstPad *pad)
{
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- cothread_switch(cothread_current_main());
+ gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
- g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ /* try to signal to resolve the error */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
+ return;
+ }
+ else {
+ gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
+ }
+ }
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->writer)
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- cothread_switch(cothread_current_main());
+ gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
- g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
+ return NULL;
+ }
+ else {
+ gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
+ }
+ }
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->reader)
switch (prop_id) {
case ARG_LEAKY:
- queue->leaky = g_value_get_int(value);
+ queue->leaky = g_value_get_int (value);
break;
case ARG_MAX_LEVEL:
- queue->size_buffers = g_value_get_int(value);
+ queue->size_buffers = g_value_get_int (value);
+ break;
+ case ARG_MAY_DEADLOCK:
+ queue->may_deadlock = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case ARG_LEAKY:
- g_value_set_int(value, queue->leaky);
+ g_value_set_int (value, queue->leaky);
break;
case ARG_LEVEL:
- g_value_set_int(value, queue->level_buffers);
+ g_value_set_int (value, queue->level_buffers);
break;
case ARG_MAX_LEVEL:
- g_value_set_int(value, queue->size_buffers);
+ g_value_set_int (value, queue->size_buffers);
+ break;
+ case ARG_MAY_DEADLOCK:
+ g_value_set_boolean (value, queue->may_deadlock);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
guint64 size_time; /* size of queue in time */
gint leaky; /* whether the queue is leaky, and if so at which end */
+ gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
GMutex *qlock; /* lock for queue (vs object lock) */
/* we are single reader and single writer queue */
void
gst_scheduler_setup (GstScheduler *sched)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+
if (CLASS (sched)->setup)
CLASS (sched)->setup (sched);
}
void
gst_scheduler_reset (GstScheduler *sched)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+
if (CLASS (sched)->reset)
CLASS (sched)->reset (sched);
}
void
gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_PAD (srcpad));
+ g_return_if_fail (GST_IS_PAD (sinkpad));
+
if (CLASS (sched)->pad_connect)
CLASS (sched)->pad_connect (sched, srcpad, sinkpad);
}
void
gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_PAD (srcpad));
+ g_return_if_fail (GST_IS_PAD (sinkpad));
+
if (CLASS (sched)->pad_disconnect)
CLASS (sched)->pad_disconnect (sched, srcpad, sinkpad);
}
GstPad *
gst_scheduler_pad_select (GstScheduler *sched, GList *padlist)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (padlist != NULL);
+
if (CLASS (sched)->pad_select)
CLASS (sched)->pad_select (sched, padlist);
}
void
gst_scheduler_add_element (GstScheduler *sched, GstElement *element)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
if (CLASS (sched)->add_element)
CLASS (sched)->add_element (sched, element);
}
*
* Tell the scheduler that an element changed its state.
*/
-void
+GstElementStateReturn
gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
{
+ g_return_val_if_fail (GST_IS_SCHEDULER (sched), GST_STATE_FAILURE);
+ g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE);
+
if (CLASS (sched)->state_transition)
- CLASS (sched)->state_transition (sched, element, transition);
+ return CLASS (sched)->state_transition (sched, element, transition);
+
+ return GST_STATE_SUCCESS;
}
/**
void
gst_scheduler_remove_element (GstScheduler *sched, GstElement *element)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
if (CLASS (sched)->remove_element)
CLASS (sched)->remove_element (sched, element);
}
void
gst_scheduler_lock_element (GstScheduler *sched, GstElement *element)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
if (CLASS (sched)->lock_element)
CLASS (sched)->lock_element (sched, element);
}
void
gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
if (CLASS (sched)->unlock_element)
CLASS (sched)->unlock_element (sched, element);
}
/**
+ * gst_scheduler_error:
+ * @sched: the schedulerr
+ * @element: the element with the error
+ *
+ * Tell the scheduler an element was in error
+ */
+void
+gst_scheduler_error (GstScheduler *sched, GstElement *element)
+{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
+ if (CLASS (sched)->error)
+ CLASS (sched)->error (sched, element);
+}
+
+/**
+ * gst_scheduler_yield:
+ * @sched: the schedulerr
+ * @element: the element requesting a yield
+ *
+ * Tell the scheduler to schedule another element.
+ */
+void
+gst_scheduler_yield (GstScheduler *sched, GstElement *element)
+{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
+ if (CLASS (sched)->yield)
+ CLASS (sched)->yield (sched, element);
+}
+
+/**
+ * gst_scheduler_interrupt:
+ * @sched: the schedulerr
+ * @element: the element requesting an interrupt
+ *
+ * Tell the scheduler to interrupt execution of this element.
+ */
+void
+gst_scheduler_interrupt (GstScheduler *sched, GstElement *element)
+{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+ g_return_if_fail (GST_IS_ELEMENT (element));
+
+ if (CLASS (sched)->interrupt)
+ CLASS (sched)->interrupt (sched, element);
+}
+
+/**
* gst_scheduler_iterate:
* @sched: the schedulerr
*
gboolean
gst_scheduler_iterate (GstScheduler *sched)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+
if (CLASS (sched)->iterate)
CLASS (sched)->iterate (sched);
}
void
gst_scheduler_show (GstScheduler *sched)
{
+ g_return_if_fail (GST_IS_SCHEDULER (sched));
+
if (CLASS (sched)->show)
CLASS (sched)->show (sched);
}
GstObject object;
GstElement *parent;
-
- GList *elements;
- gint num_elements;
-
- GList *chains;
- gint num_chains;
};
struct _GstSchedulerClass {
GstObjectClass parent_class;
+ /* virtual methods */
void (*setup) (GstScheduler *sched);
void (*reset) (GstScheduler *sched);
void (*add_element) (GstScheduler *sched, GstElement *element);
- void (*remove_element) (GstScheduler *sched, GstElement *element);
- void (*state_transition) (GstScheduler *sched, GstElement *element, gint transition);
+ void (*remove_element) (GstScheduler *sched, GstElement *element);
+ GstElementStateReturn
+ (*state_transition) (GstScheduler *sched, GstElement *element, gint transition);
void (*lock_element) (GstScheduler *sched, GstElement *element);
void (*unlock_element) (GstScheduler *sched, GstElement *element);
+ void (*yield) (GstScheduler *sched, GstElement *element);
+ void (*interrupt) (GstScheduler *sched, GstElement *element);
+ void (*error) (GstScheduler *sched, GstElement *element);
void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void (*pad_select) (GstScheduler *sched, GList *padlist);
gboolean (*iterate) (GstScheduler *sched);
-
/* for debugging */
void (*show) (GstScheduler *sched);
+
+ /* signals go here */
};
GType gst_scheduler_get_type (void);
void gst_scheduler_reset (GstScheduler *sched);
void gst_scheduler_add_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_remove_element (GstScheduler *sched, GstElement *element);
-void gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
+GstElementStateReturn gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element);
void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
+void gst_scheduler_yield (GstScheduler *sched, GstElement *element);
+void gst_scheduler_interrupt (GstScheduler *sched, GstElement *element);
+void gst_scheduler_error (GstScheduler *sched, GstElement *element);
void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
GstPad* gst_scheduler_pad_select (GstScheduler *sched, GList *padlist);
thread->ppid = getpid();
thread->thread_id = -1;
-
-/* gst_element_set_manager(GST_ELEMENT(thread),GST_ELEMENT(thread)); */
}
static void
break;
case GST_STATE_PLAYING_TO_PAUSED:
{
- GList *elements = (element->sched)->elements;
+ //GList *elements = (element->sched)->elements;
+ GList *elements = gst_bin_get_list(GST_BIN (thread));
THR_INFO ("pausing thread");
* + the pending state was already set by gstelement.c::set_state()
* + find every queue we manage, and signal its empty and full conditions
*/
- g_mutex_lock (thread->lock);
+ g_mutex_lock (thread->lock);
GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING);
if (GST_STATE(typefind) != oldstate) {
gst_object_unref (GST_OBJECT (typefind));
GST_DEBUG(0, "state changed during signal, aborting\n");
- cothread_switch(cothread_current_main());
+ gst_element_yield (typefind);
}
gst_object_unref (GST_OBJECT (typefind));
}
typedef struct _GstSchedulerChain GstSchedulerChain;
+#define GST_PAD_THREADSTATE(pad) (cothread_state*) (GST_PAD_CAST (pad)->sched_private)
+#define GST_ELEMENT_THREADSTATE(elem) (cothread_state*) (GST_ELEMENT_CAST (elem)->sched_private)
+#define GST_BIN_THREADCONTEXT(bin) (cothread_context*) (GST_BIN_CAST (bin)->sched_private)
+
+#define GST_ELEMENT_COTHREAD_STOPPING GST_ELEMENT_SCHEDULER_PRIVATE1
+#define GST_ELEMENT_IS_COTHREAD_STOPPING(element) GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING)
+
+typedef struct _GstBasicScheduler GstBasicScheduler;
+typedef struct _GstBasicSchedulerClass GstBasicSchedulerClass;
+
struct _GstSchedulerChain {
- GstScheduler *sched;
+ GstBasicScheduler *sched;
GList *disabled;
gboolean schedule;
};
+#define GST_TYPE_BASIC_SCHEDULER \
+ (gst_basic_scheduler_get_type())
+#define GST_BASIC_SCHEDULER(obj) \
+ (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASIC_SCHEDULER,GstBasicScheduler))
+#define GST_BASIC_SCHEDULER_CLASS(klass) \
+ (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASIC_SCHEDULER,GstBasicSchedulerClass))
+#define GST_IS_BASIC_SCHEDULER(obj) \
+ (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASIC_SCHEDULER))
+#define GST_IS_BASIC_SCHEDULER_CLASS(obj) \
+ (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER))
+
+#define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched))
+
+struct _GstBasicScheduler {
+ GstScheduler parent;
+
+ GList *elements;
+ gint num_elements;
+
+ GList *chains;
+ gint num_chains;
+};
+
+struct _GstBasicSchedulerClass {
+ GstSchedulerClass parent_class;
+};
+
static GType _gst_basic_scheduler_type = 0;
-static void gst_basic_scheduler_class_init (GstSchedulerClass * klass);
-static void gst_basic_scheduler_init (GstScheduler * scheduler);
+static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass);
+static void gst_basic_scheduler_init (GstBasicScheduler * scheduler);
static void gst_basic_scheduler_dispose (GObject *object);
static void gst_basic_scheduler_reset (GstScheduler *sched);
static void gst_basic_scheduler_add_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_remove_element (GstScheduler *sched, GstElement *element);
-static void gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
+static GstElementStateReturn
+ gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition);
static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element);
+static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element);
+static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element);
+static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element);
static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad);
static GstPad* gst_basic_scheduler_pad_select (GstScheduler *sched, GList *padlist);
{
if (!_gst_basic_scheduler_type) {
static const GTypeInfo scheduler_info = {
- sizeof (GstSchedulerClass),
+ sizeof (GstBasicSchedulerClass),
NULL,
NULL,
(GClassInitFunc) gst_basic_scheduler_class_init,
NULL,
NULL,
- sizeof (GstScheduler),
+ sizeof (GstBasicScheduler),
0,
(GInstanceInitFunc) gst_basic_scheduler_init,
NULL
}
static void
-gst_basic_scheduler_class_init (GstSchedulerClass * klass)
+gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass)
{
GObjectClass *gobject_class;
GstObjectClass *gstobject_class;
+ GstSchedulerClass *gstscheduler_class;
gobject_class = (GObjectClass*)klass;
gstobject_class = (GstObjectClass*)klass;
+ gstscheduler_class = (GstSchedulerClass*)klass;
parent_class = g_type_class_ref (GST_TYPE_SCHEDULER);
gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose);
- klass->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
- klass->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
- klass->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
- klass->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
- klass->state_transition = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
- klass->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
- klass->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
- klass->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
- klass->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
- klass->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
- klass->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
+ gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup);
+ gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset);
+ gstscheduler_class->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element);
+ gstscheduler_class->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element);
+ gstscheduler_class->state_transition = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition);
+ gstscheduler_class->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element);
+ gstscheduler_class->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element);
+ gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_basic_scheduler_yield);
+ gstscheduler_class->interrupt = GST_DEBUG_FUNCPTR (gst_basic_scheduler_interrupt);
+ gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_basic_scheduler_error);
+ gstscheduler_class->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect);
+ gstscheduler_class->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect);
+ gstscheduler_class->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select);
+ gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate);
}
static void
-gst_basic_scheduler_init (GstScheduler *scheduler)
+gst_basic_scheduler_init (GstBasicScheduler *scheduler)
{
+ scheduler->elements = NULL;
+ scheduler->num_elements = 0;
+ scheduler->chains = NULL;
+ scheduler->num_chains = 0;
}
static void
gst_basic_scheduler_dispose (GObject *object)
{
-
G_OBJECT_CLASS (parent_class)->dispose (object);
}
static int
gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[])
{
- GstElement *element = GST_ELEMENT (argv);
+ GstElement *element = GST_ELEMENT_CAST (argv);
G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
GST_DEBUG_ENTER ("(%d,'%s')", argc, name);
GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name);
(element->loopfunc) (element);
GST_DEBUG (GST_CAT_DATAFLOW, "element %s ended loop function\n", name);
+
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING);
static int
gst_basic_scheduler_chain_wrapper (int argc, char *argv[])
{
- GstElement *element = GST_ELEMENT (argv);
+ GstElement *element = GST_ELEMENT_CAST (argv);
G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element);
GST_DEBUG_ENTER ("(\"%s\")", name);
pads = g_list_next (pads);
if (!GST_IS_REAL_PAD (pad))
continue;
- realpad = GST_REAL_PAD (pad);
+
+ realpad = GST_REAL_PAD_CAST (pad);
+
if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) {
GstBuffer *buf;
GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s\n", name,
GST_PAD_NAME (pad));
GST_RPAD_CHAINFUNC (realpad) (pad, buf);
+ GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
+ GST_PAD_NAME (pad));
}
}
- GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name,
- GST_PAD_NAME (pad));
+ else {
+ gst_element_error (element, "NULL buffer detected. Is \"%s:%s\" connected?",
+ name, GST_PAD_NAME (pad), NULL);
+ }
}
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
static int
gst_basic_scheduler_src_wrapper (int argc, char *argv[])
{
- GstElement *element = GST_ELEMENT (argv);
+ GstElement *element = GST_ELEMENT_CAST (argv);
GList *pads;
GstRealPad *realpad;
GstBuffer *buf = NULL;
do {
pads = element->pads;
while (pads) {
+
if (!GST_IS_REAL_PAD (pads->data))
continue;
- realpad = (GstRealPad *) (pads->data);
+
+ realpad = GST_REAL_PAD_CAST (pads->data);
+
pads = g_list_next (pads);
if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) {
GST_DEBUG (GST_CAT_DATAFLOW, "calling _getfunc for %s:%s\n", GST_DEBUG_PAD_NAME (realpad));
/* fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); */
/* else */
buf =
- (GST_RPAD_GETREGIONFUNC (realpad)) ((GstPad *) realpad, realpad->regiontype,
+ (GST_RPAD_GETREGIONFUNC (realpad)) (GST_PAD_CAST (realpad), realpad->regiontype,
realpad->offset, realpad->len);
realpad->regiontype = GST_REGION_VOID;
}
/* if (GST_RPAD_GETFUNC(realpad) == NULL) */
/* fprintf(stderr,"error, no getfunc in \"%s\"\n", name); */
/* else */
- buf = GST_RPAD_GETFUNC (realpad) ((GstPad *) realpad);
+ buf = GST_RPAD_GETFUNC (realpad) (GST_PAD_CAST (realpad));
}
GST_DEBUG (GST_CAT_DATAFLOW, "calling gst_pad_push on pad %s:%s\n",
GST_DEBUG_PAD_NAME (realpad));
- gst_pad_push ((GstPad *) realpad, buf);
+ gst_pad_push (GST_PAD_CAST (realpad), buf);
}
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
*/
while (GST_RPAD_BUFPEN (pad) != NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n",
- GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
- cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+ GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+ cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
/* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
- GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
- cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+ GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+ cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
}
/* now fill the bufferpen and switch so it can be consumed */
GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf;
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n",
- GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+ GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad;
GST_FLAG_UNSET (GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING);
- cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+ cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
g_print ("done switching\n");
GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n");
while (GST_RPAD_BUFPEN (pad) == NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n",
GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))),
- GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
- cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+ GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+ cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
/* we will loop switching to the peer until it's filled up the bufferpen */
while (GST_RPAD_BUFPEN (pad) == NULL) {
GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n",
- GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
- cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate);
+ GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
+ cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad)));
/* we may no longer be the same pad, check. */
if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) {
}
-static void
+static gboolean
gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain)
{
GList *elements;
GST_DEBUG (GST_CAT_SCHEDULING, "chain is using COTHREADS\n");
- g_assert (bin->threadcontext != NULL);
-
+ g_assert (GST_BIN_THREADCONTEXT (bin) != NULL);
/* walk through all the chain's elements */
elements = chain->elements;
while (elements) {
- element = GST_ELEMENT (elements->data);
+ gboolean decoupled;
+ gint same_sched = 0;
+
+ element = GST_ELEMENT_CAST (elements->data);
elements = g_list_next (elements);
+ decoupled = (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED) ? TRUE : FALSE);
+
/* start out without a wrapper function, we select it later */
wrapper_function = NULL;
else {
/* otherwise we need to decide what kind of cothread */
/* if it's not DECOUPLED, we decide based on whether it's a source or not */
- if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) {
+ if (!decoupled) {
/* if it doesn't have any sinks, it must be a source (duh) */
if (element->numsinkpads == 0) {
wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper);
/* now we have to walk through the pads to set up their state */
pads = gst_element_get_pad_list (element);
while (pads) {
+ GstPad *peerpad;
+
pad = GST_PAD (pads->data);
pads = g_list_next (pads);
+
if (!GST_IS_REAL_PAD (pad))
continue;
+
+ peerpad = GST_PAD (GST_RPAD_PEER (pad));
/* if the element is DECOUPLED or outside the manager, we have to chain */
if ((wrapper_function == NULL) ||
- (GST_RPAD_PEER (pad) &&
- (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched))
- ) {
+ (peerpad && (GST_ELEMENT_CAST (GST_PAD_PARENT (peerpad))->sched != GST_SCHEDULER (chain->sched)))) {
+
+ if (!decoupled && GST_RPAD_PEER (pad) &&
+ !GST_FLAG_IS_SET (GST_PAD_PARENT (peerpad), GST_ELEMENT_DECOUPLED)) {
+ /* whoa non decoupled with different schedulers */
+ gst_element_error (element, "element \"%s\" is not decoupled but has pads in different schedulers",
+ GST_ELEMENT_NAME (element), NULL);
+ return FALSE;
+ }
+
/* set the chain proxies */
if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) {
GST_DEBUG (GST_CAT_SCHEDULING, "copying chain function into push proxy for %s:%s\n",
GST_RPAD_PULLREGIONFUNC (pad) = GST_RPAD_GETREGIONFUNC (pad);
}
- /* otherwise we really are a cothread */
}
+ /* otherwise we really are a cothread */
else {
if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded push proxy for sinkpad %s:%s\n",
- GST_DEBUG_PAD_NAME (pad));
+ GST_DEBUG_PAD_NAME (pad));
GST_RPAD_CHAINHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy);
}
else {
GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded pull proxy for srcpad %s:%s\n",
- GST_DEBUG_PAD_NAME (pad));
+ GST_DEBUG_PAD_NAME (pad));
GST_RPAD_GETHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy);
GST_RPAD_PULLREGIONFUNC (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pullregionfunc_proxy);
}
/* need to set up the cothread now */
if (wrapper_function != NULL) {
- if (element->threadstate == NULL) {
- /* FIXME handle cothread_create returning NULL */
- element->threadstate = cothread_create (bin->threadcontext);
- GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", element->threadstate,
+ if (GST_ELEMENT_THREADSTATE (element) == NULL) {
+ GST_ELEMENT_THREADSTATE (element) = cothread_create (GST_BIN_THREADCONTEXT (bin));
+ if (GST_ELEMENT_THREADSTATE (element) == NULL) {
+ gst_element_error (element, "could not create cothread for \"%s\"",
+ GST_ELEMENT_NAME (element), NULL);
+ return FALSE;
+ }
+ GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n",
+ GST_ELEMENT_THREADSTATE (element),
GST_ELEMENT_NAME (element));
}
- cothread_setfunc (element->threadstate, wrapper_function, 0, (char **) element);
+ cothread_setfunc (GST_ELEMENT_THREADSTATE (element), wrapper_function, 0, (char **) element);
GST_DEBUG (GST_CAT_SCHEDULING, "set wrapper function for '%s' to &%s\n",
GST_ELEMENT_NAME (element), GST_DEBUG_FUNCPTR_NAME (wrapper_function));
}
}
+
+ return TRUE;
}
/*
static GstSchedulerChain *
-gst_basic_scheduler_chain_new (GstScheduler * sched)
+gst_basic_scheduler_chain_new (GstBasicScheduler * sched)
{
GstSchedulerChain *chain = g_new (GstSchedulerChain, 1);
static void
gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain)
{
- GstScheduler *sched = chain->sched;
+ GstBasicScheduler *sched = chain->sched;
/* remove the chain from the schedulers' list of chains */
sched->chains = g_list_remove (sched->chains, chain);
chain);
/* set the sched pointer for the element */
- element->sched = chain->sched;
+ element->sched = GST_SCHEDULER (chain->sched);
/* add the element to the list of 'disabled' elements */
chain->disabled = g_list_prepend (chain->disabled, element);
chain->num_elements++;
}
-static void
+static gboolean
gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement * element)
{
GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element),
chain->elements = g_list_prepend (chain->elements, element);
/* reschedule the chain */
- gst_basic_scheduler_cothreaded_chain (GST_BIN (chain->sched->parent), chain);
+ return gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain->sched)->parent), chain);
}
static void
gst_basic_scheduler_chain_disable_element (chain, element);
}
/* we have to check for a threadstate here because a queue doesn't have one */
- if (element->threadstate) {
- cothread_free (element->threadstate);
- element->threadstate = NULL;
+ if (GST_ELEMENT_THREADSTATE (element)) {
+ cothread_free (GST_ELEMENT_THREADSTATE (element));
+ GST_ELEMENT_THREADSTATE (element) = NULL;
}
/* remove the element from the list of elements */
/* if there are no more elements in the chain, destroy the chain */
if (chain->num_elements == 0)
gst_basic_scheduler_chain_destroy (chain);
-
- /* unset the sched pointer for the element */
- element->sched = NULL;
}
static void
-gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1, GstElement * element2)
+gst_basic_scheduler_chain_elements (GstBasicScheduler * sched, GstElement * element1, GstElement * element2)
{
GList *chains;
GstSchedulerChain *chain;
/* find the chain within the scheduler that holds the element, if any */
static GstSchedulerChain *
-gst_basic_scheduler_find_chain (GstScheduler * sched, GstElement * element)
+gst_basic_scheduler_find_chain (GstBasicScheduler * sched, GstElement * element)
{
GList *chains;
GstSchedulerChain *chain;
GstBin *bin = GST_BIN (sched->parent);
/* first create thread context */
- if (bin->threadcontext == NULL) {
+ if (GST_BIN_THREADCONTEXT (bin) == NULL) {
GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context\n");
- bin->threadcontext = cothread_context_init ();
+ GST_BIN_THREADCONTEXT (bin) = cothread_context_init ();
}
}
{
cothread_context *ctx;
GstBin *bin = GST_BIN (GST_SCHED_PARENT (sched));
- GList *elements = sched->elements;
+ GList *elements = GST_BASIC_SCHEDULER_CAST (sched)->elements;
while (elements) {
- GST_ELEMENT (elements->data)->threadstate = NULL;
+ GST_ELEMENT_THREADSTATE (elements->data) = NULL;
elements = g_list_next (elements);
}
- ctx = GST_BIN (GST_SCHED_PARENT (sched))->threadcontext;
+ ctx = GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched));
cothread_context_free (ctx);
- GST_BIN (GST_SCHED_PARENT (sched))->threadcontext = NULL;
+ GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)) = NULL;
}
static void
GstPad *pad;
GstElement *peerelement;
GstSchedulerChain *chain;
-
- g_return_if_fail (element != NULL);
- g_return_if_fail (GST_IS_ELEMENT (element));
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
/* if it's already in this scheduler, don't bother doing anything */
if (GST_ELEMENT_SCHED (element) == sched)
return;
/* first add it to the list of elements that are to be scheduled */
- sched->elements = g_list_prepend (sched->elements, element);
- sched->num_elements++;
+ bsched->elements = g_list_prepend (bsched->elements, element);
+ bsched->num_elements++;
/* create a chain to hold it, and add */
- chain = gst_basic_scheduler_chain_new (sched);
+ chain = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_add_element (chain, element);
/* set the sched pointer in all the pads */
if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) {
GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together");
/* make sure that the two elements are in the same chain */
- gst_basic_scheduler_chain_elements (sched, element, peerelement);
+ gst_basic_scheduler_chain_elements (bsched, element, peerelement);
}
}
}
gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element)
{
GstSchedulerChain *chain;
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
- g_return_if_fail (element != NULL);
- g_return_if_fail (GST_IS_ELEMENT (element));
-
- if (g_list_find (sched->elements, element)) {
+ if (g_list_find (bsched->elements, element)) {
GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler",
GST_ELEMENT_NAME (element));
/* find what chain the element is in */
- chain = gst_basic_scheduler_find_chain (sched, element);
+ chain = gst_basic_scheduler_find_chain (bsched, element);
/* remove it from its chain */
gst_basic_scheduler_chain_remove_element (chain, element);
/* remove it from the list of elements */
- sched->elements = g_list_remove (sched->elements, element);
- sched->num_elements--;
+ bsched->elements = g_list_remove (bsched->elements, element);
+ bsched->num_elements--;
/* unset the scheduler pointer in the element */
GST_ELEMENT_SCHED (element) = NULL;
}
}
-static void
+static GstElementStateReturn
gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition)
{
GstSchedulerChain *chain;
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
/* find the chain the element is in */
- chain = gst_basic_scheduler_find_chain (sched, element);
+ chain = gst_basic_scheduler_find_chain (bsched, element);
/* remove it from the chain */
if (chain) {
if (transition == GST_STATE_PLAYING_TO_PAUSED)
gst_basic_scheduler_chain_disable_element (chain, element);
if (transition == GST_STATE_PAUSED_TO_PLAYING)
- gst_basic_scheduler_chain_enable_element (chain, element);
+ if (!gst_basic_scheduler_chain_enable_element (chain, element)) {
+ GST_INFO (GST_CAT_SCHEDULING, "could not enable element \"%s\"", GST_ELEMENT_NAME (element));
+ return GST_STATE_FAILURE;
+ }
}
else {
GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, no state change", GST_ELEMENT_NAME (element));
}
+
+ return GST_STATE_SUCCESS;
}
static void
gst_basic_scheduler_lock_element (GstScheduler * sched, GstElement * element)
{
- if (element->threadstate)
- cothread_lock (element->threadstate);
+ if (GST_ELEMENT_THREADSTATE (element))
+ cothread_lock (GST_ELEMENT_THREADSTATE (element));
}
static void
gst_basic_scheduler_unlock_element (GstScheduler * sched, GstElement * element)
{
- if (element->threadstate)
- cothread_unlock (element->threadstate);
+ if (GST_ELEMENT_THREADSTATE (element))
+ cothread_unlock (GST_ELEMENT_THREADSTATE (element));
+}
+
+static void
+gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element)
+{
+ if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) {
+ cothread_switch (cothread_current_main ());
+ }
+}
+
+static void
+gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element)
+{
+ cothread_switch (cothread_current_main ());
}
static void
-gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad * sinkpad)
+gst_basic_scheduler_error (GstScheduler *sched, GstElement *element)
+{
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
+
+ if (GST_ELEMENT_THREADSTATE (element)) {
+ GstSchedulerChain *chain;
+
+ chain = gst_basic_scheduler_find_chain (bsched, element);
+ if (chain)
+ gst_basic_scheduler_chain_disable_element (chain, element);
+
+ GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) = GST_STATE_PAUSED;
+
+ cothread_switch (cothread_current_main ());
+ }
+}
+
+static void
+gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad *srcpad, GstPad *sinkpad)
{
GstElement *srcelement, *sinkelement;
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
srcelement = GST_PAD_PARENT (srcpad);
g_return_if_fail (srcelement != NULL);
if (GST_ELEMENT_SCHED (srcelement) == GST_ELEMENT_SCHED (sinkelement)) {
GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same scheduler, chaining together",
GST_DEBUG_PAD_NAME (sinkpad));
- gst_basic_scheduler_chain_elements (sched, srcelement, sinkelement);
+ gst_basic_scheduler_chain_elements (bsched, srcelement, sinkelement);
}
}
GstSchedulerChain *chain;
GstElement *element1, *element2;
GstSchedulerChain *chain1, *chain2;
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s",
GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad));
/* we need to have the parent elements of each pad */
- element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad));
- element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad));
+ element1 = GST_ELEMENT_CAST (GST_PAD_PARENT (srcpad));
+ element2 = GST_ELEMENT_CAST (GST_PAD_PARENT (sinkpad));
/* first task is to remove the old chain they belonged to.
* this can be accomplished by taking either of the elements,
* since they are guaranteed to be in the same chain
* FIXME is it potentially better to make an attempt at splitting cleaner??
*/
- chain1 = gst_basic_scheduler_find_chain (sched, element1);
- chain2 = gst_basic_scheduler_find_chain (sched, element2);
+ chain1 = gst_basic_scheduler_find_chain (bsched, element1);
+ chain2 = gst_basic_scheduler_find_chain (bsched, element2);
if (chain1 != chain2) {
/* elements not in the same chain don't need to be separated */
gst_basic_scheduler_chain_destroy (chain1);
/* now create a new chain to hold element1 and build it from scratch */
- chain1 = gst_basic_scheduler_chain_new (sched);
+ chain1 = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_recursive_add (chain1, element1);
}
/* check the other element to see if it landed in the newly created chain */
- if (gst_basic_scheduler_find_chain (sched, element2) == NULL) {
+ if (gst_basic_scheduler_find_chain (bsched, element2) == NULL) {
/* if not in chain, create chain and build from scratch */
- chain2 = gst_basic_scheduler_chain_new (sched);
+ chain2 = gst_basic_scheduler_chain_new (bsched);
gst_basic_scheduler_chain_recursive_add (chain2, element2);
}
}
if (pad != NULL) {
GstRealPad *peer = GST_RPAD_PEER (pad);
- cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate);
+ cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (peer)));
g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)),
gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad))));
GstElement *entry;
gboolean eos = FALSE;
GList *elements;
+ gint scheduled = 0;
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
GST_DEBUG_ENTER ("(\"%s\")", GST_ELEMENT_NAME (bin));
- g_return_val_if_fail (bin != NULL, TRUE);
- g_return_val_if_fail (GST_IS_BIN (bin), TRUE);
-
/* step through all the chains */
- chains = sched->chains;
+ chains = bsched->chains;
if (chains == NULL)
return FALSE;
GST_DEBUG (GST_CAT_SCHEDULING, "there are %d elements in this chain\n", chain->num_elements);
elements = chain->elements;
while (elements) {
- entry = GST_ELEMENT (elements->data);
+ entry = GST_ELEMENT_CAST (elements->data);
elements = g_list_next (elements);
if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) {
GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is DECOUPLED, skipping\n",
GST_ELEMENT_NAME (entry));
entry = NULL;
}
- else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_NO_ENTRY)) {
+ else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_INFINITE_LOOP)) {
GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is not valid, skipping\n",
GST_ELEMENT_NAME (entry));
entry = NULL;
GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING);
GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n",
GST_ELEMENT_NAME (entry), entry);
- cothread_switch (entry->threadstate);
+ if (GST_ELEMENT_THREADSTATE (entry)) {
+ cothread_switch (GST_ELEMENT_THREADSTATE (entry));
+ }
+ else {
+ GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch not possible, element has no threadstate\n");
+ return FALSE;
+ }
/* following is a check to see if the chain was interrupted due to a
* top-half state_change(). (i.e., if there's a pending state.)
GST_STATE_PENDING (GST_SCHEDULER (sched)->parent));
return FALSE;
}
-
+ scheduled++;
}
else {
GST_INFO (GST_CAT_DATAFLOW, "NO ENTRY INTO CHAIN!");
- eos = TRUE;
+ if (scheduled == 0)
+ eos = TRUE;
}
}
else {
GST_INFO (GST_CAT_DATAFLOW, "NO ENABLED ELEMENTS IN CHAIN!!");
- eos = TRUE;
+ if (scheduled == 0)
+ eos = TRUE;
}
}
GList *chains, *elements;
GstElement *element;
GstSchedulerChain *chain;
+ GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched);
if (sched == NULL) {
g_print ("scheduler doesn't exist for this element\n");
g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n", GST_ELEMENT_NAME (sched->parent));
- g_print ("scheduler has %d elements in it: ", sched->num_elements);
- elements = sched->elements;
+ g_print ("scheduler has %d elements in it: ", bsched->num_elements);
+ elements = bsched->elements;
while (elements) {
element = GST_ELEMENT (elements->data);
elements = g_list_next (elements);
}
g_print ("\n");
- g_print ("scheduler has %d chains in it\n", sched->num_chains);
- chains = sched->chains;
+ g_print ("scheduler has %d chains in it\n", bsched->num_chains);
+ chains = bsched->chains;
while (chains) {
chain = (GstSchedulerChain *) (chains->data);
chains = g_list_next (chains);
aggregator = GST_AGGREGATOR (element);
- do {
- if (aggregator->sched == AGGREGATOR_LOOP ||
- aggregator->sched == AGGREGATOR_LOOP_PEEK) {
- GList *pads = aggregator->sinkpads;
-
- while (pads) {
- GstPad *pad = GST_PAD (pads->data);
- pads = g_list_next (pads);
-
- if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
- buf = gst_pad_peek (pad);
- if (buf == NULL)
- continue;
-
- g_assert (buf == gst_pad_pull (pad));
- debug = "loop_peek";
- }
- else {
- buf = gst_pad_pull (pad);
- debug = "loop";
- }
- gst_aggregator_push (aggregator, pad, buf, debug);
- }
- }
- else {
- if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
- GstPad *pad;
+ if (aggregator->sched == AGGREGATOR_LOOP ||
+ aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+ GList *pads = aggregator->sinkpads;
- debug = "loop_select";
+ while (pads) {
+ GstPad *pad = GST_PAD (pads->data);
+ pads = g_list_next (pads);
- pad = gst_pad_select (aggregator->sinkpads);
- buf = gst_pad_pull (pad);
+ if (aggregator->sched == AGGREGATOR_LOOP_PEEK) {
+ buf = gst_pad_peek (pad);
+ if (buf == NULL)
+ continue;
- gst_aggregator_push (aggregator, pad, buf, debug);
+ g_assert (buf == gst_pad_pull (pad));
+ debug = "loop_peek";
}
else {
- g_assert_not_reached ();
+ buf = gst_pad_pull (pad);
+ debug = "loop";
}
+ gst_aggregator_push (aggregator, pad, buf, debug);
+ }
+ }
+ else {
+ if (aggregator->sched == AGGREGATOR_LOOP_SELECT) {
+ GstPad *pad;
+
+ debug = "loop_select";
+
+ pad = gst_pad_select (aggregator->sinkpads);
+ buf = gst_pad_pull (pad);
+
+ gst_aggregator_push (aggregator, pad, buf, debug);
}
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+ else {
+ g_assert_not_reached ();
+ }
+ }
}
/**
case ARG_OUTPUT:
break;
case ARG_DATA:
- src->data = g_value_get_int (value);
+ src->data = g_value_get_enum (value);
switch (src->data) {
case FAKESRC_DATA_ALLOCATE:
if (src->parent) {
}
break;
case ARG_SIZETYPE:
- src->sizetype = g_value_get_int (value);
+ src->sizetype = g_value_get_enum (value);
break;
case ARG_SIZEMIN:
src->sizemin = g_value_get_int (value);
src->parentsize = g_value_get_int (value);
break;
case ARG_FILLTYPE:
- src->filltype = g_value_get_int (value);
+ src->filltype = g_value_get_enum (value);
break;
case ARG_PATTERN:
break;
g_value_set_boolean (value, src->loop_based);
break;
case ARG_OUTPUT:
- g_value_set_int (value, src->output);
+ g_value_set_enum (value, src->output);
break;
case ARG_DATA:
- g_value_set_int (value, src->data);
+ g_value_set_enum (value, src->data);
break;
case ARG_SIZETYPE:
- g_value_set_int (value, src->sizetype);
+ g_value_set_enum (value, src->sizetype);
break;
case ARG_SIZEMIN:
g_value_set_int (value, src->sizemin);
g_value_set_int (value, src->parentsize);
break;
case ARG_FILLTYPE:
- g_value_set_int (value, src->filltype);
+ g_value_set_enum (value, src->filltype);
break;
case ARG_PATTERN:
g_value_set_string (value, src->pattern);
gst_fakesrc_loop(GstElement *element)
{
GstFakeSrc *src;
+ GList *pads;
g_return_if_fail(element != NULL);
g_return_if_fail(GST_IS_FAKESRC(element));
src = GST_FAKESRC (element);
- do {
- GList *pads;
-
- pads = GST_ELEMENT (src)->pads;
+ pads = gst_element_get_pad_list (element);
- while (pads) {
- GstPad *pad = GST_PAD (pads->data);
- GstBuffer *buf;
+ while (pads) {
+ GstPad *pad = GST_PAD (pads->data);
+ GstBuffer *buf;
- if (src->rt_num_buffers == 0) {
- src->eos = TRUE;
- }
- else {
- if (src->rt_num_buffers > 0)
- src->rt_num_buffers--;
- }
+ if (src->rt_num_buffers == 0) {
+ src->eos = TRUE;
+ }
+ else {
+ if (src->rt_num_buffers > 0)
+ src->rt_num_buffers--;
+ }
- if (src->eos) {
- gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
- return;
- }
+ if (src->eos) {
+ gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS)));
+ return;
+ }
- buf = gst_fakesrc_create_buffer (src);
- GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
+ buf = gst_fakesrc_create_buffer (src);
+ GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++;
- if (!src->silent) {
- gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
- GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
- }
+ if (!src->silent) {
+ gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)",
+ GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
+ }
- g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
+ g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0,
buf, pad);
- gst_pad_push (pad, buf);
+ gst_pad_push (pad, buf);
- pads = g_list_next (pads);
- }
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element));
+ pads = g_list_next (pads);
+ }
}
static GstElementStateReturn
#include <unistd.h>
#include <sys/mman.h>
#include <errno.h>
+#include <string.h>
/**********************************************************************
/* mmap() the data into this new buffer */
GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset);
if (GST_BUFFER_DATA(buf) == NULL) {
- fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n");
+ gst_element_error (GST_ELEMENT (src), "couldn't map file");
} else if (GST_BUFFER_DATA(buf) == MAP_FAILED) {
- g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s",
- size, src->fd, offset, sys_errlist[errno]);
+ gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s",
+ size, src->fd, offset, strerror (errno));
}
#ifdef MADV_SEQUENTIAL
/* madvise to tell the kernel what to do with it */
/* open the file */
src->fd = open (src->filename, O_RDONLY);
if (src->fd < 0) {
- perror ("open");
- gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL));
+ gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)",
+ src->filename, strerror (errno), NULL);
return FALSE;
} else {
/* find the file length */
{
g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN));
- g_print ("close\n");
/* close the file */
close (src->fd);
src->fd = 0;
src->filelen = 0;
src->curoffset = 0;
+ if (src->mapbuf)
+ gst_buffer_unref (src->mapbuf);
GST_FLAG_UNSET (src, GST_FILESRC_OPEN);
}
{
GstFileSrc *src = GST_FILESRC(element);
- if (GST_STATE_PENDING (element) == GST_STATE_NULL) {
- if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
- gst_filesrc_close_file (GST_FILESRC (element));
- } if (GST_STATE_PENDING (element) == GST_STATE_READY) {
- src->curoffset = 0;
- } else {
-
- if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
- if (!gst_filesrc_open_file (GST_FILESRC (element)))
- return GST_STATE_FAILURE;
- }
+ switch (GST_STATE_TRANSITION (element)) {
+ case GST_STATE_NULL_TO_READY:
+ if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) {
+ if (!gst_filesrc_open_file (GST_FILESRC (element)))
+ return GST_STATE_FAILURE;
+ }
+ break;
+ case GST_STATE_READY_TO_NULL:
+ if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN))
+ gst_filesrc_close_file (GST_FILESRC (element));
+ break;
+ case GST_STATE_READY_TO_PAUSED:
+ case GST_STATE_PAUSED_TO_READY:
+ src->curoffset = 0;
+ default:
+ break;
}
if (GST_ELEMENT_CLASS (parent_class)->change_state)
identity = GST_IDENTITY (element);
- do {
- buf = gst_pad_pull (identity->sinkpad);
+ buf = gst_pad_pull (identity->sinkpad);
- for (i=identity->duplicate; i; i--) {
- if (!identity->silent)
- g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
+ for (i=identity->duplicate; i; i--) {
+ if (!identity->silent)
+ g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n",
GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf));
- g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
+ g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0,
buf);
- if (i>1)
- gst_buffer_ref (buf);
-
- gst_pad_push (identity->srcpad, buf);
+ if (i>1)
+ gst_buffer_ref (buf);
- if (identity->sleep_time)
- usleep (identity->sleep_time);
- }
+ gst_pad_push (identity->srcpad, buf);
- } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
+ if (identity->sleep_time)
+ usleep (identity->sleep_time);
+ }
}
static void
ARG_LEAKY,
ARG_LEVEL,
ARG_MAX_LEVEL,
+ ARG_MAY_DEADLOCK,
};
static void gst_queue_class_init (GstQueueClass *klass);
static void gst_queue_init (GstQueue *queue);
+static void gst_queue_dispose (GObject *object);
static void gst_queue_set_property (GObject *object, guint prop_id,
const GValue *value, GParamSpec *pspec);
parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
- g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY,
- g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.",
- GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE));
- g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL,
- g_param_spec_int("level","Level","How many buffers are in the queue.",
- 0,G_MAXINT,0,G_PARAM_READABLE));
- g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL,
- g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.",
- 0,G_MAXINT,100,G_PARAM_READWRITE));
-
- gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property);
- gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property);
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY,
+ g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.",
+ GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL,
+ g_param_spec_int ("level", "Level", "How many buffers are in the queue.",
+ 0, G_MAXINT, 0, G_PARAM_READABLE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL,
+ g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.",
+ 0, G_MAXINT, 100, G_PARAM_READWRITE));
+ g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK,
+ g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING",
+ TRUE, G_PARAM_READWRITE));
+
+ gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose);
+ gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property);
+ gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property);
gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state);
}
queue->size_buffers = 100; /* 100 buffers */
queue->size_bytes = 100 * 1024; /* 100KB */
queue->size_time = 1000000000LL; /* 1sec */
+ queue->may_deadlock = TRUE;
queue->qlock = g_mutex_new ();
queue->reader = FALSE;
GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n");
}
+static void
+gst_queue_dispose (GObject *object)
+{
+ GstQueue *queue = GST_QUEUE (object);
+
+ g_mutex_free (queue->qlock);
+ g_cond_free (queue->not_empty);
+ g_cond_free (queue->not_full);
+
+ G_OBJECT_CLASS (parent_class)->dispose (object);
+}
+
static GstBufferPool*
gst_queue_get_bufferpool (GstPad *pad)
{
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- cothread_switch(cothread_current_main());
+ gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
- g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ /* try to signal to resolve the error */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down");
+ return;
+ }
+ else {
+ gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements");
+ }
+ }
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->writer)
while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) {
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n");
g_mutex_unlock (queue->qlock);
- cothread_switch(cothread_current_main());
+ gst_element_interrupt (GST_ELEMENT (queue));
goto restart;
}
- g_assert (GST_STATE (queue) == GST_STATE_PLAYING);
+ if (GST_STATE (queue) != GST_STATE_PLAYING) {
+ /* this means the other end is shut down */
+ if (!queue->may_deadlock) {
+ g_mutex_unlock (queue->qlock);
+ gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down");
+ return NULL;
+ }
+ else {
+ gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements");
+ }
+ }
GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers);
if (queue->reader)
switch (prop_id) {
case ARG_LEAKY:
- queue->leaky = g_value_get_int(value);
+ queue->leaky = g_value_get_int (value);
break;
case ARG_MAX_LEVEL:
- queue->size_buffers = g_value_get_int(value);
+ queue->size_buffers = g_value_get_int (value);
+ break;
+ case ARG_MAY_DEADLOCK:
+ queue->may_deadlock = g_value_get_boolean (value);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
switch (prop_id) {
case ARG_LEAKY:
- g_value_set_int(value, queue->leaky);
+ g_value_set_int (value, queue->leaky);
break;
case ARG_LEVEL:
- g_value_set_int(value, queue->level_buffers);
+ g_value_set_int (value, queue->level_buffers);
break;
case ARG_MAX_LEVEL:
- g_value_set_int(value, queue->size_buffers);
+ g_value_set_int (value, queue->size_buffers);
+ break;
+ case ARG_MAY_DEADLOCK:
+ g_value_set_boolean (value, queue->may_deadlock);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
guint64 size_time; /* size of queue in time */
gint leaky; /* whether the queue is leaky, and if so at which end */
+ gboolean may_deadlock; /* it the queue should fail on possible deadlocks */
GMutex *qlock; /* lock for queue (vs object lock) */
/* we are single reader and single writer queue */