dataqueue: add gst_data_queue_push_force
authorThiago Santos <thiago.sousa.santos@collabora.com>
Thu, 8 Aug 2013 00:26:01 +0000 (21:26 -0300)
committerThiago Santos <thiago.sousa.santos@collabora.com>
Tue, 13 Aug 2013 15:00:48 +0000 (12:00 -0300)
Adds a variant of the _push function that doesn't check the queue limits
before adding the new item. It is useful when pushing an element to the
queue shouldn't lock the thread.

One particular scenario is when the queue is used to serialize buffers
and events that are going to be pushed from another thread. The
dataqueue should have a limit on the amount of buffers to be stored to
avoid large memory consumption, but events can be considered to have
negligible impact on memory compared to buffers. So it is useful to be
used to push items into the queue that contain events, even though the
queue is already full, it shouldn't matter inserting an item that has
no significative size.

This scenario happens on adaptive elements (dashdemux / mssdemux) as
there is a single download thread fetching buffers and putting into the
dataqueues for the streams. This same download thread can als generate
events in some situations as caps changes, eos or a internal control
events. There can be a deadlock at preroll if the first buffer fetched
is large enough to fill the dataqueue and the download thread and the
next iteration of the download thread decides to push an event to this
same dataqueue before fetching buffers to other streams, if this push
locks, the pipeline will be stuck in preroll as no more buffers will be
downloaded.
There is a somewhat common practice in dash streams to have a single
very large buffer for audio and one for video, so this will always
happen as the download thread will have to push an EOS right after
fetching the first buffer for any stream.

API: gst_data_queue_push_force

https://bugzilla.gnome.org/show_bug.cgi?id=705694

docs/libs/gstreamer-libs-sections.txt
libs/gst/base/gstdataqueue.c
libs/gst/base/gstdataqueue.h
win32/common/libgstbase.def

index cdcd21c..bbf7332 100644 (file)
@@ -761,6 +761,7 @@ GstDataQueueEmptyCallback
 GstDataQueueFullCallback
 gst_data_queue_new
 gst_data_queue_push
+gst_data_queue_push_force
 gst_data_queue_pop
 gst_data_queue_flush
 gst_data_queue_set_flushing
index 9a3ca9f..0c46f0c 100644 (file)
@@ -410,6 +410,68 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
   GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
 }
 
+static void
+gst_data_queue_push_force_unlocked (GstDataQueue * queue,
+    GstDataQueueItem * item)
+{
+  GstDataQueuePrivate *priv = queue->priv;
+
+  gst_queue_array_push_tail (priv->queue, item);
+
+  if (item->visible)
+    priv->cur_level.visible++;
+  priv->cur_level.bytes += item->size;
+  priv->cur_level.time += item->duration;
+}
+
+/**
+ * gst_data_queue_push_force:
+ * @queue: a #GstDataQueue.
+ * @item: a #GstDataQueueItem.
+ *
+ * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
+ * on the @queue. It ignores if the @queue is full or not and forces the @item
+ * to be pushed anyway.
+ * MT safe.
+ *
+ * Note that this function has slightly different semantics than gst_pad_push()
+ * and gst_pad_push_event(): this function only takes ownership of @item and
+ * the #GstMiniObject contained in @item if the push was successful. If FALSE
+ * is returned, the caller is responsible for freeing @item and its contents.
+ *
+ * Returns: #TRUE if the @item was successfully pushed on the @queue.
+ *
+ * Since: 1.2.0
+ */
+gboolean
+gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
+{
+  GstDataQueuePrivate *priv = queue->priv;
+
+  g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
+  g_return_val_if_fail (item != NULL, FALSE);
+
+  GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+  STATUS (queue, "before pushing");
+  gst_data_queue_push_force_unlocked (queue, item);
+  STATUS (queue, "after pushing");
+  if (priv->waiting_add)
+    g_cond_signal (&priv->item_add);
+
+  GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+
+  return TRUE;
+
+  /* ERRORS */
+flushing:
+  {
+    GST_DEBUG ("queue:%p, we are flushing", queue);
+    GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+    return FALSE;
+  }
+}
+
 /**
  * gst_data_queue_push:
  * @queue: a #GstDataQueue.
@@ -460,12 +522,7 @@ gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
     }
   }
 
-  gst_queue_array_push_tail (priv->queue, item);
-
-  if (item->visible)
-    priv->cur_level.visible++;
-  priv->cur_level.bytes += item->size;
-  priv->cur_level.time += item->duration;
+  gst_data_queue_push_force_unlocked (queue, item);
 
   STATUS (queue, "after pushing");
   if (priv->waiting_add)
index 3bd76f5..c707c8b 100644 (file)
@@ -139,6 +139,7 @@ GstDataQueue * gst_data_queue_new            (GstDataQueueCheckFullFunction chec
                                              gpointer checkdata) G_GNUC_MALLOC;
 
 gboolean       gst_data_queue_push           (GstDataQueue * queue, GstDataQueueItem * item);
+gboolean       gst_data_queue_push_force     (GstDataQueue * queue, GstDataQueueItem * item);
 
 gboolean       gst_data_queue_pop            (GstDataQueue * queue, GstDataQueueItem ** item);
 gboolean       gst_data_queue_peek           (GstDataQueue * queue, GstDataQueueItem ** item);
index 09a13ab..1b1db5f 100644 (file)
@@ -256,6 +256,7 @@ EXPORTS
        gst_data_queue_peek
        gst_data_queue_pop
        gst_data_queue_push
+       gst_data_queue_push_force
        gst_data_queue_set_flushing
        gst_push_src_get_type
        gst_queue_array_drop_element