2 * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
4 * This library is free software; you can redistribute it and/or
5 * modify it under the terms of the GNU Library General Public
6 * License as published by the Free Software Foundation; either
7 * version 2 of the License, or (at your option) any later version.
9 * This library is distributed in the hope that it will be useful,
10 * but WITHOUT ANY WARRANTY; without even the implied warranty of
11 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
12 * Library General Public License for more details.
14 * You should have received a copy of the GNU Library General Public
15 * License along with this library; if not, write to the
16 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17 * Boston, MA 02110-1301, USA.
24 #include "gst_private.h"
26 #include "gstpromise.h"
28 #define GST_CAT_DEFAULT gst_promise_debug
29 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
34 * @short_description: a miniobject for future/promise-like functionality
37 * The #GstPromise object implements the container for values that may
38 * be available later. i.e. a Future or a Promise in
39 * <ulink url="https://en.wikipedia.org/wiki/Futures_and_promises">https://en.wikipedia.org/wiki/Futures_and_promises</ulink>
40 * As with all Future/Promise-like functionality, there is the concept of the
41 * producer of the value and the consumer of the value.
43 * A #GstPromise is created with gst_promise_new() by the consumer and passed
44 * to the producer to avoid thread safety issues with the change callback.
45 * A #GstPromise can be replied to with a value (or an error) by the producer
46 * with gst_promise_reply(). gst_promise_interrupt() is for the consumer to
47 * indicate to the producer that the value is not needed anymore and producing
48 * that value can stop. The @GST_PROMISE_RESULT_EXPIRED state set by a call
49 * to gst_promise_expire() indicates to the consumer that a value will never
50 * be produced and is intended to be called by a third party that implements
51 * some notion of message handling such as #GstBus.
52 * A callback can also be installed at #GstPromise creation for
53 * result changes with gst_promise_new_with_change_func().
54 * The change callback can be used to chain #GstPromises's together as in the
56 * |[<!-- language="C" -->
57 * const GstStructure *reply;
59 * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
60 * return; // interrupted or expired value
61 * reply = gst_promise_get_reply (promise);
63 * return; // propagate error
64 * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
65 * pass p to promise-using API
68 * Each #GstPromise starts out with a #GstPromiseResult of
69 * %GST_PROMISE_RESULT_PENDING and only ever transitions once
70 * into one of the other #GstPromiseResult's.
72 * In order to support multi-threaded code, gst_promise_reply(),
73 * gst_promise_interrupt() and gst_promise_expire() may all be from
74 * different threads with some restrictions and the final result of the promise
75 * is whichever call is made first. There are two restrictions on ordering:
77 * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
78 * after gst_promise_expire()
79 * 2. That gst_promise_reply() and gst_promise_interrupt()
80 * cannot be called twice.
82 * The change function set with gst_promise_new_with_change_func() is
83 * called directly from either the gst_promise_reply(),
84 * gst_promise_interrupt() or gst_promise_expire() and can be called
85 * from an arbitrary thread. #GstPromise using APIs can restrict this to
86 * a single thread or a subset of threads but that is entirely up to the API
87 * that uses #GstPromise.
90 static const int immutable_structure_refcount = 2;
92 #define GST_PROMISE_REPLY(p) (((GstPromiseImpl *)(p))->reply)
93 #define GST_PROMISE_RESULT(p) (((GstPromiseImpl *)(p))->result)
94 #define GST_PROMISE_LOCK(p) (&(((GstPromiseImpl *)(p))->lock))
95 #define GST_PROMISE_COND(p) (&(((GstPromiseImpl *)(p))->cond))
96 #define GST_PROMISE_CHANGE_FUNC(p) (((GstPromiseImpl *)(p))->change_func)
97 #define GST_PROMISE_CHANGE_DATA(p) (((GstPromiseImpl *)(p))->user_data)
98 #define GST_PROMISE_CHANGE_NOTIFY(p) (((GstPromiseImpl *)(p))->notify)
104 GstPromiseResult result;
109 GstPromiseChangeFunc change_func;
111 GDestroyNotify notify;
116 * @promise: a #GstPromise
118 * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
119 * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
120 * immediately with the current result.
122 * Returns: the result of the promise
127 gst_promise_wait (GstPromise * promise)
129 GstPromiseResult ret;
131 g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
133 g_mutex_lock (GST_PROMISE_LOCK (promise));
134 ret = GST_PROMISE_RESULT (promise);
136 while (ret == GST_PROMISE_RESULT_PENDING) {
137 GST_LOG ("%p waiting", promise);
138 g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
139 ret = GST_PROMISE_RESULT (promise);
141 GST_LOG ("%p waited", promise);
143 g_mutex_unlock (GST_PROMISE_LOCK (promise));
150 * @promise: (allow-none): a #GstPromise
151 * @s: (transfer full): a #GstStructure with the the reply contents
153 * Set a reply on @promise. This will wake up any waiters with
154 * %GST_PROMISE_RESULT_REPLIED. Called by the producer of the value to
155 * indicate success (or failure).
157 * If @promise has already been interrupted by the consumer, then this reply
158 * is not visible to the consumer.
163 gst_promise_reply (GstPromise * promise, GstStructure * s)
165 GstPromiseChangeFunc change_func = NULL;
166 gpointer change_data = NULL;
168 /* Caller requested that no reply is necessary */
172 g_mutex_lock (GST_PROMISE_LOCK (promise));
173 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
174 GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
175 GstPromiseResult result = GST_PROMISE_RESULT (promise);
176 g_mutex_unlock (GST_PROMISE_LOCK (promise));
177 g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
178 result == GST_PROMISE_RESULT_INTERRUPTED);
181 /* XXX: is this necessary and valid? */
182 if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
183 gst_structure_free (GST_PROMISE_REPLY (promise));
185 /* Only reply iff we are currently in pending */
186 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
188 && !gst_structure_set_parent_refcount (s,
189 (int *) &immutable_structure_refcount)) {
190 g_critical ("Input structure has a parent already!");
191 g_mutex_unlock (GST_PROMISE_LOCK (promise));
195 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
196 GST_LOG ("%p replied", promise);
198 GST_PROMISE_REPLY (promise) = s;
200 change_func = GST_PROMISE_CHANGE_FUNC (promise);
201 change_data = GST_PROMISE_CHANGE_DATA (promise);
205 gst_structure_free (s);
208 g_cond_broadcast (GST_PROMISE_COND (promise));
209 g_mutex_unlock (GST_PROMISE_LOCK (promise));
212 change_func (promise, change_data);
216 * gst_promise_get_reply:
217 * @promise: a #GstPromise
219 * Retrieve the reply set on @promise. @promise must be in
220 * %GST_PROMISE_RESULT_REPLIED and the returned structure is owned by @promise
222 * Returns: (transfer none): The reply set on @promise
227 gst_promise_get_reply (GstPromise * promise)
229 g_return_val_if_fail (promise != NULL, NULL);
231 g_mutex_lock (GST_PROMISE_LOCK (promise));
232 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
233 GstPromiseResult result = GST_PROMISE_RESULT (promise);
234 g_mutex_unlock (GST_PROMISE_LOCK (promise));
235 g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
238 g_mutex_unlock (GST_PROMISE_LOCK (promise));
240 return GST_PROMISE_REPLY (promise);
244 * gst_promise_interrupt:
245 * @promise: a #GstPromise
247 * Interrupt waiting for a @promise. This will wake up any waiters with
248 * %GST_PROMISE_RESULT_INTERRUPTED. Called when the consumer does not want
249 * the value produced anymore.
254 gst_promise_interrupt (GstPromise * promise)
256 GstPromiseChangeFunc change_func = NULL;
257 gpointer change_data = NULL;
259 g_return_if_fail (promise != NULL);
261 g_mutex_lock (GST_PROMISE_LOCK (promise));
262 if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
263 GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
264 GstPromiseResult result = GST_PROMISE_RESULT (promise);
265 g_mutex_unlock (GST_PROMISE_LOCK (promise));
266 g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
267 result == GST_PROMISE_RESULT_REPLIED);
269 /* only interrupt if we are currently in pending */
270 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
271 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
272 g_cond_broadcast (GST_PROMISE_COND (promise));
273 GST_LOG ("%p interrupted", promise);
275 change_func = GST_PROMISE_CHANGE_FUNC (promise);
276 change_data = GST_PROMISE_CHANGE_DATA (promise);
278 g_mutex_unlock (GST_PROMISE_LOCK (promise));
281 change_func (promise, change_data);
285 * gst_promise_expire:
286 * @promise: a #GstPromise
288 * Expire a @promise. This will wake up any waiters with
289 * %GST_PROMISE_RESULT_EXPIRED. Called by a message loop when the parent
290 * message is handled and/or destroyed (possibly unanswered).
295 gst_promise_expire (GstPromise * promise)
297 GstPromiseChangeFunc change_func = NULL;
298 gpointer change_data = NULL;
300 g_return_if_fail (promise != NULL);
302 g_mutex_lock (GST_PROMISE_LOCK (promise));
303 if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
304 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
305 g_cond_broadcast (GST_PROMISE_COND (promise));
306 GST_LOG ("%p expired", promise);
308 change_func = GST_PROMISE_CHANGE_FUNC (promise);
309 change_data = GST_PROMISE_CHANGE_DATA (promise);
310 GST_PROMISE_CHANGE_FUNC (promise) = NULL;
311 GST_PROMISE_CHANGE_DATA (promise) = NULL;
313 g_mutex_unlock (GST_PROMISE_LOCK (promise));
316 change_func (promise, change_data);
320 gst_promise_free (GstMiniObject * object)
322 GstPromise *promise = (GstPromise *) object;
324 /* the promise *must* be dealt with in some way before destruction */
325 g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
327 if (GST_PROMISE_CHANGE_NOTIFY (promise))
328 GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
330 if (GST_PROMISE_REPLY (promise)) {
331 gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
332 gst_structure_free (GST_PROMISE_REPLY (promise));
334 g_mutex_clear (GST_PROMISE_LOCK (promise));
335 g_cond_clear (GST_PROMISE_COND (promise));
336 GST_LOG ("%p finalized", promise);
339 memset (promise, 0xff, sizeof (GstPromiseImpl));
346 gst_promise_init (GstPromise * promise)
348 static volatile gsize _init = 0;
350 if (g_once_init_enter (&_init)) {
351 GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
352 g_once_init_leave (&_init, 1);
355 gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
356 NULL, gst_promise_free);
358 GST_PROMISE_REPLY (promise) = NULL;
359 GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
360 g_mutex_init (GST_PROMISE_LOCK (promise));
361 g_cond_init (GST_PROMISE_COND (promise));
367 * Returns: a new #GstPromise
372 gst_promise_new (void)
374 GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
376 gst_promise_init (promise);
377 GST_LOG ("new promise %p", promise);
383 * gst_promise_new_with_change_func:
384 * @func: (scope notified): a #GstPromiseChangeFunc to call
385 * @user_data: (closure): argument to call @func with
386 * @notify: notification function that @user_data is no longer needed
388 * @func will be called exactly once when transitioning out of
389 * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
392 * Returns: a new #GstPromise
397 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
398 GDestroyNotify notify)
400 GstPromise *promise = gst_promise_new ();
402 GST_PROMISE_CHANGE_FUNC (promise) = func;
403 GST_PROMISE_CHANGE_DATA (promise) = user_data;
404 GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
409 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);