gst/playback/gstqueue2.c: Also fix #476514 for queue2.
authorWim Taymans <wim.taymans@gmail.com>
Mon, 17 Sep 2007 16:22:17 +0000 (16:22 +0000)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 29 Oct 2009 10:17:14 +0000 (11:17 +0100)
Original commit message from CVS:
* gst/playback/gstqueue2.c: (update_buffering),
(gst_queue_locked_flush), (gst_queue_locked_enqueue),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_push_one), (gst_queue_sink_activate_push),
(gst_queue_src_activate_push), (gst_queue_src_activate_pull):
Also fix #476514 for queue2.

gst/playback/gstqueue2.c

index 44627518b7edc114faa0d6af66f7b3c0ca5b814d..d478cda914b352a0ce43b4be15c35e26221b3de3 100644 (file)
@@ -158,6 +158,7 @@ struct _GstQueue
   /* flowreturn when srcpad is paused */
   GstFlowReturn srcresult;
   gboolean is_eos;
+  gboolean unexpected;
 
   /* the queue of data we're keeping our hands on */
   GQueue *queue;
@@ -166,11 +167,13 @@ struct _GstQueue
   GstQueueSize max_level;       /* max. amount of data allowed in the queue */
   gboolean use_buffering;
   gboolean use_rate_estimate;
+  GstClockTime buffering_interval;
   gint low_percent;             /* low/high watermarks for buffering */
   gint high_percent;
 
   /* current buffering state */
   gboolean is_buffering;
+  guint buffering_iteration;
 
   /* for measuring input/output rates */
   guint64 bytes_in;
@@ -682,6 +685,7 @@ update_buffering (GstQueue * queue)
      * below the low threshold */
     if (percent < queue->low_percent) {
       queue->is_buffering = TRUE;
+      queue->buffering_iteration++;
       post = TRUE;
     }
   }
@@ -985,7 +989,6 @@ gst_queue_locked_flush (GstQueue * queue)
   GST_QUEUE_CLEAR_LEVEL (queue->cur_level);
   gst_segment_init (&queue->sink_segment, GST_FORMAT_TIME);
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
-  queue->is_eos = FALSE;
   if (queue->starting_segment != NULL)
     gst_event_unref (queue->starting_segment);
   queue->starting_segment = NULL;
@@ -1041,6 +1044,9 @@ gst_queue_locked_enqueue (GstQueue * queue, gpointer item)
           queue->segment_event_received = TRUE;
           queue->starting_segment = event;
         }
+        /* a new segment allows us to accept more buffers if we got UNEXPECTED
+         * from downstream */
+        queue->unexpected = FALSE;
         break;
       default:
         if (QUEUE_IS_USING_TEMP_FILE (queue))
@@ -1182,6 +1188,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       GST_QUEUE_MUTEX_LOCK (queue);
       gst_queue_locked_flush (queue);
       queue->srcresult = GST_FLOW_OK;
+      queue->is_eos = FALSE;
+      queue->unexpected = FALSE;
       /* reset rate counters */
       reset_rate_timer (queue);
       gst_pad_start_task (queue->srcpad, (GstTaskFunction) gst_queue_loop,
@@ -1193,6 +1201,9 @@ gst_queue_handle_sink_event (GstPad * pad, GstEvent * event)
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
         GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+        /* refuse more events on EOS */
+        if (queue->is_eos)
+          goto out_eos;
         gst_queue_locked_enqueue (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -1207,10 +1218,16 @@ done:
   /* ERRORS */
 out_flushing:
   {
-    GST_DEBUG_OBJECT (queue, "we are flushing");
+    GST_DEBUG_OBJECT (queue, "refusing event, we are flushing");
     GST_QUEUE_MUTEX_UNLOCK (queue);
-
-    gst_buffer_unref (event);
+    gst_event_unref (event);
+    return FALSE;
+  }
+out_eos:
+  {
+    GST_DEBUG_OBJECT (queue, "refusing event, we are EOS");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    gst_event_unref (event);
     return FALSE;
   }
 }
@@ -1275,6 +1292,12 @@ gst_queue_chain (GstPad * pad, GstBuffer * buffer)
 
   /* we have to lock the queue since we span threads */
   GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+  /* when we received EOS, we refuse more data */
+  if (queue->is_eos)
+    goto out_eos;
+  /* when we received unexpected from downstream, refuse more buffers */
+  if (queue->unexpected)
+    goto out_unexpected;
 
   /* We make space available if we're "full" according to whatever
    * the user defined as "full". */
@@ -1299,11 +1322,27 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
-
     gst_buffer_unref (buffer);
 
     return ret;
   }
+out_eos:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we received EOS");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    gst_buffer_unref (buffer);
+
+    return GST_FLOW_UNEXPECTED;
+  }
+out_unexpected:
+  {
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because we received UNEXPECTED");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    gst_buffer_unref (buffer);
+
+    return GST_FLOW_UNEXPECTED;
+  }
 }
 
 /* dequeue an item from the queue an push it downstream. This functions returns
@@ -1318,6 +1357,7 @@ gst_queue_push_one (GstQueue * queue)
   if (data == NULL)
     goto no_item;
 
+next:
   if (GST_IS_BUFFER (data)) {
     GstBuffer *buffer = GST_BUFFER_CAST (data);
 
@@ -1327,6 +1367,42 @@ gst_queue_push_one (GstQueue * queue)
 
     /* need to check for srcresult here as well */
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+    if (result == GST_FLOW_UNEXPECTED) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "got UNEXPECTED from downstream");
+      /* stop pushing buffers, we dequeue all items until we see an item that we
+       * can push again, which is EOS or NEWSEGMENT. If there is nothing in the
+       * queue we can push, we set a flag to make the sinkpad refuse more
+       * buffers with an UNEXPECTED return value until we receive something
+       * pushable again or we get flushed. */
+      while ((data = gst_queue_locked_dequeue (queue))) {
+        if (GST_IS_BUFFER (data)) {
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping UNEXPECTED buffer %p", data);
+          gst_buffer_unref (GST_BUFFER_CAST (data));
+        } else if (GST_IS_EVENT (data)) {
+          GstEvent *event = GST_EVENT_CAST (data);
+          GstEventType type = GST_EVENT_TYPE (event);
+
+          if (type == GST_EVENT_EOS || type == GST_EVENT_NEWSEGMENT) {
+            /* we found a pushable item in the queue, push it out */
+            GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+                "pushing pushable event %s after UNEXPECTED %p",
+                GST_EVENT_TYPE_NAME (event));
+            goto next;
+          }
+          GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+              "dropping UNEXPECTED event %p", event);
+          gst_event_unref (event);
+        }
+      }
+      /* no more items in the queue. Set the unexpected flag so that upstream
+       * make us refuse any more buffers on the sinkpad. Since we will still
+       * accept EOS and NEWSEGMENT we return _FLOW_OK to the caller so that the
+       * task function does not shut down. */
+      queue->unexpected = TRUE;
+      result = GST_FLOW_OK;
+    }
   } else if (GST_IS_EVENT (data)) {
     GstEvent *event = GST_EVENT_CAST (data);
     GstEventType type = GST_EVENT_TYPE (event);
@@ -1337,8 +1413,11 @@ gst_queue_push_one (GstQueue * queue)
 
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
     /* if we're EOS, return UNEXPECTED so that the task pauses. */
-    if (type == GST_EVENT_EOS)
+    if (type == GST_EVENT_EOS) {
+      GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+          "pushed EOS event %p, return UNEXPECTED", event);
       result = GST_FLOW_UNEXPECTED;
+    }
   }
   return result;
 
@@ -1557,6 +1636,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
     GST_QUEUE_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "activating push mode");
     queue->srcresult = GST_FLOW_OK;
+    queue->is_eos = FALSE;
+    queue->unexpected = FALSE;
     reset_rate_timer (queue);
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
@@ -1587,6 +1668,8 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
     GST_QUEUE_MUTEX_LOCK (queue);
     GST_DEBUG_OBJECT (queue, "activating push mode");
     queue->srcresult = GST_FLOW_OK;
+    queue->is_eos = FALSE;
+    queue->unexpected = FALSE;
     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
@@ -1621,6 +1704,8 @@ gst_queue_src_activate_pull (GstPad * pad, gboolean active)
       GST_QUEUE_MUTEX_LOCK (queue);
       GST_DEBUG_OBJECT (queue, "activating pull mode");
       queue->srcresult = GST_FLOW_OK;
+      queue->is_eos = FALSE;
+      queue->unexpected = FALSE;
       result = TRUE;
       GST_QUEUE_MUTEX_UNLOCK (queue);
     } else {