plugins: use new gst_buffer_list_calculate_size()
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index e8f7f89..9c887c1 100644 (file)
@@ -24,6 +24,7 @@
 
 /**
  * SECTION:element-queue
+ * @title: queue
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -747,26 +748,14 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
-static gboolean
-buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
-{
-  guint *p_size = data;
-  gsize buf_size;
-
-  buf_size = gst_buffer_get_size (*buf);
-  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
-  *p_size += buf_size;
-  return TRUE;
-}
-
 static inline void
 gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
 {
   GstQueueItem qitem;
   GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
-  gsize bsize = 0;
+  gsize bsize;
 
-  gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
+  bsize = gst_buffer_list_calculate_size (buffer_list);
 
   /* add buffer to the statistics */
   queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
@@ -921,9 +910,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   queue = GST_QUEUE (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      STATUS (queue, pad, "received flush start event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -949,7 +940,6 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       GST_QUEUE_MUTEX_UNLOCK (queue);
       break;
     case GST_EVENT_FLUSH_STOP:
-      STATUS (queue, pad, "received flush stop event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -973,6 +963,14 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
         GST_QUEUE_MUTEX_LOCK (queue);
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->srcresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->srcresult = GST_FLOW_OK;
+
         if (queue->srcresult != GST_FLOW_OK) {
           /* Errors in sticky event pushing are no problem and ignored here
            * as they will cause more meaningful errors during data flow.
@@ -993,9 +991,30 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
             goto out_flow_error;
           }
         }
-        /* refuse more events on EOS */
-        if (queue->eos)
-          goto out_eos;
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+              } else {
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue_locked_enqueue_event (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -1004,8 +1023,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       }
       break;
   }
-  if (ret == FALSE)
+  if (ret == FALSE) {
+    GST_ERROR_OBJECT (queue, "Failed to push event");
     return GST_FLOW_ERROR;
+  }
   return GST_FLOW_OK;
 
   /* ERRORS */
@@ -1045,7 +1066,10 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
         qitem.size = 0;
         gst_queue_array_push_tail_struct (queue->queue, &qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        g_cond_wait (&queue->query_handled, &queue->qlock);
+        while (queue->srcresult == GST_FLOW_OK &&
+            queue->last_handled_query != query)
+          g_cond_wait (&queue->query_handled, &queue->qlock);
+        queue->last_handled_query = NULL;
         if (queue->srcresult != GST_FLOW_OK)
           goto out_flushing;
         res = queue->last_query;
@@ -1394,7 +1418,8 @@ next:
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+              || type == GST_EVENT_STREAM_START) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
                 "pushing pushable event %s after EOS",
@@ -1443,6 +1468,7 @@ next:
     ret = gst_pad_peer_query (queue->srcpad, query);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
     queue->last_query = ret;
+    queue->last_handled_query = query;
     g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
@@ -1452,21 +1478,25 @@ next:
   /* ERRORS */
 no_item:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+    GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
         "exit because we have no item in the queue");
     return GST_FLOW_ERROR;
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GstFlowReturn ret = queue->srcresult;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 out_flushing_query:
   {
+    GstFlowReturn ret = queue->srcresult;
     queue->last_query = FALSE;
     g_cond_signal (&queue->query_handled);
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 }
 
@@ -1606,9 +1636,13 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       switch (format) {
         case GST_FORMAT_BYTES:
           peer_pos -= queue->cur_level.bytes;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         case GST_FORMAT_TIME:
           peer_pos -= queue->cur_level.time;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         default:
           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
@@ -1725,6 +1759,10 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
 
         /* step 2, make sure streaming finishes */
         result = gst_pad_stop_task (pad);
+
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, FALSE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       break;
     default: