queue/queue2: Ensure that the streaming thread is unlocked after deactivating the...
[platform/upstream/gstreamer.git] / plugins / elements / gstqueue.c
index c787e2b..1f0c09c 100644 (file)
@@ -407,10 +407,8 @@ gst_queue_class_init (GstQueueClass * klass)
   gst_element_class_set_static_metadata (gstelement_class,
       "Queue",
       "Generic", "Simple data queue", "Erik Walthinsen <omega@cse.ogi.edu>");
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&srctemplate));
-  gst_element_class_add_pad_template (gstelement_class,
-      gst_static_pad_template_get (&sinktemplate));
+  gst_element_class_add_static_pad_template (gstelement_class, &srctemplate);
+  gst_element_class_add_static_pad_template (gstelement_class, &sinktemplate);
 
   /* Registering debug symbols for function pointers */
   GST_DEBUG_REGISTER_FUNCPTR (gst_queue_src_activate_mode);
@@ -467,8 +465,8 @@ gst_queue_init (GstQueue * queue)
       gst_queue_array_new_for_struct (sizeof (GstQueueItem),
       DEFAULT_MAX_SIZE_BUFFERS * 3 / 2);
 
-  queue->sinktime = GST_CLOCK_TIME_NONE;
-  queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = GST_CLOCK_STIME_NONE;
+  queue->srctime = GST_CLOCK_STIME_NONE;
 
   queue->sink_tainted = TRUE;
   queue->src_tainted = TRUE;
@@ -503,6 +501,23 @@ gst_queue_finalize (GObject * object)
   G_OBJECT_CLASS (parent_class)->finalize (object);
 }
 
+/* Convenience function */
+static inline GstClockTimeDiff
+my_segment_to_running_time (GstSegment * segment, GstClockTime val)
+{
+  GstClockTimeDiff res = GST_CLOCK_STIME_NONE;
+
+  if (GST_CLOCK_TIME_IS_VALID (val)) {
+    gboolean sign =
+        gst_segment_to_running_time_full (segment, GST_FORMAT_TIME, val, &val);
+    if (sign > 0)
+      res = val;
+    else if (sign < 0)
+      res = -val;
+  }
+  return res;
+}
+
 /* calculate the diff between running time on the sink and src of the queue.
  * This is the total amount of time in the queue. */
 static void
@@ -513,7 +528,7 @@ update_time_level (GstQueue * queue)
   if (queue->sink_tainted) {
     GST_LOG_OBJECT (queue, "update sink time");
     queue->sinktime =
-        gst_segment_to_running_time (&queue->sink_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->sink_segment,
         queue->sink_segment.position);
     queue->sink_tainted = FALSE;
   }
@@ -522,14 +537,14 @@ update_time_level (GstQueue * queue)
   if (queue->src_tainted) {
     GST_LOG_OBJECT (queue, "update src time");
     queue->srctime =
-        gst_segment_to_running_time (&queue->src_segment, GST_FORMAT_TIME,
+        my_segment_to_running_time (&queue->src_segment,
         queue->src_segment.position);
     queue->src_tainted = FALSE;
   }
   src_time = queue->srctime;
 
-  GST_LOG_OBJECT (queue, "sink %" GST_TIME_FORMAT ", src %" GST_TIME_FORMAT,
-      GST_TIME_ARGS (sink_time), GST_TIME_ARGS (src_time));
+  GST_LOG_OBJECT (queue, "sink %" GST_STIME_FORMAT ", src %" GST_STIME_FORMAT,
+      GST_STIME_ARGS (sink_time), GST_STIME_ARGS (src_time));
 
   if (sink_time >= src_time)
     queue->cur_level.time = sink_time - src_time;
@@ -613,7 +628,8 @@ apply_buffer (GstQueue * queue, GstBuffer * buffer, GstSegment * segment,
   if (duration != GST_CLOCK_TIME_NONE)
     timestamp += duration;
 
-  GST_LOG_OBJECT (queue, "position updated to %" GST_TIME_FORMAT,
+  GST_LOG_OBJECT (queue, "%s position updated to %" GST_TIME_FORMAT,
+      segment == &queue->sink_segment ? "sink" : "src",
       GST_TIME_ARGS (timestamp));
 
   segment->position = timestamp;
@@ -704,7 +720,7 @@ gst_queue_locked_flush (GstQueue * queue, gboolean full)
   gst_segment_init (&queue->src_segment, GST_FORMAT_TIME);
   queue->head_needs_discont = queue->tail_needs_discont = FALSE;
 
-  queue->sinktime = queue->srctime = GST_CLOCK_TIME_NONE;
+  queue->sinktime = queue->srctime = GST_CLOCK_STIME_NONE;
   queue->sink_tainted = queue->src_tainted = TRUE;
 
   /* we deleted a lot of something */
@@ -902,11 +918,10 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
 {
   gboolean ret = TRUE;
   GstQueue *queue;
-  GstEventType event_type = GST_EVENT_TYPE (event);
 
   queue = GST_QUEUE (parent);
 
-  switch (event_type) {
+  switch (GST_EVENT_TYPE (event)) {
     case GST_EVENT_FLUSH_START:
       STATUS (queue, pad, "received flush start event");
       /* forward event */
@@ -918,14 +933,20 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       /* unblock the loop and chain functions */
       GST_QUEUE_SIGNAL_ADD (queue);
       GST_QUEUE_SIGNAL_DEL (queue);
-      queue->last_query = FALSE;
-      g_cond_signal (&queue->query_handled);
       GST_QUEUE_MUTEX_UNLOCK (queue);
 
       /* make sure it pauses, this should happen since we sent
        * flush_start downstream. */
       gst_pad_pause_task (queue->srcpad);
       GST_CAT_LOG_OBJECT (queue_dataflow, queue, "loop stopped");
+
+      /* unblock query handler after the streaming thread is shut down.
+       * Otherwise downstream might have a query that is already unreffed
+       * upstream */
+      GST_QUEUE_MUTEX_LOCK (queue);
+      queue->last_query = FALSE;
+      g_cond_signal (&queue->query_handled);
+      GST_QUEUE_MUTEX_UNLOCK (queue);
       break;
     case GST_EVENT_FLUSH_STOP:
       STATUS (queue, pad, "received flush stop event");
@@ -961,14 +982,11 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
           if (!GST_EVENT_IS_STICKY (event)) {
             GST_QUEUE_MUTEX_UNLOCK (queue);
             goto out_flow_error;
-          } else if (event_type == GST_EVENT_EOS) {
+          } else if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
             if (queue->srcresult == GST_FLOW_NOT_LINKED
                 || queue->srcresult < GST_FLOW_EOS) {
               GST_QUEUE_MUTEX_UNLOCK (queue);
-              GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-                  (_("Internal data flow error.")),
-                  ("streaming task paused, reason %s (%d)",
-                      gst_flow_get_name (queue->srcresult), queue->srcresult));
+              GST_ELEMENT_FLOW_ERROR (queue, queue->srcresult);
             } else {
               GST_QUEUE_MUTEX_UNLOCK (queue);
             }
@@ -986,11 +1004,8 @@ gst_queue_handle_sink_event (GstPad * pad, GstObject * parent, GstEvent * event)
       }
       break;
   }
-  if (ret == FALSE) {
-    if (event_type == GST_EVENT_CAPS)
-      return GST_FLOW_NOT_NEGOTIATED;
+  if (ret == FALSE)
     return GST_FLOW_ERROR;
-  }
   return GST_FLOW_OK;
 
   /* ERRORS */
@@ -1517,10 +1532,7 @@ out_flushing:
     /* let app know about us giving up if upstream is not expected to do so */
     /* EOS is already taken care of elsewhere */
     if (eos && (ret == GST_FLOW_NOT_LINKED || ret < GST_FLOW_EOS)) {
-      GST_ELEMENT_ERROR (queue, STREAM, FAILED,
-          (_("Internal data flow error.")),
-          ("streaming task paused, reason %s (%d)",
-              gst_flow_get_name (ret), ret));
+      GST_ELEMENT_FLOW_ERROR (queue, ret);
       gst_pad_push_event (queue->srcpad, gst_event_new_eos ());
     }
     return;
@@ -1594,9 +1606,13 @@ gst_queue_handle_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
       switch (format) {
         case GST_FORMAT_BYTES:
           peer_pos -= queue->cur_level.bytes;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         case GST_FORMAT_TIME:
           peer_pos -= queue->cur_level.time;
+          if (peer_pos < 0)     /* Clamp result to 0 */
+            peer_pos = 0;
           break;
         default:
           GST_DEBUG_OBJECT (queue, "Can't adjust query in %s format, don't "
@@ -1664,10 +1680,7 @@ gst_queue_sink_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
         GST_QUEUE_MUTEX_LOCK (queue);
         queue->srcresult = GST_FLOW_FLUSHING;
         /* the item del signal will unblock */
-        g_cond_signal (&queue->item_del);
-        /* unblock query handler */
-        queue->last_query = FALSE;
-        g_cond_signal (&queue->query_handled);
+        GST_QUEUE_SIGNAL_DEL (queue);
         GST_QUEUE_MUTEX_UNLOCK (queue);
 
         /* step 2, wait until streaming thread stopped and flush queue */
@@ -1716,6 +1729,10 @@ gst_queue_src_activate_mode (GstPad * pad, GstObject * parent, GstPadMode mode,
 
         /* step 2, make sure streaming finishes */
         result = gst_pad_stop_task (pad);
+
+        GST_QUEUE_MUTEX_LOCK (queue);
+        gst_queue_locked_flush (queue, FALSE);
+        GST_QUEUE_MUTEX_UNLOCK (queue);
       }
       break;
     default: