2005-07-19 Wim Taymans <wim@fluendo.com>
+ * gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event),
+ (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event),
+ (gst_queue_handle_src_query), (gst_queue_sink_activate_push),
+ (gst_queue_src_activate_push), (gst_queue_change_state),
+ (gst_queue_get_property):
+ * gst/gstqueue.h:
+ Propagate GstFlowReturn more intelligently upstream and output
+ an ERROR/EOS when streaming stopped due to fatal error.
+
+2005-07-19 Wim Taymans <wim@fluendo.com>
+
* tools/gst-launch.c: (check_intr), (event_loop), (main):
Don't block forever for the state change to complete, the
pipeline already did with a sensible timeout.
queue->leaky = GST_QUEUE_NO_LEAK;
queue->may_deadlock = TRUE;
queue->block_timeout = GST_CLOCK_TIME_NONE;
- queue->flushing = FALSE;
+ queue->srcresult = GST_FLOW_WRONG_STATE;
queue->qlock = g_mutex_new ();
queue->item_add = g_cond_new ();
gst_pad_push_event (queue->srcpad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = FALSE;
+ gst_queue_locked_flush (queue);
+ queue->srcresult = GST_FLOW_OK;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
GST_QUEUE_MUTEX_UNLOCK;
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = TRUE;
- gst_queue_locked_flush (queue);
+ queue->srcresult = GST_FLOW_WRONG_STATE;
/* unblock the loop function */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
STATUS (queue, "after flush");
- /* make sure it stops */
+ /* make sure it pauses */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
}
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK;
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
GST_QUEUE_MUTEX_LOCK;
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
/* how are we going to make space for this buffer? */
switch (queue->leaky) {
/* leak current buffer */
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
- if (queue->flushing)
+ if (queue->srcresult != GST_FLOW_OK)
goto out_flushing;
/* if there's a pending state change for this queue
GST_QUEUE_MUTEX_UNLOCK;
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
GST_QUEUE_MUTEX_LOCK;
+
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
break;
}
}
return GST_FLOW_OK;
+ /* special conditions */
out_unref:
{
GST_QUEUE_MUTEX_UNLOCK;
}
out_flushing:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
+ GstFlowReturn ret = queue->srcresult;
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "exit because task paused, reason: %d", ret);
GST_QUEUE_MUTEX_UNLOCK;
- gst_pad_pause_task (queue->srcpad);
gst_buffer_unref (buffer);
- return GST_FLOW_WRONG_STATE;
+ return ret;
}
}
GST_QUEUE_MUTEX_LOCK;
restart:
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
while (gst_queue_is_empty (queue)) {
GST_QUEUE_MUTEX_UNLOCK;
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
GST_QUEUE_MUTEX_LOCK;
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
STATUS (queue, "pre-empty wait");
while (gst_queue_is_empty (queue)) {
STATUS (queue, "waiting for item_add");
- if (queue->flushing)
- goto out_flushing;
-
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ());
g_cond_wait (queue->item_add, queue->qlock);
- if (queue->flushing)
+ /* we released the lock in the g_cond above so we might be
+ * flushing now */
+ if (queue->srcresult != GST_FLOW_OK)
goto out_flushing;
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
GST_QUEUE_MUTEX_UNLOCK;
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
GST_QUEUE_MUTEX_LOCK;
+
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
}
/* There's something in the list now, whatever it is */
GST_QUEUE_MUTEX_UNLOCK;
result = gst_pad_push (pad, GST_BUFFER (data));
GST_QUEUE_MUTEX_LOCK;
+ /* can opt to check for srcresult here but the push should
+ * return an error value that is more accurate */
if (result != GST_FLOW_OK) {
+ queue->srcresult = result;
+ if (GST_FLOW_IS_FATAL (result)) {
+ GST_ELEMENT_ERROR (queue, STREAM, STOPPED,
+ ("streaming stopped, reason %d", result),
+ ("streaming stopped, reason %d", result));
+ gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS));
+ }
gst_pad_pause_task (queue->srcpad);
}
} else {
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
+ /* all incomming data is now unexpected */
+ queue->srcresult = GST_FLOW_UNEXPECTED;
+ /* and we don't need to process anymore */
gst_pad_pause_task (queue->srcpad);
restart = FALSE;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
+
return;
out_flushing:
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
- gst_pad_pause_task (pad);
- GST_QUEUE_MUTEX_UNLOCK;
- return;
+ {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "exit because task paused, reason: %d", queue->srcresult);
+ GST_QUEUE_MUTEX_UNLOCK;
+
+ return;
+ }
}
gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
{
gboolean res = TRUE;
-
-#ifndef GST_DISABLE_GST_DEBUG
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+#ifndef GST_DISABLE_GST_DEBUG
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
event, GST_EVENT_TYPE (event));
#endif
- res = gst_pad_event_default (pad, event);
+ res = gst_pad_push_event (queue->sinkpad, event);
return res;
}
gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
{
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ GstPad *peer;
+ gboolean res;
- if (!GST_PAD_PEER (queue->sinkpad))
+ if (!(peer = gst_pad_get_peer (queue->sinkpad)))
return FALSE;
- if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query))
+
+ res = gst_pad_query (peer, query);
+ gst_object_unref (peer);
+ if (!res)
return FALSE;
switch (GST_QUERY_TYPE (query)) {
gboolean result = FALSE;
GstQueue *queue;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
- queue->flushing = FALSE;
+ queue->srcresult = GST_FLOW_OK;
result = TRUE;
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = TRUE;
+ queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
}
+ gst_object_unref (queue);
return result;
}
gboolean result = FALSE;
GstQueue *queue;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = FALSE;
+ queue->srcresult = GST_FLOW_OK;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
GST_QUEUE_MUTEX_UNLOCK;
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = TRUE;
+ queue->srcresult = GST_FLOW_WRONG_STATE;
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
result = gst_pad_stop_task (pad);
}
+ gst_object_unref (queue);
+
return result;
}
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
- /* lock the queue so another thread (not in sync with this thread's state)
- * can't call this queue's _loop (or whatever) */
-
switch (GST_STATE_TRANSITION (element)) {
case GST_STATE_NULL_TO_READY:
break;
break;
}
- GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
-
return ret;
}
{
GstQueue *queue = GST_QUEUE (object);
+ GST_QUEUE_MUTEX_LOCK;
+
switch (prop_id) {
case ARG_CUR_LEVEL_BYTES:
g_value_set_uint (value, queue->cur_level.bytes);
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
+
+ GST_QUEUE_MUTEX_UNLOCK;
}
GstPad *sinkpad;
GstPad *srcpad;
+ /* flowreturn when srcpad is paused */
+ GstFlowReturn srcresult;
+
/* the queue of data we're keeping our grubby hands on */
GQueue *queue;
/* it the queue should fail on possible deadlocks */
gboolean may_deadlock;
- gboolean flushing;
GMutex *qlock; /* lock for queue (vs object lock) */
GCond *item_add; /* signals buffers now available for reading */
queue->leaky = GST_QUEUE_NO_LEAK;
queue->may_deadlock = TRUE;
queue->block_timeout = GST_CLOCK_TIME_NONE;
- queue->flushing = FALSE;
+ queue->srcresult = GST_FLOW_WRONG_STATE;
queue->qlock = g_mutex_new ();
queue->item_add = g_cond_new ();
gst_pad_push_event (queue->srcpad, event);
if (GST_EVENT_FLUSH_DONE (event)) {
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = FALSE;
+ gst_queue_locked_flush (queue);
+ queue->srcresult = GST_FLOW_OK;
gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
queue->srcpad);
GST_QUEUE_MUTEX_UNLOCK;
} else {
/* now unblock the chain function */
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = TRUE;
- gst_queue_locked_flush (queue);
+ queue->srcresult = GST_FLOW_WRONG_STATE;
/* unblock the loop function */
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
STATUS (queue, "after flush");
- /* make sure it stops */
+ /* make sure it pauses */
gst_pad_pause_task (queue->srcpad);
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
}
/* we have to lock the queue since we span threads */
GST_QUEUE_MUTEX_LOCK;
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
GST_CAT_LOG_OBJECT (queue_dataflow, queue,
"adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
GST_QUEUE_MUTEX_LOCK;
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
/* how are we going to make space for this buffer? */
switch (queue->leaky) {
/* leak current buffer */
STATUS (queue, "waiting for item_del signal from thread using qlock");
g_cond_wait (queue->item_del, queue->qlock);
- if (queue->flushing)
+ if (queue->srcresult != GST_FLOW_OK)
goto out_flushing;
/* if there's a pending state change for this queue
GST_QUEUE_MUTEX_UNLOCK;
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
GST_QUEUE_MUTEX_LOCK;
+
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
break;
}
}
return GST_FLOW_OK;
+ /* special conditions */
out_unref:
{
GST_QUEUE_MUTEX_UNLOCK;
}
out_flushing:
{
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
+ GstFlowReturn ret = queue->srcresult;
+
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "exit because task paused, reason: %d", ret);
GST_QUEUE_MUTEX_UNLOCK;
- gst_pad_pause_task (queue->srcpad);
gst_buffer_unref (buffer);
- return GST_FLOW_WRONG_STATE;
+ return ret;
}
}
GST_QUEUE_MUTEX_LOCK;
restart:
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
while (gst_queue_is_empty (queue)) {
GST_QUEUE_MUTEX_UNLOCK;
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
GST_QUEUE_MUTEX_LOCK;
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
+
STATUS (queue, "pre-empty wait");
while (gst_queue_is_empty (queue)) {
STATUS (queue, "waiting for item_add");
- if (queue->flushing)
- goto out_flushing;
-
GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
g_thread_self ());
g_cond_wait (queue->item_add, queue->qlock);
- if (queue->flushing)
+ /* we released the lock in the g_cond above so we might be
+ * flushing now */
+ if (queue->srcresult != GST_FLOW_OK)
goto out_flushing;
GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
GST_QUEUE_MUTEX_UNLOCK;
g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
GST_QUEUE_MUTEX_LOCK;
+
+ if (queue->srcresult != GST_FLOW_OK)
+ goto out_flushing;
}
/* There's something in the list now, whatever it is */
GST_QUEUE_MUTEX_UNLOCK;
result = gst_pad_push (pad, GST_BUFFER (data));
GST_QUEUE_MUTEX_LOCK;
+ /* can opt to check for srcresult here but the push should
+ * return an error value that is more accurate */
if (result != GST_FLOW_OK) {
+ queue->srcresult = result;
+ if (GST_FLOW_IS_FATAL (result)) {
+ GST_ELEMENT_ERROR (queue, STREAM, STOPPED,
+ ("streaming stopped, reason %d", result),
+ ("streaming stopped, reason %d", result));
+ gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS));
+ }
gst_pad_pause_task (queue->srcpad);
}
} else {
if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
+ /* all incomming data is now unexpected */
+ queue->srcresult = GST_FLOW_UNEXPECTED;
+ /* and we don't need to process anymore */
gst_pad_pause_task (queue->srcpad);
restart = FALSE;
}
GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
+
return;
out_flushing:
- GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
- gst_pad_pause_task (pad);
- GST_QUEUE_MUTEX_UNLOCK;
- return;
+ {
+ GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+ "exit because task paused, reason: %d", queue->srcresult);
+ GST_QUEUE_MUTEX_UNLOCK;
+
+ return;
+ }
}
gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
{
gboolean res = TRUE;
-
-#ifndef GST_DISABLE_GST_DEBUG
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+#ifndef GST_DISABLE_GST_DEBUG
GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
event, GST_EVENT_TYPE (event));
#endif
- res = gst_pad_event_default (pad, event);
+ res = gst_pad_push_event (queue->sinkpad, event);
return res;
}
gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
{
GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+ GstPad *peer;
+ gboolean res;
- if (!GST_PAD_PEER (queue->sinkpad))
+ if (!(peer = gst_pad_get_peer (queue->sinkpad)))
return FALSE;
- if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query))
+
+ res = gst_pad_query (peer, query);
+ gst_object_unref (peer);
+ if (!res)
return FALSE;
switch (GST_QUERY_TYPE (query)) {
gboolean result = FALSE;
GstQueue *queue;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
- queue->flushing = FALSE;
+ queue->srcresult = GST_FLOW_OK;
result = TRUE;
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = TRUE;
+ queue->srcresult = GST_FLOW_WRONG_STATE;
gst_queue_locked_flush (queue);
g_cond_signal (queue->item_del);
GST_QUEUE_MUTEX_UNLOCK;
/* step 2, make sure streaming finishes */
result = gst_pad_stop_task (pad);
}
+ gst_object_unref (queue);
return result;
}
gboolean result = FALSE;
GstQueue *queue;
- queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+ queue = GST_QUEUE (gst_pad_get_parent (pad));
if (active) {
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = FALSE;
+ queue->srcresult = GST_FLOW_OK;
result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
GST_QUEUE_MUTEX_UNLOCK;
} else {
/* step 1, unblock chain and loop functions */
GST_QUEUE_MUTEX_LOCK;
- queue->flushing = TRUE;
+ queue->srcresult = GST_FLOW_WRONG_STATE;
g_cond_signal (queue->item_add);
GST_QUEUE_MUTEX_UNLOCK;
result = gst_pad_stop_task (pad);
}
+ gst_object_unref (queue);
+
return result;
}
GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
- /* lock the queue so another thread (not in sync with this thread's state)
- * can't call this queue's _loop (or whatever) */
-
switch (GST_STATE_TRANSITION (element)) {
case GST_STATE_NULL_TO_READY:
break;
break;
}
- GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
-
return ret;
}
{
GstQueue *queue = GST_QUEUE (object);
+ GST_QUEUE_MUTEX_LOCK;
+
switch (prop_id) {
case ARG_CUR_LEVEL_BYTES:
g_value_set_uint (value, queue->cur_level.bytes);
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
+
+ GST_QUEUE_MUTEX_UNLOCK;
}
GstPad *sinkpad;
GstPad *srcpad;
+ /* flowreturn when srcpad is paused */
+ GstFlowReturn srcresult;
+
/* the queue of data we're keeping our grubby hands on */
GQueue *queue;
/* it the queue should fail on possible deadlocks */
gboolean may_deadlock;
- gboolean flushing;
GMutex *qlock; /* lock for queue (vs object lock) */
GCond *item_add; /* signals buffers now available for reading */