element: Enforce that elements created by gst_element_factory_create/make() are floating
[platform/upstream/gstreamer.git] / gst / gstpromise.c
1 /* GStreamer
2  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "gst_private.h"
25
26 #include "gstpromise.h"
27
28 #define GST_CAT_DEFAULT gst_promise_debug
29 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
30
31 /**
32  * SECTION:gstpromise
33  * @title: GstPromise
34  * @short_description: a miniobject for future/promise-like functionality
35  * @see_also:
36  *
37  * The #GstPromise object implements the container for values that may
38  * be available later. i.e. a Future or a Promise in
39  * <ulink url="https://en.wikipedia.org/wiki/Futures_and_promises">https://en.wikipedia.org/wiki/Futures_and_promises</ulink>
40  * As with all Future/Promise-like functionality, there is the concept of the
41  * producer of the value and the consumer of the value.
42  *
43  * A #GstPromise is created with gst_promise_new() by the consumer and passed
44  * to the producer to avoid thread safety issues with the change callback.
45  * A #GstPromise can be replied to with a value (or an error) by the producer
46  * with gst_promise_reply(). gst_promise_interrupt() is for the consumer to
47  * indicate to the producer that the value is not needed anymore and producing
48  * that value can stop.  The @GST_PROMISE_RESULT_EXPIRED state set by a call
49  * to gst_promise_expire() indicates to the consumer that a value will never
50  * be produced and is intended to be called by a third party that implements
51  * some notion of message handling such as #GstBus.
52  * A callback can also be installed at #GstPromise creation for
53  * result changes with gst_promise_new_with_change_func().
54  * The change callback can be used to chain #GstPromises's together as in the
55  * following example.
56  * |[<!-- language="C" -->
57  * const GstStructure *reply;
58  * GstPromise *p;
59  * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
60  *   return; // interrupted or expired value
61  * reply = gst_promise_get_reply (promise);
62  * if (error in reply)
63  *   return; // propagate error
64  * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
65  * pass p to promise-using API
66  * ]|
67  *
68  * Each #GstPromise starts out with a #GstPromiseResult of
69  * %GST_PROMISE_RESULT_PENDING and only ever transitions once
70  * into one of the other #GstPromiseResult's.
71  *
72  * In order to support multi-threaded code, gst_promise_reply(),
73  * gst_promise_interrupt() and gst_promise_expire() may all be from
74  * different threads with some restrictions and the final result of the promise
75  * is whichever call is made first.  There are two restrictions on ordering:
76  *
77  * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
78  * after gst_promise_expire()
79  * 2. That gst_promise_reply() and gst_promise_interrupt()
80  * cannot be called twice.
81  *
82  * The change function set with gst_promise_new_with_change_func() is
83  * called directly from either the gst_promise_reply(),
84  * gst_promise_interrupt() or gst_promise_expire() and can be called
85  * from an arbitrary thread.  #GstPromise using APIs can restrict this to
86  * a single thread or a subset of threads but that is entirely up to the API
87  * that uses #GstPromise.
88  */
89
90 static const int immutable_structure_refcount = 2;
91
92 #define GST_PROMISE_REPLY(p)            (((GstPromiseImpl *)(p))->reply)
93 #define GST_PROMISE_RESULT(p)           (((GstPromiseImpl *)(p))->result)
94 #define GST_PROMISE_LOCK(p)             (&(((GstPromiseImpl *)(p))->lock))
95 #define GST_PROMISE_COND(p)             (&(((GstPromiseImpl *)(p))->cond))
96 #define GST_PROMISE_CHANGE_FUNC(p)      (((GstPromiseImpl *)(p))->change_func)
97 #define GST_PROMISE_CHANGE_DATA(p)      (((GstPromiseImpl *)(p))->user_data)
98 #define GST_PROMISE_CHANGE_NOTIFY(p)    (((GstPromiseImpl *)(p))->notify)
99
100 typedef struct
101 {
102   GstPromise promise;
103
104   GstPromiseResult result;
105   GstStructure *reply;
106
107   GMutex lock;
108   GCond cond;
109   GstPromiseChangeFunc change_func;
110   gpointer user_data;
111   GDestroyNotify notify;
112 } GstPromiseImpl;
113
114 /**
115  * gst_promise_wait:
116  * @promise: a #GstPromise
117  *
118  * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
119  * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
120  * immediately with the current result.
121  *
122  * Returns: the result of the promise
123  *
124  * Since: 1.14
125  */
126 GstPromiseResult
127 gst_promise_wait (GstPromise * promise)
128 {
129   GstPromiseResult ret;
130
131   g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
132
133   g_mutex_lock (GST_PROMISE_LOCK (promise));
134   ret = GST_PROMISE_RESULT (promise);
135
136   while (ret == GST_PROMISE_RESULT_PENDING) {
137     GST_LOG ("%p waiting", promise);
138     g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
139     ret = GST_PROMISE_RESULT (promise);
140   }
141   GST_LOG ("%p waited", promise);
142
143   g_mutex_unlock (GST_PROMISE_LOCK (promise));
144
145   return ret;
146 }
147
148 /**
149  * gst_promise_reply:
150  * @promise: (allow-none): a #GstPromise
151  * @s: (transfer full): a #GstStructure with the the reply contents
152  *
153  * Set a reply on @promise.  This will wake up any waiters with
154  * %GST_PROMISE_RESULT_REPLIED.  Called by the producer of the value to
155  * indicate success (or failure).
156  *
157  * If @promise has already been interrupted by the consumer, then this reply
158  * is not visible to the consumer.
159  *
160  * Since: 1.14
161  */
162 void
163 gst_promise_reply (GstPromise * promise, GstStructure * s)
164 {
165   GstPromiseChangeFunc change_func = NULL;
166   gpointer change_data = NULL;
167
168   /* Caller requested that no reply is necessary */
169   if (promise == NULL)
170     return;
171
172   g_mutex_lock (GST_PROMISE_LOCK (promise));
173   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
174       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
175     GstPromiseResult result = GST_PROMISE_RESULT (promise);
176     g_mutex_unlock (GST_PROMISE_LOCK (promise));
177     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
178         result == GST_PROMISE_RESULT_INTERRUPTED);
179   }
180
181   /* XXX: is this necessary and valid? */
182   if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
183     gst_structure_free (GST_PROMISE_REPLY (promise));
184
185   /* Only reply iff we are currently in pending */
186   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
187     if (s
188         && !gst_structure_set_parent_refcount (s,
189             (int *) &immutable_structure_refcount)) {
190       g_critical ("Input structure has a parent already!");
191       g_mutex_unlock (GST_PROMISE_LOCK (promise));
192       return;
193     }
194
195     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
196     GST_LOG ("%p replied", promise);
197
198     GST_PROMISE_REPLY (promise) = s;
199
200     change_func = GST_PROMISE_CHANGE_FUNC (promise);
201     change_data = GST_PROMISE_CHANGE_DATA (promise);
202   } else {
203     /* eat the value */
204     if (s)
205       gst_structure_free (s);
206   }
207
208   g_cond_broadcast (GST_PROMISE_COND (promise));
209   g_mutex_unlock (GST_PROMISE_LOCK (promise));
210
211   if (change_func)
212     change_func (promise, change_data);
213 }
214
215 /**
216  * gst_promise_get_reply:
217  * @promise: a #GstPromise
218  *
219  * Retrieve the reply set on @promise.  @promise must be in
220  * %GST_PROMISE_RESULT_REPLIED and the returned structure is owned by @promise
221  *
222  * Returns: (transfer none): The reply set on @promise
223  *
224  * Since: 1.14
225  */
226 const GstStructure *
227 gst_promise_get_reply (GstPromise * promise)
228 {
229   g_return_val_if_fail (promise != NULL, NULL);
230
231   g_mutex_lock (GST_PROMISE_LOCK (promise));
232   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
233     GstPromiseResult result = GST_PROMISE_RESULT (promise);
234     g_mutex_unlock (GST_PROMISE_LOCK (promise));
235     g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
236   }
237
238   g_mutex_unlock (GST_PROMISE_LOCK (promise));
239
240   return GST_PROMISE_REPLY (promise);
241 }
242
243 /**
244  * gst_promise_interrupt:
245  * @promise: a #GstPromise
246  *
247  * Interrupt waiting for a @promise.  This will wake up any waiters with
248  * %GST_PROMISE_RESULT_INTERRUPTED.  Called when the consumer does not want
249  * the value produced anymore.
250  *
251  * Since: 1.14
252  */
253 void
254 gst_promise_interrupt (GstPromise * promise)
255 {
256   GstPromiseChangeFunc change_func = NULL;
257   gpointer change_data = NULL;
258
259   g_return_if_fail (promise != NULL);
260
261   g_mutex_lock (GST_PROMISE_LOCK (promise));
262   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
263       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
264     GstPromiseResult result = GST_PROMISE_RESULT (promise);
265     g_mutex_unlock (GST_PROMISE_LOCK (promise));
266     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
267         result == GST_PROMISE_RESULT_REPLIED);
268   }
269   /* only interrupt if we are currently in pending */
270   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
271     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
272     g_cond_broadcast (GST_PROMISE_COND (promise));
273     GST_LOG ("%p interrupted", promise);
274
275     change_func = GST_PROMISE_CHANGE_FUNC (promise);
276     change_data = GST_PROMISE_CHANGE_DATA (promise);
277   }
278   g_mutex_unlock (GST_PROMISE_LOCK (promise));
279
280   if (change_func)
281     change_func (promise, change_data);
282 }
283
284 /**
285  * gst_promise_expire:
286  * @promise: a #GstPromise
287  *
288  * Expire a @promise.  This will wake up any waiters with
289  * %GST_PROMISE_RESULT_EXPIRED.  Called by a message loop when the parent
290  * message is handled and/or destroyed (possibly unanswered).
291  *
292  * Since: 1.14
293  */
294 void
295 gst_promise_expire (GstPromise * promise)
296 {
297   GstPromiseChangeFunc change_func = NULL;
298   gpointer change_data = NULL;
299
300   g_return_if_fail (promise != NULL);
301
302   g_mutex_lock (GST_PROMISE_LOCK (promise));
303   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
304     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
305     g_cond_broadcast (GST_PROMISE_COND (promise));
306     GST_LOG ("%p expired", promise);
307
308     change_func = GST_PROMISE_CHANGE_FUNC (promise);
309     change_data = GST_PROMISE_CHANGE_DATA (promise);
310     GST_PROMISE_CHANGE_FUNC (promise) = NULL;
311     GST_PROMISE_CHANGE_DATA (promise) = NULL;
312   }
313   g_mutex_unlock (GST_PROMISE_LOCK (promise));
314
315   if (change_func)
316     change_func (promise, change_data);
317 }
318
319 static void
320 gst_promise_free (GstMiniObject * object)
321 {
322   GstPromise *promise = (GstPromise *) object;
323
324   /* the promise *must* be dealt with in some way before destruction */
325   g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
326
327   if (GST_PROMISE_CHANGE_NOTIFY (promise))
328     GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
329
330   if (GST_PROMISE_REPLY (promise)) {
331     gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
332     gst_structure_free (GST_PROMISE_REPLY (promise));
333   }
334   g_mutex_clear (GST_PROMISE_LOCK (promise));
335   g_cond_clear (GST_PROMISE_COND (promise));
336   GST_LOG ("%p finalized", promise);
337
338 #ifdef USE_POISONING
339   memset (promise, 0xff, sizeof (GstPromiseImpl));
340 #endif
341
342   g_free (promise);
343 }
344
345 static void
346 gst_promise_init (GstPromise * promise)
347 {
348   static volatile gsize _init = 0;
349
350   if (g_once_init_enter (&_init)) {
351     GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
352     g_once_init_leave (&_init, 1);
353   }
354
355   gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
356       NULL, gst_promise_free);
357
358   GST_PROMISE_REPLY (promise) = NULL;
359   GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
360   g_mutex_init (GST_PROMISE_LOCK (promise));
361   g_cond_init (GST_PROMISE_COND (promise));
362 }
363
364 /**
365  * gst_promise_new:
366  *
367  * Returns: a new #GstPromise
368  *
369  * Since: 1.14
370  */
371 GstPromise *
372 gst_promise_new (void)
373 {
374   GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
375
376   gst_promise_init (promise);
377   GST_LOG ("new promise %p", promise);
378
379   return promise;
380 }
381
382 /**
383  * gst_promise_new_with_change_func:
384  * @func: (scope notified): a #GstPromiseChangeFunc to call
385  * @user_data: (closure): argument to call @func with
386  * @notify: notification function that @user_data is no longer needed
387  *
388  * @func will be called exactly once when transitioning out of
389  * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
390  * states.
391  *
392  * Returns: a new #GstPromise
393  *
394  * Since: 1.14
395  */
396 GstPromise *
397 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
398     GDestroyNotify notify)
399 {
400   GstPromise *promise = gst_promise_new ();
401
402   GST_PROMISE_CHANGE_FUNC (promise) = func;
403   GST_PROMISE_CHANGE_DATA (promise) = user_data;
404   GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
405
406   return promise;
407 }
408
409 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);