+2005-05-25 Wim Taymans <wim@fluendo.com>
+
+ * gst/gstbin.c: (bin_element_is_sink), (has_ancestor),
+ (bin_element_is_semi_sink), (append_child), (gst_bin_change_state):
+ * gst/gstpad.c: (gst_pad_set_active), (gst_pad_link_prepare),
+ (gst_pad_link), (gst_pad_accept_caps), (gst_pad_query),
+ (gst_pad_send_event), (gst_pad_start_task):
+ * gst/gstqueue.c: (gst_queue_init), (gst_queue_locked_flush),
+ (gst_queue_handle_sink_event), (gst_queue_chain), (gst_queue_loop),
+ (gst_queue_sink_activate), (gst_queue_src_activate),
+ (gst_queue_change_state):
+ * gst/gstqueue.h:
+ Fix state changes for non sinks. We now change sinks, then elements
+ with unconnected srcpads, then the rest.
+ More efficient queue unlocking in flush and state changes.
+ Set the pad activate mode even if it does not have an activate
+ function.
+
2005-05-25 Ronald S. Bultje <rbultje@ronald.bitfreak.net>
* gst/base/gstbasesrc.c: (gst_basesrc_activate):
return result;
}
+/* returns 0 when TRUE because this is a GCompareFunc */
/* MT safe */
static gint
bin_element_is_sink (GstElement * child, GstBin * bin)
{
+ gboolean is_sink;
+
/* we lock the child here for the remainder of the function to
* get its name safely. */
GST_LOCK (child);
- if (GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK)) {
+ is_sink = GST_FLAG_IS_SET (child, GST_ELEMENT_IS_SINK);
+ GST_UNLOCK (child);
+
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
+ "child %s %s sink", GST_OBJECT_NAME (child), is_sink ? "is" : "is not");
+
+ return is_sink ? 0 : 1;
+}
+
+static gboolean
+has_ancestor (GstObject * object, GstObject * ancestor)
+{
+ GstObject *parent;
+ gboolean result = FALSE;
+
+ if (object == NULL)
+ return FALSE;
+
+ if (object == ancestor)
+ return TRUE;
+
+ parent = gst_object_get_parent (object);
+ result = has_ancestor (parent, ancestor);
+ if (parent)
+ gst_object_unref (GST_OBJECT_CAST (parent));
+
+ return result;
+}
+
+/* returns 0 when TRUE because this is a GCompareFunc.
+ * This function returns elements that have no connected srcpads and
+ * are therefore not reachable from a real sink. */
+/* MT safe */
+static gint
+bin_element_is_semi_sink (GstElement * child, GstBin * bin)
+{
+ int ret = 1;
+
+ /* we lock the child here for the remainder of the function to
+ * get its pads and name safely. */
+ GST_LOCK (child);
+
+ /* check if this is a sink element, these are the elements
+ * without (linked) source pads. */
+ if (child->numsrcpads == 0) {
+ /* shortcut */
GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
- "finding child %s as sink", GST_OBJECT_NAME (child));
- GST_UNLOCK (child);
- /* returns 0 because this is a GCompareFunc */
- return 0;
+ "adding child %s as sink", GST_OBJECT_NAME (child));
+ ret = 0;
} else {
- GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
- "child %s is not a sink", GST_OBJECT_NAME (child));
- GST_UNLOCK (child);
- return 1;
+ /* loop over all pads, try to figure out if this element
+ * is a semi sink because it has no linked source pads */
+ GList *pads;
+ gboolean connected_src = FALSE;
+
+ for (pads = child->srcpads; pads; pads = g_list_next (pads)) {
+ GstPad *peer;
+
+ if ((peer = gst_pad_get_peer (GST_PAD_CAST (pads->data)))) {
+ connected_src =
+ has_ancestor (GST_OBJECT_CAST (peer), GST_OBJECT_CAST (bin));
+ gst_object_unref (GST_OBJECT_CAST (peer));
+ if (connected_src) {
+ break;
+ }
+ }
+ }
+ if (connected_src) {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
+ "not adding child %s as sink: linked source pads",
+ GST_OBJECT_NAME (child));
+ } else {
+ GST_CAT_DEBUG_OBJECT (GST_CAT_STATES, bin,
+ "adding child %s as sink since it has unlinked source pads in this bin",
+ GST_OBJECT_NAME (child));
+ ret = 0;
+ }
}
+ GST_UNLOCK (child);
+
+ return ret;
}
static gint
return ret;
}
+static void
+append_child (gpointer child, GQueue * queue)
+{
+ g_queue_push_tail (queue, child);
+}
+
/* this function is called with the STATE_LOCK held. It works
* as follows:
*
GList *children;
guint32 children_cookie;
GQueue *elem_queue; /* list of elements waiting for a state change */
- GQueue *temp; /* temp queue of non sinks */
+ GQueue *semi_queue; /* list of elements with no connected srcpads */
+ GQueue *temp; /* temp queue of leftovers */
bin = GST_BIN (element);
/* all elements added to this queue should have their refcount
* incremented */
elem_queue = g_queue_new ();
+ semi_queue = g_queue_new ();
temp = g_queue_new ();
/* first step, find all sink elements, these are the elements
if (bin_element_is_sink (child, bin) == 0) {
g_queue_push_tail (elem_queue, child);
+ } else if (bin_element_is_semi_sink (child, bin) == 0) {
+ g_queue_push_tail (semi_queue, child);
} else {
g_queue_push_tail (temp, child);
}
if (G_UNLIKELY (children_cookie != bin->children_cookie)) {
/* undo what we had */
g_queue_foreach (elem_queue, (GFunc) gst_object_unref, NULL);
+ g_queue_foreach (semi_queue, (GFunc) gst_object_unref, NULL);
g_queue_foreach (temp, (GFunc) gst_object_unref, NULL);
while (g_queue_pop_head (elem_queue));
+ while (g_queue_pop_head (semi_queue));
while (g_queue_pop_head (temp));
goto restart;
}
}
GST_UNLOCK (bin);
+ /* now change state for semi sink elements first so add them in
+ * front of the other elements */
+ g_queue_foreach (temp, (GFunc) append_child, semi_queue);
+ g_queue_free (temp);
+
/* can be the case for a bin like ( identity ) */
- if (g_queue_is_empty (elem_queue) && !g_queue_is_empty (temp)) {
+ if (g_queue_is_empty (elem_queue) && !g_queue_is_empty (semi_queue)) {
GQueue *q = elem_queue;
- elem_queue = temp;
- temp = q;
+ elem_queue = semi_queue;
+ semi_queue = q;
}
/* second step, change state of elements in the queue */
/* take element */
qelement = g_queue_pop_head (elem_queue);
- /* we don't need it in the temp anymore */
- g_queue_remove_all (temp, qelement);
+ /* we don't need it in the semi_queue anymore */
+ g_queue_remove_all (semi_queue, qelement);
/* if queue is empty now, continue with a non-sink */
if (g_queue_is_empty (elem_queue)) {
GstElement *non_sink;
GST_DEBUG ("sinks and upstream elements exhausted");
- non_sink = g_queue_pop_head (temp);
+ non_sink = g_queue_pop_head (semi_queue);
if (non_sink) {
GST_DEBUG ("found lefover non-sink %s", GST_OBJECT_NAME (non_sink));
g_queue_push_tail (elem_queue, non_sink);
/* release refcounts in queue, should normally be empty */
g_queue_foreach (elem_queue, (GFunc) gst_object_unref, NULL);
g_queue_free (elem_queue);
- g_queue_foreach (temp, (GFunc) gst_object_unref, NULL);
- g_queue_free (temp);
+ g_queue_foreach (semi_queue, (GFunc) gst_object_unref, NULL);
+ g_queue_free (semi_queue);
return ret;
}
GST_LOCK (realpad);
if (result == FALSE)
goto activate_error;
-
- /* store the mode */
- GST_RPAD_ACTIVATE_MODE (realpad) = mode;
}
+ /* store the mode */
+ GST_RPAD_ACTIVATE_MODE (realpad) = mode;
/* when going to active allow data passing now */
if (active) {
was_ok:
{
GST_CAT_DEBUG (GST_CAT_PADS,
- "pad %s:%s was active", GST_DEBUG_PAD_NAME (realpad));
+ "pad %s:%s was active, old %d, new %d",
+ GST_DEBUG_PAD_NAME (realpad), old, mode);
GST_UNLOCK (realpad);
return TRUE;
}
static void gst_queue_locked_flush (GstQueue * queue);
static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode);
+static gboolean gst_queue_sink_activate (GstPad * pad, GstActivateMode mode);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
"sink");
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_chain));
+ gst_pad_set_activate_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue_sink_activate));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
gst_pad_set_link_function (queue->sinkpad,
queue->leaky = GST_QUEUE_NO_LEAK;
queue->may_deadlock = TRUE;
queue->block_timeout = GST_CLOCK_TIME_NONE;
- queue->flush = FALSE;
+ queue->flushing = FALSE;
queue->qlock = g_mutex_new ();
queue->item_add = g_cond_new ();
queue->cur_level.bytes = 0;
queue->cur_level.time = 0;
- /* make sure any pending buffers to be added are flushed too */
- queue->flush = TRUE;
-
/* we deleted something... */
g_cond_signal (queue->item_del);
}
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
+ GST_QUEUE_MUTEX_UNLOCK;
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = TRUE;
gst_queue_locked_flush (queue);
+ /* unblock the loop function */
+ g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
STATUS (queue, "after flush");
- /* unblock the loop function */
- g_cond_signal (queue->item_add);
-
/* make sure it stops */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
- if (GST_RPAD_IS_FLUSHING (pad))
- goto out_flushing;
+ if (queue->flushing)
+#if 0
+ if (GST_RPAD_IS_FLUSHING (pad))
+#endif
+ goto out_flushing;
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
break;
}
}
+#if 0
/* we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
+#endif
g_queue_push_tail (queue->queue, buffer);
while (gst_queue_is_empty (queue)) {
STATUS (queue, "waiting for item_add");
+#if 0
/* we are flushing */
+ if (GST_RPAD_IS_FLUSHING (pad))
+ goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
+#endif
+
+ if (queue->flushing)
+ goto out_flushing;
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ());
g_cond_wait (queue->item_add, queue->qlock);
+ if (queue->flushing)
+ goto out_flushing;
+#if 0
/* we got unlocked because we are flushing */
+ if (GST_RPAD_IS_FLUSHING (pad))
+ goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
+#endif
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
g_thread_self ());
}
static gboolean
-gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
+gst_queue_sink_activate (GstPad * pad, GstActivateMode mode)
{
gboolean result = FALSE;
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- if (mode == GST_ACTIVATE_PUSH) {
- result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
- } else {
- /* step 1, unblock chain and loop functions */
- GST_QUEUE_MUTEX_LOCK;
- g_cond_signal (queue->item_add);
- g_cond_signal (queue->item_del);
- GST_QUEUE_MUTEX_UNLOCK;
+ switch (mode) {
+ case GST_ACTIVATE_PUSH:
+ queue->flushing = FALSE;
+ result = TRUE;
+ break;
+ case GST_ACTIVATE_PULL:
+ result = FALSE;
+ break;
+ case GST_ACTIVATE_NONE:
+ /* step 1, unblock chain and loop functions */
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = TRUE;
+ gst_queue_locked_flush (queue);
+ g_cond_signal (queue->item_del);
+ GST_QUEUE_MUTEX_UNLOCK;
- /* step 2, make sure streaming finishes */
- result = gst_pad_stop_task (pad);
+ /* step 2, make sure streaming finishes */
+ result = gst_pad_stop_task (pad);
+ break;
}
return result;
}
+static gboolean
+gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
+{
+ gboolean result = FALSE;
+ GstQueue *queue;
+
+ queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+
+ switch (mode) {
+ case GST_ACTIVATE_PUSH:
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = FALSE;
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
+ GST_QUEUE_MUTEX_UNLOCK;
+ break;
+ case GST_ACTIVATE_PULL:
+ result = FALSE;
+ break;
+ case GST_ACTIVATE_NONE:
+ /* step 1, unblock chain and loop functions */
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = TRUE;
+ g_cond_signal (queue->item_add);
+ GST_QUEUE_MUTEX_UNLOCK;
+
+ /* step 2, make sure streaming finishes */
+ result = gst_pad_stop_task (pad);
+ break;
+ }
+ return result;
+}
static GstElementStateReturn
gst_queue_change_state (GstElement * element)
case GST_STATE_NULL_TO_READY:
break;
case GST_STATE_READY_TO_PAUSED:
- GST_QUEUE_MUTEX_LOCK;
- gst_queue_locked_flush (queue);
- GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
- GST_QUEUE_MUTEX_LOCK;
- gst_queue_locked_flush (queue);
- GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_READY_TO_NULL:
break;
/* it the queue should fail on possible deadlocks */
gboolean may_deadlock;
- gboolean flush;
+ gboolean flushing;
GMutex *qlock; /* lock for queue (vs object lock) */
GCond *item_add; /* signals buffers now available for reading */
static void gst_queue_locked_flush (GstQueue * queue);
static gboolean gst_queue_src_activate (GstPad * pad, GstActivateMode mode);
+static gboolean gst_queue_sink_activate (GstPad * pad, GstActivateMode mode);
static GstElementStateReturn gst_queue_change_state (GstElement * element);
"sink");
gst_pad_set_chain_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_chain));
+ gst_pad_set_activate_function (queue->sinkpad,
+ GST_DEBUG_FUNCPTR (gst_queue_sink_activate));
gst_pad_set_event_function (queue->sinkpad,
GST_DEBUG_FUNCPTR (gst_queue_handle_sink_event));
gst_pad_set_link_function (queue->sinkpad,
queue->leaky = GST_QUEUE_NO_LEAK;
queue->may_deadlock = TRUE;
queue->block_timeout = GST_CLOCK_TIME_NONE;
- queue->flush = FALSE;
+ queue->flushing = FALSE;
queue->qlock = g_mutex_new ();
queue->item_add = g_cond_new ();
queue->cur_level.bytes = 0;
queue->cur_level.time = 0;
- /* make sure any pending buffers to be added are flushed too */
- queue->flush = TRUE;
-
/* we deleted something... */
g_cond_signal (queue->item_del);
}
/* forward event */
gst_pad_event_default (pad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = FALSE;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
+ GST_QUEUE_MUTEX_UNLOCK;
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = TRUE;
gst_queue_locked_flush (queue);
+ /* unblock the loop function */
+ g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
STATUS (queue, "after flush");
- /* unblock the loop function */
- g_cond_signal (queue->item_add);
-
/* make sure it stops */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
- if (GST_RPAD_IS_FLUSHING (pad))
- goto out_flushing;
+ if (queue->flushing)
+#if 0
+ if (GST_RPAD_IS_FLUSHING (pad))
+#endif
+ goto out_flushing;
/* if there's a pending state change for this queue
* or its manager, switch back to iterator so bottom
break;
}
}
+#if 0
/* we are flushing */
if (GST_RPAD_IS_FLUSHING (pad))
goto out_flushing;
+#endif
g_queue_push_tail (queue->queue, buffer);
while (gst_queue_is_empty (queue)) {
STATUS (queue, "waiting for item_add");
+#if 0
/* we are flushing */
+ if (GST_RPAD_IS_FLUSHING (pad))
+ goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
+#endif
+
+ if (queue->flushing)
+ goto out_flushing;
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ());
g_cond_wait (queue->item_add, queue->qlock);
+ if (queue->flushing)
+ goto out_flushing;
+#if 0
/* we got unlocked because we are flushing */
+ if (GST_RPAD_IS_FLUSHING (pad))
+ goto out_flushing;
if (GST_RPAD_IS_FLUSHING (queue->sinkpad))
goto out_flushing;
+#endif
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
g_thread_self ());
}
static gboolean
-gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
+gst_queue_sink_activate (GstPad * pad, GstActivateMode mode)
{
gboolean result = FALSE;
GstQueue *queue;
queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
- if (mode == GST_ACTIVATE_PUSH) {
- result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
- } else {
- /* step 1, unblock chain and loop functions */
- GST_QUEUE_MUTEX_LOCK;
- g_cond_signal (queue->item_add);
- g_cond_signal (queue->item_del);
- GST_QUEUE_MUTEX_UNLOCK;
+ switch (mode) {
+ case GST_ACTIVATE_PUSH:
+ queue->flushing = FALSE;
+ result = TRUE;
+ break;
+ case GST_ACTIVATE_PULL:
+ result = FALSE;
+ break;
+ case GST_ACTIVATE_NONE:
+ /* step 1, unblock chain and loop functions */
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = TRUE;
+ gst_queue_locked_flush (queue);
+ g_cond_signal (queue->item_del);
+ GST_QUEUE_MUTEX_UNLOCK;
- /* step 2, make sure streaming finishes */
- result = gst_pad_stop_task (pad);
+ /* step 2, make sure streaming finishes */
+ result = gst_pad_stop_task (pad);
+ break;
}
return result;
}
+static gboolean
+gst_queue_src_activate (GstPad * pad, GstActivateMode mode)
+{
+ gboolean result = FALSE;
+ GstQueue *queue;
+
+ queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+
+ switch (mode) {
+ case GST_ACTIVATE_PUSH:
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = FALSE;
+ result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
+ GST_QUEUE_MUTEX_UNLOCK;
+ break;
+ case GST_ACTIVATE_PULL:
+ result = FALSE;
+ break;
+ case GST_ACTIVATE_NONE:
+ /* step 1, unblock chain and loop functions */
+ GST_QUEUE_MUTEX_LOCK;
+ queue->flushing = TRUE;
+ g_cond_signal (queue->item_add);
+ GST_QUEUE_MUTEX_UNLOCK;
+
+ /* step 2, make sure streaming finishes */
+ result = gst_pad_stop_task (pad);
+ break;
+ }
+ return result;
+}
static GstElementStateReturn
gst_queue_change_state (GstElement * element)
case GST_STATE_NULL_TO_READY:
break;
case GST_STATE_READY_TO_PAUSED:
- GST_QUEUE_MUTEX_LOCK;
- gst_queue_locked_flush (queue);
- GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_PAUSED_TO_PLAYING:
break;
case GST_STATE_PLAYING_TO_PAUSED:
break;
case GST_STATE_PAUSED_TO_READY:
- GST_QUEUE_MUTEX_LOCK;
- gst_queue_locked_flush (queue);
- GST_QUEUE_MUTEX_UNLOCK;
break;
case GST_STATE_READY_TO_NULL:
break;
/* it the queue should fail on possible deadlocks */
gboolean may_deadlock;
- gboolean flush;
+ gboolean flushing;
GMutex *qlock; /* lock for queue (vs object lock) */
GCond *item_add; /* signals buffers now available for reading */