gstpromise: add since 1.14 markers
[platform/upstream/gstreamer.git] / gst / gstpromise.c
1 /* GStreamer
2  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "gst_private.h"
25
26 #include "gstpromise.h"
27
28 #define GST_CAT_DEFAULT gst_promise_debug
29 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
30
31 /**
32  * SECTION:gstpromise
33  * @title: GstPromise
34  * @short_description: a miniobject for future/promise-like functionality
35  * @see_also:
36  *
37  * The #GstPromise object implements the container for values that may
38  * be available later. i.e. a Future or a Promise in
39  * <ulink url="https://en.wikipedia.org/wiki/Futures_and_promises">https://en.wikipedia.org/wiki/Futures_and_promises</ulink>
40  *
41  * A #GstPromise can be created with gst_promise_new(), replied to
42  * with gst_promise_reply(), interrupted with gst_promise_interrupt() and
43  * expired with gst_promise_expire(). A callback can also be installed at
44  * #GstPromise creation for result changes with gst_promise_new_with_change_func().
45  * The change callback can be used to chain #GstPromises's together as in the
46  * following example.
47  * |[<!-- language="C" -->
48  * const GstStructure *reply;
49  * GstPromise *p;
50  * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
51  *   return; // interrupted or expired value
52  * reply = gst_promise_get_reply (promise);
53  * if (error in reply)
54  *   return; // propagate error
55  * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
56  * pass p to promise-using API
57  * ]|
58  *
59  * Each #GstPromise starts out with a #GstPromiseResult of
60  * %GST_PROMISE_RESULT_PENDING and only ever transitions out of that result
61  * into one of the other #GstPromiseResult.
62  *
63  * In order to support multi-threaded code, gst_promise_reply(),
64  * gst_promise_interrupt() and gst_promise_expire() may all be from
65  * different threads with some restrictions, the final result of the promise
66  * is whichever call is made first.  There are two restrictions on ordering:
67  *
68  * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
69  * after gst_promise_expire()
70  * 2. That gst_promise_reply() and gst_promise_interrupt()
71  * cannot be called twice.
72  */
73
74 static const int immutable_structure_refcount = 2;
75
76 #define GST_PROMISE_REPLY(p)            (((GstPromiseImpl *)(p))->reply)
77 #define GST_PROMISE_RESULT(p)           (((GstPromiseImpl *)(p))->result)
78 #define GST_PROMISE_LOCK(p)             (&(((GstPromiseImpl *)(p))->lock))
79 #define GST_PROMISE_COND(p)             (&(((GstPromiseImpl *)(p))->cond))
80 #define GST_PROMISE_CHANGE_FUNC(p)      (((GstPromiseImpl *)(p))->change_func)
81 #define GST_PROMISE_CHANGE_DATA(p)      (((GstPromiseImpl *)(p))->user_data)
82 #define GST_PROMISE_CHANGE_NOTIFY(p)    (((GstPromiseImpl *)(p))->notify)
83
84 typedef struct
85 {
86   GstPromise promise;
87
88   GstPromiseResult result;
89   GstStructure *reply;
90
91   GMutex lock;
92   GCond cond;
93   GstPromiseChangeFunc change_func;
94   gpointer user_data;
95   GDestroyNotify notify;
96 } GstPromiseImpl;
97
98 /**
99  * gst_promise_wait:
100  * @promise: a #GstPromise
101  *
102  * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
103  * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
104  * immediately with the current result.
105  *
106  * Returns: the result of the promise
107  *
108  * Since: 1.14
109  */
110 GstPromiseResult
111 gst_promise_wait (GstPromise * promise)
112 {
113   GstPromiseResult ret;
114
115   g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
116
117   g_mutex_lock (GST_PROMISE_LOCK (promise));
118   ret = GST_PROMISE_RESULT (promise);
119
120   while (ret == GST_PROMISE_RESULT_PENDING) {
121     GST_LOG ("%p waiting", promise);
122     g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
123     ret = GST_PROMISE_RESULT (promise);
124   }
125   GST_LOG ("%p waited", promise);
126
127   g_mutex_unlock (GST_PROMISE_LOCK (promise));
128
129   return ret;
130 }
131
132 /**
133  * gst_promise_reply:
134  * @promise: (allow-none): a #GstPromise
135  * @s: (transfer full): a #GstStructure with the the reply contents
136  *
137  * Set a reply on @promise.  This will wake up any waiters with
138  * %GST_PROMISE_RESULT_REPLIED.
139  *
140  * Since: 1.14
141  */
142 void
143 gst_promise_reply (GstPromise * promise, GstStructure * s)
144 {
145   GstPromiseChangeFunc change_func = NULL;
146   gpointer change_data = NULL;
147
148   /* Caller requested that no reply is necessary */
149   if (promise == NULL)
150     return;
151
152   g_mutex_lock (GST_PROMISE_LOCK (promise));
153   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
154       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
155     GstPromiseResult result = GST_PROMISE_RESULT (promise);
156     g_mutex_unlock (GST_PROMISE_LOCK (promise));
157     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
158         result == GST_PROMISE_RESULT_INTERRUPTED);
159   }
160
161   /* XXX: is this necessary and valid? */
162   if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
163     gst_structure_free (GST_PROMISE_REPLY (promise));
164
165   /* Only reply iff we are currently in pending */
166   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
167     if (s
168         && !gst_structure_set_parent_refcount (s,
169             (int *) &immutable_structure_refcount)) {
170       g_critical ("Input structure has a parent already!");
171       g_mutex_unlock (GST_PROMISE_LOCK (promise));
172       return;
173     }
174
175     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
176     GST_LOG ("%p replied", promise);
177
178     GST_PROMISE_REPLY (promise) = s;
179
180     change_func = GST_PROMISE_CHANGE_FUNC (promise);
181     change_data = GST_PROMISE_CHANGE_DATA (promise);
182   } else {
183     /* eat the value */
184     if (s)
185       gst_structure_free (s);
186   }
187
188   g_cond_broadcast (GST_PROMISE_COND (promise));
189   g_mutex_unlock (GST_PROMISE_LOCK (promise));
190
191   if (change_func)
192     change_func (promise, change_data);
193 }
194
195 /**
196  * gst_promise_get_reply:
197  * @promise: a #GstPromise
198  *
199  * Retrieve the reply set on @promise.  @promise must be in
200  * %GST_PROMISE_RESULT_REPLIED and is owned by @promise
201  *
202  * Returns: (transfer none): The reply set on @promise
203  *
204  * Since: 1.14
205  */
206 const GstStructure *
207 gst_promise_get_reply (GstPromise * promise)
208 {
209   g_return_val_if_fail (promise != NULL, NULL);
210
211   g_mutex_lock (GST_PROMISE_LOCK (promise));
212   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
213     GstPromiseResult result = GST_PROMISE_RESULT (promise);
214     g_mutex_unlock (GST_PROMISE_LOCK (promise));
215     g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
216   }
217
218   g_mutex_unlock (GST_PROMISE_LOCK (promise));
219
220   return GST_PROMISE_REPLY (promise);
221 }
222
223 /**
224  * gst_promise_interrupt:
225  * @promise: a #GstPromise
226  *
227  * Interrupt waiting for a @promise.  This will wake up any waiters with
228  * %GST_PROMISE_RESULT_INTERRUPTED
229  *
230  * Since: 1.14
231  */
232 void
233 gst_promise_interrupt (GstPromise * promise)
234 {
235   GstPromiseChangeFunc change_func = NULL;
236   gpointer change_data = NULL;
237
238   g_return_if_fail (promise != NULL);
239
240   g_mutex_lock (GST_PROMISE_LOCK (promise));
241   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
242       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
243     GstPromiseResult result = GST_PROMISE_RESULT (promise);
244     g_mutex_unlock (GST_PROMISE_LOCK (promise));
245     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
246         result == GST_PROMISE_RESULT_REPLIED);
247   }
248   /* only interrupt if we are currently in pending */
249   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
250     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
251     g_cond_broadcast (GST_PROMISE_COND (promise));
252     GST_LOG ("%p interrupted", promise);
253
254     change_func = GST_PROMISE_CHANGE_FUNC (promise);
255     change_data = GST_PROMISE_CHANGE_DATA (promise);
256   }
257   g_mutex_unlock (GST_PROMISE_LOCK (promise));
258
259   if (change_func)
260     change_func (promise, change_data);
261 }
262
263 /**
264  * gst_promise_expire:
265  * @promise: a #GstPromise
266  *
267  * Expire a @promise.  This will wake up any waiters with
268  * %GST_PROMISE_RESULT_EXPIRED
269  *
270  * Since: 1.14
271  */
272 void
273 gst_promise_expire (GstPromise * promise)
274 {
275   GstPromiseChangeFunc change_func = NULL;
276   gpointer change_data = NULL;
277
278   g_return_if_fail (promise != NULL);
279
280   g_mutex_lock (GST_PROMISE_LOCK (promise));
281   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
282     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
283     g_cond_broadcast (GST_PROMISE_COND (promise));
284     GST_LOG ("%p expired", promise);
285
286     change_func = GST_PROMISE_CHANGE_FUNC (promise);
287     change_data = GST_PROMISE_CHANGE_DATA (promise);
288     GST_PROMISE_CHANGE_FUNC (promise) = NULL;
289     GST_PROMISE_CHANGE_DATA (promise) = NULL;
290   }
291   g_mutex_unlock (GST_PROMISE_LOCK (promise));
292
293   if (change_func)
294     change_func (promise, change_data);
295 }
296
297 static void
298 gst_promise_free (GstMiniObject * object)
299 {
300   GstPromise *promise = (GstPromise *) object;
301
302   /* the promise *must* be dealt with in some way before destruction */
303   g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
304
305   if (GST_PROMISE_CHANGE_NOTIFY (promise))
306     GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
307
308   if (GST_PROMISE_REPLY (promise)) {
309     gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
310     gst_structure_free (GST_PROMISE_REPLY (promise));
311   }
312   g_mutex_clear (GST_PROMISE_LOCK (promise));
313   g_cond_clear (GST_PROMISE_COND (promise));
314   GST_LOG ("%p finalized", promise);
315
316   g_free (promise);
317 }
318
319 static void
320 gst_promise_init (GstPromise * promise)
321 {
322   static volatile gsize _init = 0;
323
324   if (g_once_init_enter (&_init)) {
325     GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
326     g_once_init_leave (&_init, 1);
327   }
328
329   gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
330       NULL, gst_promise_free);
331
332   GST_PROMISE_REPLY (promise) = NULL;
333   GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
334   g_mutex_init (GST_PROMISE_LOCK (promise));
335   g_cond_init (GST_PROMISE_COND (promise));
336 }
337
338 /**
339  * gst_promise_new:
340  *
341  * Returns: a new #GstPromise
342  *
343  * Since: 1.14
344  */
345 GstPromise *
346 gst_promise_new (void)
347 {
348   GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
349
350   gst_promise_init (promise);
351   GST_LOG ("new promise %p", promise);
352
353   return promise;
354 }
355
356 /**
357  * gst_promise_new_with_change_func:
358  * @func: (scope notified): a #GstPromiseChangeFunc to call
359  * @user_data: (closure): argument to call @func with
360  * @notify: notification function that @user_data is no longer needed
361  *
362  * @func will be called exactly once when transitioning out of
363  * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
364  * states.
365  *
366  * Returns: a new #GstPromise
367  *
368  * Since: 1.14
369  */
370 GstPromise *
371 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
372     GDestroyNotify notify)
373 {
374   GstPromise *promise = gst_promise_new ();
375
376   GST_PROMISE_CHANGE_FUNC (promise) = func;
377   GST_PROMISE_CHANGE_DATA (promise) = user_data;
378   GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
379
380   return promise;
381 }
382
383 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);