X-Git-Url: http://review.tizen.org/git/?a=blobdiff_plain;f=gst%2Fgstbufferpool.c;h=619860e63ffa976251329ce89a86bcade364c5d3;hb=dac5966da6a0f53d0443dfa1ac239289028c415d;hp=a54b42ddd513efb92935d31a746d4d77e37d1df8;hpb=b92d639fbf60c172a008cd93e4e35cc7cf9bc447;p=platform%2Fupstream%2Fgstreamer.git diff --git a/gst/gstbufferpool.c b/gst/gstbufferpool.c index a54b42d..619860e 100644 --- a/gst/gstbufferpool.c +++ b/gst/gstbufferpool.c @@ -21,6 +21,7 @@ /** * SECTION:gstbufferpool + * @title: GstBufferPool * @short_description: Pool for buffers * @see_also: #GstBuffer * @@ -80,12 +81,15 @@ #include "gstbufferpool.h" +#ifdef G_OS_WIN32 +# ifndef EWOULDBLOCK +# define EWOULDBLOCK EAGAIN /* This is just to placate gcc */ +# endif +#endif /* G_OS_WIN32 */ + GST_DEBUG_CATEGORY_STATIC (gst_buffer_pool_debug); #define GST_CAT_DEFAULT gst_buffer_pool_debug -#define GST_BUFFER_POOL_GET_PRIVATE(obj) \ - (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BUFFER_POOL, GstBufferPoolPrivate)) - #define GST_BUFFER_POOL_LOCK(pool) (g_rec_mutex_lock(&pool->priv->rec_lock)) #define GST_BUFFER_POOL_UNLOCK(pool) (g_rec_mutex_unlock(&pool->priv->rec_lock)) @@ -113,7 +117,7 @@ struct _GstBufferPoolPrivate static void gst_buffer_pool_finalize (GObject * object); -G_DEFINE_TYPE (GstBufferPool, gst_buffer_pool, GST_TYPE_OBJECT); +G_DEFINE_TYPE_WITH_PRIVATE (GstBufferPool, gst_buffer_pool, GST_TYPE_OBJECT); static gboolean default_start (GstBufferPool * pool); static gboolean default_stop (GstBufferPool * pool); @@ -132,8 +136,6 @@ gst_buffer_pool_class_init (GstBufferPoolClass * klass) { GObjectClass *gobject_class = (GObjectClass *) klass; - g_type_class_add_private (klass, sizeof (GstBufferPoolPrivate)); - gobject_class->finalize = gst_buffer_pool_finalize; klass->start = default_start; @@ -154,7 +156,7 @@ gst_buffer_pool_init (GstBufferPool * pool) { GstBufferPoolPrivate *priv; - priv = pool->priv = GST_BUFFER_POOL_GET_PRIVATE (pool); + priv = pool->priv = gst_buffer_pool_get_instance_private (pool); g_rec_mutex_init (&priv->rec_lock); @@ -170,9 +172,9 @@ gst_buffer_pool_init (GstBufferPool * pool) gst_allocation_params_init (&priv->params); gst_buffer_pool_config_set_allocator (priv->config, priv->allocator, &priv->params); - /* 1 control write for flushing */ + /* 1 control write for flushing - the flush token */ gst_poll_write_control (priv->poll); - /* 1 control write for marking that we are not waiting for poll */ + /* 1 control write for marking that we are not waiting for poll - the wait token */ gst_poll_write_control (priv->poll); GST_DEBUG_OBJECT (pool, "created"); @@ -187,7 +189,7 @@ gst_buffer_pool_finalize (GObject * object) pool = GST_BUFFER_POOL_CAST (object); priv = pool->priv; - GST_DEBUG_OBJECT (pool, "finalize"); + GST_DEBUG_OBJECT (pool, "%p finalize", pool); gst_buffer_pool_set_active (pool, FALSE); gst_atomic_queue_unref (priv->queue); @@ -205,16 +207,19 @@ gst_buffer_pool_finalize (GObject * object) * * Creates a new #GstBufferPool instance. * - * Returns: (transfer floating): a new #GstBufferPool instance + * Returns: (transfer full): a new #GstBufferPool instance */ GstBufferPool * gst_buffer_pool_new (void) { GstBufferPool *result; - result = g_object_newv (GST_TYPE_BUFFER_POOL, 0, NULL); + result = g_object_new (GST_TYPE_BUFFER_POOL, NULL); GST_DEBUG_OBJECT (result, "created new buffer pool"); + /* Clear floating flag */ + gst_object_ref_sink (result); + return result; } @@ -390,7 +395,17 @@ default_stop (GstBufferPool * pool) /* clear the pool */ while ((buffer = gst_atomic_queue_pop (priv->queue))) { - gst_poll_read_control (priv->poll); + while (!gst_poll_read_control (priv->poll)) { + if (errno == EWOULDBLOCK) { + /* We put the buffer into the queue but did not finish writing control + * yet, let's wait a bit and retry */ + g_thread_yield (); + continue; + } else { + /* Critical error but GstPoll already complained */ + break; + } + } do_free_buffer (pool, buffer); } return priv->cur_buffers == 0; @@ -431,6 +446,7 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing) if (flushing) { g_atomic_int_set (&pool->flushing, 1); + /* Write the flush token to wake up any waiters */ gst_poll_write_control (priv->poll); if (pclass->flush_start) @@ -439,7 +455,19 @@ do_set_flushing (GstBufferPool * pool, gboolean flushing) if (pclass->flush_stop) pclass->flush_stop (pool); - gst_poll_read_control (priv->poll); + while (!gst_poll_read_control (priv->poll)) { + if (errno == EWOULDBLOCK) { + /* This should not really happen unless flushing and unflushing + * happens on different threads. Let's wait a bit to get back flush + * token from the thread that was setting it to flushing */ + g_thread_yield (); + continue; + } else { + /* Critical error but GstPoll already complained */ + break; + } + } + g_atomic_int_set (&pool->flushing, 0); } } @@ -1001,7 +1029,7 @@ gst_buffer_pool_config_get_params (GstStructure * config, GstCaps ** caps, * * Get the @allocator and @params from @config. * - * Returns: %TRUE, if the values are set. + * Returns: %TRUE, if the values are set. */ gboolean gst_buffer_pool_config_get_allocator (GstStructure * config, @@ -1080,7 +1108,17 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, /* try to get a buffer from the queue */ *buffer = gst_atomic_queue_pop (priv->queue); if (G_LIKELY (*buffer)) { - gst_poll_read_control (priv->poll); + while (!gst_poll_read_control (priv->poll)) { + if (errno == EWOULDBLOCK) { + /* We put the buffer into the queue but did not finish writing control + * yet, let's wait a bit and retry */ + g_thread_yield (); + continue; + } else { + /* Critical error but GstPoll already complained */ + break; + } + } result = GST_FLOW_OK; GST_LOG_OBJECT (pool, "acquired buffer %p", *buffer); break; @@ -1105,10 +1143,33 @@ default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer, /* now we release the control socket, we wait for a buffer release or * flushing */ - gst_poll_read_control (pool->priv->poll); - GST_LOG_OBJECT (pool, "waiting for free buffers or flushing"); - gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE); - gst_poll_write_control (pool->priv->poll); + if (!gst_poll_read_control (pool->priv->poll)) { + if (errno == EWOULDBLOCK) { + /* This means that we have two threads trying to allocate buffers + * already, and the other one already got the wait token. This + * means that we only have to wait for the poll now and not write the + * token afterwards: we will be woken up once the other thread is + * woken up and that one will write the wait token it removed */ + GST_LOG_OBJECT (pool, "waiting for free buffers or flushing"); + gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE); + } else { + /* This is a critical error, GstPoll already gave a warning */ + result = GST_FLOW_ERROR; + break; + } + } else { + /* We're the first thread waiting, we got the wait token and have to + * write it again later + * OR + * We're a second thread and just consumed the flush token and block all + * other threads, in which case we must not wait and give it back + * immediately */ + if (!GST_BUFFER_POOL_IS_FLUSHING (pool)) { + GST_LOG_OBJECT (pool, "waiting for free buffers or flushing"); + gst_poll_wait (priv->poll, GST_CLOCK_TIME_NONE); + } + gst_poll_write_control (pool->priv->poll); + } } return result; @@ -1160,6 +1221,13 @@ default_reset_buffer (GstBufferPool * pool, GstBuffer * buffer) GST_BUFFER_OFFSET (buffer) = GST_BUFFER_OFFSET_NONE; GST_BUFFER_OFFSET_END (buffer) = GST_BUFFER_OFFSET_NONE; + /* if the memory is intact reset the size to the full size */ + if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_TAG_MEMORY)) { + gsize offset; + gst_buffer_get_sizes (buffer, &offset, NULL); + gst_buffer_resize (buffer, -offset, pool->priv->size); + } + /* remove all metadata without the POOLED flag */ gst_buffer_foreach_meta (buffer, remove_meta_unpooled, pool); }