plugins: use new gst_buffer_list_calculate_size()
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index 6d9810f..9c887c1 100644 (file)
@@ -24,6 +24,7 @@
 
 /**
  * SECTION:element-queue
+ * @title: queue
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -407,10 +408,8 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_element_class_set_static_metadata (gstelement_class,
       "Queue",
       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
 
   /* Registering debug symbols for function pointers */
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
@@ -467,8 +466,8 @@ gst_queue_init (GstQueue * queue)
       gst_queue_array_new_for_struct (sizeof (GstQueueItem),
       DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
 
-  queue->sinktime = GST_CLOCK_TIME_NONE;
-  queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = GST_CLOCK_STIME_NONE;
+  queue->srctime = GST_CLOCK_STIME_NONE;
 
   queue->sink_tainted = TRUE;
   queue->src_tainted = TRUE;
@@ -503,6 +502,23 @@ gst_queue_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. */
 static void
@@ -513,7 +529,7 @@ update_time_level (GstQueue * queue)
   if (queue->sink_tainted) {
     GST_LOG_OBJECT (queue, "update sink time");
     queue->sinktime =
-        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->sink_segment,
         queue->sink_segment.position);
     queue->sink_tainted = FALSE;
   }
@@ -522,14 +538,14 @@ update_time_level (GstQueue * queue)
   if (queue->src_tainted) {
     GST_LOG_OBJECT (queue, "update src time");
     queue->srctime =
-        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->src_segment,
         queue->src_segment.position);
     queue->src_tainted = FALSE;
   }
   src_time = queue->srctime;
 
-  GST_LOG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+  GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
   if (sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
@@ -613,7 +629,8 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
   if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_LOG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+  GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
+      segment == &queue->sink_segment ? "sink" : "src",
       GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
@@ -704,7 +721,7 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
 
-  queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
   queue->sink_tainted = queue->src_tainted = TRUE;
 
   /* we deleted a lot of something */
@@ -731,26 +748,14 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
-static gboolean
-buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
-{
-  guint *p_size = data;
-  gsize buf_size;
-
-  buf_size = gst_buffer_get_size (*buf);
-  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
-  *p_size += buf_size;
-  return TRUE;
-}
-
 static inline void
 gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
 {
   GstQueueItem qitem;
   GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
-  gsize bsize = 0;
+  gsize bsize;
 
-  gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
+  bsize = gst_buffer_list_calculate_size (buffer_list);
 
   /* add buffer to the statistics */
   queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
@@ -905,9 +910,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   queue = GST_QUEUE (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      STATUS (queue, pad, "received flush start event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -917,17 +924,22 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       /* unblock the loop and chain functions */
       GST_QUEUE_SIGNAL_ADD (queue);
       GST_QUEUE_SIGNAL_DEL (queue);
-      queue->last_query = FALSE;
-      g_cond_signal (&queue->query_handled);
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       /* make sure it pauses, this should happen since we sent
        * flush_start downstream. */
       gst_pad_pause_task (queue->srcpad);
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+
+      /* unblock query handler after the streaming thread is shut down.
+       * Otherwise downstream might have a query that is already unreffed
+       * upstream */
+      GST_QUEUE_MUTEX_LOCK (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
       break;
     case GST_EVENT_FLUSH_STOP:
-      STATUS (queue, pad, "received flush stop event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -951,6 +963,14 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
         GST_QUEUE_MUTEX_LOCK (queue);
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->srcresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->srcresult = GST_FLOW_OK;
+
         if (queue->srcresult != GST_FLOW_OK) {
           /* Errors in sticky event pushing are no problem and ignored here
            * as they will cause more meaningful errors during data flow.
@@ -964,19 +984,37 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
             if (queue->srcresult == GST_FLOW_NOT_LINKED
                 || queue->srcresult < GST_FLOW_EOS) {
               GST_QUEUE_MUTEX_UNLOCK (queue);
-              GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-                  (_("Internal data flow error.")),
-                  ("streaming task paused, reason %s (%d)",
-                      gst_flow_get_name (queue->srcresult), queue->srcresult));
+              GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
             } else {
               GST_QUEUE_MUTEX_UNLOCK (queue);
             }
             goto out_flow_error;
           }
         }
-        /* refuse more events on EOS */
-        if (queue->eos)
-          goto out_eos;
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+              } else {
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue_locked_enqueue_event (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -986,8 +1024,7 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       break;
   }
   if (ret == FALSE) {
-    if (GST_EVENT_TYPE (event) == GST_EVENT_CAPS)
-      return GST_FLOW_NOT_NEGOTIATED;
+    GST_ERROR_OBJECT (queue, "Failed to push event");
     return GST_FLOW_ERROR;
   }
   return GST_FLOW_OK;
@@ -1029,7 +1066,10 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
         qitem.size = 0;
         gst_queue_array_push_tail_struct (queue->queue, &qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        g_cond_wait (&queue->query_handled, &queue->qlock);
+        while (queue->srcresult == GST_FLOW_OK &&
+            queue->last_handled_query != query)
+          g_cond_wait (&queue->query_handled, &queue->qlock);
+        queue->last_handled_query = NULL;
         if (queue->srcresult != GST_FLOW_OK)
           goto out_flushing;
         res = queue->last_query;
@@ -1378,7 +1418,8 @@ next:
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+              || type == GST_EVENT_STREAM_START) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
                 "pushing pushable event %s after EOS",
@@ -1427,6 +1468,7 @@ next:
     ret = gst_pad_peer_query (queue->srcpad, query);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
     queue->last_query = ret;
+    queue->last_handled_query = query;
     g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
@@ -1436,21 +1478,25 @@ next:
   /* ERRORS */
 no_item:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+    GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
         "exit because we have no item in the queue");
     return GST_FLOW_ERROR;
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GstFlowReturn ret = queue->srcresult;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 out_flushing_query:
   {
+    GstFlowReturn ret = queue->srcresult;
     queue->last_query = FALSE;
     g_cond_signal (&queue->query_handled);
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 }
 
@@ -1516,10 +1562,7 @@ out_flushing:
     /* let app know about us giving up if upstream is not expected to do so */
     /* EOS is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
-      GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)",
-              gst_flow_get_name (ret), ret));
+      GST_ELEMENT_FLOW_ERROR (queue, ret);
       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
     }
     return;
@@ -1593,9 +1636,13 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       switch (format) {
         case GST_FORMAT_BYTES:
           peer_pos -= queue->cur_level.bytes;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         case GST_FORMAT_TIME:
           peer_pos -= queue->cur_level.time;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         default:
           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
@@ -1663,10 +1710,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_FLUSHING;
         /* the item del signal will unblock */
-        g_cond_signal (&queue->item_del);
-        /* unblock query handler */
-        queue->last_query = FALSE;
-        g_cond_signal (&queue->query_handled);
+        GST_QUEUE_SIGNAL_DEL (queue);
         GST_QUEUE_MUTEX_UNLOCK (queue);
 
         /* step 2, wait until streaming thread stopped and flush queue */
@@ -1715,6 +1759,10 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
 
         /* step 2, make sure streaming finishes */
         result = gst_pad_stop_task (pad);
+
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, FALSE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       break;
     default: