aa445695406a87f68bed43ca5c8af15cd221d3e0
[platform/upstream/gstreamer.git] / gst / gstbufferpool.c
1 /* GStreamer
2  * Copyright (C) 2010 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbufferpool.c: GstBufferPool baseclass
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbufferpool
24  * @short_description: Pool for buffers
25  * @see_also: #GstBuffer
26  *
27  */
28
29 #include "gst_private.h"
30
31 #include <errno.h>
32 #ifdef HAVE_UNISTD_H
33 #  include <unistd.h>
34 #endif
35 #include <sys/types.h>
36
37 #include "gstinfo.h"
38 #include "gstquark.h"
39
40 #include "gstbufferpool.h"
41
42
43 #define GST_BUFFER_POOL_GET_PRIVATE(obj)  \
44    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BUFFER_POOL, GstBufferPoolPrivate))
45
46 #define GST_BUFFER_POOL_LOCK(pool)   (g_static_rec_mutex_lock(&pool->priv->rec_lock))
47 #define GST_BUFFER_POOL_UNLOCK(pool) (g_static_rec_mutex_unlock(&pool->priv->rec_lock))
48
49 struct _GstBufferPoolPrivate
50 {
51   GStaticRecMutex rec_lock;
52   guint size;
53   guint min_buffers;
54   guint max_buffers;
55   guint prefix;
56   guint postfix;
57   guint align;
58 };
59
60 enum
61 {
62   /* add more above */
63   LAST_SIGNAL
64 };
65
66 static void gst_buffer_pool_finalize (GObject * object);
67
68 G_DEFINE_TYPE (GstBufferPool, gst_buffer_pool, GST_TYPE_OBJECT);
69
70 static gboolean default_start (GstBufferPool * pool);
71 static gboolean default_stop (GstBufferPool * pool);
72 static gboolean default_set_config (GstBufferPool * pool,
73     GstStructure * config);
74 static GstFlowReturn default_alloc_buffer (GstBufferPool * pool,
75     GstBuffer ** buffer, GstBufferPoolParams * params);
76 static GstFlowReturn default_acquire_buffer (GstBufferPool * pool,
77     GstBuffer ** buffer, GstBufferPoolParams * params);
78 static void default_free_buffer (GstBufferPool * pool, GstBuffer * buffer);
79 static void default_release_buffer (GstBufferPool * pool, GstBuffer * buffer);
80
81 static void
82 gst_buffer_pool_class_init (GstBufferPoolClass * klass)
83 {
84   GObjectClass *gobject_class = (GObjectClass *) klass;
85
86   g_type_class_add_private (klass, sizeof (GstBufferPoolPrivate));
87
88   gobject_class->finalize = gst_buffer_pool_finalize;
89
90   klass->start = default_start;
91   klass->stop = default_stop;
92   klass->set_config = default_set_config;
93   klass->acquire_buffer = default_acquire_buffer;
94   klass->alloc_buffer = default_alloc_buffer;
95   klass->release_buffer = default_release_buffer;
96   klass->free_buffer = default_free_buffer;
97 }
98
99 static void
100 gst_buffer_pool_init (GstBufferPool * pool)
101 {
102   pool->priv = GST_BUFFER_POOL_GET_PRIVATE (pool);
103
104   g_static_rec_mutex_init (&pool->priv->rec_lock);
105
106   pool->poll = gst_poll_new_timer ();
107   pool->queue = gst_atomic_queue_new (10);
108   pool->flushing = TRUE;
109   pool->active = FALSE;
110   pool->configured = FALSE;
111   pool->started = FALSE;
112   pool->config = gst_structure_id_empty_new (GST_QUARK (BUFFER_POOL_CONFIG));
113   gst_buffer_pool_config_set (pool->config, NULL, 0, 0, 0, 0, 0, 1);
114
115   GST_DEBUG_OBJECT (pool, "created");
116 }
117
118 static void
119 gst_buffer_pool_finalize (GObject * object)
120 {
121   GstBufferPool *pool;
122
123   pool = GST_BUFFER_POOL_CAST (object);
124
125   GST_DEBUG_OBJECT (pool, "finalize");
126
127   gst_buffer_pool_set_active (pool, FALSE);
128   gst_atomic_queue_unref (pool->queue);
129   gst_poll_free (pool->poll);
130   gst_structure_free (pool->config);
131   g_static_rec_mutex_free (&pool->priv->rec_lock);
132
133   G_OBJECT_CLASS (gst_buffer_pool_parent_class)->finalize (object);
134 }
135
136 /**
137  * gst_buffer_pool_new:
138  *
139  * Creates a new #GstBufferPool instance.
140  *
141  * Returns: a new #GstBufferPool instance
142  */
143 GstBufferPool *
144 gst_buffer_pool_new (void)
145 {
146   GstBufferPool *result;
147
148   result = g_object_newv (GST_TYPE_BUFFER_POOL, 0, NULL);
149   GST_DEBUG_OBJECT (result, "created new buffer pool");
150
151   return result;
152 }
153
154 static GstFlowReturn
155 default_alloc_buffer (GstBufferPool * pool, GstBuffer ** buffer,
156     GstBufferPoolParams * params)
157 {
158   guint size, align;
159   GstBufferPoolPrivate *priv = pool->priv;
160
161   *buffer = gst_buffer_new ();
162
163   align = priv->align - 1;
164   size = priv->prefix + priv->postfix + priv->size + align;
165   if (size > 0) {
166     guint8 *memptr;
167
168     memptr = g_malloc (size);
169     GST_BUFFER_MALLOCDATA (*buffer) = memptr;
170     memptr = (guint8 *) ((guintptr) (memptr + align) & ~align);
171     GST_BUFFER_DATA (*buffer) = memptr + priv->prefix;
172     GST_BUFFER_SIZE (*buffer) = priv->size;
173   }
174
175   return GST_FLOW_OK;
176 }
177
178 /* the default implementation for preallocating the buffers
179  * in the pool */
180 static gboolean
181 default_start (GstBufferPool * pool)
182 {
183   guint i;
184   GstBufferPoolPrivate *priv = pool->priv;
185   GstBufferPoolClass *pclass;
186
187   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
188
189   /* no alloc function, error */
190   if (G_UNLIKELY (pclass->alloc_buffer == NULL))
191     goto no_alloc;
192
193   /* we need to prealloc buffers */
194   for (i = 0; i < priv->min_buffers; i++) {
195     GstBuffer *buffer;
196
197     if (pclass->alloc_buffer (pool, &buffer, NULL) != GST_FLOW_OK)
198       goto alloc_failed;
199
200     GST_LOG_OBJECT (pool, "prealloced buffer %d: %p", i, buffer);
201     /* store in the queue */
202     gst_atomic_queue_push (pool->queue, buffer);
203     gst_poll_write_control (pool->poll);
204   }
205   return TRUE;
206
207   /* ERRORS */
208 no_alloc:
209   {
210     GST_WARNING_OBJECT (pool, "no alloc function");
211     return FALSE;
212   }
213 alloc_failed:
214   {
215     GST_WARNING_OBJECT (pool, "alloc function failed");
216     return FALSE;
217   }
218 }
219
220 /* must be called with the lock */
221 static gboolean
222 do_start (GstBufferPool * pool)
223 {
224   if (!pool->started) {
225     GstBufferPoolClass *pclass;
226
227     pclass = GST_BUFFER_POOL_GET_CLASS (pool);
228
229     GST_LOG_OBJECT (pool, "starting");
230     /* start the pool, subclasses should allocate buffers and put them
231      * in the queue */
232     if (G_LIKELY (pclass->start)) {
233       if (!pclass->start (pool))
234         return FALSE;
235     }
236     pool->started = TRUE;
237   }
238   return TRUE;
239 }
240
241
242 static void
243 default_free_buffer (GstBufferPool * pool, GstBuffer * buffer)
244 {
245   gst_buffer_unref (buffer);
246 }
247
248 /* must be called with the lock */
249 static gboolean
250 default_stop (GstBufferPool * pool)
251 {
252   GstBuffer *buffer;
253   GstBufferPoolClass *pclass;
254
255   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
256
257   /* clear the pool */
258   while ((buffer = gst_atomic_queue_pop (pool->queue))) {
259     gst_poll_read_control (pool->poll);
260
261     if (G_LIKELY (pclass->free_buffer))
262       pclass->free_buffer (pool, buffer);
263   }
264   return TRUE;
265 }
266
267 /* must be called with the lock */
268 static gboolean
269 do_stop (GstBufferPool * pool)
270 {
271   if (pool->started) {
272     GstBufferPoolClass *pclass;
273
274     pclass = GST_BUFFER_POOL_GET_CLASS (pool);
275
276     GST_LOG_OBJECT (pool, "stopping");
277     if (G_LIKELY (pclass->stop)) {
278       if (!pclass->stop (pool))
279         return FALSE;
280     }
281     pool->started = FALSE;
282   }
283   return TRUE;
284 }
285
286 /**
287  * gst_buffer_pool_set_active:
288  * @pool: a #GstBufferPool
289  * @active: the new active state
290  *
291  * Control the active state of @pool. When the pool is active, new calls to
292  * gst_buffer_pool_acquire_buffer() will return with GST_FLOW_WRONG_STATE.
293  *
294  * Returns: %FALSE when the pool was not configured or when preallocation of the
295  * buffers failed.
296  */
297 gboolean
298 gst_buffer_pool_set_active (GstBufferPool * pool, gboolean active)
299 {
300   GstBufferPoolClass *pclass;
301   gboolean res = TRUE;
302
303   g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
304
305   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
306
307   GST_LOG_OBJECT (pool, "active %d", active);
308
309   GST_BUFFER_POOL_LOCK (pool);
310   /* just return if we are already in the right state */
311   if (pool->active == active)
312     goto was_ok;
313
314   /* we need to be configured */
315   if (!pool->configured)
316     goto not_configured;
317
318   if (active) {
319     if (!do_start (pool))
320       goto start_failed;
321
322     /* unset the flushing state now */
323     gst_poll_read_control (pool->poll);
324     g_atomic_int_set (&pool->flushing, FALSE);
325   } else {
326     /* set to flushing first */
327     g_atomic_int_set (&pool->flushing, TRUE);
328     gst_poll_write_control (pool->poll);
329
330     /* when all buffers are in the pool, free them. Else they will be
331      * freed when they are released */
332     if (g_atomic_int_get (&pool->outstanding) == 0) {
333       if (!do_stop (pool))
334         goto stop_failed;
335     }
336   }
337   pool->active = active;
338   GST_BUFFER_POOL_UNLOCK (pool);
339
340   return res;
341
342 was_ok:
343   {
344     GST_DEBUG_OBJECT (pool, "pool was in the right state");
345     GST_BUFFER_POOL_UNLOCK (pool);
346     return TRUE;
347   }
348 not_configured:
349   {
350     GST_ERROR_OBJECT (pool, "pool was not configured");
351     GST_BUFFER_POOL_UNLOCK (pool);
352     return FALSE;
353   }
354 start_failed:
355   {
356     GST_ERROR_OBJECT (pool, "start failed");
357     GST_BUFFER_POOL_UNLOCK (pool);
358     return FALSE;
359   }
360 stop_failed:
361   {
362     GST_WARNING_OBJECT (pool, "stop failed");
363     GST_BUFFER_POOL_UNLOCK (pool);
364     return FALSE;
365   }
366 }
367
368 static gboolean
369 default_set_config (GstBufferPool * pool, GstStructure * config)
370 {
371   GstBufferPoolPrivate *priv = pool->priv;
372   const GstCaps *caps;
373   guint size, min_buffers, max_buffers;
374   guint prefix, postfix, align;
375
376   /* parse the config and keep around */
377   if (!gst_buffer_pool_config_get (config, &caps, &size, &min_buffers,
378           &max_buffers, &prefix, &postfix, &align))
379     goto wrong_config;
380
381   priv->size = size;
382   priv->min_buffers = min_buffers;
383   priv->max_buffers = max_buffers;
384   priv->prefix = prefix;
385   priv->postfix = postfix;
386   priv->align = align;
387
388   return TRUE;
389
390 wrong_config:
391   {
392     GST_WARNING_OBJECT (pool, "invalid config");
393     return FALSE;
394   }
395 }
396
397 /**
398  * gst_buffer_pool_set_config:
399  * @pool: a #GstBufferPool
400  * @config: a #GstStructure
401  *
402  * Set the configuration of the pool. The pool must be inactive and all buffers
403  * allocated form this pool must be returned or else this function will do
404  * nothing and return FALSE.
405  *
406  * @condfig is a #GstStructure that contains the configuration parameters for
407  * the pool. A default and mandatory set of parameters can be configured with
408  * gst_buffer_pool_config_set(). This function takes ownership of @config.
409  *
410  * Returns: TRUE when the configuration could be set.
411  */
412 gboolean
413 gst_buffer_pool_set_config (GstBufferPool * pool, GstStructure * config)
414 {
415   gboolean result;
416   GstBufferPoolClass *pclass;
417
418   g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), FALSE);
419   g_return_val_if_fail (config != NULL, FALSE);
420
421   GST_BUFFER_POOL_LOCK (pool);
422   /* can't change the settings when active */
423   if (pool->active)
424     goto was_active;
425
426   /* we can't change when outstanding buffers */
427   if (g_atomic_int_get (&pool->outstanding) != 0)
428     goto have_outstanding;
429
430   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
431
432   /* set the new config */
433   if (G_LIKELY (pclass->set_config))
434     result = pclass->set_config (pool, config);
435   else
436     result = FALSE;
437
438   if (result) {
439     if (pool->config)
440       gst_structure_free (pool->config);
441     pool->config = config;
442
443     /* now we are configured */
444     pool->configured = TRUE;
445   }
446   GST_BUFFER_POOL_UNLOCK (pool);
447
448   return result;
449
450   /* ERRORS */
451 was_active:
452   {
453     GST_WARNING_OBJECT (pool, "can't change config, we are active");
454     GST_BUFFER_POOL_UNLOCK (pool);
455     return FALSE;
456   }
457 have_outstanding:
458   {
459     GST_WARNING_OBJECT (pool, "can't change config, have outstanding buffers");
460     GST_BUFFER_POOL_UNLOCK (pool);
461     return FALSE;
462   }
463 }
464
465 /**
466  * gst_buffer_pool_get_config:
467  * @pool: a #GstBufferPool
468  *
469  * Get a copy of the current configuration of the pool. This configuration
470  * can either be modified and used for the gst_buffer_pool_set_config() call
471  * or it must be freed after usage.
472  *
473  * Returns: a copy of the current configuration of @pool. use
474  * gst_structure_free() after usage or gst_buffer_pool_set_config().
475  */
476 GstStructure *
477 gst_buffer_pool_get_config (GstBufferPool * pool)
478 {
479   GstStructure *result;
480
481   g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), NULL);
482
483   GST_BUFFER_POOL_UNLOCK (pool);
484   result = gst_structure_copy (pool->config);
485   GST_BUFFER_POOL_UNLOCK (pool);
486
487   return result;
488 }
489
490 /**
491  * gst_buffer_pool_config_set:
492  * @pool: a #GstBufferPool
493  * @size: the size of each buffer, not including pre and post fix
494  * @min_buffers: the minimum amount of buffers to allocate.
495  * @max_buffers: the maximum amount of buffers to allocate or 0 for unlimited.
496  * @prefix: prefix each buffer with this many bytes
497  * @postfix: postfix each buffer with this many bytes
498  * @align: alignment of the buffer data.
499  *
500  * Configure @config with the given parameters.
501  */
502 void
503 gst_buffer_pool_config_set (GstStructure * config, const GstCaps * caps,
504     guint size, guint min_buffers, guint max_buffers, guint prefix,
505     guint postfix, guint align)
506 {
507   g_return_if_fail (config != NULL);
508
509   gst_structure_id_set (config,
510       GST_QUARK (CAPS), GST_TYPE_CAPS, caps,
511       GST_QUARK (SIZE), G_TYPE_UINT, size,
512       GST_QUARK (MIN_BUFFERS), G_TYPE_UINT, min_buffers,
513       GST_QUARK (MAX_BUFFERS), G_TYPE_UINT, max_buffers,
514       GST_QUARK (PREFIX), G_TYPE_UINT, prefix,
515       GST_QUARK (POSTFIX), G_TYPE_UINT, postfix,
516       GST_QUARK (ALIGN), G_TYPE_UINT, align, NULL);
517 }
518
519 /**
520  * gst_buffer_pool_config_get:
521  * @pool: a #GstBufferPool
522  * @size: the size of each buffer, not including pre and post fix
523  * @min_buffers: the minimum amount of buffers to allocate.
524  * @max_buffers: the maximum amount of buffers to allocate or 0 for unlimited.
525  * @prefix: prefix each buffer with this many bytes
526  * @postfix: postfix each buffer with this many bytes
527  * @align: alignment of the buffer data.
528  *
529  * Get the configuration values from @config.
530  */
531 gboolean
532 gst_buffer_pool_config_get (GstStructure * config, const GstCaps ** caps,
533     guint * size, guint * min_buffers, guint * max_buffers, guint * prefix,
534     guint * postfix, guint * align)
535 {
536   g_return_val_if_fail (config != NULL, FALSE);
537
538   return gst_structure_id_get (config,
539       GST_QUARK (CAPS), GST_TYPE_CAPS, caps,
540       GST_QUARK (SIZE), G_TYPE_UINT, size,
541       GST_QUARK (MIN_BUFFERS), G_TYPE_UINT, min_buffers,
542       GST_QUARK (MAX_BUFFERS), G_TYPE_UINT, max_buffers,
543       GST_QUARK (PREFIX), G_TYPE_UINT, prefix,
544       GST_QUARK (POSTFIX), G_TYPE_UINT, postfix,
545       GST_QUARK (ALIGN), G_TYPE_UINT, align, NULL);
546 }
547
548 static GstFlowReturn
549 default_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
550     GstBufferPoolParams * params)
551 {
552   GstFlowReturn result;
553   GstBufferPoolClass *pclass;
554   GstBufferPoolPrivate *priv = pool->priv;
555
556   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
557
558   while (TRUE) {
559     if (G_UNLIKELY (g_atomic_int_get (&pool->flushing)))
560       return GST_FLOW_WRONG_STATE;
561
562     /* try to get a buffer from the queue */
563     *buffer = gst_atomic_queue_pop (pool->queue);
564     if (G_LIKELY (*buffer)) {
565       gst_poll_read_control (pool->poll);
566       result = GST_FLOW_OK;
567       break;
568     }
569
570     /* no buffer */
571     if (priv->max_buffers == 0) {
572       /* no max_buffers, we allocate some more */
573       if (G_LIKELY (pclass->alloc_buffer)) {
574         result = pclass->alloc_buffer (pool, buffer, params);
575       } else
576         result = GST_FLOW_NOT_SUPPORTED;
577       break;
578     }
579
580     /* check if we need to wait */
581     if (params && !(params->flags & GST_BUFFER_POOL_FLAG_WAIT)) {
582       result = GST_FLOW_UNEXPECTED;
583       break;
584     }
585
586     /* now wait */
587     gst_poll_wait (pool->poll, GST_CLOCK_TIME_NONE);
588   }
589
590   return result;
591 }
592
593 static inline void
594 dec_outstanding (GstBufferPool * pool)
595 {
596   if (g_atomic_int_dec_and_test (&pool->outstanding)) {
597     /* all buffers are returned to the pool, see if we need to free them */
598     if (g_atomic_int_get (&pool->flushing)) {
599       /* take the lock so that set_active is not run concurrently */
600       GST_BUFFER_POOL_LOCK (pool);
601       /* recheck the flushing state in the lock, the pool could have been
602        * set to active again */
603       if (g_atomic_int_get (&pool->flushing))
604         do_stop (pool);
605
606       GST_BUFFER_POOL_UNLOCK (pool);
607     }
608   }
609 }
610
611 /**
612  * gst_buffer_pool_acquire_buffer:
613  * @pool: a #GstBufferPool
614  * @buffer: a location for a #GstBuffer
615  * @params: parameters.
616  *
617  * Acquire a buffer from @pool. @buffer should point to a memory location that
618  * can hold a pointer to the new buffer.
619  *
620  * @params can be NULL or contain optional parameters to influence the allocation.
621  *
622  * Returns: a #GstFlowReturn such as GST_FLOW_WRONG_STATE when the pool is
623  * inactive.
624  */
625 GstFlowReturn
626 gst_buffer_pool_acquire_buffer (GstBufferPool * pool, GstBuffer ** buffer,
627     GstBufferPoolParams * params)
628 {
629   GstBufferPoolClass *pclass;
630   GstFlowReturn result;
631
632   g_return_val_if_fail (GST_IS_BUFFER_POOL (pool), GST_FLOW_ERROR);
633   g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
634
635   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
636
637   /* assume we'll have one more outstanding buffer we need to do that so
638    * that concurrent set_active doesn't clear the buffers */
639   g_atomic_int_inc (&pool->outstanding);
640
641   if (G_LIKELY (pclass->acquire_buffer))
642     result = pclass->acquire_buffer (pool, buffer, params);
643   else
644     result = GST_FLOW_NOT_SUPPORTED;
645
646   if (G_LIKELY (result == GST_FLOW_OK)) {
647     /* all buffers from the pool point to the pool and have the refcount of the
648      * pool incremented */
649     (*buffer)->pool = gst_object_ref (pool);
650   } else {
651     dec_outstanding (pool);
652   }
653
654   return result;
655 }
656
657 static void
658 default_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
659 {
660   /* keep it around in our queue */
661   gst_atomic_queue_push (pool->queue, buffer);
662   gst_poll_write_control (pool->poll);
663 }
664
665 /**
666  * gst_buffer_pool_release_buffer:
667  * @pool: a #GstBufferPool
668  * @buffer: a #GstBuffer
669  *
670  * Release @buffer to @pool. @buffer should have previously been allocated from
671  * @pool with gst_buffer_pool_acquire_buffer().
672  *
673  * This function is usually called automatically when the last ref on @buffer
674  * disappears.
675  */
676 void
677 gst_buffer_pool_release_buffer (GstBufferPool * pool, GstBuffer * buffer)
678 {
679   GstBufferPoolClass *pclass;
680
681   g_return_if_fail (GST_IS_BUFFER_POOL (pool));
682   g_return_if_fail (buffer != NULL);
683
684   /* check that the buffer is ours, all buffers returned to the pool have the
685    * pool member set to NULL and the pool refcount decreased */
686   if (!g_atomic_pointer_compare_and_exchange ((gpointer *) & buffer->pool,
687           pool, NULL))
688     return;
689
690   pclass = GST_BUFFER_POOL_GET_CLASS (pool);
691
692   if (G_LIKELY (pclass->release_buffer))
693     pclass->release_buffer (pool, buffer);
694
695   dec_outstanding (pool);
696
697   /* decrease the refcount that the buffer had to us */
698   gst_object_unref (pool);
699 }