gst/gstqueue.*: Propagate GstFlowReturn more intelligently upstream and output an...
authorWim Taymans <wim.taymans@gmail.com>
Tue, 19 Jul 2005 17:46:37 +0000 (17:46 +0000)
committerWim Taymans <wim.taymans@gmail.com>
Tue, 19 Jul 2005 17:46:37 +0000 (17:46 +0000)
Original commit message from CVS:
* gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event),
(gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event),
(gst_queue_handle_src_query), (gst_queue_sink_activate_push),
(gst_queue_src_activate_push), (gst_queue_change_state),
(gst_queue_get_property):
* gst/gstqueue.h:
Propagate GstFlowReturn more intelligently upstream and output
an ERROR/EOS when streaming stopped due to fatal error.

ChangeLog
gst/gstqueue.c
gst/gstqueue.h
plugins/elements/gstqueue.c
plugins/elements/gstqueue.h

index 42acdcd..8ae0249 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,5 +1,16 @@
 2005-07-19  Wim Taymans  <wim@fluendo.com>
 
+       * gst/gstqueue.c: (gst_queue_init), (gst_queue_handle_sink_event),
+       (gst_queue_chain), (gst_queue_loop), (gst_queue_handle_src_event),
+       (gst_queue_handle_src_query), (gst_queue_sink_activate_push),
+       (gst_queue_src_activate_push), (gst_queue_change_state),
+       (gst_queue_get_property):
+       * gst/gstqueue.h:
+       Propagate GstFlowReturn more intelligently upstream and output
+       an ERROR/EOS when streaming stopped due to fatal error.
+
+2005-07-19  Wim Taymans  <wim@fluendo.com>
+
        * tools/gst-launch.c: (check_intr), (event_loop), (main):
        Don't block forever for the state change to complete, the
        pipeline already did with a sensible timeout.
index 0629763..f8d3708 100644 (file)
@@ -338,7 +338,7 @@ gst_queue_init (GstQueue * queue)
   queue->leaky = GST_QUEUE_NO_LEAK;
   queue->may_deadlock = TRUE;
   queue->block_timeout = GST_CLOCK_TIME_NONE;
-  queue->flushing = FALSE;
+  queue->srcresult = GST_FLOW_WRONG_STATE;
 
   queue->qlock = g_mutex_new ();
   queue->item_add = g_cond_new ();
@@ -473,22 +473,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       gst_pad_push_event (queue->srcpad, event);
       if (GST_EVENT_FLUSH_DONE (event)) {
         GST_QUEUE_MUTEX_LOCK;
-        queue->flushing = FALSE;
+        gst_queue_locked_flush (queue);
+        queue->srcresult = GST_FLOW_OK;
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
             queue->srcpad);
         GST_QUEUE_MUTEX_UNLOCK;
       } else {
         /* now unblock the chain function */
         GST_QUEUE_MUTEX_LOCK;
-        queue->flushing = TRUE;
-        gst_queue_locked_flush (queue);
+        queue->srcresult = GST_FLOW_WRONG_STATE;
         /* unblock the loop function */
         g_cond_signal (queue->item_add);
         GST_QUEUE_MUTEX_UNLOCK;
 
         STATUS (queue, "after flush");
 
-        /* make sure it stops */
+        /* make sure it pauses */
         gst_pad_pause_task (queue->srcpad);
         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
       }
@@ -548,6 +548,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   /* we have to lock the queue since we span threads */
   GST_QUEUE_MUTEX_LOCK;
 
+  if (queue->srcresult != GST_FLOW_OK)
+    goto out_flushing;
+
   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
       "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
 
@@ -559,6 +562,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
     GST_QUEUE_MUTEX_LOCK;
 
+    if (queue->srcresult != GST_FLOW_OK)
+      goto out_flushing;
+
     /* how are we going to make space for this buffer? */
     switch (queue->leaky) {
         /* leak current buffer */
@@ -621,7 +627,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
           STATUS (queue, "waiting for item_del signal from thread using qlock");
           g_cond_wait (queue->item_del, queue->qlock);
 
-          if (queue->flushing)
+          if (queue->srcresult != GST_FLOW_OK)
             goto out_flushing;
 
           /* if there's a pending state change for this queue
@@ -634,6 +640,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
         GST_QUEUE_MUTEX_UNLOCK;
         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
         GST_QUEUE_MUTEX_LOCK;
+
+        if (queue->srcresult != GST_FLOW_OK)
+          goto out_flushing;
+
         break;
     }
   }
@@ -654,6 +664,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   return GST_FLOW_OK;
 
+  /* special conditions */
 out_unref:
   {
     GST_QUEUE_MUTEX_UNLOCK;
@@ -664,13 +675,15 @@ out_unref:
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
+    GstFlowReturn ret = queue->srcresult;
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason:  %d", ret);
     GST_QUEUE_MUTEX_UNLOCK;
-    gst_pad_pause_task (queue->srcpad);
 
     gst_buffer_unref (buffer);
 
-    return GST_FLOW_WRONG_STATE;
+    return ret;
   }
 }
 
@@ -687,23 +700,28 @@ gst_queue_loop (GstPad * pad)
   GST_QUEUE_MUTEX_LOCK;
 
 restart:
+  if (queue->srcresult != GST_FLOW_OK)
+    goto out_flushing;
+
   while (gst_queue_is_empty (queue)) {
     GST_QUEUE_MUTEX_UNLOCK;
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
     GST_QUEUE_MUTEX_LOCK;
 
+    if (queue->srcresult != GST_FLOW_OK)
+      goto out_flushing;
+
     STATUS (queue, "pre-empty wait");
     while (gst_queue_is_empty (queue)) {
       STATUS (queue, "waiting for item_add");
 
-      if (queue->flushing)
-        goto out_flushing;
-
       GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
           g_thread_self ());
       g_cond_wait (queue->item_add, queue->qlock);
 
-      if (queue->flushing)
+      /* we released the lock in the g_cond above so we might be 
+       * flushing now */
+      if (queue->srcresult != GST_FLOW_OK)
         goto out_flushing;
 
       GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
@@ -715,6 +733,9 @@ restart:
     GST_QUEUE_MUTEX_UNLOCK;
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
     GST_QUEUE_MUTEX_LOCK;
+
+    if (queue->srcresult != GST_FLOW_OK)
+      goto out_flushing;
   }
 
   /* There's something in the list now, whatever it is */
@@ -734,11 +755,23 @@ restart:
     GST_QUEUE_MUTEX_UNLOCK;
     result = gst_pad_push (pad, GST_BUFFER (data));
     GST_QUEUE_MUTEX_LOCK;
+    /* can opt to check for srcresult here but the push should
+     * return an error value that is more accurate */
     if (result != GST_FLOW_OK) {
+      queue->srcresult = result;
+      if (GST_FLOW_IS_FATAL (result)) {
+        GST_ELEMENT_ERROR (queue, STREAM, STOPPED,
+            ("streaming stopped, reason %d", result),
+            ("streaming stopped, reason %d", result));
+        gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS));
+      }
       gst_pad_pause_task (queue->srcpad);
     }
   } else {
     if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
+      /* all incomming data is now unexpected */
+      queue->srcresult = GST_FLOW_UNEXPECTED;
+      /* and we don't need to process anymore */
       gst_pad_pause_task (queue->srcpad);
       restart = FALSE;
     }
@@ -754,13 +787,17 @@ restart:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
   GST_QUEUE_MUTEX_UNLOCK;
+
   return;
 
 out_flushing:
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
-  gst_pad_pause_task (pad);
-  GST_QUEUE_MUTEX_UNLOCK;
-  return;
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason:  %d", queue->srcresult);
+    GST_QUEUE_MUTEX_UNLOCK;
+
+    return;
+  }
 }
 
 
@@ -768,15 +805,14 @@ static gboolean
 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
 {
   gboolean res = TRUE;
-
-#ifndef GST_DISABLE_GST_DEBUG
   GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
+#ifndef GST_DISABLE_GST_DEBUG
   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
       event, GST_EVENT_TYPE (event));
 #endif
 
-  res = gst_pad_event_default (pad, event);
+  res = gst_pad_push_event (queue->sinkpad, event);
 
   return res;
 }
@@ -785,10 +821,15 @@ static gboolean
 gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
 {
   GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+  GstPad *peer;
+  gboolean res;
 
-  if (!GST_PAD_PEER (queue->sinkpad))
+  if (!(peer = gst_pad_get_peer (queue->sinkpad)))
     return FALSE;
-  if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query))
+
+  res = gst_pad_query (peer, query);
+  gst_object_unref (peer);
+  if (!res)
     return FALSE;
 
   switch (GST_QUERY_TYPE (query)) {
@@ -829,15 +870,15 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
   gboolean result = FALSE;
   GstQueue *queue;
 
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
-    queue->flushing = FALSE;
+    queue->srcresult = GST_FLOW_OK;
     result = TRUE;
   } else {
     /* step 1, unblock chain and loop functions */
     GST_QUEUE_MUTEX_LOCK;
-    queue->flushing = TRUE;
+    queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
     g_cond_signal (queue->item_del);
     GST_QUEUE_MUTEX_UNLOCK;
@@ -845,6 +886,7 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
     /* step 2, make sure streaming finishes */
     result = gst_pad_stop_task (pad);
   }
+  gst_object_unref (queue);
 
   return result;
 }
@@ -855,17 +897,17 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
   gboolean result = FALSE;
   GstQueue *queue;
 
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
     GST_QUEUE_MUTEX_LOCK;
-    queue->flushing = FALSE;
+    queue->srcresult = GST_FLOW_OK;
     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
     GST_QUEUE_MUTEX_UNLOCK;
   } else {
     /* step 1, unblock chain and loop functions */
     GST_QUEUE_MUTEX_LOCK;
-    queue->flushing = TRUE;
+    queue->srcresult = GST_FLOW_WRONG_STATE;
     g_cond_signal (queue->item_add);
     GST_QUEUE_MUTEX_UNLOCK;
 
@@ -873,6 +915,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
     result = gst_pad_stop_task (pad);
   }
 
+  gst_object_unref (queue);
+
   return result;
 }
 
@@ -886,9 +930,6 @@ gst_queue_change_state (GstElement * element)
 
   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
 
-  /* lock the queue so another thread (not in sync with this thread's state)
-   * can't call this queue's _loop (or whatever) */
-
   switch (GST_STATE_TRANSITION (element)) {
     case GST_STATE_NULL_TO_READY:
       break;
@@ -913,8 +954,6 @@ gst_queue_change_state (GstElement * element)
       break;
   }
 
-  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
-
   return ret;
 }
 
@@ -970,6 +1009,8 @@ gst_queue_get_property (GObject * object,
 {
   GstQueue *queue = GST_QUEUE (object);
 
+  GST_QUEUE_MUTEX_LOCK;
+
   switch (prop_id) {
     case ARG_CUR_LEVEL_BYTES:
       g_value_set_uint (value, queue->cur_level.bytes);
@@ -1011,4 +1052,6 @@ gst_queue_get_property (GObject * object,
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
+
+  GST_QUEUE_MUTEX_UNLOCK;
 }
index b0a2479..aec0946 100644 (file)
@@ -62,6 +62,9 @@ struct _GstQueue {
   GstPad *sinkpad;
   GstPad *srcpad;
 
+  /* flowreturn when srcpad is paused */
+  GstFlowReturn srcresult;
+
   /* the queue of data we're keeping our grubby hands on */
   GQueue *queue;
 
@@ -79,7 +82,6 @@ struct _GstQueue {
 
   /* it the queue should fail on possible deadlocks */
   gboolean may_deadlock;
-  gboolean flushing;
 
   GMutex *qlock;       /* lock for queue (vs object lock) */
   GCond *item_add;     /* signals buffers now available for reading */
index 0629763..f8d3708 100644 (file)
@@ -338,7 +338,7 @@ gst_queue_init (GstQueue * queue)
   queue->leaky = GST_QUEUE_NO_LEAK;
   queue->may_deadlock = TRUE;
   queue->block_timeout = GST_CLOCK_TIME_NONE;
-  queue->flushing = FALSE;
+  queue->srcresult = GST_FLOW_WRONG_STATE;
 
   queue->qlock = g_mutex_new ();
   queue->item_add = g_cond_new ();
@@ -473,22 +473,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       gst_pad_push_event (queue->srcpad, event);
       if (GST_EVENT_FLUSH_DONE (event)) {
         GST_QUEUE_MUTEX_LOCK;
-        queue->flushing = FALSE;
+        gst_queue_locked_flush (queue);
+        queue->srcresult = GST_FLOW_OK;
         gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
             queue->srcpad);
         GST_QUEUE_MUTEX_UNLOCK;
       } else {
         /* now unblock the chain function */
         GST_QUEUE_MUTEX_LOCK;
-        queue->flushing = TRUE;
-        gst_queue_locked_flush (queue);
+        queue->srcresult = GST_FLOW_WRONG_STATE;
         /* unblock the loop function */
         g_cond_signal (queue->item_add);
         GST_QUEUE_MUTEX_UNLOCK;
 
         STATUS (queue, "after flush");
 
-        /* make sure it stops */
+        /* make sure it pauses */
         gst_pad_pause_task (queue->srcpad);
         GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
       }
@@ -548,6 +548,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
   /* we have to lock the queue since we span threads */
   GST_QUEUE_MUTEX_LOCK;
 
+  if (queue->srcresult != GST_FLOW_OK)
+    goto out_flushing;
+
   GST_CAT_LOG_OBJECT (queue_dataflow, queue,
       "adding buffer %p of size %d", buffer, GST_BUFFER_SIZE (buffer));
 
@@ -559,6 +562,9 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_OVERRUN], 0);
     GST_QUEUE_MUTEX_LOCK;
 
+    if (queue->srcresult != GST_FLOW_OK)
+      goto out_flushing;
+
     /* how are we going to make space for this buffer? */
     switch (queue->leaky) {
         /* leak current buffer */
@@ -621,7 +627,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
           STATUS (queue, "waiting for item_del signal from thread using qlock");
           g_cond_wait (queue->item_del, queue->qlock);
 
-          if (queue->flushing)
+          if (queue->srcresult != GST_FLOW_OK)
             goto out_flushing;
 
           /* if there's a pending state change for this queue
@@ -634,6 +640,10 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
         GST_QUEUE_MUTEX_UNLOCK;
         g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
         GST_QUEUE_MUTEX_LOCK;
+
+        if (queue->srcresult != GST_FLOW_OK)
+          goto out_flushing;
+
         break;
     }
   }
@@ -654,6 +664,7 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   return GST_FLOW_OK;
 
+  /* special conditions */
 out_unref:
   {
     GST_QUEUE_MUTEX_UNLOCK;
@@ -664,13 +675,15 @@ out_unref:
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
+    GstFlowReturn ret = queue->srcresult;
+
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason:  %d", ret);
     GST_QUEUE_MUTEX_UNLOCK;
-    gst_pad_pause_task (queue->srcpad);
 
     gst_buffer_unref (buffer);
 
-    return GST_FLOW_WRONG_STATE;
+    return ret;
   }
 }
 
@@ -687,23 +700,28 @@ gst_queue_loop (GstPad * pad)
   GST_QUEUE_MUTEX_LOCK;
 
 restart:
+  if (queue->srcresult != GST_FLOW_OK)
+    goto out_flushing;
+
   while (gst_queue_is_empty (queue)) {
     GST_QUEUE_MUTEX_UNLOCK;
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_UNDERRUN], 0);
     GST_QUEUE_MUTEX_LOCK;
 
+    if (queue->srcresult != GST_FLOW_OK)
+      goto out_flushing;
+
     STATUS (queue, "pre-empty wait");
     while (gst_queue_is_empty (queue)) {
       STATUS (queue, "waiting for item_add");
 
-      if (queue->flushing)
-        goto out_flushing;
-
       GST_LOG_OBJECT (queue, "doing g_cond_wait using qlock from thread %p",
           g_thread_self ());
       g_cond_wait (queue->item_add, queue->qlock);
 
-      if (queue->flushing)
+      /* we released the lock in the g_cond above so we might be 
+       * flushing now */
+      if (queue->srcresult != GST_FLOW_OK)
         goto out_flushing;
 
       GST_LOG_OBJECT (queue, "done g_cond_wait using qlock from thread %p",
@@ -715,6 +733,9 @@ restart:
     GST_QUEUE_MUTEX_UNLOCK;
     g_signal_emit (G_OBJECT (queue), gst_queue_signals[SIGNAL_RUNNING], 0);
     GST_QUEUE_MUTEX_LOCK;
+
+    if (queue->srcresult != GST_FLOW_OK)
+      goto out_flushing;
   }
 
   /* There's something in the list now, whatever it is */
@@ -734,11 +755,23 @@ restart:
     GST_QUEUE_MUTEX_UNLOCK;
     result = gst_pad_push (pad, GST_BUFFER (data));
     GST_QUEUE_MUTEX_LOCK;
+    /* can opt to check for srcresult here but the push should
+     * return an error value that is more accurate */
     if (result != GST_FLOW_OK) {
+      queue->srcresult = result;
+      if (GST_FLOW_IS_FATAL (result)) {
+        GST_ELEMENT_ERROR (queue, STREAM, STOPPED,
+            ("streaming stopped, reason %d", result),
+            ("streaming stopped, reason %d", result));
+        gst_pad_push_event (queue->srcpad, gst_event_new (GST_EVENT_EOS));
+      }
       gst_pad_pause_task (queue->srcpad);
     }
   } else {
     if (GST_EVENT_TYPE (data) == GST_EVENT_EOS) {
+      /* all incomming data is now unexpected */
+      queue->srcresult = GST_FLOW_UNEXPECTED;
+      /* and we don't need to process anymore */
       gst_pad_pause_task (queue->srcpad);
       restart = FALSE;
     }
@@ -754,13 +787,17 @@ restart:
   GST_CAT_LOG_OBJECT (queue_dataflow, queue, "signalling item_del");
   g_cond_signal (queue->item_del);
   GST_QUEUE_MUTEX_UNLOCK;
+
   return;
 
 out_flushing:
-  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because of flush");
-  gst_pad_pause_task (pad);
-  GST_QUEUE_MUTEX_UNLOCK;
-  return;
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason:  %d", queue->srcresult);
+    GST_QUEUE_MUTEX_UNLOCK;
+
+    return;
+  }
 }
 
 
@@ -768,15 +805,14 @@ static gboolean
 gst_queue_handle_src_event (GstPad * pad, GstEvent * event)
 {
   gboolean res = TRUE;
-
-#ifndef GST_DISABLE_GST_DEBUG
   GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
 
+#ifndef GST_DISABLE_GST_DEBUG
   GST_CAT_DEBUG_OBJECT (queue_dataflow, queue, "got event %p (%d)",
       event, GST_EVENT_TYPE (event));
 #endif
 
-  res = gst_pad_event_default (pad, event);
+  res = gst_pad_push_event (queue->sinkpad, event);
 
   return res;
 }
@@ -785,10 +821,15 @@ static gboolean
 gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
 {
   GstQueue *queue = GST_QUEUE (GST_PAD_PARENT (pad));
+  GstPad *peer;
+  gboolean res;
 
-  if (!GST_PAD_PEER (queue->sinkpad))
+  if (!(peer = gst_pad_get_peer (queue->sinkpad)))
     return FALSE;
-  if (!gst_pad_query (GST_PAD_PEER (queue->sinkpad), query))
+
+  res = gst_pad_query (peer, query);
+  gst_object_unref (peer);
+  if (!res)
     return FALSE;
 
   switch (GST_QUERY_TYPE (query)) {
@@ -829,15 +870,15 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
   gboolean result = FALSE;
   GstQueue *queue;
 
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
-    queue->flushing = FALSE;
+    queue->srcresult = GST_FLOW_OK;
     result = TRUE;
   } else {
     /* step 1, unblock chain and loop functions */
     GST_QUEUE_MUTEX_LOCK;
-    queue->flushing = TRUE;
+    queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
     g_cond_signal (queue->item_del);
     GST_QUEUE_MUTEX_UNLOCK;
@@ -845,6 +886,7 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
     /* step 2, make sure streaming finishes */
     result = gst_pad_stop_task (pad);
   }
+  gst_object_unref (queue);
 
   return result;
 }
@@ -855,17 +897,17 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
   gboolean result = FALSE;
   GstQueue *queue;
 
-  queue = GST_QUEUE (GST_OBJECT_PARENT (pad));
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
 
   if (active) {
     GST_QUEUE_MUTEX_LOCK;
-    queue->flushing = FALSE;
+    queue->srcresult = GST_FLOW_OK;
     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
     GST_QUEUE_MUTEX_UNLOCK;
   } else {
     /* step 1, unblock chain and loop functions */
     GST_QUEUE_MUTEX_LOCK;
-    queue->flushing = TRUE;
+    queue->srcresult = GST_FLOW_WRONG_STATE;
     g_cond_signal (queue->item_add);
     GST_QUEUE_MUTEX_UNLOCK;
 
@@ -873,6 +915,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
     result = gst_pad_stop_task (pad);
   }
 
+  gst_object_unref (queue);
+
   return result;
 }
 
@@ -886,9 +930,6 @@ gst_queue_change_state (GstElement * element)
 
   GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "starting state change");
 
-  /* lock the queue so another thread (not in sync with this thread's state)
-   * can't call this queue's _loop (or whatever) */
-
   switch (GST_STATE_TRANSITION (element)) {
     case GST_STATE_NULL_TO_READY:
       break;
@@ -913,8 +954,6 @@ gst_queue_change_state (GstElement * element)
       break;
   }
 
-  GST_CAT_LOG_OBJECT (GST_CAT_STATES, element, "done with state change");
-
   return ret;
 }
 
@@ -970,6 +1009,8 @@ gst_queue_get_property (GObject * object,
 {
   GstQueue *queue = GST_QUEUE (object);
 
+  GST_QUEUE_MUTEX_LOCK;
+
   switch (prop_id) {
     case ARG_CUR_LEVEL_BYTES:
       g_value_set_uint (value, queue->cur_level.bytes);
@@ -1011,4 +1052,6 @@ gst_queue_get_property (GObject * object,
       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
       break;
   }
+
+  GST_QUEUE_MUTEX_UNLOCK;
 }
index b0a2479..aec0946 100644 (file)
@@ -62,6 +62,9 @@ struct _GstQueue {
   GstPad *sinkpad;
   GstPad *srcpad;
 
+  /* flowreturn when srcpad is paused */
+  GstFlowReturn srcresult;
+
   /* the queue of data we're keeping our grubby hands on */
   GQueue *queue;
 
@@ -79,7 +82,6 @@ struct _GstQueue {
 
   /* it the queue should fail on possible deadlocks */
   gboolean may_deadlock;
-  gboolean flushing;
 
   GMutex *qlock;       /* lock for queue (vs object lock) */
   GCond *item_add;     /* signals buffers now available for reading */