gst/playback/gstqueue2.c: Add pull based scheduling and fix some deadlocks. Fixes...
authorThiago Sousa Santos <thiagossantos@gmail.com>
Wed, 6 Jun 2007 13:36:26 +0000 (13:36 +0000)
committerSebastian Dröge <sebastian.droege@collabora.co.uk>
Thu, 29 Oct 2009 10:17:12 +0000 (11:17 +0100)
Original commit message from CVS:
Patch by: Thiago Sousa Santos <thiagossantos at gmail dot com>
* gst/playback/gstqueue2.c: (gst_queue_init),
(gst_queue_handle_sink_event), (gst_queue_chain),
(gst_queue_get_range), (gst_queue_src_checkgetrange_function),
(gst_queue_sink_activate_push), (gst_queue_src_activate_push),
(gst_queue_src_activate_pull):
Add pull based scheduling and fix some deadlocks. Fixes #444523.
Does not yet completely work because duration queries upstream won't
block yet.

gst/playback/gstqueue2.c

index 10f4b29..bd0da0a 100644 (file)
@@ -2,7 +2,7 @@
  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
  *                    2003 Colin Walters <cwalters@gnome.org>
  *                    2000,2005,2007 Wim Taymans <wim@fluendo.com>
- *                    2007 Thiago Sousa Santos <thiagossantos at gmail dot com>
+ *                    2007 Thiago Sousa Santos <thiagoss@lcc.ufcg.edu.br>
  *
  * gstqueue2.c:
  *
@@ -40,7 +40,7 @@
  *
  * The default queue size limits are 100 buffers, 2MB of data, or
  * two seconds worth of data, whichever is reached first.
- * 
+ *
  * If you set temp-location, the element will buffer data on the file
  * specified by it. By using this, it will buffer the entire 
  * stream data on the file independently of the queue size limits, they
@@ -52,7 +52,6 @@
 #include "config.h"
 #endif
 
-#include <stdio.h>
 #include <glib/gstdio.h>
 
 #include <gst/gst.h>
@@ -121,6 +120,8 @@ enum
   (G_TYPE_CHECK_INSTANCE_TYPE((obj),GST_TYPE_QUEUE))
 #define GST_IS_QUEUE_CLASS(klass) \
   (G_TYPE_CHECK_CLASS_TYPE((klass),GST_TYPE_QUEUE))
+#define GST_QUEUE_CAST(obj) \
+  ((GstQueue *)(obj))
 
 typedef struct _GstQueue GstQueue;
 typedef struct _GstQueueSize GstQueueSize;
@@ -195,6 +196,7 @@ struct _GstQueue
    * because we can't save it on the file */
   gboolean segment_event_received;
   GstEvent *starting_segment;
+
 };
 
 struct _GstQueueClass
@@ -325,6 +327,11 @@ static gboolean gst_queue_handle_src_query (GstPad * pad, GstQuery * query);
 
 static GstCaps *gst_queue_getcaps (GstPad * pad);
 
+static GstFlowReturn gst_queue_get_range (GstPad * pad, guint64 offset,
+    guint length, GstBuffer ** buffer);
+static gboolean gst_queue_src_checkgetrange_function (GstPad * pad);
+
+static gboolean gst_queue_src_activate_pull (GstPad * pad, gboolean active);
 static gboolean gst_queue_src_activate_push (GstPad * pad, gboolean active);
 static gboolean gst_queue_sink_activate_push (GstPad * pad, gboolean active);
 static GstStateChangeReturn gst_queue_change_state (GstElement * element,
@@ -427,8 +434,14 @@ gst_queue_init (GstQueue * queue, GstQueueClass * g_class)
 
   queue->srcpad = gst_pad_new_from_static_template (&srctemplate, "src");
 
+  gst_pad_set_activatepull_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_src_activate_pull));
   gst_pad_set_activatepush_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_src_activate_push));
+  gst_pad_set_getrange_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_get_range));
+  gst_pad_set_checkgetrange_function (queue->srcpad,
+      GST_DEBUG_FUNCPTR (gst_queue_src_checkgetrange_function));
   gst_pad_set_acceptcaps_function (queue->srcpad,
       GST_DEBUG_FUNCPTR (gst_queue_acceptcaps));
   gst_pad_set_getcaps_function (queue->srcpad,
@@ -857,6 +870,7 @@ eos:
   }
 }
 
+/* should be called with QUEUE_LOCK */
 static GstMiniObject *
 gst_queue_read_item_from_file (GstQueue * queue)
 {
@@ -1173,6 +1187,9 @@ done:
   /* ERRORS */
 out_flushing:
   {
+    GST_DEBUG_OBJECT (queue, "we are flushing");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+
     gst_buffer_unref (event);
     return FALSE;
   }
@@ -1262,6 +1279,7 @@ out_flushing:
     GST_CAT_LOG_OBJECT (queue_dataflow, queue,
         "exit because task paused, reason: %s", gst_flow_get_name (ret));
     GST_QUEUE_MUTEX_UNLOCK (queue);
+
     gst_buffer_unref (buffer);
 
     return ret;
@@ -1318,6 +1336,8 @@ out_flushing:
   }
 }
 
+/* called repeadedly with @pad as the source pad. This function should push out
+ * data to the peer element. */
 static void
 gst_queue_loop (GstPad * pad)
 {
@@ -1419,6 +1439,51 @@ gst_queue_handle_src_query (GstPad * pad, GstQuery * query)
   return TRUE;
 }
 
+static GstFlowReturn
+gst_queue_get_range (GstPad * pad, guint64 offset, guint length,
+    GstBuffer ** buffer)
+{
+  GstQueue *queue;
+  GstFlowReturn ret;
+
+  queue = GST_QUEUE_CAST (gst_pad_get_parent (pad));
+
+  GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing);
+  length = (length == -1) ? DEFAULT_BUFFER_SIZE : length;
+  offset = (offset == -1) ? queue->reading_pos : offset;
+
+  /* function will block when the range is not yet available */
+  ret = gst_queue_create_read (queue, offset, length, buffer);
+  GST_QUEUE_MUTEX_UNLOCK (queue);
+
+  gst_object_unref (queue);
+
+  return ret;
+
+  /* ERRORS */
+out_flushing:
+  {
+    GST_DEBUG_OBJECT (queue, "we are flushing");
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+    return GST_FLOW_WRONG_STATE;
+  }
+}
+
+static gboolean
+gst_queue_src_checkgetrange_function (GstPad * pad)
+{
+  GstQueue *queue;
+  gboolean ret;
+
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
+  /* we can operate in pull mode when we are using a tempfile */
+  ret = QUEUE_IS_USING_TEMP_FILE (queue);
+  gst_object_unref (GST_OBJECT (queue));
+
+  return ret;
+}
+
+/* sink currently only operates in push mode */
 static gboolean
 gst_queue_sink_activate_push (GstPad * pad, gboolean active)
 {
@@ -1429,12 +1494,14 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
 
   if (active) {
     GST_QUEUE_MUTEX_LOCK (queue);
+    GST_DEBUG_OBJECT (queue, "activating push mode");
     queue->srcresult = GST_FLOW_OK;
     reset_rate_timer (queue);
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
     /* unblock chain function */
     GST_QUEUE_MUTEX_LOCK (queue);
+    GST_DEBUG_OBJECT (queue, "deactivating push mode");
     queue->srcresult = GST_FLOW_WRONG_STATE;
     gst_queue_locked_flush (queue);
     GST_QUEUE_MUTEX_UNLOCK (queue);
@@ -1445,6 +1512,8 @@ gst_queue_sink_activate_push (GstPad * pad, gboolean active)
   return result;
 }
 
+/* src operating in push mode, we start a task on the source pad that pushes out
+ * buffers from the queue */
 static gboolean
 gst_queue_src_activate_push (GstPad * pad, gboolean active)
 {
@@ -1455,12 +1524,14 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
 
   if (active) {
     GST_QUEUE_MUTEX_LOCK (queue);
+    GST_DEBUG_OBJECT (queue, "activating push mode");
     queue->srcresult = GST_FLOW_OK;
     result = gst_pad_start_task (pad, (GstTaskFunction) gst_queue_loop, pad);
     GST_QUEUE_MUTEX_UNLOCK (queue);
   } else {
     /* unblock loop function */
     GST_QUEUE_MUTEX_LOCK (queue);
+    GST_DEBUG_OBJECT (queue, "deactivating push mode");
     queue->srcresult = GST_FLOW_WRONG_STATE;
     /* the item add signal will unblock */
     g_cond_signal (queue->item_add);
@@ -1475,6 +1546,45 @@ gst_queue_src_activate_push (GstPad * pad, gboolean active)
   return result;
 }
 
+/* pull mode, downstream will call our getrange function */
+static gboolean
+gst_queue_src_activate_pull (GstPad * pad, gboolean active)
+{
+  gboolean result;
+  GstQueue *queue;
+
+  queue = GST_QUEUE (gst_pad_get_parent (pad));
+
+  if (active) {
+    if (QUEUE_IS_USING_TEMP_FILE (queue)) {
+      GST_QUEUE_MUTEX_LOCK (queue);
+      GST_DEBUG_OBJECT (queue, "activating pull mode");
+      queue->srcresult = GST_FLOW_OK;
+      result = TRUE;
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+    } else {
+      GST_QUEUE_MUTEX_LOCK (queue);
+      GST_DEBUG_OBJECT (queue, "no temp file, cannot activate pull mode");
+      /* this is not allowed, we cannot operate in pull mode without a temp
+       * file. */
+      queue->srcresult = GST_FLOW_WRONG_STATE;
+      result = FALSE;
+      GST_QUEUE_MUTEX_UNLOCK (queue);
+    }
+  } else {
+    GST_QUEUE_MUTEX_LOCK (queue);
+    GST_DEBUG_OBJECT (queue, "deactivating pull mode");
+    queue->srcresult = GST_FLOW_WRONG_STATE;
+    /* this will unlock getrange */
+    g_cond_signal (queue->item_add);
+    result = TRUE;
+    GST_QUEUE_MUTEX_UNLOCK (queue);
+  }
+  gst_object_unref (queue);
+
+  return result;
+}
+
 static GstStateChangeReturn
 gst_queue_change_state (GstElement * element, GstStateChange transition)
 {