*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
- * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
- * Boston, MA 02111-1307, USA.
+ * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
+ * Boston, MA 02110-1301, USA.
*/
/**
* SECTION:gstdataqueue
+ * @title: GstDataQueue
* @short_description: Threadsafe queueing object
*
* #GstDataQueue is an object that handles threadsafe queueing of objects. It
* also provides size-related functionality. This object should be used for
* any #GstElement that wishes to provide some sort of queueing functionality.
*/
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
#include <gst/gst.h>
#include "string.h"
};
#define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \
- GST_CAT_LOG (data_queue_dataflow, \
+ GST_CAT_TRACE (data_queue_dataflow, \
"locking qlock from thread %p", \
g_thread_self ()); \
g_mutex_lock (&q->priv->qlock); \
- GST_CAT_LOG (data_queue_dataflow, \
+ GST_CAT_TRACE (data_queue_dataflow, \
"locked qlock from thread %p", \
g_thread_self ()); \
} G_STMT_END
} G_STMT_END
#define GST_DATA_QUEUE_MUTEX_UNLOCK(q) G_STMT_START { \
- GST_CAT_LOG (data_queue_dataflow, \
+ GST_CAT_TRACE (data_queue_dataflow, \
"unlocking qlock from thread %p", \
g_thread_self ()); \
g_mutex_unlock (&q->priv->qlock); \
}
#define parent_class gst_data_queue_parent_class
-G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init);
+G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT,
+ G_ADD_PRIVATE (GstDataQueue) _do_init);
static void
gst_data_queue_class_init (GstDataQueueClass * klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
- g_type_class_add_private (klass, sizeof (GstDataQueuePrivate));
-
gobject_class->set_property = gst_data_queue_set_property;
gobject_class->get_property = gst_data_queue_get_property;
/* signals */
/**
- * GstDataQueue::empty:
+ * GstDataQueue::empty: (skip)
* @queue: the queue instance
*
* Reports that the queue became empty (empty).
g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0);
/**
- * GstDataQueue::full:
+ * GstDataQueue::full: (skip)
* @queue: the queue instance
*
* Reports that the queue became full (full).
static void
gst_data_queue_init (GstDataQueue * queue)
{
- queue->priv =
- G_TYPE_INSTANCE_GET_PRIVATE (queue, GST_TYPE_DATA_QUEUE,
- GstDataQueuePrivate);
+ queue->priv = gst_data_queue_get_instance_private (queue);
queue->priv->cur_level.visible = 0; /* no content */
queue->priv->cur_level.bytes = 0; /* no content */
}
/**
- * gst_data_queue_new:
+ * gst_data_queue_new: (skip)
* @checkfull: the callback used to tell if the element considers the queue full
* or not.
* @fullcallback: the callback which will be called when the queue is considered full.
* @emptycallback: the callback which will be called when the queue is considered empty.
- * @checkdata: a #gpointer that will be given in the @checkfull callback.
+ * @checkdata: a #gpointer that will be passed to the @checkfull, @fullcallback,
+ * and @emptycallback callbacks.
*
- * Creates a new #GstDataQueue. The difference with @gst_data_queue_new is that it will
- * not emit the 'full' and 'empty' signals, but instead calling directly @fullcallback
- * or @emptycallback.
+ * Creates a new #GstDataQueue. If @fullcallback or @emptycallback are supplied, then
+ * the #GstDataQueue will call the respective callback to signal full or empty condition.
+ * If the callbacks are NULL the #GstDataQueue will instead emit 'full' and 'empty'
+ * signals.
*
* Returns: a new #GstDataQueue.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
GstDataQueue *
gst_data_queue_new (GstDataQueueCheckFullFunction checkfull,
g_return_val_if_fail (checkfull != NULL, NULL);
- ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL);
+ ret = g_object_new (GST_TYPE_DATA_QUEUE, NULL);
ret->priv->checkfull = checkfull;
ret->priv->checkdata = checkdata;
ret->priv->fullcallback = fullcallback;
}
/**
- * gst_data_queue_flush:
+ * gst_data_queue_flush: (skip)
* @queue: a #GstDataQueue.
*
* Flushes all the contents of the @queue. Any call to #gst_data_queue_push and
* #gst_data_queue_pop will be released.
* MT safe.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
void
gst_data_queue_flush (GstDataQueue * queue)
}
/**
- * gst_data_queue_is_empty:
+ * gst_data_queue_is_empty: (skip)
* @queue: a #GstDataQueue.
*
* Queries if there are any items in the @queue.
* MT safe.
*
- * Returns: #TRUE if @queue is empty.
+ * Returns: %TRUE if @queue is empty.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
gboolean
gst_data_queue_is_empty (GstDataQueue * queue)
}
/**
- * gst_data_queue_is_full:
+ * gst_data_queue_is_full: (skip)
* @queue: a #GstDataQueue.
*
* Queries if @queue is full. This check will be done using the
* #GstDataQueueCheckFullFunction registered with @queue.
* MT safe.
*
- * Returns: #TRUE if @queue is full.
+ * Returns: %TRUE if @queue is full.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
gboolean
gst_data_queue_is_full (GstDataQueue * queue)
}
/**
- * gst_data_queue_set_flushing:
+ * gst_data_queue_set_flushing: (skip)
* @queue: a #GstDataQueue.
* @flushing: a #gboolean stating if the queue will be flushing or not.
*
- * Sets the queue to flushing state if @flushing is #TRUE. If set to flushing
+ * Sets the queue to flushing state if @flushing is %TRUE. If set to flushing
* state, any incoming data on the @queue will be discarded. Any call currently
* blocking on #gst_data_queue_push or #gst_data_queue_pop will return straight
- * away with a return value of #FALSE. While the @queue is in flushing state,
- * all calls to those two functions will return #FALSE.
+ * away with a return value of %FALSE. While the @queue is in flushing state,
+ * all calls to those two functions will return %FALSE.
*
* MT Safe.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
void
gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing)
GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
}
+static void
+gst_data_queue_push_force_unlocked (GstDataQueue * queue,
+ GstDataQueueItem * item)
+{
+ GstDataQueuePrivate *priv = queue->priv;
+
+ gst_queue_array_push_tail (priv->queue, item);
+
+ if (item->visible)
+ priv->cur_level.visible++;
+ priv->cur_level.bytes += item->size;
+ priv->cur_level.time += item->duration;
+}
+
/**
- * gst_data_queue_push:
+ * gst_data_queue_push_force: (skip)
+ * @queue: a #GstDataQueue.
+ * @item: a #GstDataQueueItem.
+ *
+ * Pushes a #GstDataQueueItem (or a structure that begins with the same fields)
+ * on the @queue. It ignores if the @queue is full or not and forces the @item
+ * to be pushed anyway.
+ * MT safe.
+ *
+ * Note that this function has slightly different semantics than gst_pad_push()
+ * and gst_pad_push_event(): this function only takes ownership of @item and
+ * the #GstMiniObject contained in @item if the push was successful. If %FALSE
+ * is returned, the caller is responsible for freeing @item and its contents.
+ *
+ * Returns: %TRUE if the @item was successfully pushed on the @queue.
+ *
+ * Since: 1.2
+ */
+gboolean
+gst_data_queue_push_force (GstDataQueue * queue, GstDataQueueItem * item)
+{
+ GstDataQueuePrivate *priv = queue->priv;
+
+ g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
+ g_return_val_if_fail (item != NULL, FALSE);
+
+ GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+ STATUS (queue, "before pushing");
+ gst_data_queue_push_force_unlocked (queue, item);
+ STATUS (queue, "after pushing");
+ if (priv->waiting_add)
+ g_cond_signal (&priv->item_add);
+
+ GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+
+ return TRUE;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG ("queue:%p, we are flushing", queue);
+ GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+ return FALSE;
+ }
+}
+
+/**
+ * gst_data_queue_push: (skip)
* @queue: a #GstDataQueue.
* @item: a #GstDataQueueItem.
*
*
* Note that this function has slightly different semantics than gst_pad_push()
* and gst_pad_push_event(): this function only takes ownership of @item and
- * the #GstMiniObject contained in @item if the push was successful. If FALSE
+ * the #GstMiniObject contained in @item if the push was successful. If %FALSE
* is returned, the caller is responsible for freeing @item and its contents.
*
- * Returns: #TRUE if the @item was successfully pushed on the @queue.
+ * Returns: %TRUE if the @item was successfully pushed on the @queue.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
gboolean
gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item)
}
}
- gst_queue_array_push_tail (priv->queue, item);
-
- if (item->visible)
- priv->cur_level.visible++;
- priv->cur_level.bytes += item->size;
- priv->cur_level.time += item->duration;
+ gst_data_queue_push_force_unlocked (queue, item);
STATUS (queue, "after pushing");
if (priv->waiting_add)
}
}
+static gboolean
+_gst_data_queue_wait_non_empty (GstDataQueue * queue)
+{
+ GstDataQueuePrivate *priv = queue->priv;
+
+ while (gst_data_queue_locked_is_empty (queue)) {
+ priv->waiting_add = TRUE;
+ g_cond_wait (&priv->item_add, &priv->qlock);
+ priv->waiting_add = FALSE;
+ if (priv->flushing)
+ return FALSE;
+ }
+ return TRUE;
+}
+
/**
- * gst_data_queue_pop:
+ * gst_data_queue_pop: (skip)
* @queue: a #GstDataQueue.
- * @item: pointer to store the returned #GstDataQueueItem.
+ * @item: (out): pointer to store the returned #GstDataQueueItem.
*
* Retrieves the first @item available on the @queue. If the queue is currently
* empty, the call will block until at least one item is available, OR the
* @queue is set to the flushing state.
* MT safe.
*
- * Returns: #TRUE if an @item was successfully retrieved from the @queue.
+ * Returns: %TRUE if an @item was successfully retrieved from the @queue.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
gboolean
gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item)
g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
- while (gst_data_queue_locked_is_empty (queue)) {
- priv->waiting_add = TRUE;
- g_cond_wait (&priv->item_add, &priv->qlock);
- priv->waiting_add = FALSE;
- if (priv->flushing)
- goto flushing;
- }
+ if (!_gst_data_queue_wait_non_empty (queue))
+ goto flushing;
}
/* Get the item from the GQueue */
static gint
is_of_type (gconstpointer a, gconstpointer b)
{
- return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_INT (b));
+ return !G_TYPE_CHECK_INSTANCE_TYPE (a, GPOINTER_TO_SIZE (b));
+}
+
+/**
+ * gst_data_queue_peek: (skip)
+ * @queue: a #GstDataQueue.
+ * @item: (out): pointer to store the returned #GstDataQueueItem.
+ *
+ * Retrieves the first @item available on the @queue without removing it.
+ * If the queue is currently empty, the call will block until at least
+ * one item is available, OR the @queue is set to the flushing state.
+ * MT safe.
+ *
+ * Returns: %TRUE if an @item was successfully retrieved from the @queue.
+ *
+ * Since: 1.2
+ */
+gboolean
+gst_data_queue_peek (GstDataQueue * queue, GstDataQueueItem ** item)
+{
+ GstDataQueuePrivate *priv = queue->priv;
+
+ g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE);
+ g_return_val_if_fail (item != NULL, FALSE);
+
+ GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+ STATUS (queue, "before peeking");
+
+ if (gst_data_queue_locked_is_empty (queue)) {
+ GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+ if (G_LIKELY (priv->emptycallback))
+ priv->emptycallback (queue, priv->checkdata);
+ else
+ g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0);
+ GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing);
+
+ if (!_gst_data_queue_wait_non_empty (queue))
+ goto flushing;
+ }
+
+ /* Get the item from the GQueue */
+ *item = gst_queue_array_peek_head (priv->queue);
+
+ STATUS (queue, "after peeking");
+ GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+
+ return TRUE;
+
+ /* ERRORS */
+flushing:
+ {
+ GST_DEBUG ("queue:%p, we are flushing", queue);
+ GST_DATA_QUEUE_MUTEX_UNLOCK (queue);
+ return FALSE;
+ }
}
/**
- * gst_data_queue_drop_head:
+ * gst_data_queue_drop_head: (skip)
* @queue: The #GstDataQueue to drop an item from.
* @type: The #GType of the item to drop.
*
* Pop and unref the head-most #GstMiniObject with the given #GType.
*
- * Returns: TRUE if an element was removed.
+ * Returns: %TRUE if an element was removed.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
gboolean
gst_data_queue_drop_head (GstDataQueue * queue, GType type)
GST_DEBUG ("queue:%p", queue);
GST_DATA_QUEUE_MUTEX_LOCK (queue);
- idx = gst_queue_array_find (priv->queue, is_of_type, GINT_TO_POINTER (type));
+ idx = gst_queue_array_find (priv->queue, is_of_type, GSIZE_TO_POINTER (type));
if (idx == -1)
goto done;
}
/**
- * gst_data_queue_limits_changed:
- * @queue: The #GstDataQueue
+ * gst_data_queue_limits_changed: (skip)
+ * @queue: The #GstDataQueue
*
* Inform the queue that the limits for the fullness check have changed and that
- * any blocking gst_data_queue_push() should be unblocked to recheck the limts.
+ * any blocking gst_data_queue_push() should be unblocked to recheck the limits.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
void
gst_data_queue_limits_changed (GstDataQueue * queue)
}
/**
- * gst_data_queue_get_level:
+ * gst_data_queue_get_level: (skip)
* @queue: The #GstDataQueue
- * @level: the location to store the result
+ * @level: (out): the location to store the result
*
* Get the current level of the queue.
*
- * Since: 1.2.0
+ * Since: 1.2
*/
void
gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level)