2 * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
24 #define GST_DISABLE_MINIOBJECT_INLINE_FUNCTIONS
25 #include "gst_private.h"
27 #include "gstpromise.h"
29 #define GST_CAT_DEFAULT gst_promise_debug
30 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
35 * @short_description: a miniobject for future/promise-like functionality
37 * The #GstPromise object implements the container for values that may
38 * be available later. i.e. a Future or a Promise in
39 * <https://en.wikipedia.org/wiki/Futures_and_promises>.
40 * As with all Future/Promise-like functionality, there is the concept of the
41 * producer of the value and the consumer of the value.
43 * A #GstPromise is created with gst_promise_new() by the consumer and passed
44 * to the producer to avoid thread safety issues with the change callback.
45 * A #GstPromise can be replied to with a value (or an error) by the producer
46 * with gst_promise_reply(). The exact value returned is defined by the API
47 * contract of the producer and %NULL may be a valid reply.
48 * gst_promise_interrupt() is for the consumer to
49 * indicate to the producer that the value is not needed anymore and producing
50 * that value can stop. The @GST_PROMISE_RESULT_EXPIRED state set by a call
51 * to gst_promise_expire() indicates to the consumer that a value will never
52 * be produced and is intended to be called by a third party that implements
53 * some notion of message handling such as #GstBus.
54 * A callback can also be installed at #GstPromise creation for
55 * result changes with gst_promise_new_with_change_func().
56 * The change callback can be used to chain #GstPromises's together as in the
58 * |[<!-- language="C" -->
59 * const GstStructure *reply;
61 * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
62 * return; // interrupted or expired value
63 * reply = gst_promise_get_reply (promise);
65 * return; // propagate error
66 * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
67 * pass p to promise-using API
70 * Each #GstPromise starts out with a #GstPromiseResult of
71 * %GST_PROMISE_RESULT_PENDING and only ever transitions once
72 * into one of the other #GstPromiseResult's.
74 * In order to support multi-threaded code, gst_promise_reply(),
75 * gst_promise_interrupt() and gst_promise_expire() may all be from
76 * different threads with some restrictions and the final result of the promise
77 * is whichever call is made first. There are two restrictions on ordering:
79 * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
80 * after gst_promise_expire()
81 * 2. That gst_promise_reply() and gst_promise_interrupt()
82 * cannot be called twice.
84 * The change function set with gst_promise_new_with_change_func() is
85 * called directly from either the gst_promise_reply(),
86 * gst_promise_interrupt() or gst_promise_expire() and can be called
87 * from an arbitrary thread. #GstPromise using APIs can restrict this to
88 * a single thread or a subset of threads but that is entirely up to the API
89 * that uses #GstPromise.
92 static const int immutable_structure_refcount = 2;
94 #define GST_PROMISE_REPLY(p) (((GstPromiseImpl *)(p))->reply)
95 #define GST_PROMISE_RESULT(p) (((GstPromiseImpl *)(p))->result)
96 #define GST_PROMISE_LOCK(p) (&(((GstPromiseImpl *)(p))->lock))
97 #define GST_PROMISE_COND(p) (&(((GstPromiseImpl *)(p))->cond))
98 #define GST_PROMISE_CHANGE_FUNC(p) (((GstPromiseImpl *)(p))->change_func)
99 #define GST_PROMISE_CHANGE_DATA(p) (((GstPromiseImpl *)(p))->user_data)
100 #define GST_PROMISE_CHANGE_NOTIFY(p) (((GstPromiseImpl *)(p))->notify)
106 GstPromiseResult result;
111 GstPromiseChangeFunc change_func;
113 GDestroyNotify notify;
118 * @promise: a #GstPromise
120 * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
121 * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
122 * immediately with the current result.
124 * Returns: the result of the promise
129 gst_promise_wait (GstPromise * promise)
131 GstPromiseResult ret;
133 g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
135 g_mutex_lock (GST_PROMISE_LOCK (promise));
136 ret = GST_PROMISE_RESULT (promise);
138 while (ret == GST_PROMISE_RESULT_PENDING) {
139 GST_LOG ("%p waiting", promise);
140 g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
141 ret = GST_PROMISE_RESULT (promise);
143 GST_LOG ("%p waited", promise);
145 g_mutex_unlock (GST_PROMISE_LOCK (promise));
152 * @promise: (allow-none): a #GstPromise
153 * @s: (transfer full) (nullable): a #GstStructure with the the reply contents
155 * Set a reply on @promise. This will wake up any waiters with
156 * %GST_PROMISE_RESULT_REPLIED. Called by the producer of the value to
157 * indicate success (or failure).
159 * If @promise has already been interrupted by the consumer, then this reply
160 * is not visible to the consumer.
165 gst_promise_reply (GstPromise * promise, GstStructure * s)
167 GstPromiseChangeFunc change_func = NULL;
168 gpointer change_data = NULL;
170 /* Caller requested that no reply is necessary */
174 g_mutex_lock (GST_PROMISE_LOCK (promise));
175 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
176 GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
177 GstPromiseResult result = GST_PROMISE_RESULT (promise);
178 g_mutex_unlock (GST_PROMISE_LOCK (promise));
179 g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
180 result == GST_PROMISE_RESULT_INTERRUPTED);
183 /* XXX: is this necessary and valid? */
184 if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
185 gst_structure_free (GST_PROMISE_REPLY (promise));
187 /* Only reply iff we are currently in pending */
188 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
190 && !gst_structure_set_parent_refcount (s,
191 (int *) &immutable_structure_refcount)) {
192 g_critical ("Input structure has a parent already!");
193 g_mutex_unlock (GST_PROMISE_LOCK (promise));
197 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
198 GST_LOG ("%p replied", promise);
200 GST_PROMISE_REPLY (promise) = s;
202 change_func = GST_PROMISE_CHANGE_FUNC (promise);
203 change_data = GST_PROMISE_CHANGE_DATA (promise);
207 gst_structure_free (s);
210 g_cond_broadcast (GST_PROMISE_COND (promise));
211 g_mutex_unlock (GST_PROMISE_LOCK (promise));
214 change_func (promise, change_data);
218 * gst_promise_get_reply:
219 * @promise: a #GstPromise
221 * Retrieve the reply set on @promise. @promise must be in
222 * %GST_PROMISE_RESULT_REPLIED and the returned structure is owned by @promise
224 * Returns: (transfer none) (nullable): The reply set on @promise
229 gst_promise_get_reply (GstPromise * promise)
231 g_return_val_if_fail (promise != NULL, NULL);
233 g_mutex_lock (GST_PROMISE_LOCK (promise));
234 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
235 GstPromiseResult result = GST_PROMISE_RESULT (promise);
236 g_mutex_unlock (GST_PROMISE_LOCK (promise));
237 g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
240 g_mutex_unlock (GST_PROMISE_LOCK (promise));
242 return GST_PROMISE_REPLY (promise);
246 * gst_promise_interrupt:
247 * @promise: a #GstPromise
249 * Interrupt waiting for a @promise. This will wake up any waiters with
250 * %GST_PROMISE_RESULT_INTERRUPTED. Called when the consumer does not want
251 * the value produced anymore.
256 gst_promise_interrupt (GstPromise * promise)
258 GstPromiseChangeFunc change_func = NULL;
259 gpointer change_data = NULL;
261 g_return_if_fail (promise != NULL);
263 g_mutex_lock (GST_PROMISE_LOCK (promise));
264 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
265 GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
266 GstPromiseResult result = GST_PROMISE_RESULT (promise);
267 g_mutex_unlock (GST_PROMISE_LOCK (promise));
268 g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
269 result == GST_PROMISE_RESULT_REPLIED);
271 /* only interrupt if we are currently in pending */
272 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
273 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
274 g_cond_broadcast (GST_PROMISE_COND (promise));
275 GST_LOG ("%p interrupted", promise);
277 change_func = GST_PROMISE_CHANGE_FUNC (promise);
278 change_data = GST_PROMISE_CHANGE_DATA (promise);
280 g_mutex_unlock (GST_PROMISE_LOCK (promise));
283 change_func (promise, change_data);
287 * gst_promise_expire:
288 * @promise: a #GstPromise
290 * Expire a @promise. This will wake up any waiters with
291 * %GST_PROMISE_RESULT_EXPIRED. Called by a message loop when the parent
292 * message is handled and/or destroyed (possibly unanswered).
297 gst_promise_expire (GstPromise * promise)
299 GstPromiseChangeFunc change_func = NULL;
300 gpointer change_data = NULL;
302 g_return_if_fail (promise != NULL);
304 g_mutex_lock (GST_PROMISE_LOCK (promise));
305 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
306 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
307 g_cond_broadcast (GST_PROMISE_COND (promise));
308 GST_LOG ("%p expired", promise);
310 change_func = GST_PROMISE_CHANGE_FUNC (promise);
311 change_data = GST_PROMISE_CHANGE_DATA (promise);
312 GST_PROMISE_CHANGE_FUNC (promise) = NULL;
313 GST_PROMISE_CHANGE_DATA (promise) = NULL;
315 g_mutex_unlock (GST_PROMISE_LOCK (promise));
318 change_func (promise, change_data);
322 gst_promise_free (GstMiniObject * object)
324 GstPromise *promise = (GstPromise *) object;
326 /* the promise *must* be dealt with in some way before destruction */
327 g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
329 if (GST_PROMISE_CHANGE_NOTIFY (promise))
330 GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
332 if (GST_PROMISE_REPLY (promise)) {
333 gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
334 gst_structure_free (GST_PROMISE_REPLY (promise));
336 g_mutex_clear (GST_PROMISE_LOCK (promise));
337 g_cond_clear (GST_PROMISE_COND (promise));
338 GST_LOG ("%p finalized", promise);
341 memset (promise, 0xff, sizeof (GstPromiseImpl));
348 gst_promise_init (GstPromise * promise)
350 static gsize _init = 0;
352 if (g_once_init_enter (&_init)) {
353 GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
354 g_once_init_leave (&_init, 1);
357 gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
358 NULL, gst_promise_free);
360 GST_PROMISE_REPLY (promise) = NULL;
361 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
362 g_mutex_init (GST_PROMISE_LOCK (promise));
363 g_cond_init (GST_PROMISE_COND (promise));
369 * Returns: a new #GstPromise
374 gst_promise_new (void)
376 GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
378 gst_promise_init (promise);
379 GST_LOG ("new promise %p", promise);
385 * gst_promise_new_with_change_func:
386 * @func: (scope notified): a #GstPromiseChangeFunc to call
387 * @user_data: (closure): argument to call @func with
388 * @notify: notification function that @user_data is no longer needed
390 * @func will be called exactly once when transitioning out of
391 * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
394 * Returns: a new #GstPromise
399 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
400 GDestroyNotify notify)
402 GstPromise *promise = gst_promise_new ();
404 GST_PROMISE_CHANGE_FUNC (promise) = func;
405 GST_PROMISE_CHANGE_DATA (promise) = user_data;
406 GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
411 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);
415 * @promise: a #GstPromise.
417 * Increases the refcount of the given @promise by one.
419 * Returns: (transfer full): @promise
424 gst_promise_ref (GstPromise * promise)
426 return (GstPromise *) gst_mini_object_ref (GST_MINI_OBJECT_CAST (promise));
431 * @promise: (transfer full): a #GstPromise.
433 * Decreases the refcount of the promise. If the refcount reaches 0, the
434 * promise will be freed.
439 gst_promise_unref (GstPromise * promise)
441 gst_mini_object_unref (GST_MINI_OBJECT_CAST (promise));