From 087dee1f620d42d0680833adb08cd2bf3d298711 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sat, 22 Dec 2001 21:18:17 +0000 Subject: [PATCH] This is an attempt at not segfaulting on errors but reporting some usefull info instead. Original commit message from CVS: This is an attempt at not segfaulting on errors but reporting some usefull info instead. - bin changes so errors can propagate. - changed the _FAST macros to _CAST because that is what they do. - removed all references to cothreads out of the core, they are really a scheduler issue, handler with a sched_private gpointer. - added a live buffer count, for debugging buffer leaks. - added error checking in gst_scheduler_state_transition this solves the "out of cothreads" problem. - GST_ELEMENT_NO_ENTRY == GST_ELEMENT_INFINITE_LOOP - added 2 private element flasg for use by the scheduler (_COTHREAD_STOPPING) is now - added scheduler entry points: - _yield : to create possible scheduling points. - _interrupt: to stop execution of an element. - _error: to signal en error condition to the scheduler. - improved error messages for pads. - signal gst_element_error where appropriate. - added the a new bin to the parent before entering it so one can reference its children. - queue memleak fixes on dispose. - added possible deadlock detection in queue (turned off be default) - GstBasicScheduler is a real class of its own now, hiding its internal variables. - GST_ELEMENT_IS_COTHREAD_STOPPING is gone. either call explicit _yield operations, or make a sane loop. - Better state change handling in filesrc. Better error reporting/recovery too. - updated core plugins. - detect non decoupled elements on scheduler boundries and error. --- gst/autoplug/gstautoplugcache.c | 103 +++++------ gst/cothreads.c | 1 - gst/elements/gstaggregator.c | 64 ++++--- gst/elements/gstfakesrc.c | 69 ++++--- gst/elements/gstfilesrc.c | 41 +++-- gst/elements/gstidentity.c | 25 ++- gst/gstbin.c | 26 ++- gst/gstbin.h | 12 +- gst/gstbuffer.c | 20 +- gst/gstbuffer.h | 2 + gst/gstelement.c | 41 ++++- gst/gstelement.h | 32 ++-- gst/gstobject.h | 18 +- gst/gstpad.c | 83 ++++++--- gst/gstpad.h | 31 ++-- gst/gstparse.c | 11 +- gst/gstqueue.c | 88 ++++++--- gst/gstqueue.h | 1 + gst/gstscheduler.c | 91 +++++++++- gst/gstscheduler.h | 23 ++- gst/gstthread.c | 7 +- gst/gsttypefind.c | 2 +- gst/schedulers/gstbasicscheduler.c | 363 +++++++++++++++++++++++++------------ plugins/elements/gstaggregator.c | 64 ++++--- plugins/elements/gstfakesrc.c | 69 ++++--- plugins/elements/gstfilesrc.c | 41 +++-- plugins/elements/gstidentity.c | 25 ++- plugins/elements/gstqueue.c | 88 ++++++--- plugins/elements/gstqueue.h | 1 + 29 files changed, 917 insertions(+), 525 deletions(-) diff --git a/gst/autoplug/gstautoplugcache.c b/gst/autoplug/gstautoplugcache.c index 6bb5721..326310c 100644 --- a/gst/autoplug/gstautoplugcache.c +++ b/gst/autoplug/gstautoplugcache.c @@ -203,72 +203,69 @@ gst_autoplugcache_loop (GstElement *element) * the playout pointer hits the end of cache again it has to start pulling. */ - do { - /* the first time through, the current_playout pointer is going to be NULL */ - if (cache->current_playout == NULL) { - /* get a buffer */ - buf = gst_pad_pull (cache->sinkpad); + /* the first time through, the current_playout pointer is going to be NULL */ + if (cache->current_playout == NULL) { + /* get a buffer */ + buf = gst_pad_pull (cache->sinkpad); - /* add it to the cache, though cache == NULL */ - gst_buffer_ref (buf); - cache->cache = g_list_prepend (cache->cache, buf); - cache->buffer_count++; + /* add it to the cache, though cache == NULL */ + gst_buffer_ref (buf); + cache->cache = g_list_prepend (cache->cache, buf); + cache->buffer_count++; - /* set the current_playout pointer */ - cache->current_playout = cache->cache; + /* set the current_playout pointer */ + cache->current_playout = cache->cache; - g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf); - - /* send the buffer on its way */ - gst_pad_push (cache->srcpad, buf); - } - - /* the steady state is where the playout is at the front of the cache */ - else if (g_list_previous(cache->current_playout) == NULL) { + g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf); - /* if we've been told to fire an empty signal (after a reset) */ - if (cache->fire_empty) { - int oldstate = GST_STATE(cache); - GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n"); + /* send the buffer on its way */ + gst_pad_push (cache->srcpad, buf); + } + /* the steady state is where the playout is at the front of the cache */ + else if (g_list_previous(cache->current_playout) == NULL) { + + /* if we've been told to fire an empty signal (after a reset) */ + if (cache->fire_empty) { + int oldstate = GST_STATE(cache); + GST_DEBUG(0,"at front of cache, about to pull, but firing signal\n"); + gst_object_ref (GST_OBJECT (cache)); + g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL); + if (GST_STATE(cache) != oldstate) { gst_object_ref (GST_OBJECT (cache)); - g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[CACHE_EMPTY], 0, NULL); - if (GST_STATE(cache) != oldstate) { - gst_object_ref (GST_OBJECT (cache)); - GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n"); - cothread_switch(cothread_current_main()); - } - gst_object_unref (GST_OBJECT (cache)); + GST_DEBUG(GST_CAT_AUTOPLUG, "state changed during signal, aborting\n"); + cothread_switch(cothread_current_main()); } + gst_object_unref (GST_OBJECT (cache)); + } - /* get a buffer */ - buf = gst_pad_pull (cache->sinkpad); + /* get a buffer */ + buf = gst_pad_pull (cache->sinkpad); - /* add it to the front of the cache */ - gst_buffer_ref (buf); - cache->cache = g_list_prepend (cache->cache, buf); - cache->buffer_count++; + /* add it to the front of the cache */ + gst_buffer_ref (buf); + cache->cache = g_list_prepend (cache->cache, buf); + cache->buffer_count++; - /* set the current_playout pointer */ - cache->current_playout = cache->cache; + /* set the current_playout pointer */ + cache->current_playout = cache->cache; - /* send the buffer on its way */ - gst_pad_push (cache->srcpad, buf); - } - - /* otherwise we're trundling through existing cached buffers */ - else { - /* move the current_playout pointer */ - cache->current_playout = g_list_previous (cache->current_playout); + /* send the buffer on its way */ + gst_pad_push (cache->srcpad, buf); + } - if (cache->fire_first) { - g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf); - cache->fire_first = FALSE; - } + /* otherwise we're trundling through existing cached buffers */ + else { + /* move the current_playout pointer */ + cache->current_playout = g_list_previous (cache->current_playout); - /* push that buffer */ - gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data)); + if (cache->fire_first) { + g_signal_emit (G_OBJECT(cache), gst_autoplugcache_signals[FIRST_BUFFER], 0, buf); + cache->fire_first = FALSE; } - } while (!GST_FLAG_IS_SET (element, GST_ELEMENT_COTHREAD_STOPPING)); + + /* push that buffer */ + gst_pad_push (cache->srcpad, GST_BUFFER(cache->current_playout->data)); + } } static GstPadNegotiateReturn diff --git a/gst/cothreads.c b/gst/cothreads.c index 48aa164..f9b668e 100644 --- a/gst/cothreads.c +++ b/gst/cothreads.c @@ -166,7 +166,6 @@ cothread_create (cothread_context *ctx) cothread_destroy (ctx->threads[slot]); break; } - } sp = CURRENT_STACK_FRAME; diff --git a/gst/elements/gstaggregator.c b/gst/elements/gstaggregator.c index 4a6baf5..e6f2337 100644 --- a/gst/elements/gstaggregator.c +++ b/gst/elements/gstaggregator.c @@ -272,46 +272,44 @@ gst_aggregator_loop (GstElement *element) aggregator = GST_AGGREGATOR (element); - do { - if (aggregator->sched == AGGREGATOR_LOOP || - aggregator->sched == AGGREGATOR_LOOP_PEEK) { - GList *pads = aggregator->sinkpads; - - while (pads) { - GstPad *pad = GST_PAD (pads->data); - pads = g_list_next (pads); - - if (aggregator->sched == AGGREGATOR_LOOP_PEEK) { - buf = gst_pad_peek (pad); - if (buf == NULL) - continue; - - g_assert (buf == gst_pad_pull (pad)); - debug = "loop_peek"; - } - else { - buf = gst_pad_pull (pad); - debug = "loop"; - } - gst_aggregator_push (aggregator, pad, buf, debug); - } - } - else { - if (aggregator->sched == AGGREGATOR_LOOP_SELECT) { - GstPad *pad; + if (aggregator->sched == AGGREGATOR_LOOP || + aggregator->sched == AGGREGATOR_LOOP_PEEK) { + GList *pads = aggregator->sinkpads; - debug = "loop_select"; + while (pads) { + GstPad *pad = GST_PAD (pads->data); + pads = g_list_next (pads); - pad = gst_pad_select (aggregator->sinkpads); - buf = gst_pad_pull (pad); + if (aggregator->sched == AGGREGATOR_LOOP_PEEK) { + buf = gst_pad_peek (pad); + if (buf == NULL) + continue; - gst_aggregator_push (aggregator, pad, buf, debug); + g_assert (buf == gst_pad_pull (pad)); + debug = "loop_peek"; } else { - g_assert_not_reached (); + buf = gst_pad_pull (pad); + debug = "loop"; } + gst_aggregator_push (aggregator, pad, buf, debug); + } + } + else { + if (aggregator->sched == AGGREGATOR_LOOP_SELECT) { + GstPad *pad; + + debug = "loop_select"; + + pad = gst_pad_select (aggregator->sinkpads); + buf = gst_pad_pull (pad); + + gst_aggregator_push (aggregator, pad, buf, debug); } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); + else { + g_assert_not_reached (); + } + } } /** diff --git a/gst/elements/gstfakesrc.c b/gst/elements/gstfakesrc.c index 7d8d6e3..1f87031 100644 --- a/gst/elements/gstfakesrc.c +++ b/gst/elements/gstfakesrc.c @@ -387,7 +387,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G case ARG_OUTPUT: break; case ARG_DATA: - src->data = g_value_get_int (value); + src->data = g_value_get_enum (value); switch (src->data) { case FAKESRC_DATA_ALLOCATE: if (src->parent) { @@ -403,7 +403,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G } break; case ARG_SIZETYPE: - src->sizetype = g_value_get_int (value); + src->sizetype = g_value_get_enum (value); break; case ARG_SIZEMIN: src->sizemin = g_value_get_int (value); @@ -415,7 +415,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G src->parentsize = g_value_get_int (value); break; case ARG_FILLTYPE: - src->filltype = g_value_get_int (value); + src->filltype = g_value_get_enum (value); break; case ARG_PATTERN: break; @@ -455,13 +455,13 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS g_value_set_boolean (value, src->loop_based); break; case ARG_OUTPUT: - g_value_set_int (value, src->output); + g_value_set_enum (value, src->output); break; case ARG_DATA: - g_value_set_int (value, src->data); + g_value_set_enum (value, src->data); break; case ARG_SIZETYPE: - g_value_set_int (value, src->sizetype); + g_value_set_enum (value, src->sizetype); break; case ARG_SIZEMIN: g_value_set_int (value, src->sizemin); @@ -473,7 +473,7 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS g_value_set_int (value, src->parentsize); break; case ARG_FILLTYPE: - g_value_set_int (value, src->filltype); + g_value_set_enum (value, src->filltype); break; case ARG_PATTERN: g_value_set_string (value, src->pattern); @@ -689,49 +689,46 @@ static void gst_fakesrc_loop(GstElement *element) { GstFakeSrc *src; + GList *pads; g_return_if_fail(element != NULL); g_return_if_fail(GST_IS_FAKESRC(element)); src = GST_FAKESRC (element); - do { - GList *pads; - - pads = GST_ELEMENT (src)->pads; + pads = gst_element_get_pad_list (element); - while (pads) { - GstPad *pad = GST_PAD (pads->data); - GstBuffer *buf; + while (pads) { + GstPad *pad = GST_PAD (pads->data); + GstBuffer *buf; - if (src->rt_num_buffers == 0) { - src->eos = TRUE; - } - else { - if (src->rt_num_buffers > 0) - src->rt_num_buffers--; - } + if (src->rt_num_buffers == 0) { + src->eos = TRUE; + } + else { + if (src->rt_num_buffers > 0) + src->rt_num_buffers--; + } - if (src->eos) { - gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS))); - return; - } + if (src->eos) { + gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS))); + return; + } - buf = gst_fakesrc_create_buffer (src); - GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++; + buf = gst_fakesrc_create_buffer (src); + GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++; - if (!src->silent) { - gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)", - GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); - } + if (!src->silent) { + gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)", + GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); + } - g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0, + g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0, buf, pad); - gst_pad_push (pad, buf); + gst_pad_push (pad, buf); - pads = g_list_next (pads); - } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); + pads = g_list_next (pads); + } } static GstElementStateReturn diff --git a/gst/elements/gstfilesrc.c b/gst/elements/gstfilesrc.c index 346f648..db05b10 100644 --- a/gst/elements/gstfilesrc.c +++ b/gst/elements/gstfilesrc.c @@ -30,6 +30,7 @@ #include #include #include +#include /********************************************************************** @@ -327,10 +328,10 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size) /* mmap() the data into this new buffer */ GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset); if (GST_BUFFER_DATA(buf) == NULL) { - fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n"); + gst_element_error (GST_ELEMENT (src), "couldn't map file"); } else if (GST_BUFFER_DATA(buf) == MAP_FAILED) { - g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s", - size, src->fd, offset, sys_errlist[errno]); + gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s", + size, src->fd, offset, strerror (errno)); } #ifdef MADV_SEQUENTIAL /* madvise to tell the kernel what to do with it */ @@ -533,8 +534,8 @@ gst_filesrc_open_file (GstFileSrc *src) /* open the file */ src->fd = open (src->filename, O_RDONLY); if (src->fd < 0) { - perror ("open"); - gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL)); + gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)", + src->filename, strerror (errno), NULL); return FALSE; } else { /* find the file length */ @@ -557,7 +558,6 @@ gst_filesrc_close_file (GstFileSrc *src) { g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN)); - g_print ("close\n"); /* close the file */ close (src->fd); @@ -565,6 +565,8 @@ gst_filesrc_close_file (GstFileSrc *src) src->fd = 0; src->filelen = 0; src->curoffset = 0; + if (src->mapbuf) + gst_buffer_unref (src->mapbuf); GST_FLAG_UNSET (src, GST_FILESRC_OPEN); } @@ -575,17 +577,22 @@ gst_filesrc_change_state (GstElement *element) { GstFileSrc *src = GST_FILESRC(element); - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) - gst_filesrc_close_file (GST_FILESRC (element)); - } if (GST_STATE_PENDING (element) == GST_STATE_READY) { - src->curoffset = 0; - } else { - - if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) { - if (!gst_filesrc_open_file (GST_FILESRC (element))) - return GST_STATE_FAILURE; - } + switch (GST_STATE_TRANSITION (element)) { + case GST_STATE_NULL_TO_READY: + if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) { + if (!gst_filesrc_open_file (GST_FILESRC (element))) + return GST_STATE_FAILURE; + } + break; + case GST_STATE_READY_TO_NULL: + if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) + gst_filesrc_close_file (GST_FILESRC (element)); + break; + case GST_STATE_READY_TO_PAUSED: + case GST_STATE_PAUSED_TO_READY: + src->curoffset = 0; + default: + break; } if (GST_ELEMENT_CLASS (parent_class)->change_state) diff --git a/gst/elements/gstidentity.c b/gst/elements/gstidentity.c index e8195aa..ddd4c02 100644 --- a/gst/elements/gstidentity.c +++ b/gst/elements/gstidentity.c @@ -205,27 +205,24 @@ gst_identity_loop (GstElement *element) identity = GST_IDENTITY (element); - do { - buf = gst_pad_pull (identity->sinkpad); + buf = gst_pad_pull (identity->sinkpad); - for (i=identity->duplicate; i; i--) { - if (!identity->silent) - g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n", + for (i=identity->duplicate; i; i--) { + if (!identity->silent) + g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n", GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); - g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0, + g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0, buf); - if (i>1) - gst_buffer_ref (buf); - - gst_pad_push (identity->srcpad, buf); + if (i>1) + gst_buffer_ref (buf); - if (identity->sleep_time) - usleep (identity->sleep_time); - } + gst_pad_push (identity->srcpad, buf); - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + if (identity->sleep_time) + usleep (identity->sleep_time); + } } static void diff --git a/gst/gstbin.c b/gst/gstbin.c index 19674e8..7a0b845 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -138,7 +138,6 @@ gst_bin_init (GstBin * bin) bin->numchildren = 0; bin->children = NULL; - bin->eoscond = g_cond_new (); } /** @@ -364,8 +363,8 @@ gst_bin_remove (GstBin * bin, GstElement * element) } void -gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState new, - GstElement * child) +gst_bin_child_state_change (GstBin *bin, GstElementState old, GstElementState new, + GstElement *child) { gint old_idx = 0, new_idx = 0, i; @@ -394,6 +393,22 @@ gst_bin_child_state_change (GstBin * bin, GstElementState old, GstElementState n GST_UNLOCK (bin); } +void +gst_bin_child_error (GstBin *bin, GstElement *child) +{ + if (GST_STATE (bin) != GST_STATE_NULL) { + /* + GST_STATE_PENDING (bin) = ((GST_STATE (bin) >> 1)); + if (gst_element_set_state (bin, GST_STATE (bin)>>1) != GST_STATE_SUCCESS) { + gst_element_error (GST_ELEMENT (bin), "bin \"%s\" couldn't change state on error from child \"%s\"", + GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child)); + } + */ + gst_element_info (GST_ELEMENT (bin), "bin \"%s\" stopped because child \"%s\" signalled an error", + GST_ELEMENT_NAME (bin), GST_ELEMENT_NAME (child)); + } +} + static void gst_bin_send_event (GstElement *element, GstEvent *event) { @@ -437,6 +452,9 @@ gst_bin_change_state (GstElement * element) gst_element_set_state (child, old_state); if (GST_ELEMENT_SCHED (child) == GST_ELEMENT_SCHED (element)) { + /* reset to what is was */ + GST_STATE_PENDING (element) = old_state; + gst_bin_change_state (element); return GST_STATE_FAILURE; } break; @@ -551,8 +569,6 @@ gst_bin_dispose (GObject * object) bin->children = NULL; bin->numchildren = 0; - g_cond_free (bin->eoscond); - G_OBJECT_CLASS (parent_class)->dispose (object); } diff --git a/gst/gstbin.h b/gst/gstbin.h index 7e36704..ae9fe1c 100644 --- a/gst/gstbin.h +++ b/gst/gstbin.h @@ -38,15 +38,15 @@ extern GType _gst_bin_type; # define GST_IS_BIN(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_BIN)) # define GST_IS_BIN_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_BIN)) -#define GST_BIN_FAST(obj) ((GstBin*)(obj)) -#define GST_BIN_CLASS_FAST(klass) ((GstBinClass*)(klass)) +#define GST_BIN_CAST(obj) ((GstBin*)(obj)) +#define GST_BIN_CLASS_CAST(klass) ((GstBinClass*)(klass)) #ifdef GST_TYPE_PARANOID # define GST_BIN(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_BIN, GstBin)) # define GST_BIN_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_BIN, GstBinClass)) #else -# define GST_BIN GST_BIN_FAST -# define GST_BIN_CLASS GST_BIN_CLASS_FAST +# define GST_BIN GST_BIN_CAST +# define GST_BIN_CLASS GST_BIN_CLASS_CAST #endif typedef enum { @@ -71,11 +71,10 @@ struct _GstBin { /* our children */ gint numchildren; GList *children; - GCond *eoscond; GstElementState child_states[GST_NUM_STATES]; - cothread_context *threadcontext; + gpointer sched_private; }; struct _GstBinClass { @@ -113,6 +112,7 @@ gboolean gst_bin_iterate (GstBin *bin); /* internal */ void gst_bin_child_state_change (GstBin *bin, GstElementState oldstate, GstElementState newstate, GstElement *child); +void gst_bin_child_error (GstBin *bin, GstElement *child); #ifdef __cplusplus } diff --git a/gst/gstbuffer.c b/gst/gstbuffer.c index 5f304f2..841a1fc 100644 --- a/gst/gstbuffer.c +++ b/gst/gstbuffer.c @@ -31,6 +31,7 @@ GType _gst_buffer_type; static GMemChunk *_gst_buffer_chunk; static GMutex *_gst_buffer_chunk_lock; +static gint _gst_buffer_live; void _gst_buffer_initialize (void) @@ -58,6 +59,20 @@ _gst_buffer_initialize (void) _gst_buffer_chunk_lock = g_mutex_new (); _gst_buffer_type = g_type_register_static (G_TYPE_INT, "GstBuffer", &buffer_info, 0); + + _gst_buffer_live = 0; +} + +/** + * gst_buffer_print_stats: + * + * Print statistics about live buffers. + */ +void +gst_buffer_print_stats (void) +{ + g_log (g_log_domain_gstreamer, G_LOG_LEVEL_INFO, + "%d live buffers", _gst_buffer_live); } /** @@ -74,6 +89,7 @@ gst_buffer_new (void) g_mutex_lock (_gst_buffer_chunk_lock); buffer = g_mem_chunk_alloc (_gst_buffer_chunk); + _gst_buffer_live++; g_mutex_unlock (_gst_buffer_chunk_lock); GST_INFO (GST_CAT_BUFFER,"creating new buffer %p",buffer); @@ -153,11 +169,12 @@ gst_buffer_create_sub (GstBuffer *parent, g_mutex_lock (_gst_buffer_chunk_lock); buffer = g_mem_chunk_alloc (_gst_buffer_chunk); - GST_DATA_TYPE(buffer) = _gst_buffer_type; + _gst_buffer_live++; g_mutex_unlock (_gst_buffer_chunk_lock); GST_INFO (GST_CAT_BUFFER,"creating new subbuffer %p from parent %p (size %u, offset %u)", buffer, parent, size, offset); + GST_DATA_TYPE(buffer) = _gst_buffer_type; buffer->lock = g_mutex_new (); #ifdef HAVE_ATOMIC_H atomic_set (&buffer->refcount, 1); @@ -292,6 +309,7 @@ gst_buffer_destroy (GstBuffer *buffer) /* remove it entirely from memory */ g_mutex_lock (_gst_buffer_chunk_lock); g_mem_chunk_free (_gst_buffer_chunk,buffer); + _gst_buffer_live--; g_mutex_unlock (_gst_buffer_chunk_lock); } diff --git a/gst/gstbuffer.h b/gst/gstbuffer.h index 17edbd8..89af2cc 100644 --- a/gst/gstbuffer.h +++ b/gst/gstbuffer.h @@ -167,6 +167,8 @@ GstBuffer* gst_buffer_append (GstBuffer *buffer, GstBuffer *append); gboolean gst_buffer_is_span_fast (GstBuffer *buf1, GstBuffer *buf2); +void gst_buffer_print_stats (void); + #ifdef __cplusplus } #endif /* __cplusplus */ diff --git a/gst/gstelement.c b/gst/gstelement.c index 6315fd6..fd0b3c2 100644 --- a/gst/gstelement.c +++ b/gst/gstelement.c @@ -173,8 +173,8 @@ gst_element_init (GstElement *element) element->numsinkpads = 0; element->pads = NULL; element->loopfunc = NULL; - element->threadstate = NULL; element->sched = NULL; + element->sched_private = NULL; element->state_mutex = g_mutex_new (); element->state_cond = g_cond_new (); } @@ -798,10 +798,25 @@ void gst_element_error (GstElement *element, const gchar *error, ...) { va_list var_args; + GstObject *parent; + g_return_if_fail (GST_IS_ELEMENT (element)); + g_return_if_fail (error != NULL); + va_start (var_args, error); gst_element_message (element, "error", error, var_args); va_end (var_args); + + parent = GST_ELEMENT_PARENT (element); + + if (parent && GST_IS_BIN (parent)) { + gst_bin_child_error (GST_BIN (parent), element); + } + + if (element->sched) { + gst_scheduler_error (element->sched, element); + } + } /** @@ -817,6 +832,9 @@ gst_element_info (GstElement *element, const gchar *info, ...) { va_list var_args; + g_return_if_fail (GST_IS_ELEMENT (element)); + g_return_if_fail (info != NULL); + va_start (var_args, info); gst_element_message (element, "info", info, var_args); va_end (var_args); @@ -965,7 +983,7 @@ static GstElementStateReturn gst_element_change_state (GstElement *element) { GstElementState old_state; - //GstEvent *event; + GstObject *parent; g_return_val_if_fail (element != NULL, GST_STATE_FAILURE); g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE); @@ -973,7 +991,7 @@ gst_element_change_state (GstElement *element) old_state = GST_STATE (element); if (GST_STATE_PENDING (element) == GST_STATE_VOID_PENDING || old_state == GST_STATE_PENDING (element)) { - GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)\n", GST_ELEMENT_NAME (element)); + GST_INFO (GST_CAT_STATES, "no state change needed for element %s (VOID_PENDING)", GST_ELEMENT_NAME (element)); return GST_STATE_SUCCESS; } @@ -983,8 +1001,12 @@ gst_element_change_state (GstElement *element) GST_STATE_TRANSITION (element)); /* tell the scheduler if we have one */ - if (element->sched) - gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element)); + if (element->sched) { + if (gst_scheduler_state_transition (element->sched, element, GST_STATE_TRANSITION (element)) + != GST_STATE_SUCCESS) { + return GST_STATE_FAILURE; + } + } GST_STATE (element) = GST_STATE_PENDING (element); GST_STATE_PENDING (element) = GST_STATE_VOID_PENDING; @@ -993,11 +1015,11 @@ gst_element_change_state (GstElement *element) g_cond_signal (element->state_cond); g_mutex_unlock (element->state_mutex); - if (GST_ELEMENT_PARENT (element)) { - gst_bin_child_state_change (GST_BIN (GST_ELEMENT_PARENT (element)), old_state, GST_STATE (element), element); + parent = GST_ELEMENT_PARENT (element); + + if (parent && GST_IS_BIN (parent)) { + gst_bin_child_state_change (GST_BIN (parent), old_state, GST_STATE (element), element); } - //event = gst_event_new_state_change (old_state, GST_STATE (element)); - //gst_element_send_event (element, event); return GST_STATE_SUCCESS; } @@ -1328,7 +1350,6 @@ gst_element_signal_eos (GstElement *element) GST_DEBUG(GST_CAT_EVENT, "signaling EOS on element %s\n",GST_OBJECT_NAME(element)); g_signal_emit (G_OBJECT (element), gst_element_signals[EOS], 0); - GST_FLAG_SET(element,GST_ELEMENT_COTHREAD_STOPPING); } diff --git a/gst/gstelement.h b/gst/gstelement.h index 036c606..08e7d26 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -58,8 +58,8 @@ extern GType _gst_element_type; #define GST_TYPE_ELEMENT (_gst_element_type) -#define GST_ELEMENT_FAST(obj) ((GstElement*)(obj)) -#define GST_ELEMENT_CLASS_FAST(klass) ((GstElementClass*)(klass)) +#define GST_ELEMENT_CAST(obj) ((GstElement*)(obj)) +#define GST_ELEMENT_CLASS_CAST(klass) ((GstElementClass*)(klass)) #define GST_IS_ELEMENT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_ELEMENT)) #define GST_IS_ELEMENT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_ELEMENT)) @@ -67,8 +67,8 @@ extern GType _gst_element_type; # define GST_ELEMENT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_ELEMENT, GstElement)) # define GST_ELEMENT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_ELEMENT, GstElementClass)) #else -# define GST_ELEMENT GST_ELEMENT_FAST -# define GST_ELEMENT_CLASS GST_ELEMENT_CLASS_FAST +# define GST_ELEMENT GST_ELEMENT_CAST +# define GST_ELEMENT_CLASS GST_ELEMENT_CLASS_CAST #endif typedef enum { @@ -82,18 +82,16 @@ typedef enum { /* this element is incable of seeking (FIXME: does this apply to filters?) */ GST_ELEMENT_NO_SEEK, - /***** !!!!! need to have a flag that says that an element must - *not* be an entry into a scheduling chain !!!!! *****/ - /* this element for some reason doesn't obey COTHREAD_STOPPING, or - has some other reason why it can't be the entry */ - GST_ELEMENT_NO_ENTRY, + /* this element, for some reason, has a loop function that performs + * an infinite loop without calls to gst_element_yield () */ + GST_ELEMENT_INFINITE_LOOP, + + /* private flags that can be used by the scheduler */ + GST_ELEMENT_SCHEDULER_PRIVATE1, + GST_ELEMENT_SCHEDULER_PRIVATE2, /* there is a new loopfunction ready for placement */ GST_ELEMENT_NEW_LOOPFUNC, - /* the cothread holding this element needs to be stopped */ - GST_ELEMENT_COTHREAD_STOPPING, - /* the element has to be scheduled as a cothread for any sanity */ - GST_ELEMENT_USE_COTHREAD, /* if this element can handle events */ GST_ELEMENT_EVENT_AWARE, @@ -103,7 +101,6 @@ typedef enum { } GstElementFlags; #define GST_ELEMENT_IS_THREAD_SUGGESTED(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_THREAD_SUGGESTED)) -#define GST_ELEMENT_IS_COTHREAD_STOPPING(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_COTHREAD_STOPPING)) #define GST_ELEMENT_IS_EOS(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EOS)) #define GST_ELEMENT_IS_EVENT_AWARE(obj) (GST_FLAG_IS_SET(obj,GST_ELEMENT_EVENT_AWARE)) @@ -127,9 +124,10 @@ struct _GstElement { guint8 current_state; guint8 pending_state; GstElement *manager; - GstScheduler *sched; GstElementLoopFunction loopfunc; - cothread_state *threadstate; + + GstScheduler *sched; + gpointer sched_private; /* element pads */ guint16 numpads; @@ -185,6 +183,8 @@ const gchar* gst_element_get_name (GstElement *element); void gst_element_set_parent (GstElement *element, GstObject *parent); GstObject* gst_element_get_parent (GstElement *element); +#define gst_element_yield(element) gst_scheduler_yield(GST_ELEMENT_SCHED(element),element) +#define gst_element_interrupt(element) gst_scheduler_interrupt(GST_ELEMENT_SCHED(element),element) void gst_element_set_sched (GstElement *element, GstScheduler *sched); GstScheduler* gst_element_get_sched (GstElement *element); diff --git a/gst/gstobject.h b/gst/gstobject.h index d889dfb..0f2078b 100644 --- a/gst/gstobject.h +++ b/gst/gstobject.h @@ -53,15 +53,15 @@ extern GType _gst_object_type; # define GST_IS_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_OBJECT)) # define GST_IS_OBJECT_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_OBJECT)) -#define GST_OBJECT_FAST(obj) ((GstObject*)(obj)) -#define GST_OBJECT_CLASS_FAST(klass) ((GstObjectClass*)(klass)) +#define GST_OBJECT_CAST(obj) ((GstObject*)(obj)) +#define GST_OBJECT_CLASS_CAST(klass) ((GstObjectClass*)(klass)) #ifdef GST_TYPE_PARANOID # define GST_OBJECT(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_OBJECT, GstObject)) # define GST_OBJECT_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_OBJECT, GstObjectClass)) #else -# define GST_OBJECT GST_OBJECT_FAST -# define GST_OBJECT_CLASS GST_OBJECT_CLASS_FAST +# define GST_OBJECT GST_OBJECT_CAST +# define GST_OBJECT_CLASS GST_OBJECT_CLASS_CAST #endif /*typedef struct _GstObject GstObject; */ @@ -115,7 +115,7 @@ struct _GstObjectClass { #endif }; -#define GST_FLAGS(obj) (GST_OBJECT (obj)->flags) +#define GST_FLAGS(obj) (GST_OBJECT_CAST (obj)->flags) #define GST_FLAG_IS_SET(obj,flag) (GST_FLAGS (obj) & (1<<(flag))) #define GST_FLAG_SET(obj,flag) G_STMT_START{ (GST_FLAGS (obj) |= (1<<(flag))); }G_STMT_END #define GST_FLAG_UNSET(obj,flag) G_STMT_START{ (GST_FLAGS (obj) &= ~(1<<(flag))); }G_STMT_END @@ -127,10 +127,10 @@ struct _GstObjectClass { #define GST_OBJECT_FLOATING(obj) (GST_FLAG_IS_SET (obj, GST_FLOATING)) /* object locking */ -#define GST_LOCK(obj) (g_mutex_lock(GST_OBJECT(obj)->lock)) -#define GST_TRYLOCK(obj) (g_mutex_trylock(GST_OBJECT(obj)->lock)) -#define GST_UNLOCK(obj) (g_mutex_unlock(GST_OBJECT(obj)->lock)) -#define GST_GET_LOCK(obj) (GST_OBJECT(obj)->lock) +#define GST_LOCK(obj) (g_mutex_lock(GST_OBJECT_CAST(obj)->lock)) +#define GST_TRYLOCK(obj) (g_mutex_trylock(GST_OBJECT_CAST(obj)->lock)) +#define GST_UNLOCK(obj) (g_mutex_unlock(GST_OBJECT_CAST(obj)->lock)) +#define GST_GET_LOCK(obj) (GST_OBJECT_CAST(obj)->lock) /* normal GObject stuff */ diff --git a/gst/gstpad.c b/gst/gstpad.c index 2c52a5e..cf7e65c 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -192,6 +192,9 @@ gst_real_pad_init (GstRealPad *pad) pad->direction = GST_PAD_UNKNOWN; pad->peer = NULL; + pad->sched = NULL; + pad->sched_private = NULL; + pad->chainfunc = NULL; pad->getfunc = NULL; pad->getregionfunc = NULL; @@ -564,13 +567,11 @@ void gst_pad_connect (GstPad *srcpad, GstPad *sinkpad) { - if (!gst_pad_try_connect (srcpad, sinkpad)) -/* FIXME: g_critical is glib-2.0, not glib-1.2 - g_critical ("couldn't connect %s:%s and %s:%s", -*/ + if (!gst_pad_try_connect (srcpad, sinkpad)) { g_warning ("couldn't connect %s:%s and %s:%s", GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad)); + } } /** @@ -1451,18 +1452,33 @@ gst_pad_push (GstPad *pad, GstBuffer *buf) GST_DEBUG_ENTER ("(%s:%s)", GST_DEBUG_PAD_NAME (pad)); g_return_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC); + if (!peer) { - g_warning ("gst_pad_push but %s:%s is unconnected", GST_DEBUG_PAD_NAME (pad)); - return; + g_warning ("push on pad %s:%s but it is unconnected", GST_DEBUG_PAD_NAME (pad)); } - - if (peer->chainhandler) { - GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n", - GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer))); - (peer->chainhandler) (GST_PAD_FAST (peer), buf); - } else { - g_warning ("gst_pad_push but %s:%s has but no chainhandler", GST_DEBUG_PAD_NAME (peer)); + if (peer->chainhandler) { + if (buf) { + GST_DEBUG (GST_CAT_DATAFLOW, "calling chainhandler &%s of peer pad %s:%s\n", + GST_DEBUG_FUNCPTR_NAME (peer->chainhandler), GST_DEBUG_PAD_NAME (GST_PAD (peer))); + (peer->chainhandler) (GST_PAD_CAST (peer), buf); + return; + } + else { + g_warning ("trying to push a NULL buffer on pad %s:%s", GST_DEBUG_PAD_NAME (peer)); + return; + } + } + else { + g_warning ("(internal error) push on pad %s:%s but it has no chainhandler", GST_DEBUG_PAD_NAME (peer)); + } + } + /* clean up the mess here */ + if (buf != NULL) { + if (GST_IS_BUFFER (buf)) + gst_buffer_unref (buf); + else + gst_pad_event_default (pad, GST_EVENT (buf)); } } #endif @@ -1486,18 +1502,33 @@ gst_pad_pull (GstPad *pad) g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SINK, NULL); if (!peer) { - g_warning ("gst_pad_pull but %s:%s is unconnected", GST_DEBUG_PAD_NAME(pad)); - return NULL; + gst_element_error (GST_PAD_PARENT (pad), + "pull on pad %s:%s but it was unconnected", + GST_ELEMENT_NAME (GST_PAD_PARENT (pad)), GST_PAD_NAME (pad), + NULL); } - - if (peer->gethandler) { - GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n", - GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer)); - return (peer->gethandler) (GST_PAD_FAST (peer)); - } else { - g_warning ("gst_pad_pull but %s:%s has no gethandler", GST_DEBUG_PAD_NAME (peer)); - return NULL; + else { + if (peer->gethandler) { + GstBuffer *buf; + + GST_DEBUG (GST_CAT_DATAFLOW, "calling gethandler %s of peer pad %s:%s\n", + GST_DEBUG_FUNCPTR_NAME (peer->gethandler), GST_DEBUG_PAD_NAME (peer)); + + buf = (peer->gethandler) (GST_PAD_CAST (peer)); + if (buf) + return buf; + /* no null buffers allowed */ + gst_element_error (GST_PAD_PARENT (pad), + "NULL buffer during pull on %s:%s", GST_DEBUG_PAD_NAME (pad), NULL); + + } else { + gst_element_error (GST_PAD_PARENT (pad), + "(internal error) pull on pad %s:%s but the peer pad %s:%s has no gethandler", + GST_DEBUG_PAD_NAME (pad), GST_DEBUG_PAD_NAME (peer), + NULL); + } } + return NULL; } #endif @@ -1535,8 +1566,8 @@ gst_pad_pullregion (GstPad *pad, GstRegionType type, guint64 offset, guint64 len if (peer->pullregionfunc) { GST_DEBUG (GST_CAT_DATAFLOW, "calling pullregionfunc &%s of peer pad %s:%s\n", - GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_FAST (peer))); - result = (peer->pullregionfunc) (GST_PAD_FAST (peer), type, offset, len); + GST_DEBUG_FUNCPTR_NAME (peer->pullregionfunc), GST_DEBUG_PAD_NAME(GST_PAD_CAST (peer))); + result = (peer->pullregionfunc) (GST_PAD_CAST (peer), type, offset, len); } else { GST_DEBUG (GST_CAT_DATAFLOW,"no pullregionfunc\n"); result = NULL; @@ -1986,6 +2017,8 @@ gst_pad_event_default (GstPad *pad, GstEvent *event) pads = g_list_next (pads); } } + /* we have to try to schedule another element because this one is deisabled */ + gst_element_yield (element); break; default: g_warning ("no default handler for event\n"); diff --git a/gst/gstpad.h b/gst/gstpad.h index d2f6653..aa08ce6 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -48,8 +48,8 @@ extern GType _gst_ghost_pad_type; */ #define GST_TYPE_PAD (_gst_pad_type) -#define GST_PAD_FAST(obj) ((GstPad*)(obj)) -#define GST_PAD_CLASS_FAST(klass) ((GstPadClass*)(klass)) +#define GST_PAD_CAST(obj) ((GstPad*)(obj)) +#define GST_PAD_CLASS_CAST(klass) ((GstPadClass*)(klass)) #define GST_IS_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_PAD)) #define GST_IS_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD || \ G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD) @@ -59,8 +59,8 @@ extern GType _gst_ghost_pad_type; # define GST_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_PAD, GstPad)) # define GST_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_PAD, GstPadClass)) #else -# define GST_PAD GST_PAD_FAST -# define GST_PAD_CLASS GST_PAD_CLASS_FAST +# define GST_PAD GST_PAD_CAST +# define GST_PAD_CLASS GST_PAD_CLASS_CAST #endif /* @@ -68,8 +68,8 @@ extern GType _gst_ghost_pad_type; */ #define GST_TYPE_REAL_PAD (_gst_real_pad_type) -#define GST_REAL_PAD_FAST(obj) ((GstRealPad*)(obj)) -#define GST_REAL_PAD_CLASS_FAST(klass) ((GstRealPadClass*)(klass)) +#define GST_REAL_PAD_CAST(obj) ((GstRealPad*)(obj)) +#define GST_REAL_PAD_CLASS_CAST(klass) ((GstRealPadClass*)(klass)) #define GST_IS_REAL_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_REAL_PAD)) #define GST_IS_REAL_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_REAL_PAD) #define GST_IS_REAL_PAD_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_REAL_PAD)) @@ -78,8 +78,8 @@ extern GType _gst_ghost_pad_type; # define GST_REAL_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_REAL_PAD, GstRealPad)) # define GST_REAL_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_REAL_PAD, GstRealPadClass)) #else -# define GST_REAL_PAD GST_REAL_PAD_FAST -# define GST_REAL_PAD_CLASS GST_REAL_PAD_CLASS_FAST +# define GST_REAL_PAD GST_REAL_PAD_CAST +# define GST_REAL_PAD_CLASS GST_REAL_PAD_CLASS_CAST #endif /* @@ -87,8 +87,8 @@ extern GType _gst_ghost_pad_type; */ #define GST_TYPE_GHOST_PAD (_gst_ghost_pad_type) -#define GST_GHOST_PAD_FAST(obj) ((GstGhostPad*)(obj)) -#define GST_GHOST_PAD_CLASS_FAST(klass) ((GstGhostPadClass*)(klass)) +#define GST_GHOST_PAD_CAST(obj) ((GstGhostPad*)(obj)) +#define GST_GHOST_PAD_CLASS_CAST(klass) ((GstGhostPadClass*)(klass)) #define GST_IS_GHOST_PAD(obj) (G_TYPE_CHECK_INSTANCE_TYPE ((obj), GST_TYPE_GHOST_PAD)) #define GST_IS_GHOST_PAD_FAST(obj) (G_OBJECT_TYPE(obj) == GST_TYPE_GHOST_PAD) #define GST_IS_GHOST_PAD_CLASS(obj) (G_TYPE_CHECK_CLASS_TYPE ((klass), GST_TYPE_GHOST_PAD)) @@ -97,8 +97,8 @@ extern GType _gst_ghost_pad_type; # define GST_GHOST_PAD(obj) (G_TYPE_CHECK_INSTANCE_CAST ((obj), GST_TYPE_GHOST_PAD, GstGhostPad)) # define GST_GHOST_PAD_CLASS(klass) (G_TYPE_CHECK_CLASS_CAST ((klass), GST_TYPE_GHOST_PAD, GstGhostPadClass)) #else -# define GST_GHOST_PAD GST_GHOST_PAD_FAST -# define GST_GHOST_PAD_CLASS GST_GHOST_PAD_CLASS_FAST +# define GST_GHOST_PAD GST_GHOST_PAD_CAST +# define GST_GHOST_PAD_CLASS GST_GHOST_PAD_CLASS_CAST #endif @@ -169,7 +169,8 @@ struct _GstRealPad { GstCaps *caps; GstPadDirection direction; - cothread_state *threadstate; + GstScheduler *sched; + gpointer sched_private; GstRealPad *peer; @@ -178,8 +179,6 @@ struct _GstRealPad { guint64 offset; guint64 len; - GstScheduler *sched; - GstPadChainFunction chainfunc; GstPadChainFunction chainhandler; GstPadGetFunction getfunc; @@ -257,7 +256,7 @@ struct _GstGhostPadClass { #define GST_GPAD_REALPAD(pad) (((GstGhostPad *)(pad))->realpad) /* Generic */ -#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad)) +#define GST_PAD_REALIZE(pad) (GST_IS_REAL_PAD(pad) ? ((GstRealPad *)(pad)) : GST_GPAD_REALPAD(pad)) #define GST_PAD_DIRECTION(pad) GST_RPAD_DIRECTION(GST_PAD_REALIZE(pad)) #define GST_PAD_CAPS(pad) GST_RPAD_CAPS(GST_PAD_REALIZE(pad)) #define GST_PAD_PEER(pad) GST_RPAD_PEER(GST_PAD_REALIZE(pad)) diff --git a/gst/gstparse.c b/gst/gstparse.c index 26ec647..49fc423 100644 --- a/gst/gstparse.c +++ b/gst/gstparse.c @@ -104,6 +104,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri GList *pads; gint elementcount = 0; gint retval = 0; + gboolean backref = FALSE; priv->binlevel++; @@ -174,13 +175,14 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri GstElement *new; GST_DEBUG (0, "have pad for element %s\n", element_name); - new = gst_bin_get_by_name (parent, element_name); + new = gst_bin_get_by_name_recurse_up (parent, element_name); if (!new) { GST_DEBUG (0, "element %s does not exist! trying to continue\n", element_name); } else { previous = new; srcpadname = ptr + 1; + backref = TRUE; } } @@ -321,6 +323,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri i++; continue; } + gst_bin_add (GST_BIN (parent), element); j = gst_parse_launch_cmdline (argc - i, argv + i + 1, GST_BIN (element), priv); /* check for parse error */ @@ -347,9 +350,9 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri return GST_PARSE_ERROR_NOSUCH_ELEMENT; } GST_DEBUG (0, "CREATED element %s\n", GST_ELEMENT_NAME (element)); + gst_bin_add (GST_BIN (parent), element); } - gst_bin_add (GST_BIN (parent), element); elementcount++; g_slist_free (sinkpads); @@ -412,7 +415,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri else GST_DEBUG (0, "have sink pad %s:%s\n", GST_DEBUG_PAD_NAME (GST_PARSE_LISTPAD (sinkpads))); - if (!srcpads && sinkpads && previous) { + if (!srcpads && sinkpads && previous && srcpadname) { dyn_connect *connect = g_malloc (sizeof (dyn_connect)); connect->srcpadname = srcpadname; @@ -442,7 +445,7 @@ gst_parse_launch_cmdline (int argc, char *argv[], GstBin * parent, gst_parse_pri sinkpads = NULL; /* if we're the first element, ghost all the sinkpads */ - if (elementcount == 1) { + if (elementcount == 1 && !backref) { DEBUG ("first element, ghosting all of %s's sink pads to parent %s\n", GST_ELEMENT_NAME (element), GST_ELEMENT_NAME (GST_ELEMENT (parent))); pads = gst_element_get_pad_list (element); diff --git a/gst/gstqueue.c b/gst/gstqueue.c index 4375cec..0a00fb8 100644 --- a/gst/gstqueue.c +++ b/gst/gstqueue.c @@ -65,11 +65,13 @@ enum { ARG_LEAKY, ARG_LEVEL, ARG_MAX_LEVEL, + ARG_MAY_DEADLOCK, }; static void gst_queue_class_init (GstQueueClass *klass); static void gst_queue_init (GstQueue *queue); +static void gst_queue_dispose (GObject *object); static void gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); @@ -141,18 +143,22 @@ gst_queue_class_init (GstQueueClass *klass) parent_class = g_type_class_ref (GST_TYPE_ELEMENT); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY, - g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.", - GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE)); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL, - g_param_spec_int("level","Level","How many buffers are in the queue.", - 0,G_MAXINT,0,G_PARAM_READABLE)); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL, - g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.", - 0,G_MAXINT,100,G_PARAM_READWRITE)); - - gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY, + g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.", + GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL, + g_param_spec_int ("level", "Level", "How many buffers are in the queue.", + 0, G_MAXINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL, + g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", + 0, G_MAXINT, 100, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK, + g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", + TRUE, G_PARAM_READWRITE)); + + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state); } @@ -182,6 +188,7 @@ gst_queue_init (GstQueue *queue) queue->size_buffers = 100; /* 100 buffers */ queue->size_bytes = 100 * 1024; /* 100KB */ queue->size_time = 1000000000LL; /* 1sec */ + queue->may_deadlock = TRUE; queue->qlock = g_mutex_new (); queue->reader = FALSE; @@ -191,6 +198,18 @@ gst_queue_init (GstQueue *queue) GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); } +static void +gst_queue_dispose (GObject *object) +{ + GstQueue *queue = GST_QUEUE (object); + + g_mutex_free (queue->qlock); + g_cond_free (queue->not_empty); + g_cond_free (queue->not_full); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad) { @@ -334,10 +353,21 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - cothread_switch(cothread_current_main()); + gst_element_interrupt (GST_ELEMENT (queue)); goto restart; } - g_assert (GST_STATE (queue) == GST_STATE_PLAYING); + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + /* try to signal to resolve the error */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); + return; + } + else { + gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements"); + } + } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); if (queue->writer) @@ -402,10 +432,20 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - cothread_switch(cothread_current_main()); + gst_element_interrupt (GST_ELEMENT (queue)); goto restart; } - g_assert (GST_STATE (queue) == GST_STATE_PLAYING); + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); + return NULL; + } + else { + gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements"); + } + } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); if (queue->reader) @@ -513,10 +553,13 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa switch (prop_id) { case ARG_LEAKY: - queue->leaky = g_value_get_int(value); + queue->leaky = g_value_get_int (value); break; case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int(value); + queue->size_buffers = g_value_get_int (value); + break; + case ARG_MAY_DEADLOCK: + queue->may_deadlock = g_value_get_boolean (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -536,13 +579,16 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe switch (prop_id) { case ARG_LEAKY: - g_value_set_int(value, queue->leaky); + g_value_set_int (value, queue->leaky); break; case ARG_LEVEL: - g_value_set_int(value, queue->level_buffers); + g_value_set_int (value, queue->level_buffers); break; case ARG_MAX_LEVEL: - g_value_set_int(value, queue->size_buffers); + g_value_set_int (value, queue->size_buffers); + break; + case ARG_MAY_DEADLOCK: + g_value_set_boolean (value, queue->may_deadlock); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/gst/gstqueue.h b/gst/gstqueue.h index fb091e1..3af0ef6 100644 --- a/gst/gstqueue.h +++ b/gst/gstqueue.h @@ -74,6 +74,7 @@ struct _GstQueue { guint64 size_time; /* size of queue in time */ gint leaky; /* whether the queue is leaky, and if so at which end */ + gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ GMutex *qlock; /* lock for queue (vs object lock) */ /* we are single reader and single writer queue */ diff --git a/gst/gstscheduler.c b/gst/gstscheduler.c index e49ca5c..cfa59c8 100644 --- a/gst/gstscheduler.c +++ b/gst/gstscheduler.c @@ -76,6 +76,8 @@ gst_scheduler_init (GstScheduler *sched) void gst_scheduler_setup (GstScheduler *sched) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + if (CLASS (sched)->setup) CLASS (sched)->setup (sched); } @@ -89,6 +91,8 @@ gst_scheduler_setup (GstScheduler *sched) void gst_scheduler_reset (GstScheduler *sched) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + if (CLASS (sched)->reset) CLASS (sched)->reset (sched); } @@ -104,6 +108,10 @@ gst_scheduler_reset (GstScheduler *sched) void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_PAD (srcpad)); + g_return_if_fail (GST_IS_PAD (sinkpad)); + if (CLASS (sched)->pad_connect) CLASS (sched)->pad_connect (sched, srcpad, sinkpad); } @@ -119,6 +127,10 @@ gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad) void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_PAD (srcpad)); + g_return_if_fail (GST_IS_PAD (sinkpad)); + if (CLASS (sched)->pad_disconnect) CLASS (sched)->pad_disconnect (sched, srcpad, sinkpad); } @@ -135,6 +147,9 @@ gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkp GstPad * gst_scheduler_pad_select (GstScheduler *sched, GList *padlist) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (padlist != NULL); + if (CLASS (sched)->pad_select) CLASS (sched)->pad_select (sched, padlist); } @@ -149,6 +164,9 @@ gst_scheduler_pad_select (GstScheduler *sched, GList *padlist) void gst_scheduler_add_element (GstScheduler *sched, GstElement *element) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + if (CLASS (sched)->add_element) CLASS (sched)->add_element (sched, element); } @@ -161,11 +179,16 @@ gst_scheduler_add_element (GstScheduler *sched, GstElement *element) * * Tell the scheduler that an element changed its state. */ -void +GstElementStateReturn gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition) { + g_return_val_if_fail (GST_IS_SCHEDULER (sched), GST_STATE_FAILURE); + g_return_val_if_fail (GST_IS_ELEMENT (element), GST_STATE_FAILURE); + if (CLASS (sched)->state_transition) - CLASS (sched)->state_transition (sched, element, transition); + return CLASS (sched)->state_transition (sched, element, transition); + + return GST_STATE_SUCCESS; } /** @@ -178,6 +201,9 @@ gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint t void gst_scheduler_remove_element (GstScheduler *sched, GstElement *element) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + if (CLASS (sched)->remove_element) CLASS (sched)->remove_element (sched, element); } @@ -192,6 +218,9 @@ gst_scheduler_remove_element (GstScheduler *sched, GstElement *element) void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + if (CLASS (sched)->lock_element) CLASS (sched)->lock_element (sched, element); } @@ -206,11 +235,65 @@ gst_scheduler_lock_element (GstScheduler *sched, GstElement *element) void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + if (CLASS (sched)->unlock_element) CLASS (sched)->unlock_element (sched, element); } /** + * gst_scheduler_error: + * @sched: the schedulerr + * @element: the element with the error + * + * Tell the scheduler an element was in error + */ +void +gst_scheduler_error (GstScheduler *sched, GstElement *element) +{ + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + + if (CLASS (sched)->error) + CLASS (sched)->error (sched, element); +} + +/** + * gst_scheduler_yield: + * @sched: the schedulerr + * @element: the element requesting a yield + * + * Tell the scheduler to schedule another element. + */ +void +gst_scheduler_yield (GstScheduler *sched, GstElement *element) +{ + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + + if (CLASS (sched)->yield) + CLASS (sched)->yield (sched, element); +} + +/** + * gst_scheduler_interrupt: + * @sched: the schedulerr + * @element: the element requesting an interrupt + * + * Tell the scheduler to interrupt execution of this element. + */ +void +gst_scheduler_interrupt (GstScheduler *sched, GstElement *element) +{ + g_return_if_fail (GST_IS_SCHEDULER (sched)); + g_return_if_fail (GST_IS_ELEMENT (element)); + + if (CLASS (sched)->interrupt) + CLASS (sched)->interrupt (sched, element); +} + +/** * gst_scheduler_iterate: * @sched: the schedulerr * @@ -221,6 +304,8 @@ gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element) gboolean gst_scheduler_iterate (GstScheduler *sched) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + if (CLASS (sched)->iterate) CLASS (sched)->iterate (sched); } @@ -235,6 +320,8 @@ gst_scheduler_iterate (GstScheduler *sched) void gst_scheduler_show (GstScheduler *sched) { + g_return_if_fail (GST_IS_SCHEDULER (sched)); + if (CLASS (sched)->show) CLASS (sched)->show (sched); } diff --git a/gst/gstscheduler.h b/gst/gstscheduler.h index 1b2dac5..31e5b5a 100644 --- a/gst/gstscheduler.h +++ b/gst/gstscheduler.h @@ -54,31 +54,31 @@ struct _GstScheduler { GstObject object; GstElement *parent; - - GList *elements; - gint num_elements; - - GList *chains; - gint num_chains; }; struct _GstSchedulerClass { GstObjectClass parent_class; + /* virtual methods */ void (*setup) (GstScheduler *sched); void (*reset) (GstScheduler *sched); void (*add_element) (GstScheduler *sched, GstElement *element); - void (*remove_element) (GstScheduler *sched, GstElement *element); - void (*state_transition) (GstScheduler *sched, GstElement *element, gint transition); + void (*remove_element) (GstScheduler *sched, GstElement *element); + GstElementStateReturn + (*state_transition) (GstScheduler *sched, GstElement *element, gint transition); void (*lock_element) (GstScheduler *sched, GstElement *element); void (*unlock_element) (GstScheduler *sched, GstElement *element); + void (*yield) (GstScheduler *sched, GstElement *element); + void (*interrupt) (GstScheduler *sched, GstElement *element); + void (*error) (GstScheduler *sched, GstElement *element); void (*pad_connect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void (*pad_disconnect) (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void (*pad_select) (GstScheduler *sched, GList *padlist); gboolean (*iterate) (GstScheduler *sched); - /* for debugging */ void (*show) (GstScheduler *sched); + + /* signals go here */ }; GType gst_scheduler_get_type (void); @@ -89,9 +89,12 @@ void gst_scheduler_setup (GstScheduler *sched); void gst_scheduler_reset (GstScheduler *sched); void gst_scheduler_add_element (GstScheduler *sched, GstElement *element); void gst_scheduler_remove_element (GstScheduler *sched, GstElement *element); -void gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition); +GstElementStateReturn gst_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition); void gst_scheduler_lock_element (GstScheduler *sched, GstElement *element); void gst_scheduler_unlock_element (GstScheduler *sched, GstElement *element); +void gst_scheduler_yield (GstScheduler *sched, GstElement *element); +void gst_scheduler_interrupt (GstScheduler *sched, GstElement *element); +void gst_scheduler_error (GstScheduler *sched, GstElement *element); void gst_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); void gst_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); GstPad* gst_scheduler_pad_select (GstScheduler *sched, GList *padlist); diff --git a/gst/gstthread.c b/gst/gstthread.c index e76b8d5..321fe62 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -147,8 +147,6 @@ gst_thread_init (GstThread *thread) thread->ppid = getpid(); thread->thread_id = -1; - -/* gst_element_set_manager(GST_ELEMENT(thread),GST_ELEMENT(thread)); */ } static void @@ -303,7 +301,8 @@ gst_thread_change_state (GstElement * element) break; case GST_STATE_PLAYING_TO_PAUSED: { - GList *elements = (element->sched)->elements; + //GList *elements = (element->sched)->elements; + GList *elements = gst_bin_get_list(GST_BIN (thread)); THR_INFO ("pausing thread"); @@ -313,7 +312,7 @@ gst_thread_change_state (GstElement * element) * + the pending state was already set by gstelement.c::set_state() * + find every queue we manage, and signal its empty and full conditions */ - g_mutex_lock (thread->lock); + g_mutex_lock (thread->lock); GST_FLAG_UNSET (thread, GST_THREAD_STATE_SPINNING); diff --git a/gst/gsttypefind.c b/gst/gsttypefind.c index 27032f0..dda6135 100644 --- a/gst/gsttypefind.c +++ b/gst/gsttypefind.c @@ -197,7 +197,7 @@ gst_typefind_chain (GstPad *pad, GstBuffer *buf) if (GST_STATE(typefind) != oldstate) { gst_object_unref (GST_OBJECT (typefind)); GST_DEBUG(0, "state changed during signal, aborting\n"); - cothread_switch(cothread_current_main()); + gst_element_yield (typefind); } gst_object_unref (GST_OBJECT (typefind)); } diff --git a/gst/schedulers/gstbasicscheduler.c b/gst/schedulers/gstbasicscheduler.c index 02f3848..be83954 100644 --- a/gst/schedulers/gstbasicscheduler.c +++ b/gst/schedulers/gstbasicscheduler.c @@ -25,8 +25,18 @@ typedef struct _GstSchedulerChain GstSchedulerChain; +#define GST_PAD_THREADSTATE(pad) (cothread_state*) (GST_PAD_CAST (pad)->sched_private) +#define GST_ELEMENT_THREADSTATE(elem) (cothread_state*) (GST_ELEMENT_CAST (elem)->sched_private) +#define GST_BIN_THREADCONTEXT(bin) (cothread_context*) (GST_BIN_CAST (bin)->sched_private) + +#define GST_ELEMENT_COTHREAD_STOPPING GST_ELEMENT_SCHEDULER_PRIVATE1 +#define GST_ELEMENT_IS_COTHREAD_STOPPING(element) GST_FLAG_IS_SET((element), GST_ELEMENT_COTHREAD_STOPPING) + +typedef struct _GstBasicScheduler GstBasicScheduler; +typedef struct _GstBasicSchedulerClass GstBasicSchedulerClass; + struct _GstSchedulerChain { - GstScheduler *sched; + GstBasicScheduler *sched; GList *disabled; @@ -39,10 +49,37 @@ struct _GstSchedulerChain { gboolean schedule; }; +#define GST_TYPE_BASIC_SCHEDULER \ + (gst_basic_scheduler_get_type()) +#define GST_BASIC_SCHEDULER(obj) \ + (G_TYPE_CHECK_INSTANCE_CAST((obj),GST_TYPE_BASIC_SCHEDULER,GstBasicScheduler)) +#define GST_BASIC_SCHEDULER_CLASS(klass) \ + (G_TYPE_CHECK_CLASS_CAST((klass),GST_TYPE_BASIC_SCHEDULER,GstBasicSchedulerClass)) +#define GST_IS_BASIC_SCHEDULER(obj) \ + (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_BASIC_SCHEDULER)) +#define GST_IS_BASIC_SCHEDULER_CLASS(obj) \ + (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_BASIC_SCHEDULER)) + +#define GST_BASIC_SCHEDULER_CAST(sched) ((GstBasicScheduler *)(sched)) + +struct _GstBasicScheduler { + GstScheduler parent; + + GList *elements; + gint num_elements; + + GList *chains; + gint num_chains; +}; + +struct _GstBasicSchedulerClass { + GstSchedulerClass parent_class; +}; + static GType _gst_basic_scheduler_type = 0; -static void gst_basic_scheduler_class_init (GstSchedulerClass * klass); -static void gst_basic_scheduler_init (GstScheduler * scheduler); +static void gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass); +static void gst_basic_scheduler_init (GstBasicScheduler * scheduler); static void gst_basic_scheduler_dispose (GObject *object); @@ -50,9 +87,13 @@ static void gst_basic_scheduler_setup (GstScheduler *sched); static void gst_basic_scheduler_reset (GstScheduler *sched); static void gst_basic_scheduler_add_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_remove_element (GstScheduler *sched, GstElement *element); -static void gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition); +static GstElementStateReturn + gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition); static void gst_basic_scheduler_lock_element (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_unlock_element (GstScheduler *sched, GstElement *element); +static void gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element); +static void gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element); +static void gst_basic_scheduler_error (GstScheduler *sched, GstElement *element); static void gst_basic_scheduler_pad_connect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); static void gst_basic_scheduler_pad_disconnect (GstScheduler *sched, GstPad *srcpad, GstPad *sinkpad); static GstPad* gst_basic_scheduler_pad_select (GstScheduler *sched, GList *padlist); @@ -67,13 +108,13 @@ gst_basic_scheduler_get_type (void) { if (!_gst_basic_scheduler_type) { static const GTypeInfo scheduler_info = { - sizeof (GstSchedulerClass), + sizeof (GstBasicSchedulerClass), NULL, NULL, (GClassInitFunc) gst_basic_scheduler_class_init, NULL, NULL, - sizeof (GstScheduler), + sizeof (GstBasicScheduler), 0, (GInstanceInitFunc) gst_basic_scheduler_init, NULL @@ -85,40 +126,48 @@ gst_basic_scheduler_get_type (void) } static void -gst_basic_scheduler_class_init (GstSchedulerClass * klass) +gst_basic_scheduler_class_init (GstBasicSchedulerClass * klass) { GObjectClass *gobject_class; GstObjectClass *gstobject_class; + GstSchedulerClass *gstscheduler_class; gobject_class = (GObjectClass*)klass; gstobject_class = (GstObjectClass*)klass; + gstscheduler_class = (GstSchedulerClass*)klass; parent_class = g_type_class_ref (GST_TYPE_SCHEDULER); gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_basic_scheduler_dispose); - klass->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup); - klass->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset); - klass->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element); - klass->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element); - klass->state_transition = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition); - klass->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element); - klass->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element); - klass->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect); - klass->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect); - klass->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select); - klass->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate); + gstscheduler_class->setup = GST_DEBUG_FUNCPTR (gst_basic_scheduler_setup); + gstscheduler_class->reset = GST_DEBUG_FUNCPTR (gst_basic_scheduler_reset); + gstscheduler_class->add_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_add_element); + gstscheduler_class->remove_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_remove_element); + gstscheduler_class->state_transition = GST_DEBUG_FUNCPTR (gst_basic_scheduler_state_transition); + gstscheduler_class->lock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_lock_element); + gstscheduler_class->unlock_element = GST_DEBUG_FUNCPTR (gst_basic_scheduler_unlock_element); + gstscheduler_class->yield = GST_DEBUG_FUNCPTR (gst_basic_scheduler_yield); + gstscheduler_class->interrupt = GST_DEBUG_FUNCPTR (gst_basic_scheduler_interrupt); + gstscheduler_class->error = GST_DEBUG_FUNCPTR (gst_basic_scheduler_error); + gstscheduler_class->pad_connect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_connect); + gstscheduler_class->pad_disconnect = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_disconnect); + gstscheduler_class->pad_select = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pad_select); + gstscheduler_class->iterate = GST_DEBUG_FUNCPTR (gst_basic_scheduler_iterate); } static void -gst_basic_scheduler_init (GstScheduler *scheduler) +gst_basic_scheduler_init (GstBasicScheduler *scheduler) { + scheduler->elements = NULL; + scheduler->num_elements = 0; + scheduler->chains = NULL; + scheduler->num_chains = 0; } static void gst_basic_scheduler_dispose (GObject *object) { - G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -152,7 +201,7 @@ GstPluginDesc plugin_desc = { static int gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[]) { - GstElement *element = GST_ELEMENT (argv); + GstElement *element = GST_ELEMENT_CAST (argv); G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element); GST_DEBUG_ENTER ("(%d,'%s')", argc, name); @@ -162,6 +211,7 @@ gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[]) GST_DEBUG_FUNCPTR_NAME (element->loopfunc), name); (element->loopfunc) (element); GST_DEBUG (GST_CAT_DATAFLOW, "element %s ended loop function\n", name); + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); GST_FLAG_UNSET (element, GST_ELEMENT_COTHREAD_STOPPING); @@ -172,7 +222,7 @@ gst_basic_scheduler_loopfunc_wrapper (int argc, char *argv[]) static int gst_basic_scheduler_chain_wrapper (int argc, char *argv[]) { - GstElement *element = GST_ELEMENT (argv); + GstElement *element = GST_ELEMENT_CAST (argv); G_GNUC_UNUSED const gchar *name = GST_ELEMENT_NAME (element); GST_DEBUG_ENTER ("(\"%s\")", name); @@ -189,7 +239,9 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[]) pads = g_list_next (pads); if (!GST_IS_REAL_PAD (pad)) continue; - realpad = GST_REAL_PAD (pad); + + realpad = GST_REAL_PAD_CAST (pad); + if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SINK) { GstBuffer *buf; @@ -204,10 +256,14 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[]) GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s\n", name, GST_PAD_NAME (pad)); GST_RPAD_CHAINFUNC (realpad) (pad, buf); + GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name, + GST_PAD_NAME (pad)); } } - GST_DEBUG (GST_CAT_DATAFLOW, "calling chain function of %s:%s done\n", name, - GST_PAD_NAME (pad)); + else { + gst_element_error (element, "NULL buffer detected. Is \"%s:%s\" connected?", + name, GST_PAD_NAME (pad), NULL); + } } } } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); @@ -220,7 +276,7 @@ gst_basic_scheduler_chain_wrapper (int argc, char *argv[]) static int gst_basic_scheduler_src_wrapper (int argc, char *argv[]) { - GstElement *element = GST_ELEMENT (argv); + GstElement *element = GST_ELEMENT_CAST (argv); GList *pads; GstRealPad *realpad; GstBuffer *buf = NULL; @@ -231,9 +287,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[]) do { pads = element->pads; while (pads) { + if (!GST_IS_REAL_PAD (pads->data)) continue; - realpad = (GstRealPad *) (pads->data); + + realpad = GST_REAL_PAD_CAST (pads->data); + pads = g_list_next (pads); if (GST_RPAD_DIRECTION (realpad) == GST_PAD_SRC) { GST_DEBUG (GST_CAT_DATAFLOW, "calling _getfunc for %s:%s\n", GST_DEBUG_PAD_NAME (realpad)); @@ -243,7 +302,7 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[]) /* fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); */ /* else */ buf = - (GST_RPAD_GETREGIONFUNC (realpad)) ((GstPad *) realpad, realpad->regiontype, + (GST_RPAD_GETREGIONFUNC (realpad)) (GST_PAD_CAST (realpad), realpad->regiontype, realpad->offset, realpad->len); realpad->regiontype = GST_REGION_VOID; } @@ -252,12 +311,12 @@ gst_basic_scheduler_src_wrapper (int argc, char *argv[]) /* if (GST_RPAD_GETFUNC(realpad) == NULL) */ /* fprintf(stderr,"error, no getfunc in \"%s\"\n", name); */ /* else */ - buf = GST_RPAD_GETFUNC (realpad) ((GstPad *) realpad); + buf = GST_RPAD_GETFUNC (realpad) (GST_PAD_CAST (realpad)); } GST_DEBUG (GST_CAT_DATAFLOW, "calling gst_pad_push on pad %s:%s\n", GST_DEBUG_PAD_NAME (realpad)); - gst_pad_push ((GstPad *) realpad, buf); + gst_pad_push (GST_PAD_CAST (realpad), buf); } } } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); @@ -281,8 +340,8 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) */ while (GST_RPAD_BUFPEN (pad) != NULL) { GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to empty bufpen\n", - GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); - cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); + GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); /* we may no longer be the same pad, check. */ if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) { @@ -295,8 +354,8 @@ gst_basic_scheduler_chainhandler_proxy (GstPad * pad, GstBuffer * buf) /* now fill the bufferpen and switch so it can be consumed */ GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf; GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n", - GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); - cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); + GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n"); } @@ -314,12 +373,12 @@ gst_basic_scheduler_select_proxy (GstPad * pad, GstBuffer * buf) /* now fill the bufferpen and switch so it can be consumed */ GST_RPAD_BUFPEN (GST_RPAD_PEER (pad)) = buf; GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p\n", - GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); + GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad)))); GST_ELEMENT (GST_PAD_PARENT (pad))->select_pad = pad; GST_FLAG_UNSET (GST_PAD_PARENT (pad), GST_ELEMENT_COTHREAD_STOPPING); - cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); + cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); g_print ("done switching\n"); GST_DEBUG (GST_CAT_DATAFLOW, "done switching\n"); @@ -339,8 +398,8 @@ gst_basic_scheduler_gethandler_proxy (GstPad * pad) while (GST_RPAD_BUFPEN (pad) == NULL) { GST_DEBUG (GST_CAT_DATAFLOW, "switching to \"%s\": %p to fill bufpen\n", GST_ELEMENT_NAME (GST_ELEMENT (GST_PAD_PARENT (pad))), - GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); - cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); + GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); /* we may no longer be the same pad, check. */ if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) { @@ -374,8 +433,8 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin /* we will loop switching to the peer until it's filled up the bufferpen */ while (GST_RPAD_BUFPEN (pad) == NULL) { GST_DEBUG (GST_CAT_DATAFLOW, "switching to %p to fill bufpen\n", - GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); - cothread_switch (GST_ELEMENT (GST_PAD_PARENT (pad))->threadstate); + GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); + cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (pad))); /* we may no longer be the same pad, check. */ if (GST_RPAD_PEER (peer) != (GstRealPad *) pad) { @@ -392,7 +451,7 @@ gst_basic_scheduler_pullregionfunc_proxy (GstPad * pad, GstRegionType type, guin } -static void +static gboolean gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain) { GList *elements; @@ -403,15 +462,19 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain) GST_DEBUG (GST_CAT_SCHEDULING, "chain is using COTHREADS\n"); - g_assert (bin->threadcontext != NULL); - + g_assert (GST_BIN_THREADCONTEXT (bin) != NULL); /* walk through all the chain's elements */ elements = chain->elements; while (elements) { - element = GST_ELEMENT (elements->data); + gboolean decoupled; + gint same_sched = 0; + + element = GST_ELEMENT_CAST (elements->data); elements = g_list_next (elements); + decoupled = (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED) ? TRUE : FALSE); + /* start out without a wrapper function, we select it later */ wrapper_function = NULL; @@ -423,7 +486,7 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain) else { /* otherwise we need to decide what kind of cothread */ /* if it's not DECOUPLED, we decide based on whether it's a source or not */ - if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + if (!decoupled) { /* if it doesn't have any sinks, it must be a source (duh) */ if (element->numsinkpads == 0) { wrapper_function = GST_DEBUG_FUNCPTR (gst_basic_scheduler_src_wrapper); @@ -441,16 +504,28 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain) /* now we have to walk through the pads to set up their state */ pads = gst_element_get_pad_list (element); while (pads) { + GstPad *peerpad; + pad = GST_PAD (pads->data); pads = g_list_next (pads); + if (!GST_IS_REAL_PAD (pad)) continue; + + peerpad = GST_PAD (GST_RPAD_PEER (pad)); /* if the element is DECOUPLED or outside the manager, we have to chain */ if ((wrapper_function == NULL) || - (GST_RPAD_PEER (pad) && - (GST_ELEMENT (GST_PAD_PARENT (GST_PAD (GST_RPAD_PEER (pad))))->sched != chain->sched)) - ) { + (peerpad && (GST_ELEMENT_CAST (GST_PAD_PARENT (peerpad))->sched != GST_SCHEDULER (chain->sched)))) { + + if (!decoupled && GST_RPAD_PEER (pad) && + !GST_FLAG_IS_SET (GST_PAD_PARENT (peerpad), GST_ELEMENT_DECOUPLED)) { + /* whoa non decoupled with different schedulers */ + gst_element_error (element, "element \"%s\" is not decoupled but has pads in different schedulers", + GST_ELEMENT_NAME (element), NULL); + return FALSE; + } + /* set the chain proxies */ if (GST_RPAD_DIRECTION (pad) == GST_PAD_SINK) { GST_DEBUG (GST_CAT_SCHEDULING, "copying chain function into push proxy for %s:%s\n", @@ -464,17 +539,17 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain) GST_RPAD_PULLREGIONFUNC (pad) = GST_RPAD_GETREGIONFUNC (pad); } - /* otherwise we really are a cothread */ } + /* otherwise we really are a cothread */ else { if (gst_pad_get_direction (pad) == GST_PAD_SINK) { GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded push proxy for sinkpad %s:%s\n", - GST_DEBUG_PAD_NAME (pad)); + GST_DEBUG_PAD_NAME (pad)); GST_RPAD_CHAINHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_chainhandler_proxy); } else { GST_DEBUG (GST_CAT_SCHEDULING, "setting cothreaded pull proxy for srcpad %s:%s\n", - GST_DEBUG_PAD_NAME (pad)); + GST_DEBUG_PAD_NAME (pad)); GST_RPAD_GETHANDLER (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_gethandler_proxy); GST_RPAD_PULLREGIONFUNC (pad) = GST_DEBUG_FUNCPTR (gst_basic_scheduler_pullregionfunc_proxy); } @@ -483,17 +558,24 @@ gst_basic_scheduler_cothreaded_chain (GstBin * bin, GstSchedulerChain * chain) /* need to set up the cothread now */ if (wrapper_function != NULL) { - if (element->threadstate == NULL) { - /* FIXME handle cothread_create returning NULL */ - element->threadstate = cothread_create (bin->threadcontext); - GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", element->threadstate, + if (GST_ELEMENT_THREADSTATE (element) == NULL) { + GST_ELEMENT_THREADSTATE (element) = cothread_create (GST_BIN_THREADCONTEXT (bin)); + if (GST_ELEMENT_THREADSTATE (element) == NULL) { + gst_element_error (element, "could not create cothread for \"%s\"", + GST_ELEMENT_NAME (element), NULL); + return FALSE; + } + GST_DEBUG (GST_CAT_SCHEDULING, "created cothread %p for '%s'\n", + GST_ELEMENT_THREADSTATE (element), GST_ELEMENT_NAME (element)); } - cothread_setfunc (element->threadstate, wrapper_function, 0, (char **) element); + cothread_setfunc (GST_ELEMENT_THREADSTATE (element), wrapper_function, 0, (char **) element); GST_DEBUG (GST_CAT_SCHEDULING, "set wrapper function for '%s' to &%s\n", GST_ELEMENT_NAME (element), GST_DEBUG_FUNCPTR_NAME (wrapper_function)); } } + + return TRUE; } /* @@ -533,7 +615,7 @@ gst_basic_scheduler_chained_chain (GstBin *bin, _GstBinChain *chain) { static GstSchedulerChain * -gst_basic_scheduler_chain_new (GstScheduler * sched) +gst_basic_scheduler_chain_new (GstBasicScheduler * sched) { GstSchedulerChain *chain = g_new (GstSchedulerChain, 1); @@ -559,7 +641,7 @@ gst_basic_scheduler_chain_new (GstScheduler * sched) static void gst_basic_scheduler_chain_destroy (GstSchedulerChain * chain) { - GstScheduler *sched = chain->sched; + GstBasicScheduler *sched = chain->sched; /* remove the chain from the schedulers' list of chains */ sched->chains = g_list_remove (sched->chains, chain); @@ -582,14 +664,14 @@ gst_basic_scheduler_chain_add_element (GstSchedulerChain * chain, GstElement * e chain); /* set the sched pointer for the element */ - element->sched = chain->sched; + element->sched = GST_SCHEDULER (chain->sched); /* add the element to the list of 'disabled' elements */ chain->disabled = g_list_prepend (chain->disabled, element); chain->num_elements++; } -static void +static gboolean gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement * element) { GST_INFO (GST_CAT_SCHEDULING, "enabling element \"%s\" in chain %p", GST_ELEMENT_NAME (element), @@ -602,7 +684,7 @@ gst_basic_scheduler_chain_enable_element (GstSchedulerChain * chain, GstElement chain->elements = g_list_prepend (chain->elements, element); /* reschedule the chain */ - gst_basic_scheduler_cothreaded_chain (GST_BIN (chain->sched->parent), chain); + return gst_basic_scheduler_cothreaded_chain (GST_BIN (GST_SCHEDULER (chain->sched)->parent), chain); } static void @@ -633,9 +715,9 @@ gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement gst_basic_scheduler_chain_disable_element (chain, element); } /* we have to check for a threadstate here because a queue doesn't have one */ - if (element->threadstate) { - cothread_free (element->threadstate); - element->threadstate = NULL; + if (GST_ELEMENT_THREADSTATE (element)) { + cothread_free (GST_ELEMENT_THREADSTATE (element)); + GST_ELEMENT_THREADSTATE (element) = NULL; } /* remove the element from the list of elements */ @@ -645,13 +727,10 @@ gst_basic_scheduler_chain_remove_element (GstSchedulerChain * chain, GstElement /* if there are no more elements in the chain, destroy the chain */ if (chain->num_elements == 0) gst_basic_scheduler_chain_destroy (chain); - - /* unset the sched pointer for the element */ - element->sched = NULL; } static void -gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1, GstElement * element2) +gst_basic_scheduler_chain_elements (GstBasicScheduler * sched, GstElement * element1, GstElement * element2) { GList *chains; GstSchedulerChain *chain; @@ -722,7 +801,7 @@ gst_basic_scheduler_chain_elements (GstScheduler * sched, GstElement * element1, /* find the chain within the scheduler that holds the element, if any */ static GstSchedulerChain * -gst_basic_scheduler_find_chain (GstScheduler * sched, GstElement * element) +gst_basic_scheduler_find_chain (GstBasicScheduler * sched, GstElement * element) { GList *chains; GstSchedulerChain *chain; @@ -788,9 +867,9 @@ gst_basic_scheduler_setup (GstScheduler *sched) GstBin *bin = GST_BIN (sched->parent); /* first create thread context */ - if (bin->threadcontext == NULL) { + if (GST_BIN_THREADCONTEXT (bin) == NULL) { GST_DEBUG (GST_CAT_SCHEDULING, "initializing cothread context\n"); - bin->threadcontext = cothread_context_init (); + GST_BIN_THREADCONTEXT (bin) = cothread_context_init (); } } @@ -799,18 +878,18 @@ gst_basic_scheduler_reset (GstScheduler *sched) { cothread_context *ctx; GstBin *bin = GST_BIN (GST_SCHED_PARENT (sched)); - GList *elements = sched->elements; + GList *elements = GST_BASIC_SCHEDULER_CAST (sched)->elements; while (elements) { - GST_ELEMENT (elements->data)->threadstate = NULL; + GST_ELEMENT_THREADSTATE (elements->data) = NULL; elements = g_list_next (elements); } - ctx = GST_BIN (GST_SCHED_PARENT (sched))->threadcontext; + ctx = GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)); cothread_context_free (ctx); - GST_BIN (GST_SCHED_PARENT (sched))->threadcontext = NULL; + GST_BIN_THREADCONTEXT (GST_SCHED_PARENT (sched)) = NULL; } static void @@ -820,9 +899,7 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element) GstPad *pad; GstElement *peerelement; GstSchedulerChain *chain; - - g_return_if_fail (element != NULL); - g_return_if_fail (GST_IS_ELEMENT (element)); + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); /* if it's already in this scheduler, don't bother doing anything */ if (GST_ELEMENT_SCHED (element) == sched) @@ -844,11 +921,11 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element) return; /* first add it to the list of elements that are to be scheduled */ - sched->elements = g_list_prepend (sched->elements, element); - sched->num_elements++; + bsched->elements = g_list_prepend (bsched->elements, element); + bsched->num_elements++; /* create a chain to hold it, and add */ - chain = gst_basic_scheduler_chain_new (sched); + chain = gst_basic_scheduler_chain_new (bsched); gst_basic_scheduler_chain_add_element (chain, element); /* set the sched pointer in all the pads */ @@ -870,7 +947,7 @@ gst_basic_scheduler_add_element (GstScheduler * sched, GstElement * element) if (GST_ELEMENT_SCHED (element) == GST_ELEMENT_SCHED (peerelement)) { GST_INFO (GST_CAT_SCHEDULING, "peer is in same scheduler, chaining together"); /* make sure that the two elements are in the same chain */ - gst_basic_scheduler_chain_elements (sched, element, peerelement); + gst_basic_scheduler_chain_elements (bsched, element, peerelement); } } } @@ -880,67 +957,104 @@ static void gst_basic_scheduler_remove_element (GstScheduler * sched, GstElement * element) { GstSchedulerChain *chain; + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); - g_return_if_fail (element != NULL); - g_return_if_fail (GST_IS_ELEMENT (element)); - - if (g_list_find (sched->elements, element)) { + if (g_list_find (bsched->elements, element)) { GST_INFO (GST_CAT_SCHEDULING, "removing element \"%s\" from scheduler", GST_ELEMENT_NAME (element)); /* find what chain the element is in */ - chain = gst_basic_scheduler_find_chain (sched, element); + chain = gst_basic_scheduler_find_chain (bsched, element); /* remove it from its chain */ gst_basic_scheduler_chain_remove_element (chain, element); /* remove it from the list of elements */ - sched->elements = g_list_remove (sched->elements, element); - sched->num_elements--; + bsched->elements = g_list_remove (bsched->elements, element); + bsched->num_elements--; /* unset the scheduler pointer in the element */ GST_ELEMENT_SCHED (element) = NULL; } } -static void +static GstElementStateReturn gst_basic_scheduler_state_transition (GstScheduler *sched, GstElement *element, gint transition) { GstSchedulerChain *chain; + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); /* find the chain the element is in */ - chain = gst_basic_scheduler_find_chain (sched, element); + chain = gst_basic_scheduler_find_chain (bsched, element); /* remove it from the chain */ if (chain) { if (transition == GST_STATE_PLAYING_TO_PAUSED) gst_basic_scheduler_chain_disable_element (chain, element); if (transition == GST_STATE_PAUSED_TO_PLAYING) - gst_basic_scheduler_chain_enable_element (chain, element); + if (!gst_basic_scheduler_chain_enable_element (chain, element)) { + GST_INFO (GST_CAT_SCHEDULING, "could not enable element \"%s\"", GST_ELEMENT_NAME (element)); + return GST_STATE_FAILURE; + } } else { GST_INFO (GST_CAT_SCHEDULING, "element \"%s\" not found in any chain, no state change", GST_ELEMENT_NAME (element)); } + + return GST_STATE_SUCCESS; } static void gst_basic_scheduler_lock_element (GstScheduler * sched, GstElement * element) { - if (element->threadstate) - cothread_lock (element->threadstate); + if (GST_ELEMENT_THREADSTATE (element)) + cothread_lock (GST_ELEMENT_THREADSTATE (element)); } static void gst_basic_scheduler_unlock_element (GstScheduler * sched, GstElement * element) { - if (element->threadstate) - cothread_unlock (element->threadstate); + if (GST_ELEMENT_THREADSTATE (element)) + cothread_unlock (GST_ELEMENT_THREADSTATE (element)); +} + +static void +gst_basic_scheduler_yield (GstScheduler *sched, GstElement *element) +{ + if (GST_ELEMENT_IS_COTHREAD_STOPPING (element)) { + cothread_switch (cothread_current_main ()); + } +} + +static void +gst_basic_scheduler_interrupt (GstScheduler *sched, GstElement *element) +{ + cothread_switch (cothread_current_main ()); } static void -gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad * sinkpad) +gst_basic_scheduler_error (GstScheduler *sched, GstElement *element) +{ + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); + + if (GST_ELEMENT_THREADSTATE (element)) { + GstSchedulerChain *chain; + + chain = gst_basic_scheduler_find_chain (bsched, element); + if (chain) + gst_basic_scheduler_chain_disable_element (chain, element); + + GST_STATE_PENDING (GST_SCHEDULER (sched)->parent) = GST_STATE_PAUSED; + + cothread_switch (cothread_current_main ()); + } +} + +static void +gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad *srcpad, GstPad *sinkpad) { GstElement *srcelement, *sinkelement; + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); srcelement = GST_PAD_PARENT (srcpad); g_return_if_fail (srcelement != NULL); @@ -955,7 +1069,7 @@ gst_basic_scheduler_pad_connect (GstScheduler * sched, GstPad * srcpad, GstPad * if (GST_ELEMENT_SCHED (srcelement) == GST_ELEMENT_SCHED (sinkelement)) { GST_INFO (GST_CAT_SCHEDULING, "peer %s:%s is in same scheduler, chaining together", GST_DEBUG_PAD_NAME (sinkpad)); - gst_basic_scheduler_chain_elements (sched, srcelement, sinkelement); + gst_basic_scheduler_chain_elements (bsched, srcelement, sinkelement); } } @@ -965,21 +1079,22 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa GstSchedulerChain *chain; GstElement *element1, *element2; GstSchedulerChain *chain1, *chain2; + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); GST_INFO (GST_CAT_SCHEDULING, "disconnecting pads %s:%s and %s:%s", GST_DEBUG_PAD_NAME (srcpad), GST_DEBUG_PAD_NAME (sinkpad)); /* we need to have the parent elements of each pad */ - element1 = GST_ELEMENT (GST_PAD_PARENT (srcpad)); - element2 = GST_ELEMENT (GST_PAD_PARENT (sinkpad)); + element1 = GST_ELEMENT_CAST (GST_PAD_PARENT (srcpad)); + element2 = GST_ELEMENT_CAST (GST_PAD_PARENT (sinkpad)); /* first task is to remove the old chain they belonged to. * this can be accomplished by taking either of the elements, * since they are guaranteed to be in the same chain * FIXME is it potentially better to make an attempt at splitting cleaner?? */ - chain1 = gst_basic_scheduler_find_chain (sched, element1); - chain2 = gst_basic_scheduler_find_chain (sched, element2); + chain1 = gst_basic_scheduler_find_chain (bsched, element1); + chain2 = gst_basic_scheduler_find_chain (bsched, element2); if (chain1 != chain2) { /* elements not in the same chain don't need to be separated */ @@ -992,14 +1107,14 @@ gst_basic_scheduler_pad_disconnect (GstScheduler * sched, GstPad * srcpad, GstPa gst_basic_scheduler_chain_destroy (chain1); /* now create a new chain to hold element1 and build it from scratch */ - chain1 = gst_basic_scheduler_chain_new (sched); + chain1 = gst_basic_scheduler_chain_new (bsched); gst_basic_scheduler_chain_recursive_add (chain1, element1); } /* check the other element to see if it landed in the newly created chain */ - if (gst_basic_scheduler_find_chain (sched, element2) == NULL) { + if (gst_basic_scheduler_find_chain (bsched, element2) == NULL) { /* if not in chain, create chain and build from scratch */ - chain2 = gst_basic_scheduler_chain_new (sched); + chain2 = gst_basic_scheduler_chain_new (bsched); gst_basic_scheduler_chain_recursive_add (chain2, element2); } } @@ -1034,7 +1149,7 @@ gst_basic_scheduler_pad_select (GstScheduler * sched, GList * padlist) if (pad != NULL) { GstRealPad *peer = GST_RPAD_PEER (pad); - cothread_switch (GST_ELEMENT (GST_PAD_PARENT (peer))->threadstate); + cothread_switch (GST_ELEMENT_THREADSTATE (GST_PAD_PARENT (peer))); g_print ("%p %s\n", GST_ELEMENT (GST_PAD_PARENT (pad)), gst_element_get_name (GST_ELEMENT (GST_PAD_PARENT (pad)))); @@ -1055,14 +1170,13 @@ gst_basic_scheduler_iterate (GstScheduler * sched) GstElement *entry; gboolean eos = FALSE; GList *elements; + gint scheduled = 0; + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); GST_DEBUG_ENTER ("(\"%s\")", GST_ELEMENT_NAME (bin)); - g_return_val_if_fail (bin != NULL, TRUE); - g_return_val_if_fail (GST_IS_BIN (bin), TRUE); - /* step through all the chains */ - chains = sched->chains; + chains = bsched->chains; if (chains == NULL) return FALSE; @@ -1080,14 +1194,14 @@ gst_basic_scheduler_iterate (GstScheduler * sched) GST_DEBUG (GST_CAT_SCHEDULING, "there are %d elements in this chain\n", chain->num_elements); elements = chain->elements; while (elements) { - entry = GST_ELEMENT (elements->data); + entry = GST_ELEMENT_CAST (elements->data); elements = g_list_next (elements); if (GST_FLAG_IS_SET (entry, GST_ELEMENT_DECOUPLED)) { GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is DECOUPLED, skipping\n", GST_ELEMENT_NAME (entry)); entry = NULL; } - else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_NO_ENTRY)) { + else if (GST_FLAG_IS_SET (entry, GST_ELEMENT_INFINITE_LOOP)) { GST_DEBUG (GST_CAT_SCHEDULING, "entry \"%s\" is not valid, skipping\n", GST_ELEMENT_NAME (entry)); entry = NULL; @@ -1099,7 +1213,13 @@ gst_basic_scheduler_iterate (GstScheduler * sched) GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); GST_DEBUG (GST_CAT_DATAFLOW, "set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", GST_ELEMENT_NAME (entry), entry); - cothread_switch (entry->threadstate); + if (GST_ELEMENT_THREADSTATE (entry)) { + cothread_switch (GST_ELEMENT_THREADSTATE (entry)); + } + else { + GST_DEBUG (GST_CAT_DATAFLOW, "cothread switch not possible, element has no threadstate\n"); + return FALSE; + } /* following is a check to see if the chain was interrupted due to a * top-half state_change(). (i.e., if there's a pending state.) @@ -1113,16 +1233,18 @@ gst_basic_scheduler_iterate (GstScheduler * sched) GST_STATE_PENDING (GST_SCHEDULER (sched)->parent)); return FALSE; } - + scheduled++; } else { GST_INFO (GST_CAT_DATAFLOW, "NO ENTRY INTO CHAIN!"); - eos = TRUE; + if (scheduled == 0) + eos = TRUE; } } else { GST_INFO (GST_CAT_DATAFLOW, "NO ENABLED ELEMENTS IN CHAIN!!"); - eos = TRUE; + if (scheduled == 0) + eos = TRUE; } } @@ -1137,6 +1259,7 @@ gst_basic_scheduler_show (GstScheduler * sched) GList *chains, *elements; GstElement *element; GstSchedulerChain *chain; + GstBasicScheduler *bsched = GST_BASIC_SCHEDULER (sched); if (sched == NULL) { g_print ("scheduler doesn't exist for this element\n"); @@ -1147,8 +1270,8 @@ gst_basic_scheduler_show (GstScheduler * sched) g_print ("SCHEDULER DUMP FOR MANAGING BIN \"%s\"\n", GST_ELEMENT_NAME (sched->parent)); - g_print ("scheduler has %d elements in it: ", sched->num_elements); - elements = sched->elements; + g_print ("scheduler has %d elements in it: ", bsched->num_elements); + elements = bsched->elements; while (elements) { element = GST_ELEMENT (elements->data); elements = g_list_next (elements); @@ -1157,8 +1280,8 @@ gst_basic_scheduler_show (GstScheduler * sched) } g_print ("\n"); - g_print ("scheduler has %d chains in it\n", sched->num_chains); - chains = sched->chains; + g_print ("scheduler has %d chains in it\n", bsched->num_chains); + chains = bsched->chains; while (chains) { chain = (GstSchedulerChain *) (chains->data); chains = g_list_next (chains); diff --git a/plugins/elements/gstaggregator.c b/plugins/elements/gstaggregator.c index 4a6baf5..e6f2337 100644 --- a/plugins/elements/gstaggregator.c +++ b/plugins/elements/gstaggregator.c @@ -272,46 +272,44 @@ gst_aggregator_loop (GstElement *element) aggregator = GST_AGGREGATOR (element); - do { - if (aggregator->sched == AGGREGATOR_LOOP || - aggregator->sched == AGGREGATOR_LOOP_PEEK) { - GList *pads = aggregator->sinkpads; - - while (pads) { - GstPad *pad = GST_PAD (pads->data); - pads = g_list_next (pads); - - if (aggregator->sched == AGGREGATOR_LOOP_PEEK) { - buf = gst_pad_peek (pad); - if (buf == NULL) - continue; - - g_assert (buf == gst_pad_pull (pad)); - debug = "loop_peek"; - } - else { - buf = gst_pad_pull (pad); - debug = "loop"; - } - gst_aggregator_push (aggregator, pad, buf, debug); - } - } - else { - if (aggregator->sched == AGGREGATOR_LOOP_SELECT) { - GstPad *pad; + if (aggregator->sched == AGGREGATOR_LOOP || + aggregator->sched == AGGREGATOR_LOOP_PEEK) { + GList *pads = aggregator->sinkpads; - debug = "loop_select"; + while (pads) { + GstPad *pad = GST_PAD (pads->data); + pads = g_list_next (pads); - pad = gst_pad_select (aggregator->sinkpads); - buf = gst_pad_pull (pad); + if (aggregator->sched == AGGREGATOR_LOOP_PEEK) { + buf = gst_pad_peek (pad); + if (buf == NULL) + continue; - gst_aggregator_push (aggregator, pad, buf, debug); + g_assert (buf == gst_pad_pull (pad)); + debug = "loop_peek"; } else { - g_assert_not_reached (); + buf = gst_pad_pull (pad); + debug = "loop"; } + gst_aggregator_push (aggregator, pad, buf, debug); + } + } + else { + if (aggregator->sched == AGGREGATOR_LOOP_SELECT) { + GstPad *pad; + + debug = "loop_select"; + + pad = gst_pad_select (aggregator->sinkpads); + buf = gst_pad_pull (pad); + + gst_aggregator_push (aggregator, pad, buf, debug); } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); + else { + g_assert_not_reached (); + } + } } /** diff --git a/plugins/elements/gstfakesrc.c b/plugins/elements/gstfakesrc.c index 7d8d6e3..1f87031 100644 --- a/plugins/elements/gstfakesrc.c +++ b/plugins/elements/gstfakesrc.c @@ -387,7 +387,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G case ARG_OUTPUT: break; case ARG_DATA: - src->data = g_value_get_int (value); + src->data = g_value_get_enum (value); switch (src->data) { case FAKESRC_DATA_ALLOCATE: if (src->parent) { @@ -403,7 +403,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G } break; case ARG_SIZETYPE: - src->sizetype = g_value_get_int (value); + src->sizetype = g_value_get_enum (value); break; case ARG_SIZEMIN: src->sizemin = g_value_get_int (value); @@ -415,7 +415,7 @@ gst_fakesrc_set_property (GObject *object, guint prop_id, const GValue *value, G src->parentsize = g_value_get_int (value); break; case ARG_FILLTYPE: - src->filltype = g_value_get_int (value); + src->filltype = g_value_get_enum (value); break; case ARG_PATTERN: break; @@ -455,13 +455,13 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS g_value_set_boolean (value, src->loop_based); break; case ARG_OUTPUT: - g_value_set_int (value, src->output); + g_value_set_enum (value, src->output); break; case ARG_DATA: - g_value_set_int (value, src->data); + g_value_set_enum (value, src->data); break; case ARG_SIZETYPE: - g_value_set_int (value, src->sizetype); + g_value_set_enum (value, src->sizetype); break; case ARG_SIZEMIN: g_value_set_int (value, src->sizemin); @@ -473,7 +473,7 @@ gst_fakesrc_get_property (GObject *object, guint prop_id, GValue *value, GParamS g_value_set_int (value, src->parentsize); break; case ARG_FILLTYPE: - g_value_set_int (value, src->filltype); + g_value_set_enum (value, src->filltype); break; case ARG_PATTERN: g_value_set_string (value, src->pattern); @@ -689,49 +689,46 @@ static void gst_fakesrc_loop(GstElement *element) { GstFakeSrc *src; + GList *pads; g_return_if_fail(element != NULL); g_return_if_fail(GST_IS_FAKESRC(element)); src = GST_FAKESRC (element); - do { - GList *pads; - - pads = GST_ELEMENT (src)->pads; + pads = gst_element_get_pad_list (element); - while (pads) { - GstPad *pad = GST_PAD (pads->data); - GstBuffer *buf; + while (pads) { + GstPad *pad = GST_PAD (pads->data); + GstBuffer *buf; - if (src->rt_num_buffers == 0) { - src->eos = TRUE; - } - else { - if (src->rt_num_buffers > 0) - src->rt_num_buffers--; - } + if (src->rt_num_buffers == 0) { + src->eos = TRUE; + } + else { + if (src->rt_num_buffers > 0) + src->rt_num_buffers--; + } - if (src->eos) { - gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS))); - return; - } + if (src->eos) { + gst_pad_push(pad, GST_BUFFER(gst_event_new (GST_EVENT_EOS))); + return; + } - buf = gst_fakesrc_create_buffer (src); - GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++; + buf = gst_fakesrc_create_buffer (src); + GST_BUFFER_TIMESTAMP (buf) = src->buffer_count++; - if (!src->silent) { - gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)", - GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); - } + if (!src->silent) { + gst_element_info (element, "fakesrc: loop ******* (%s:%s) > (%d bytes, %llu)", + GST_DEBUG_PAD_NAME (pad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); + } - g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0, + g_signal_emit (G_OBJECT (src), gst_fakesrc_signals[SIGNAL_HANDOFF], 0, buf, pad); - gst_pad_push (pad, buf); + gst_pad_push (pad, buf); - pads = g_list_next (pads); - } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING (element)); + pads = g_list_next (pads); + } } static GstElementStateReturn diff --git a/plugins/elements/gstfilesrc.c b/plugins/elements/gstfilesrc.c index 346f648..db05b10 100644 --- a/plugins/elements/gstfilesrc.c +++ b/plugins/elements/gstfilesrc.c @@ -30,6 +30,7 @@ #include #include #include +#include /********************************************************************** @@ -327,10 +328,10 @@ gst_filesrc_map_region (GstFileSrc *src, off_t offset, size_t size) /* mmap() the data into this new buffer */ GST_BUFFER_DATA(buf) = mmap (NULL, size, PROT_READ, MAP_SHARED, src->fd, offset); if (GST_BUFFER_DATA(buf) == NULL) { - fprintf (stderr, "ERROR: gstfilesrc couldn't map file!\n"); + gst_element_error (GST_ELEMENT (src), "couldn't map file"); } else if (GST_BUFFER_DATA(buf) == MAP_FAILED) { - g_error ("gstfilesrc mmap(0x%x, %d, 0x%llx) : %s", - size, src->fd, offset, sys_errlist[errno]); + gst_element_error (GST_ELEMENT (src), "mmap (0x%x, %d, 0x%llx) : %s", + size, src->fd, offset, strerror (errno)); } #ifdef MADV_SEQUENTIAL /* madvise to tell the kernel what to do with it */ @@ -533,8 +534,8 @@ gst_filesrc_open_file (GstFileSrc *src) /* open the file */ src->fd = open (src->filename, O_RDONLY); if (src->fd < 0) { - perror ("open"); - gst_element_error (GST_ELEMENT (src), g_strconcat("opening file \"", src->filename, "\"", NULL)); + gst_element_error (GST_ELEMENT (src), "opening file \"%s\" (%s)", + src->filename, strerror (errno), NULL); return FALSE; } else { /* find the file length */ @@ -557,7 +558,6 @@ gst_filesrc_close_file (GstFileSrc *src) { g_return_if_fail (GST_FLAG_IS_SET (src, GST_FILESRC_OPEN)); - g_print ("close\n"); /* close the file */ close (src->fd); @@ -565,6 +565,8 @@ gst_filesrc_close_file (GstFileSrc *src) src->fd = 0; src->filelen = 0; src->curoffset = 0; + if (src->mapbuf) + gst_buffer_unref (src->mapbuf); GST_FLAG_UNSET (src, GST_FILESRC_OPEN); } @@ -575,17 +577,22 @@ gst_filesrc_change_state (GstElement *element) { GstFileSrc *src = GST_FILESRC(element); - if (GST_STATE_PENDING (element) == GST_STATE_NULL) { - if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) - gst_filesrc_close_file (GST_FILESRC (element)); - } if (GST_STATE_PENDING (element) == GST_STATE_READY) { - src->curoffset = 0; - } else { - - if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) { - if (!gst_filesrc_open_file (GST_FILESRC (element))) - return GST_STATE_FAILURE; - } + switch (GST_STATE_TRANSITION (element)) { + case GST_STATE_NULL_TO_READY: + if (!GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) { + if (!gst_filesrc_open_file (GST_FILESRC (element))) + return GST_STATE_FAILURE; + } + break; + case GST_STATE_READY_TO_NULL: + if (GST_FLAG_IS_SET (element, GST_FILESRC_OPEN)) + gst_filesrc_close_file (GST_FILESRC (element)); + break; + case GST_STATE_READY_TO_PAUSED: + case GST_STATE_PAUSED_TO_READY: + src->curoffset = 0; + default: + break; } if (GST_ELEMENT_CLASS (parent_class)->change_state) diff --git a/plugins/elements/gstidentity.c b/plugins/elements/gstidentity.c index e8195aa..ddd4c02 100644 --- a/plugins/elements/gstidentity.c +++ b/plugins/elements/gstidentity.c @@ -205,27 +205,24 @@ gst_identity_loop (GstElement *element) identity = GST_IDENTITY (element); - do { - buf = gst_pad_pull (identity->sinkpad); + buf = gst_pad_pull (identity->sinkpad); - for (i=identity->duplicate; i; i--) { - if (!identity->silent) - g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n", + for (i=identity->duplicate; i; i--) { + if (!identity->silent) + g_print("identity: loop ******* (%s:%s)i (%d bytes, %llu) \n", GST_DEBUG_PAD_NAME (identity->sinkpad), GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf)); - g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0, + g_signal_emit (G_OBJECT (identity), gst_identity_signals[SIGNAL_HANDOFF], 0, buf); - if (i>1) - gst_buffer_ref (buf); - - gst_pad_push (identity->srcpad, buf); + if (i>1) + gst_buffer_ref (buf); - if (identity->sleep_time) - usleep (identity->sleep_time); - } + gst_pad_push (identity->srcpad, buf); - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + if (identity->sleep_time) + usleep (identity->sleep_time); + } } static void diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 4375cec..0a00fb8 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -65,11 +65,13 @@ enum { ARG_LEAKY, ARG_LEVEL, ARG_MAX_LEVEL, + ARG_MAY_DEADLOCK, }; static void gst_queue_class_init (GstQueueClass *klass); static void gst_queue_init (GstQueue *queue); +static void gst_queue_dispose (GObject *object); static void gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GParamSpec *pspec); @@ -141,18 +143,22 @@ gst_queue_class_init (GstQueueClass *klass) parent_class = g_type_class_ref (GST_TYPE_ELEMENT); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEAKY, - g_param_spec_enum("leaky","Leaky","Where the queue leaks, if at all.", - GST_TYPE_QUEUE_LEAKY,GST_QUEUE_NO_LEAK,G_PARAM_READWRITE)); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_LEVEL, - g_param_spec_int("level","Level","How many buffers are in the queue.", - 0,G_MAXINT,0,G_PARAM_READABLE)); - g_object_class_install_property(G_OBJECT_CLASS(klass), ARG_MAX_LEVEL, - g_param_spec_int("max_level","Maximum Level","How many buffers the queue holds.", - 0,G_MAXINT,100,G_PARAM_READWRITE)); - - gobject_class->set_property = GST_DEBUG_FUNCPTR(gst_queue_set_property); - gobject_class->get_property = GST_DEBUG_FUNCPTR(gst_queue_get_property); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEAKY, + g_param_spec_enum ("leaky", "Leaky", "Where the queue leaks, if at all.", + GST_TYPE_QUEUE_LEAKY, GST_QUEUE_NO_LEAK, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_LEVEL, + g_param_spec_int ("level", "Level", "How many buffers are in the queue.", + 0, G_MAXINT, 0, G_PARAM_READABLE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAX_LEVEL, + g_param_spec_int ("max_level", "Maximum Level", "How many buffers the queue holds.", + 0, G_MAXINT, 100, G_PARAM_READWRITE)); + g_object_class_install_property (G_OBJECT_CLASS (klass), ARG_MAY_DEADLOCK, + g_param_spec_boolean ("may_deadlock", "May Deadlock", "The queue may deadlock if it's full and not PLAYING", + TRUE, G_PARAM_READWRITE)); + + gobject_class->dispose = GST_DEBUG_FUNCPTR (gst_queue_dispose); + gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_queue_set_property); + gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_queue_get_property); gstelement_class->change_state = GST_DEBUG_FUNCPTR(gst_queue_change_state); } @@ -182,6 +188,7 @@ gst_queue_init (GstQueue *queue) queue->size_buffers = 100; /* 100 buffers */ queue->size_bytes = 100 * 1024; /* 100KB */ queue->size_time = 1000000000LL; /* 1sec */ + queue->may_deadlock = TRUE; queue->qlock = g_mutex_new (); queue->reader = FALSE; @@ -191,6 +198,18 @@ gst_queue_init (GstQueue *queue) GST_DEBUG_ELEMENT (GST_CAT_THREAD, queue, "initialized queue's not_empty & not_full conditions\n"); } +static void +gst_queue_dispose (GObject *object) +{ + GstQueue *queue = GST_QUEUE (object); + + g_mutex_free (queue->qlock); + g_cond_free (queue->not_empty); + g_cond_free (queue->not_full); + + G_OBJECT_CLASS (parent_class)->dispose (object); +} + static GstBufferPool* gst_queue_get_bufferpool (GstPad *pad) { @@ -334,10 +353,21 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - cothread_switch(cothread_current_main()); + gst_element_interrupt (GST_ELEMENT (queue)); goto restart; } - g_assert (GST_STATE (queue) == GST_STATE_PLAYING); + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + /* try to signal to resolve the error */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), "deadlock found, source pad elements are shut down"); + return; + } + else { + gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart source pad elements"); + } + } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_full, level:%d/%d\n", queue->level_buffers, queue->size_buffers); if (queue->writer) @@ -402,10 +432,20 @@ restart: while (GST_STATE_PENDING (queue) != GST_STATE_VOID_PENDING) { GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "interrupted!!\n"); g_mutex_unlock (queue->qlock); - cothread_switch(cothread_current_main()); + gst_element_interrupt (GST_ELEMENT (queue)); goto restart; } - g_assert (GST_STATE (queue) == GST_STATE_PLAYING); + if (GST_STATE (queue) != GST_STATE_PLAYING) { + /* this means the other end is shut down */ + if (!queue->may_deadlock) { + g_mutex_unlock (queue->qlock); + gst_element_error (GST_ELEMENT (queue), "deadlock found, sink pad elements are shut down"); + return NULL; + } + else { + gst_element_info (GST_ELEMENT (queue), "waiting for the app to restart sink pad elements"); + } + } GST_DEBUG_ELEMENT (GST_CAT_DATAFLOW, queue, "waiting for not_empty, level:%d/%d\n", queue->level_buffers, queue->size_buffers); if (queue->reader) @@ -513,10 +553,13 @@ gst_queue_set_property (GObject *object, guint prop_id, const GValue *value, GPa switch (prop_id) { case ARG_LEAKY: - queue->leaky = g_value_get_int(value); + queue->leaky = g_value_get_int (value); break; case ARG_MAX_LEVEL: - queue->size_buffers = g_value_get_int(value); + queue->size_buffers = g_value_get_int (value); + break; + case ARG_MAY_DEADLOCK: + queue->may_deadlock = g_value_get_boolean (value); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); @@ -536,13 +579,16 @@ gst_queue_get_property (GObject *object, guint prop_id, GValue *value, GParamSpe switch (prop_id) { case ARG_LEAKY: - g_value_set_int(value, queue->leaky); + g_value_set_int (value, queue->leaky); break; case ARG_LEVEL: - g_value_set_int(value, queue->level_buffers); + g_value_set_int (value, queue->level_buffers); break; case ARG_MAX_LEVEL: - g_value_set_int(value, queue->size_buffers); + g_value_set_int (value, queue->size_buffers); + break; + case ARG_MAY_DEADLOCK: + g_value_set_boolean (value, queue->may_deadlock); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index fb091e1..3af0ef6 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -74,6 +74,7 @@ struct _GstQueue { guint64 size_time; /* size of queue in time */ gint leaky; /* whether the queue is leaky, and if so at which end */ + gboolean may_deadlock; /* it the queue should fail on possible deadlocks */ GMutex *qlock; /* lock for queue (vs object lock) */ /* we are single reader and single writer queue */ -- 2.7.4