From 09982c3c13b3271d1c99973b1f6e6e4cb475f1bb Mon Sep 17 00:00:00 2001 From: =?utf8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 22 Oct 2012 10:13:20 +0200 Subject: [PATCH] dataqueue/queuearray: Make public API again These are actually used outside of coreelements nowadays. Also hide lots of internals and add padding and documentation. --- docs/libs/gstreamer-libs-docs.sgml | 2 + docs/libs/gstreamer-libs-sections.txt | 47 ++++ libs/gst/base/Makefile.am | 4 + {plugins/elements => libs/gst/base}/gstdataqueue.c | 259 ++++++++++++--------- {plugins/elements => libs/gst/base}/gstdataqueue.h | 42 +--- .../elements => libs/gst/base}/gstqueuearray.c | 181 +++++++++++--- libs/gst/base/gstqueuearray.h | 50 ++++ plugins/elements/Makefile.am | 4 - plugins/elements/gstmultiqueue.c | 2 +- plugins/elements/gstmultiqueue.h | 2 +- plugins/elements/gstqueue.c | 30 +-- plugins/elements/gstqueue.h | 4 +- plugins/elements/gstqueuearray.h | 61 ----- win32/common/libgstbase.def | 20 ++ 14 files changed, 456 insertions(+), 252 deletions(-) rename {plugins/elements => libs/gst/base}/gstdataqueue.c (73%) rename {plugins/elements => libs/gst/base}/gstdataqueue.h (79%) rename {plugins/elements => libs/gst/base}/gstqueuearray.c (59%) create mode 100644 libs/gst/base/gstqueuearray.h delete mode 100644 plugins/elements/gstqueuearray.h diff --git a/docs/libs/gstreamer-libs-docs.sgml b/docs/libs/gstreamer-libs-docs.sgml index 6ca7e92..0a1b3f8 100644 --- a/docs/libs/gstreamer-libs-docs.sgml +++ b/docs/libs/gstreamer-libs-docs.sgml @@ -45,6 +45,8 @@ + + diff --git a/docs/libs/gstreamer-libs-sections.txt b/docs/libs/gstreamer-libs-sections.txt index 642d7f1..03400ed 100644 --- a/docs/libs/gstreamer-libs-sections.txt +++ b/docs/libs/gstreamer-libs-sections.txt @@ -711,6 +711,53 @@ gst_type_find_helper_get_range_ext +
+gstdataqueue +GstDataQueue +gst/base/gstdataqueue.h +GstDataQueue +GstDataQueueSize +GstDataQueueCheckFullFunction +GstDataQueueItem +GstDataQueueEmptyCallback +GstDataQueueFullCallback +gst_data_queue_new +gst_data_queue_push +gst_data_queue_pop +gst_data_queue_flush +gst_data_queue_set_flushing +gst_data_queue_drop_head +gst_data_queue_is_full +gst_data_queue_is_empty +gst_data_queue_get_level +gst_data_queue_limits_changed + +GstDataQueueClass +GST_DATA_QUEUE +GST_IS_DATA_QUEUE +GST_TYPE_DATA_QUEUE +GST_DATA_QUEUE_CLASS +GST_IS_DATA_QUEUE_CLASS + +gst_data_queue_get_type +
+ +
+gstqueuearray +GstQueueArray +gst/base/gstqueuearray.h +GstQueueArray +gst_queue_array_new +gst_queue_array_free +gst_queue_array_get_length +gst_queue_array_pop_head +gst_queue_array_peek_head +gst_queue_array_push_tail +gst_queue_array_is_empty +gst_queue_array_drop_element +gst_queue_array_find +
+ # net
diff --git a/libs/gst/base/Makefile.am b/libs/gst/base/Makefile.am index ca7d4c8..e591fb2 100644 --- a/libs/gst/base/Makefile.am +++ b/libs/gst/base/Makefile.am @@ -12,7 +12,9 @@ libgstbase_@GST_API_VERSION@_la_SOURCES = \ gstbytereader.c \ gstbytewriter.c \ gstcollectpads.c \ + gstdataqueue.c \ gstpushsrc.c \ + gstqueuearray.c \ gsttypefindhelper.c libgstbase_@GST_API_VERSION@_la_CFLAGS = $(GST_OBJ_CFLAGS) @@ -32,7 +34,9 @@ libgstbase_@GST_API_VERSION@include_HEADERS = \ gstbytereader.h \ gstbytewriter.h \ gstcollectpads.h \ + gstdataqueue.h \ gstpushsrc.h \ + gstqueuearray.h \ gsttypefindhelper.h noinst_HEADERS = \ diff --git a/plugins/elements/gstdataqueue.c b/libs/gst/base/gstdataqueue.c similarity index 73% rename from plugins/elements/gstdataqueue.c rename to libs/gst/base/gstdataqueue.c index dff49c6..ebdc2c8 100644 --- a/plugins/elements/gstdataqueue.c +++ b/libs/gst/base/gstdataqueue.c @@ -31,6 +31,7 @@ #include #include "string.h" #include "gstdataqueue.h" +#include "gstqueuearray.h" #include "gst/glib-compat-private.h" GST_DEBUG_CATEGORY_STATIC (data_queue_debug); @@ -48,18 +49,38 @@ enum enum { - ARG_0, - ARG_CUR_LEVEL_VISIBLE, - ARG_CUR_LEVEL_BYTES, - ARG_CUR_LEVEL_TIME + PROP_0, + PROP_CUR_LEVEL_VISIBLE, + PROP_CUR_LEVEL_BYTES, + PROP_CUR_LEVEL_TIME /* FILL ME */ }; +struct _GstDataQueuePrivate +{ + /* the array of data we're keeping our grubby hands on */ + GstQueueArray *queue; + + GstDataQueueSize cur_level; /* size of the queue */ + GstDataQueueCheckFullFunction checkfull; /* Callback to check if the queue is full */ + gpointer *checkdata; + + GMutex qlock; /* lock for queue (vs object lock) */ + gboolean waiting_add; + GCond item_add; /* signals buffers now available for reading */ + gboolean waiting_del; + GCond item_del; /* signals space now available for writing */ + gboolean flushing; /* indicates whether conditions where signalled because + * of external flushing */ + GstDataQueueFullCallback fullcallback; + GstDataQueueEmptyCallback emptycallback; +}; + #define GST_DATA_QUEUE_MUTEX_LOCK(q) G_STMT_START { \ GST_CAT_LOG (data_queue_dataflow, \ "locking qlock from thread %p", \ g_thread_self ()); \ - g_mutex_lock (&q->qlock); \ + g_mutex_lock (&q->priv->qlock); \ GST_CAT_LOG (data_queue_dataflow, \ "locked qlock from thread %p", \ g_thread_self ()); \ @@ -67,7 +88,7 @@ enum #define GST_DATA_QUEUE_MUTEX_LOCK_CHECK(q, label) G_STMT_START { \ GST_DATA_QUEUE_MUTEX_LOCK (q); \ - if (q->flushing) \ + if (q->priv->flushing) \ goto label; \ } G_STMT_END @@ -75,7 +96,7 @@ enum GST_CAT_LOG (data_queue_dataflow, \ "unlocking qlock from thread %p", \ g_thread_self ()); \ - g_mutex_unlock (&q->qlock); \ + g_mutex_unlock (&q->priv->qlock); \ } G_STMT_END #define STATUS(q, msg) \ @@ -84,10 +105,10 @@ enum "bytes, %"G_GUINT64_FORMAT \ " ns, %u elements", \ queue, \ - q->cur_level.visible, \ - q->cur_level.bytes, \ - q->cur_level.time, \ - q->queue.length) + q->priv->cur_level.visible, \ + q->priv->cur_level.bytes, \ + q->priv->cur_level.time, \ + gst_queue_array_get_length (q->priv->queue)) static void gst_data_queue_finalize (GObject * object); @@ -96,7 +117,6 @@ static void gst_data_queue_set_property (GObject * object, static void gst_data_queue_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GObjectClass *parent_class = NULL; static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 }; #define _do_init \ @@ -107,7 +127,7 @@ static guint gst_data_queue_signals[LAST_SIGNAL] = { 0 }; "dataflow inside the data queue object"); \ } - +#define parent_class gst_data_queue_parent_class G_DEFINE_TYPE_WITH_CODE (GstDataQueue, gst_data_queue, G_TYPE_OBJECT, _do_init); static void @@ -115,7 +135,7 @@ gst_data_queue_class_init (GstDataQueueClass * klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); - parent_class = g_type_class_peek_parent (klass); + g_type_class_add_private (klass, sizeof (GstDataQueuePrivate)); gobject_class->set_property = gst_data_queue_set_property; gobject_class->get_property = gst_data_queue_get_property; @@ -150,16 +170,16 @@ gst_data_queue_class_init (GstDataQueueClass * klass) g_cclosure_marshal_VOID__VOID, G_TYPE_NONE, 0); /* properties */ - g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_BYTES, + g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_BYTES, g_param_spec_uint ("current-level-bytes", "Current level (kB)", "Current amount of data in the queue (bytes)", 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_VISIBLE, + g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_VISIBLE, g_param_spec_uint ("current-level-visible", "Current level (visible items)", "Current number of visible items in the queue", 0, G_MAXUINT, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); - g_object_class_install_property (gobject_class, ARG_CUR_LEVEL_TIME, + g_object_class_install_property (gobject_class, PROP_CUR_LEVEL_TIME, g_param_spec_uint64 ("current-level-time", "Current level (ns)", "Current amount of data in the queue (in ns)", 0, G_MAXUINT64, 0, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); @@ -170,22 +190,26 @@ gst_data_queue_class_init (GstDataQueueClass * klass) static void gst_data_queue_init (GstDataQueue * queue) { - queue->cur_level.visible = 0; /* no content */ - queue->cur_level.bytes = 0; /* no content */ - queue->cur_level.time = 0; /* no content */ + queue->priv = + G_TYPE_INSTANCE_GET_PRIVATE (queue, GST_TYPE_DATA_QUEUE, + GstDataQueuePrivate); + + queue->priv->cur_level.visible = 0; /* no content */ + queue->priv->cur_level.bytes = 0; /* no content */ + queue->priv->cur_level.time = 0; /* no content */ - queue->checkfull = NULL; + queue->priv->checkfull = NULL; - g_mutex_init (&queue->qlock); - g_cond_init (&queue->item_add); - g_cond_init (&queue->item_del); - gst_queue_array_init (&queue->queue, 50); + g_mutex_init (&queue->priv->qlock); + g_cond_init (&queue->priv->item_add); + g_cond_init (&queue->priv->item_del); + queue->priv->queue = gst_queue_array_new (50); GST_DEBUG ("initialized queue's not_empty & not_full conditions"); } /** - * gst_data_queue_new_full: + * gst_data_queue_new: * @checkfull: the callback used to tell if the element considers the queue full * or not. * @fullcallback: the callback which will be called when the queue is considered full. @@ -197,10 +221,11 @@ gst_data_queue_init (GstDataQueue * queue) * or @emptycallback. * * Returns: a new #GstDataQueue. + * + * Since: 1.2.0 */ - GstDataQueue * -gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull, +gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, GstDataQueueFullCallback fullcallback, GstDataQueueEmptyCallback emptycallback, gpointer checkdata) { @@ -209,41 +234,28 @@ gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull, g_return_val_if_fail (checkfull != NULL, NULL); ret = g_object_newv (GST_TYPE_DATA_QUEUE, 0, NULL); - ret->checkfull = checkfull; - ret->checkdata = checkdata; - ret->fullcallback = fullcallback; - ret->emptycallback = emptycallback; + ret->priv->checkfull = checkfull; + ret->priv->checkdata = checkdata; + ret->priv->fullcallback = fullcallback; + ret->priv->emptycallback = emptycallback; return ret; } -/** - * gst_data_queue_new: - * @checkfull: the callback used to tell if the element considers the queue full - * or not. - * @checkdata: a #gpointer that will be given in the @checkfull callback. - * - * Returns: a new #GstDataQueue. - */ - -GstDataQueue * -gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, gpointer checkdata) -{ - return gst_data_queue_new_full (checkfull, NULL, NULL, checkdata); -} - static void gst_data_queue_cleanup (GstDataQueue * queue) { - while (!gst_queue_array_is_empty (&queue->queue)) { - GstDataQueueItem *item = gst_queue_array_pop_head (&queue->queue); + GstDataQueuePrivate *priv = queue->priv; + + while (!gst_queue_array_is_empty (priv->queue)) { + GstDataQueueItem *item = gst_queue_array_pop_head (priv->queue); /* Just call the destroy notify on the item */ item->destroy (item); } - queue->cur_level.visible = 0; - queue->cur_level.bytes = 0; - queue->cur_level.time = 0; + priv->cur_level.visible = 0; + priv->cur_level.bytes = 0; + priv->cur_level.time = 0; } /* called only once, as opposed to dispose */ @@ -251,18 +263,19 @@ static void gst_data_queue_finalize (GObject * object) { GstDataQueue *queue = GST_DATA_QUEUE (object); + GstDataQueuePrivate *priv = queue->priv; GST_DEBUG ("finalizing queue"); gst_data_queue_cleanup (queue); - gst_queue_array_clear (&queue->queue); + gst_queue_array_free (priv->queue); GST_DEBUG ("free mutex"); - g_mutex_clear (&queue->qlock); + g_mutex_clear (&priv->qlock); GST_DEBUG ("done free mutex"); - g_cond_clear (&queue->item_add); - g_cond_clear (&queue->item_del); + g_cond_clear (&priv->item_add); + g_cond_clear (&priv->item_del); G_OBJECT_CLASS (parent_class)->finalize (object); } @@ -270,25 +283,31 @@ gst_data_queue_finalize (GObject * object) static inline void gst_data_queue_locked_flush (GstDataQueue * queue) { + GstDataQueuePrivate *priv = queue->priv; + STATUS (queue, "before flushing"); gst_data_queue_cleanup (queue); STATUS (queue, "after flushing"); /* we deleted something... */ - if (queue->waiting_del) - g_cond_signal (&queue->item_del); + if (priv->waiting_del) + g_cond_signal (&priv->item_del); } static inline gboolean gst_data_queue_locked_is_empty (GstDataQueue * queue) { - return (queue->queue.length == 0); + GstDataQueuePrivate *priv = queue->priv; + + return (gst_queue_array_get_length (priv->queue) == 0); } static inline gboolean gst_data_queue_locked_is_full (GstDataQueue * queue) { - return queue->checkfull (queue, queue->cur_level.visible, - queue->cur_level.bytes, queue->cur_level.time, queue->checkdata); + GstDataQueuePrivate *priv = queue->priv; + + return priv->checkfull (queue, priv->cur_level.visible, + priv->cur_level.bytes, priv->cur_level.time, priv->checkdata); } /** @@ -298,6 +317,8 @@ gst_data_queue_locked_is_full (GstDataQueue * queue) * Flushes all the contents of the @queue. Any call to #gst_data_queue_push and * #gst_data_queue_pop will be released. * MT safe. + * + * Since: 1.2.0 */ void gst_data_queue_flush (GstDataQueue * queue) @@ -316,6 +337,8 @@ gst_data_queue_flush (GstDataQueue * queue) * MT safe. * * Returns: #TRUE if @queue is empty. + * + * Since: 1.2.0 */ gboolean gst_data_queue_is_empty (GstDataQueue * queue) @@ -338,6 +361,8 @@ gst_data_queue_is_empty (GstDataQueue * queue) * MT safe. * * Returns: #TRUE if @queue is full. + * + * Since: 1.2.0 */ gboolean gst_data_queue_is_full (GstDataQueue * queue) @@ -363,20 +388,24 @@ gst_data_queue_is_full (GstDataQueue * queue) * all calls to those two functions will return #FALSE. * * MT Safe. + * + * Since: 1.2.0 */ void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) { + GstDataQueuePrivate *priv = queue->priv; + GST_DEBUG ("queue:%p , flushing:%d", queue, flushing); GST_DATA_QUEUE_MUTEX_LOCK (queue); - queue->flushing = flushing; + priv->flushing = flushing; if (flushing) { /* release push/pop functions */ - if (queue->waiting_add) - g_cond_signal (&queue->item_add); - if (queue->waiting_del) - g_cond_signal (&queue->item_del); + if (priv->waiting_add) + g_cond_signal (&priv->item_add); + if (priv->waiting_del) + g_cond_signal (&priv->item_del); } GST_DATA_QUEUE_MUTEX_UNLOCK (queue); } @@ -397,10 +426,14 @@ gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing) * is returned, the caller is responsible for freeing @item and its contents. * * Returns: #TRUE if the @item was successfully pushed on the @queue. + * + * Since: 1.2.0 */ gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) { + GstDataQueuePrivate *priv = queue->priv; + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); g_return_val_if_fail (item != NULL, FALSE); @@ -411,32 +444,32 @@ gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item) /* We ALWAYS need to check for queue fillness */ if (gst_data_queue_locked_is_full (queue)) { GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - if (G_LIKELY (queue->fullcallback)) - queue->fullcallback (queue, queue->checkdata); + if (G_LIKELY (priv->fullcallback)) + priv->fullcallback (queue, priv->checkdata); else g_signal_emit (queue, gst_data_queue_signals[SIGNAL_FULL], 0); GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); /* signal might have removed some items */ while (gst_data_queue_locked_is_full (queue)) { - queue->waiting_del = TRUE; - g_cond_wait (&queue->item_del, &queue->qlock); - queue->waiting_del = FALSE; - if (queue->flushing) + priv->waiting_del = TRUE; + g_cond_wait (&priv->item_del, &priv->qlock); + priv->waiting_del = FALSE; + if (priv->flushing) goto flushing; } } - gst_queue_array_push_tail (&queue->queue, item); + gst_queue_array_push_tail (priv->queue, item); if (item->visible) - queue->cur_level.visible++; - queue->cur_level.bytes += item->size; - queue->cur_level.time += item->duration; + priv->cur_level.visible++; + priv->cur_level.bytes += item->size; + priv->cur_level.time += item->duration; STATUS (queue, "after pushing"); - if (queue->waiting_add) - g_cond_signal (&queue->item_add); + if (priv->waiting_add) + g_cond_signal (&priv->item_add); GST_DATA_QUEUE_MUTEX_UNLOCK (queue); @@ -462,10 +495,14 @@ flushing: * MT safe. * * Returns: #TRUE if an @item was successfully retrieved from the @queue. + * + * Since: 1.2.0 */ gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) { + GstDataQueuePrivate *priv = queue->priv; + g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); g_return_val_if_fail (item != NULL, FALSE); @@ -475,33 +512,33 @@ gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item) if (gst_data_queue_locked_is_empty (queue)) { GST_DATA_QUEUE_MUTEX_UNLOCK (queue); - if (G_LIKELY (queue->emptycallback)) - queue->emptycallback (queue, queue->checkdata); + if (G_LIKELY (priv->emptycallback)) + priv->emptycallback (queue, priv->checkdata); else g_signal_emit (queue, gst_data_queue_signals[SIGNAL_EMPTY], 0); GST_DATA_QUEUE_MUTEX_LOCK_CHECK (queue, flushing); while (gst_data_queue_locked_is_empty (queue)) { - queue->waiting_add = TRUE; - g_cond_wait (&queue->item_add, &queue->qlock); - queue->waiting_add = FALSE; - if (queue->flushing) + priv->waiting_add = TRUE; + g_cond_wait (&priv->item_add, &priv->qlock); + priv->waiting_add = FALSE; + if (priv->flushing) goto flushing; } } /* Get the item from the GQueue */ - *item = gst_queue_array_pop_head (&queue->queue); + *item = gst_queue_array_pop_head (priv->queue); /* update current level counter */ if ((*item)->visible) - queue->cur_level.visible--; - queue->cur_level.bytes -= (*item)->size; - queue->cur_level.time -= (*item)->duration; + priv->cur_level.visible--; + priv->cur_level.bytes -= (*item)->size; + priv->cur_level.time -= (*item)->duration; STATUS (queue, "after popping"); - if (queue->waiting_del) - g_cond_signal (&queue->item_del); + if (priv->waiting_del) + g_cond_signal (&priv->item_del); GST_DATA_QUEUE_MUTEX_UNLOCK (queue); @@ -530,6 +567,8 @@ is_of_type (gconstpointer a, gconstpointer b) * Pop and unref the head-most #GstMiniObject with the given #GType. * * Returns: TRUE if an element was removed. + * + * Since: 1.2.0 */ gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type) @@ -537,25 +576,24 @@ gst_data_queue_drop_head (GstDataQueue * queue, GType type) gboolean res = FALSE; GstDataQueueItem *leak = NULL; guint idx; + GstDataQueuePrivate *priv = queue->priv; g_return_val_if_fail (GST_IS_DATA_QUEUE (queue), FALSE); GST_DEBUG ("queue:%p", queue); GST_DATA_QUEUE_MUTEX_LOCK (queue); - idx = - gst_queue_array_find (&queue->queue, is_of_type, GINT_TO_POINTER (type)); + idx = gst_queue_array_find (priv->queue, is_of_type, GINT_TO_POINTER (type)); if (idx == -1) goto done; - leak = queue->queue.array[idx]; - gst_queue_array_drop_element (&queue->queue, idx); + leak = gst_queue_array_drop_element (priv->queue, idx); if (leak->visible) - queue->cur_level.visible--; - queue->cur_level.bytes -= leak->size; - queue->cur_level.time -= leak->duration; + priv->cur_level.visible--; + priv->cur_level.bytes -= leak->size; + priv->cur_level.time -= leak->duration; leak->destroy (leak); @@ -575,16 +613,20 @@ done: * * Inform the queue that the limits for the fullness check have changed and that * any blocking gst_data_queue_push() should be unblocked to recheck the limts. + * + * Since: 1.2.0 */ void gst_data_queue_limits_changed (GstDataQueue * queue) { + GstDataQueuePrivate *priv = queue->priv; + g_return_if_fail (GST_IS_DATA_QUEUE (queue)); GST_DATA_QUEUE_MUTEX_LOCK (queue); - if (queue->waiting_del) { + if (priv->waiting_del) { GST_DEBUG ("signal del"); - g_cond_signal (&queue->item_del); + g_cond_signal (&priv->item_del); } GST_DATA_QUEUE_MUTEX_UNLOCK (queue); } @@ -595,11 +637,15 @@ gst_data_queue_limits_changed (GstDataQueue * queue) * @level: the location to store the result * * Get the current level of the queue. + * + * Since: 1.2.0 */ void gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize * level) { - memcpy (level, (&queue->cur_level), sizeof (GstDataQueueSize)); + GstDataQueuePrivate *priv = queue->priv; + + memcpy (level, (&priv->cur_level), sizeof (GstDataQueueSize)); } static void @@ -618,18 +664,19 @@ gst_data_queue_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec) { GstDataQueue *queue = GST_DATA_QUEUE (object); + GstDataQueuePrivate *priv = queue->priv; GST_DATA_QUEUE_MUTEX_LOCK (queue); switch (prop_id) { - case ARG_CUR_LEVEL_BYTES: - g_value_set_uint (value, queue->cur_level.bytes); + case PROP_CUR_LEVEL_BYTES: + g_value_set_uint (value, priv->cur_level.bytes); break; - case ARG_CUR_LEVEL_VISIBLE: - g_value_set_uint (value, queue->cur_level.visible); + case PROP_CUR_LEVEL_VISIBLE: + g_value_set_uint (value, priv->cur_level.visible); break; - case ARG_CUR_LEVEL_TIME: - g_value_set_uint64 (value, queue->cur_level.time); + case PROP_CUR_LEVEL_TIME: + g_value_set_uint64 (value, priv->cur_level.time); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec); diff --git a/plugins/elements/gstdataqueue.h b/libs/gst/base/gstdataqueue.h similarity index 79% rename from plugins/elements/gstdataqueue.h rename to libs/gst/base/gstdataqueue.h index 52fdeaf..3c21268 100644 --- a/plugins/elements/gstdataqueue.h +++ b/libs/gst/base/gstdataqueue.h @@ -24,7 +24,6 @@ #define __GST_DATA_QUEUE_H__ #include -#include "gstqueuearray.h" G_BEGIN_DECLS #define GST_TYPE_DATA_QUEUE \ @@ -41,6 +40,7 @@ typedef struct _GstDataQueue GstDataQueue; typedef struct _GstDataQueueClass GstDataQueueClass; typedef struct _GstDataQueueSize GstDataQueueSize; typedef struct _GstDataQueueItem GstDataQueueItem; +typedef struct _GstDataQueuePrivate GstDataQueuePrivate; /** * GstDataQueueItem: @@ -66,6 +66,9 @@ struct _GstDataQueueItem /* user supplied destroy function */ GDestroyNotify destroy; + + /* < private > */ + gpointer _gst_reserved[GST_PADDING]; }; /** @@ -113,24 +116,8 @@ struct _GstDataQueue GObject object; /*< private >*/ - /* the array of data we're keeping our grubby hands on */ - GstQueueArray queue; - - GstDataQueueSize cur_level; /* size of the queue */ - GstDataQueueCheckFullFunction checkfull; /* Callback to check if the queue is full */ - gpointer *checkdata; - - GMutex qlock; /* lock for queue (vs object lock) */ - gboolean waiting_add; - GCond item_add; /* signals buffers now available for reading */ - gboolean waiting_del; - GCond item_del; /* signals space now available for writing */ - gboolean flushing; /* indicates whether conditions where signalled because - * of external flushing */ - GstDataQueueFullCallback fullcallback; - GstDataQueueEmptyCallback emptycallback; - - /* gpointer _gst_reserved[GST_PADDING]; */ + GstDataQueuePrivate *priv; + gpointer _gst_reserved[GST_PADDING]; }; struct _GstDataQueueClass @@ -141,47 +128,32 @@ struct _GstDataQueueClass void (*empty) (GstDataQueue * queue); void (*full) (GstDataQueue * queue); - /* gpointer _gst_reserved[GST_PADDING]; */ + gpointer _gst_reserved[GST_PADDING]; }; -G_GNUC_INTERNAL GType gst_data_queue_get_type (void); -G_GNUC_INTERNAL GstDataQueue * gst_data_queue_new (GstDataQueueCheckFullFunction checkfull, - gpointer checkdata) G_GNUC_MALLOC; - -G_GNUC_INTERNAL -GstDataQueue * gst_data_queue_new_full (GstDataQueueCheckFullFunction checkfull, GstDataQueueFullCallback fullcallback, GstDataQueueEmptyCallback emptycallback, gpointer checkdata) G_GNUC_MALLOC; -G_GNUC_INTERNAL gboolean gst_data_queue_push (GstDataQueue * queue, GstDataQueueItem * item); -G_GNUC_INTERNAL gboolean gst_data_queue_pop (GstDataQueue * queue, GstDataQueueItem ** item); -G_GNUC_INTERNAL void gst_data_queue_flush (GstDataQueue * queue); -G_GNUC_INTERNAL void gst_data_queue_set_flushing (GstDataQueue * queue, gboolean flushing); -G_GNUC_INTERNAL gboolean gst_data_queue_drop_head (GstDataQueue * queue, GType type); -G_GNUC_INTERNAL gboolean gst_data_queue_is_full (GstDataQueue * queue); -G_GNUC_INTERNAL gboolean gst_data_queue_is_empty (GstDataQueue * queue); -G_GNUC_INTERNAL void gst_data_queue_get_level (GstDataQueue * queue, GstDataQueueSize *level); -G_GNUC_INTERNAL void gst_data_queue_limits_changed (GstDataQueue * queue); G_END_DECLS diff --git a/plugins/elements/gstqueuearray.c b/libs/gst/base/gstqueuearray.c similarity index 59% rename from plugins/elements/gstqueuearray.c rename to libs/gst/base/gstqueuearray.c index f16f7ae..b71df08 100644 --- a/plugins/elements/gstqueuearray.c +++ b/libs/gst/base/gstqueuearray.c @@ -19,31 +19,82 @@ * Boston, MA 02111-1307, USA. */ +/** + * SECTION:gstqueuearray + * @short_description: Array based queue object + * + * #GstQueueArray is an object that provides standard queue functionality + * based on an array instead of linked lists. This reduces the overhead + * caused by memory managment by a large factor. + */ + + #include #include #include "gstqueuearray.h" -void -gst_queue_array_init (GstQueueArray * array, guint initial_size) +struct _GstQueueArray { + /* < private > */ + gpointer *array; + guint size; + guint head; + guint tail; + guint length; +}; + +/** + * gst_queue_array_new: + * @initial_size: Initial size of the new queue + * + * Allocates a new #GstQueueArray object with an initial + * queue size of @initial_size. + * + * Returns: a new #GstQueueArray object + * + * Since: 1.2.0 + */ +GstQueueArray * +gst_queue_array_new (guint initial_size) +{ + GstQueueArray *array; + + array = g_slice_new (GstQueueArray); array->size = initial_size; array->array = g_new0 (gpointer, initial_size); array->head = 0; array->tail = 0; array->length = 0; - + return array; } -GstQueueArray * -gst_queue_array_new (guint initial_size) -{ - GstQueueArray *array; - array = g_new (GstQueueArray, 1); - gst_queue_array_init (array, initial_size); - return array; +/** + * gst_queue_array_free: + * @array: a #GstQueueArray object + * + * Frees queue @array and all memory associated to it. + * + * Since: 1.2.0 + */ +void +gst_queue_array_free (GstQueueArray * array) +{ + g_free (array->array); + g_slice_free (GstQueueArray, array); } +/** + * gst_queue_array_pop_head: + * @array: a #GstQueueArray object + * + * Returns and head of the queue @array and removes + * it from the queue. + * + * Returns: The head of the queue + * + * Since: 1.2.0 + */ gpointer gst_queue_array_pop_head (GstQueueArray * array) { @@ -59,6 +110,35 @@ gst_queue_array_pop_head (GstQueueArray * array) return ret; } +/** + * gst_queue_array_pop_head: + * @array: a #GstQueueArray object + * + * Returns and head of the queue @array and does not + * remove it from the queue. + * + * Returns: The head of the queue + * + * Since: 1.2.0 + */ +gpointer +gst_queue_array_peek_head (GstQueueArray * array) +{ + /* empty array */ + if (G_UNLIKELY (array->length == 0)) + return NULL; + return array->array[array->head]; +} + +/** + * gst_queue_array_push_tail: + * @array: a #GstQueueArray object + * @data: object to push + * + * Pushes @data to the tail of the queue @array. + * + * Since: 1.2.0 + */ void gst_queue_array_push_tail (GstQueueArray * array, gpointer data) { @@ -102,43 +182,56 @@ gst_queue_array_push_tail (GstQueueArray * array, gpointer data) array->length++; } +/** + * gst_queue_array_is_empty: + * @array: a #GstQueueArray object + * + * Checks if the queue @array is empty. + * + * Returns: %TRUE if the queue @array is empty + * + * Since: 1.2.0 + */ gboolean gst_queue_array_is_empty (GstQueueArray * array) { return (array->length == 0); } -void -gst_queue_array_clear (GstQueueArray * array) -{ - g_free (array->array); -} - -void -gst_queue_array_free (GstQueueArray * array) -{ - gst_queue_array_clear (array); - g_free (array); -} - -void +/** + * gst_queue_array_drop_element: + * @array: a #GstQueueArray object + * @idx: index to drop + * + * Drops the queue element at position @idx from queue @array. + * + * Returns: the dropped element + * + * Since: 1.2.0 + */ +gpointer gst_queue_array_drop_element (GstQueueArray * array, guint idx) { + gpointer element; + if (idx == array->head) { /* just move the head */ + element = array->array[idx]; array->head++; array->head %= array->size; - return; + return element; } if (idx == array->tail - 1) { /* just move the tail */ + element = array->array[idx]; array->tail = (array->tail - 1 + array->size) % array->size; - return; + return element; } /* drop the element #idx... and readjust the array */ if (array->head < array->tail) { /* Make sure it's within the boundaries */ g_assert (array->head < idx && idx <= array->tail); + element = array->array[idx]; /* ends not wrapped */ /* move head-idx to head+1 */ memcpy (&array->array[array->head + 1], @@ -147,21 +240,39 @@ gst_queue_array_drop_element (GstQueueArray * array, guint idx) } else { /* ends are wrapped */ if (idx < array->tail) { + element = array->array[idx]; /* move idx-tail backwards one */ memcpy (&array->array[idx - 1], &array->array[idx], (array->tail - idx) * sizeof (gpointer)); array->tail--; } else if (idx >= array->head) { + element = array->array[idx]; /* move head-idx forwards one */ memcpy (&array->array[array->head], &array->array[array->head + 1], (idx - array->head) * sizeof (gpointer)); array->head++; - } else + } else { g_assert_not_reached (); + element = NULL; + } } + return element; } +/** + * gst_queue_array_find: + * @array: a #GstQueueArray object + * @func: comparison function + * @data: data for comparison function + * + * Finds an element in the queue @array by comparing every element + * with @func and returning the index of the found element. + * + * Returns: Index of the found element or -1 if nothing was found. + * + * Since: 1.2.0 + */ guint gst_queue_array_find (GstQueueArray * array, GCompareFunc func, gpointer data) { @@ -173,3 +284,19 @@ gst_queue_array_find (GstQueueArray * array, GCompareFunc func, gpointer data) return i; return -1; } + +/** + * gst_queue_array_get_length: + * @array: a #GstQueueArray object + * + * Returns the length of the queue @array + * + * Returns: the length of the queue @array. + * + * Since: 1.2.0 + */ +guint +gst_queue_array_get_length (GstQueueArray * array) +{ + return array->length; +} diff --git a/libs/gst/base/gstqueuearray.h b/libs/gst/base/gstqueuearray.h new file mode 100644 index 0000000..6b7ac54 --- /dev/null +++ b/libs/gst/base/gstqueuearray.h @@ -0,0 +1,50 @@ +/* GStreamer + * Copyright (C) 2009-2010 Edward Hervey + * + * gstqueuearray.h: + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#include + +#ifndef __GST_QUEUE_ARRAY_H__ +#define __GST_QUEUE_ARRAY_H__ + +typedef struct _GstQueueArray GstQueueArray; + +GstQueueArray * gst_queue_array_new (guint initial_size); + +void gst_queue_array_free (GstQueueArray * array); + +gpointer gst_queue_array_pop_head (GstQueueArray * array); +gpointer gst_queue_array_peek_head (GstQueueArray * array); + +void gst_queue_array_push_tail (GstQueueArray * array, + gpointer data); + +gboolean gst_queue_array_is_empty (GstQueueArray * array); + +gpointer gst_queue_array_drop_element (GstQueueArray * array, + guint idx); + +guint gst_queue_array_find (GstQueueArray * array, + GCompareFunc func, + gpointer data); + +guint gst_queue_array_get_length (GstQueueArray * array); + +#endif diff --git a/plugins/elements/Makefile.am b/plugins/elements/Makefile.am index 5e0230c..8add0bd 100644 --- a/plugins/elements/Makefile.am +++ b/plugins/elements/Makefile.am @@ -15,9 +15,7 @@ libgstcoreelements_la_SOURCES = \ gstidentity.c \ gstinputselector.c \ gstoutputselector.c \ - gstdataqueue.c \ gstmultiqueue.c \ - gstqueuearray.c \ gstqueue.c \ gstqueue2.c \ gsttee.c \ @@ -43,9 +41,7 @@ noinst_HEADERS = \ gstidentity.h \ gstinputselector.h \ gstoutputselector.h \ - gstdataqueue.h \ gstmultiqueue.h \ - gstqueuearray.h \ gstqueue.h \ gstqueue2.h \ gsttee.h \ diff --git a/plugins/elements/gstmultiqueue.c b/plugins/elements/gstmultiqueue.c index 8afcebb..22686ab 100644 --- a/plugins/elements/gstmultiqueue.c +++ b/plugins/elements/gstmultiqueue.c @@ -1983,7 +1983,7 @@ gst_single_queue_new (GstMultiQueue * mqueue, guint id) sq->mqueue = mqueue; sq->srcresult = GST_FLOW_FLUSHING; sq->pushed = FALSE; - sq->queue = gst_data_queue_new_full ((GstDataQueueCheckFullFunction) + sq->queue = gst_data_queue_new ((GstDataQueueCheckFullFunction) single_queue_check_full, (GstDataQueueFullCallback) single_queue_overrun_cb, (GstDataQueueEmptyCallback) single_queue_underrun_cb, sq); diff --git a/plugins/elements/gstmultiqueue.h b/plugins/elements/gstmultiqueue.h index 986dc22..8053ef9 100644 --- a/plugins/elements/gstmultiqueue.h +++ b/plugins/elements/gstmultiqueue.h @@ -24,7 +24,7 @@ #define __GST_MULTI_QUEUE_H__ #include -#include "gstdataqueue.h" +#include G_BEGIN_DECLS diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 2989823..8b06fe1 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -92,7 +92,7 @@ GST_DEBUG_CATEGORY_STATIC (queue_dataflow); queue->cur_level.time, \ queue->min_threshold.time, \ queue->max_size.time, \ - queue->queue.length) + gst_queue_array_get_length (queue->queue)) /* Queue signals and args */ enum @@ -417,7 +417,7 @@ gst_queue_init (GstQueue * queue) g_cond_init (&queue->item_add); g_cond_init (&queue->item_del); - gst_queue_array_init (&queue->queue, DEFAULT_MAX_SIZE_BUFFERS * 3 / 2); + queue->queue = gst_queue_array_new (DEFAULT_MAX_SIZE_BUFFERS * 3 / 2); queue->sinktime = GST_CLOCK_TIME_NONE; queue->srctime = GST_CLOCK_TIME_NONE; @@ -440,13 +440,13 @@ gst_queue_finalize (GObject * object) GST_DEBUG_OBJECT (queue, "finalizing queue"); - while (!gst_queue_array_is_empty (&queue->queue)) { - data = gst_queue_array_pop_head (&queue->queue); + while (!gst_queue_array_is_empty (queue->queue)) { + data = gst_queue_array_pop_head (queue->queue); /* FIXME: if it's a query, shouldn't we unref that too? */ if (!GST_IS_QUERY (data)) gst_mini_object_unref (data); } - gst_queue_array_clear (&queue->queue); + gst_queue_array_free (queue->queue); g_mutex_clear (&queue->qlock); g_cond_clear (&queue->item_add); @@ -556,8 +556,8 @@ gst_queue_locked_flush (GstQueue * queue) { GstMiniObject *data; - while (!gst_queue_array_is_empty (&queue->queue)) { - data = gst_queue_array_pop_head (&queue->queue); + while (!gst_queue_array_is_empty (queue->queue)) { + data = gst_queue_array_pop_head (queue->queue); /* Then lose another reference because we are supposed to destroy that data when flushing */ if (!GST_IS_QUERY (data)) @@ -590,7 +590,7 @@ gst_queue_locked_enqueue_buffer (GstQueue * queue, gpointer item) apply_buffer (queue, buffer, &queue->sink_segment, TRUE, TRUE); if (item) - gst_queue_array_push_tail (&queue->queue, item); + gst_queue_array_push_tail (queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -611,7 +611,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) case GST_EVENT_SEGMENT: apply_segment (queue, event, &queue->sink_segment, TRUE); /* if the queue is empty, apply sink segment on the source */ - if (queue->queue.length == 0) { + if (gst_queue_array_is_empty (queue->queue)) { GST_CAT_LOG_OBJECT (queue_dataflow, queue, "Apply segment on srcpad"); apply_segment (queue, event, &queue->src_segment, FALSE); queue->newseg_applied_to_src = TRUE; @@ -625,7 +625,7 @@ gst_queue_locked_enqueue_event (GstQueue * queue, gpointer item) } if (item) - gst_queue_array_push_tail (&queue->queue, item); + gst_queue_array_push_tail (queue->queue, item); GST_QUEUE_SIGNAL_ADD (queue); } @@ -635,7 +635,7 @@ gst_queue_locked_dequeue (GstQueue * queue) { GstMiniObject *item; - item = gst_queue_array_pop_head (&queue->queue); + item = gst_queue_array_pop_head (queue->queue); if (item == NULL) goto no_item; @@ -792,9 +792,9 @@ gst_queue_handle_sink_query (GstPad * pad, GstObject * parent, GstQuery * query) GST_QUEUE_MUTEX_LOCK_CHECK (queue, out_flushing); GST_LOG_OBJECT (queue, "queuing query %p (%s)", query, GST_QUERY_TYPE_NAME (query)); - gst_queue_array_push_tail (&queue->queue, query); + gst_queue_array_push_tail (queue->queue, query); GST_QUEUE_SIGNAL_ADD (queue); - while (queue->queue.length != 0) { + while (!gst_queue_array_is_empty (queue->queue)) { /* for as long as the queue has items, we know the query is * not handled yet */ GST_QUEUE_WAIT_DEL_CHECK (queue, out_flushing); @@ -822,14 +822,14 @@ gst_queue_is_empty (GstQueue * queue) { GstMiniObject *head; - if (queue->queue.length == 0) + if (gst_queue_array_is_empty (queue->queue)) return TRUE; /* Only consider the queue empty if the minimum thresholds * are not reached and data is at the queue head. Otherwise * we would block forever on serialized queries. */ - head = queue->queue.array[queue->queue.head]; + head = gst_queue_array_peek_head (queue->queue); if (!GST_IS_BUFFER (head) && !GST_IS_BUFFER_LIST (head)) return FALSE; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index 1ed123e..a262d61 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -25,7 +25,7 @@ #define __GST_QUEUE_H__ #include -#include "gstqueuearray.h" +#include G_BEGIN_DECLS @@ -108,7 +108,7 @@ struct _GstQueue { gboolean eos; /* the queue of data we're keeping our grubby hands on */ - GstQueueArray queue; + GstQueueArray *queue; GstQueueSize cur_level, /* currently in the queue */ diff --git a/plugins/elements/gstqueuearray.h b/plugins/elements/gstqueuearray.h deleted file mode 100644 index 510cbf4..0000000 --- a/plugins/elements/gstqueuearray.h +++ /dev/null @@ -1,61 +0,0 @@ -/* GStreamer - * Copyright (C) 2009-2010 Edward Hervey - * - * gstqueuearray.h: - * - * This library is free software; you can redistribute it and/or - * modify it under the terms of the GNU Library General Public - * License as published by the Free Software Foundation; either - * version 2 of the License, or (at your option) any later version. - * - * This library is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU - * Library General Public License for more details. - * - * You should have received a copy of the GNU Library General Public - * License along with this library; if not, write to the - * Free Software Foundation, Inc., 59 Temple Place - Suite 330, - * Boston, MA 02111-1307, USA. - */ - -#include - -#ifndef __GST_QUEUE_ARRAY_H__ -#define __GST_QUEUE_ARRAY_H__ - -typedef struct _GstQueueArray GstQueueArray; - -struct _GstQueueArray -{ - gpointer *array; - guint size; - guint head; - guint tail; - guint length; -}; - -G_GNUC_INTERNAL void gst_queue_array_init (GstQueueArray * array, - guint initial_size); - -G_GNUC_INTERNAL void gst_queue_array_clear (GstQueueArray * array); - -G_GNUC_INTERNAL GstQueueArray * gst_queue_array_new (guint initial_size); - -G_GNUC_INTERNAL gpointer gst_queue_array_pop_head (GstQueueArray * array); - -G_GNUC_INTERNAL void gst_queue_array_push_tail (GstQueueArray * array, - gpointer data); - -G_GNUC_INTERNAL gboolean gst_queue_array_is_empty (GstQueueArray * array); - -G_GNUC_INTERNAL void gst_queue_array_free (GstQueueArray * array); - -G_GNUC_INTERNAL void gst_queue_array_drop_element (GstQueueArray * array, - guint idx); - -G_GNUC_INTERNAL guint gst_queue_array_find (GstQueueArray * array, - GCompareFunc func, - gpointer data); - -#endif diff --git a/win32/common/libgstbase.def b/win32/common/libgstbase.def index bc52be4..cdd36cb 100644 --- a/win32/common/libgstbase.def +++ b/win32/common/libgstbase.def @@ -238,7 +238,27 @@ EXPORTS gst_collect_pads_start gst_collect_pads_stop gst_collect_pads_take_buffer + gst_data_queue_drop_head + gst_data_queue_flush + gst_data_queue_get_level + gst_data_queue_get_type + gst_data_queue_is_empty + gst_data_queue_is_full + gst_data_queue_limits_changed + gst_data_queue_new + gst_data_queue_pop + gst_data_queue_push + gst_data_queue_set_flushing gst_push_src_get_type + gst_queue_array_drop_element + gst_queue_array_find + gst_queue_array_free + gst_queue_array_get_length + gst_queue_array_is_empty + gst_queue_array_new + gst_queue_array_peek_head + gst_queue_array_pop_head + gst_queue_array_push_tail gst_type_find_helper gst_type_find_helper_for_buffer gst_type_find_helper_for_data -- 2.7.4