typefind: Lower debug level of some output related to the URI query
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index 1f0c09c..a8753ea 100644 (file)
@@ -24,6 +24,7 @@
 
 /**
  * SECTION:element-queue
+ * @title: queue
  *
  * Data is queued until one of the limits specified by the
  * #GstQueue:max-size-buffers, #GstQueue:max-size-bytes and/or
@@ -546,7 +547,8 @@ update_time_level (GstQueue * queue)
   GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
       GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
-  if (sink_time >= src_time)
+  if (GST_CLOCK_STIME_IS_VALID (src_time)
+      && GST_CLOCK_STIME_IS_VALID (sink_time) && sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
   else
     queue->cur_level.time = 0;
@@ -747,26 +749,14 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item)
   GST_QUEUE_SIGNAL_ADD (queue);
 }
 
-static gboolean
-buffer_list_calc_size (GstBuffer ** buf, guint idx, gpointer data)
-{
-  guint *p_size = data;
-  gsize buf_size;
-
-  buf_size = gst_buffer_get_size (*buf);
-  GST_TRACE ("buffer %u in has size %" G_GSIZE_FORMAT, idx, buf_size);
-  *p_size += buf_size;
-  return TRUE;
-}
-
 static inline void
 gst_queue_locked_enqueue_buffer_list (GstQueue * queue, gpointer item)
 {
   GstQueueItem qitem;
   GstBufferList *buffer_list = GST_BUFFER_LIST_CAST (item);
-  gsize bsize = 0;
+  gsize bsize;
 
-  gst_buffer_list_foreach (buffer_list, buffer_list_calc_size, &bsize);
+  bsize = gst_buffer_list_calculate_size (buffer_list);
 
   /* add buffer to the statistics */
   queue->cur_level.buffers += gst_buffer_list_length (buffer_list);
@@ -921,9 +911,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 
   queue = GST_QUEUE (parent);
 
+  GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Received event '%s'",
+      GST_EVENT_TYPE_NAME (event));
+
   switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
-      STATUS (queue, pad, "received flush start event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -949,7 +941,6 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       GST_QUEUE_MUTEX_UNLOCK (queue);
       break;
     case GST_EVENT_FLUSH_STOP:
-      STATUS (queue, pad, "received flush stop event");
       /* forward event */
       ret = gst_pad_push_event (queue->srcpad, event);
 
@@ -973,6 +964,14 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       if (GST_EVENT_IS_SERIALIZED (event)) {
         /* serialized events go in the queue */
         GST_QUEUE_MUTEX_LOCK (queue);
+
+        /* STREAM_START and SEGMENT reset the EOS status of a
+         * pad. Change the cached sinkpad flow result accordingly */
+        if (queue->srcresult == GST_FLOW_EOS
+            && (GST_EVENT_TYPE (event) == GST_EVENT_STREAM_START
+                || GST_EVENT_TYPE (event) == GST_EVENT_SEGMENT))
+          queue->srcresult = GST_FLOW_OK;
+
         if (queue->srcresult != GST_FLOW_OK) {
           /* Errors in sticky event pushing are no problem and ignored here
            * as they will cause more meaningful errors during data flow.
@@ -993,9 +992,30 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
             goto out_flow_error;
           }
         }
-        /* refuse more events on EOS */
-        if (queue->eos)
-          goto out_eos;
+
+        /* refuse more events on EOS unless they unset the EOS status */
+        if (queue->eos) {
+          switch (GST_EVENT_TYPE (event)) {
+            case GST_EVENT_STREAM_START:
+            case GST_EVENT_SEGMENT:
+              /* Restart the loop */
+              if (GST_PAD_MODE (queue->srcpad) == GST_PAD_MODE_PUSH) {
+                queue->srcresult = GST_FLOW_OK;
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+                gst_pad_start_task (queue->srcpad,
+                    (GstTaskFunction) gst_queue_loop, queue->srcpad, NULL);
+              } else {
+                queue->eos = FALSE;
+                queue->unexpected = FALSE;
+              }
+
+              break;
+            default:
+              goto out_eos;
+          }
+        }
+
         gst_queue_locked_enqueue_event (queue, event);
         GST_QUEUE_MUTEX_UNLOCK (queue);
       } else {
@@ -1004,8 +1024,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       }
       break;
   }
-  if (ret == FALSE)
+  if (ret == FALSE) {
+    GST_ERROR_OBJECT (queue, "Failed to push event");
     return GST_FLOW_ERROR;
+  }
   return GST_FLOW_OK;
 
   /* ERRORS */
@@ -1045,7 +1067,10 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query)
         qitem.size = 0;
         gst_queue_array_push_tail_struct (queue->queue, &qitem);
         GST_QUEUE_SIGNAL_ADD (queue);
-        g_cond_wait (&queue->query_handled, &queue->qlock);
+        while (queue->srcresult == GST_FLOW_OK &&
+            queue->last_handled_query != query)
+          g_cond_wait (&queue->query_handled, &queue->qlock);
+        queue->last_handled_query = NULL;
         if (queue->srcresult != GST_FLOW_OK)
           goto out_flushing;
         res = queue->last_query;
@@ -1069,18 +1094,18 @@ out_flushing:
 static gboolean
 gst_queue_is_empty (GstQueue * queue)
 {
-  GstQueueItem *head;
+  GstQueueItem *tail;
 
-  head = gst_queue_array_peek_head_struct (queue->queue);
+  tail = gst_queue_array_peek_tail_struct (queue->queue);
 
-  if (head == NULL)
+  if (tail == NULL)
     return TRUE;
 
   /* Only consider the queue empty if the minimum thresholds
-   * are not reached and data is at the queue head. Otherwise
+   * are not reached and data is at the queue tail. Otherwise
    * we would block forever on serialized queries.
    */
-  if (!GST_IS_BUFFER (head->item) && !GST_IS_BUFFER_LIST (head->item))
+  if (!GST_IS_BUFFER (tail->item) && !GST_IS_BUFFER_LIST (tail->item))
     return FALSE;
 
   /* It is possible that a max size is reached before all min thresholds are.
@@ -1394,7 +1419,8 @@ next:
           GstEvent *event = GST_EVENT_CAST (data);
           GstEventType type = GST_EVENT_TYPE (event);
 
-          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT) {
+          if (type == GST_EVENT_EOS || type == GST_EVENT_SEGMENT
+              || type == GST_EVENT_STREAM_START) {
             /* we found a pushable item in the queue, push it out */
             GST_CAT_LOG_OBJECT (queue_dataflow, queue,
                 "pushing pushable event %s after EOS",
@@ -1443,6 +1469,7 @@ next:
     ret = gst_pad_peer_query (queue->srcpad, query);
     GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing_query);
     queue->last_query = ret;
+    queue->last_handled_query = query;
     g_cond_signal (&queue->query_handled);
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "did query %p, return %d", query, queue->last_query);
@@ -1452,21 +1479,25 @@ next:
   /* ERRORS */
 no_item:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+    GST_CAT_ERROR_OBJECT (queue_dataflow, queue,
         "exit because we have no item in the queue");
     return GST_FLOW_ERROR;
   }
 out_flushing:
   {
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GstFlowReturn ret = queue->srcresult;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 out_flushing_query:
   {
+    GstFlowReturn ret = queue->srcresult;
     queue->last_query = FALSE;
     g_cond_signal (&queue->query_handled);
-    GST_CAT_LOG_OBJECT (queue_dataflow, queue, "exit because we are flushing");
-    return GST_FLOW_FLUSHING;
+    GST_CAT_LOG_OBJECT (queue_dataflow, queue,
+        "exit because task paused, reason: %s", gst_flow_get_name (ret));
+    return ret;
   }
 }