Move GstAggregator from -bad to core
[platform/upstream/gstreamer.git] / gst / gstpromise.c
1 /* GStreamer
2  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
3  *
4  * This library is free software; you can redistribute it and/or
5  * modify it under the terms of the GNU Library General Public
6  * License as published by the Free Software Foundation; either
7  * version 2 of the License, or (at your option) any later version.
8  *
9  * This library is distributed in the hope that it will be useful,
10  * but WITHOUT ANY WARRANTY; without even the implied warranty of
11  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12  * Library General Public License for more details.
13  *
14  * You should have received a copy of the GNU Library General Public
15  * License along with this library; if not, write to the
16  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
17  * Boston, MA 02110-1301, USA.
18  */
19
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "gst_private.h"
25
26 #include "gstpromise.h"
27
28 #define GST_CAT_DEFAULT gst_promise_debug
29 GST_DEBUG_CATEGORY_STATIC (GST_CAT_DEFAULT);
30
31 /**
32  * SECTION:gstpromise
33  * @title: GstRespsone
34  * @short_description: a miniobject for future/promise-like functionality
35  * @see_also:
36  *
37  * The #GstPromise object implements the container for values that may
38  * be available later. i.e. a Future or a Promise in
39  * <ulink url="https://en.wikipedia.org/wiki/Futures_and_promises">https://en.wikipedia.org/wiki/Futures_and_promises</ulink>
40  *
41  * A #GstPromise can be created with gst_promise_new(), replied to
42  * with gst_promise_reply(), interrupted with gst_promise_interrupt() and
43  * expired with gst_promise_expire(). A callback can also be installed at
44  * #GstPromise creation for result changes with gst_promise_new_with_change_func().
45  * The change callback can be used to chain #GstPromises's together as in the
46  * following example.
47  * |[<!-- language="C" -->
48  * const GstStructure *reply;
49  * GstPromise *p;
50  * if (gst_promise_wait (promise) != GST_PROMISE_RESULT_REPLIED)
51  *   return; // interrupted or expired value
52  * reply = gst_promise_get_reply (promise);
53  * if (error in reply)
54  *   return; // propagate error
55  * p = gst_promise_new_with_change_func (another_promise_change_func, user_data, notify);
56  * pass p to promise-using API
57  * ]|
58  *
59  * Each #GstPromise starts out with a #GstPromiseResult of
60  * %GST_PROMISE_RESULT_PENDING and only ever transitions out of that result
61  * into one of the other #GstPromiseResult.
62  *
63  * In order to support multi-threaded code, gst_promise_reply(),
64  * gst_promise_interrupt() and gst_promise_expire() may all be from
65  * different threads with some restrictions, the final result of the promise
66  * is whichever call is made first.  There are two restrictions on ordering:
67  *
68  * 1. That gst_promise_reply() and gst_promise_interrupt() cannot be called
69  * after gst_promise_expire()
70  * 2. That gst_promise_reply() and gst_promise_interrupt()
71  * cannot be called twice.
72  */
73
74 static const int immutable_structure_refcount = 2;
75
76 #define GST_PROMISE_REPLY(p)            (((GstPromiseImpl *)(p))->reply)
77 #define GST_PROMISE_RESULT(p)           (((GstPromiseImpl *)(p))->result)
78 #define GST_PROMISE_LOCK(p)             (&(((GstPromiseImpl *)(p))->lock))
79 #define GST_PROMISE_COND(p)             (&(((GstPromiseImpl *)(p))->cond))
80 #define GST_PROMISE_CHANGE_FUNC(p)      (((GstPromiseImpl *)(p))->change_func)
81 #define GST_PROMISE_CHANGE_DATA(p)      (((GstPromiseImpl *)(p))->user_data)
82 #define GST_PROMISE_CHANGE_NOTIFY(p)    (((GstPromiseImpl *)(p))->notify)
83
84 typedef struct
85 {
86   GstPromise promise;
87
88   GstPromiseResult result;
89   GstStructure *reply;
90
91   GMutex lock;
92   GCond cond;
93   GstPromiseChangeFunc change_func;
94   gpointer user_data;
95   GDestroyNotify notify;
96 } GstPromiseImpl;
97
98 /**
99  * gst_promise_wait:
100  * @promise: a #GstPromise
101  *
102  * Wait for @promise to move out of the %GST_PROMISE_RESULT_PENDING state.
103  * If @promise is not in %GST_PROMISE_RESULT_PENDING then it will return
104  * immediately with the current result.
105  *
106  * Returns: the result of the promise
107  */
108 GstPromiseResult
109 gst_promise_wait (GstPromise * promise)
110 {
111   GstPromiseResult ret;
112
113   g_return_val_if_fail (promise != NULL, GST_PROMISE_RESULT_EXPIRED);
114
115   g_mutex_lock (GST_PROMISE_LOCK (promise));
116   ret = GST_PROMISE_RESULT (promise);
117
118   while (ret == GST_PROMISE_RESULT_PENDING) {
119     GST_LOG ("%p waiting", promise);
120     g_cond_wait (GST_PROMISE_COND (promise), GST_PROMISE_LOCK (promise));
121     ret = GST_PROMISE_RESULT (promise);
122   }
123   GST_LOG ("%p waited", promise);
124
125   g_mutex_unlock (GST_PROMISE_LOCK (promise));
126
127   return ret;
128 }
129
130 /**
131  * gst_promise_reply:
132  * @promise: (allow-none): a #GstPromise
133  * @s: (transfer full): a #GstStructure with the the reply contents
134  *
135  * Set a reply on @promise.  This will wake up any waiters with
136  * %GST_PROMISE_RESULT_REPLIED.
137  */
138 void
139 gst_promise_reply (GstPromise * promise, GstStructure * s)
140 {
141   GstPromiseChangeFunc change_func = NULL;
142   gpointer change_data = NULL;
143
144   /* Caller requested that no reply is necessary */
145   if (promise == NULL)
146     return;
147
148   g_mutex_lock (GST_PROMISE_LOCK (promise));
149   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
150       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_INTERRUPTED) {
151     GstPromiseResult result = GST_PROMISE_RESULT (promise);
152     g_mutex_unlock (GST_PROMISE_LOCK (promise));
153     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
154         result == GST_PROMISE_RESULT_INTERRUPTED);
155   }
156
157   /* XXX: is this necessary and valid? */
158   if (GST_PROMISE_REPLY (promise) && GST_PROMISE_REPLY (promise) != s)
159     gst_structure_free (GST_PROMISE_REPLY (promise));
160
161   /* Only reply iff we are currently in pending */
162   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
163     if (s
164         && !gst_structure_set_parent_refcount (s,
165             (int *) &immutable_structure_refcount)) {
166       g_critical ("Input structure has a parent already!");
167       g_mutex_unlock (GST_PROMISE_LOCK (promise));
168       return;
169     }
170
171     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_REPLIED;
172     GST_LOG ("%p replied", promise);
173
174     GST_PROMISE_REPLY (promise) = s;
175
176     change_func = GST_PROMISE_CHANGE_FUNC (promise);
177     change_data = GST_PROMISE_CHANGE_DATA (promise);
178   } else {
179     /* eat the value */
180     if (s)
181       gst_structure_free (s);
182   }
183
184   g_cond_broadcast (GST_PROMISE_COND (promise));
185   g_mutex_unlock (GST_PROMISE_LOCK (promise));
186
187   if (change_func)
188     change_func (promise, change_data);
189 }
190
191 /**
192  * gst_promise_get_reply:
193  * @promise: a #GstPromise
194  *
195  * Retrieve the reply set on @promise.  @promise must be in
196  * %GST_PROMISE_RESULT_REPLIED and is owned by @promise
197  *
198  * Returns: (transfer none): The reply set on @promise
199  */
200 const GstStructure *
201 gst_promise_get_reply (GstPromise * promise)
202 {
203   g_return_val_if_fail (promise != NULL, NULL);
204
205   g_mutex_lock (GST_PROMISE_LOCK (promise));
206   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
207     GstPromiseResult result = GST_PROMISE_RESULT (promise);
208     g_mutex_unlock (GST_PROMISE_LOCK (promise));
209     g_return_val_if_fail (result == GST_PROMISE_RESULT_REPLIED, NULL);
210   }
211
212   g_mutex_unlock (GST_PROMISE_LOCK (promise));
213
214   return GST_PROMISE_REPLY (promise);
215 }
216
217 /**
218  * gst_promise_interrupt:
219  * @promise: a #GstPromise
220  *
221  * Interrupt waiting for a @promise.  This will wake up any waiters with
222  * %GST_PROMISE_RESULT_INTERRUPTED
223  */
224 void
225 gst_promise_interrupt (GstPromise * promise)
226 {
227   GstPromiseChangeFunc change_func = NULL;
228   gpointer change_data = NULL;
229
230   g_return_if_fail (promise != NULL);
231
232   g_mutex_lock (GST_PROMISE_LOCK (promise));
233   if (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING &&
234       GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_REPLIED) {
235     GstPromiseResult result = GST_PROMISE_RESULT (promise);
236     g_mutex_unlock (GST_PROMISE_LOCK (promise));
237     g_return_if_fail (result == GST_PROMISE_RESULT_PENDING ||
238         result == GST_PROMISE_RESULT_REPLIED);
239   }
240   /* only interrupt if we are currently in pending */
241   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
242     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_INTERRUPTED;
243     g_cond_broadcast (GST_PROMISE_COND (promise));
244     GST_LOG ("%p interrupted", promise);
245
246     change_func = GST_PROMISE_CHANGE_FUNC (promise);
247     change_data = GST_PROMISE_CHANGE_DATA (promise);
248   }
249   g_mutex_unlock (GST_PROMISE_LOCK (promise));
250
251   if (change_func)
252     change_func (promise, change_data);
253 }
254
255 /**
256  * gst_promise_expire:
257  * @promise: a #GstPromise
258  *
259  * Expire a @promise.  This will wake up any waiters with
260  * %GST_PROMISE_RESULT_EXPIRED
261  */
262 void
263 gst_promise_expire (GstPromise * promise)
264 {
265   GstPromiseChangeFunc change_func = NULL;
266   gpointer change_data = NULL;
267
268   g_return_if_fail (promise != NULL);
269
270   g_mutex_lock (GST_PROMISE_LOCK (promise));
271   if (GST_PROMISE_RESULT (promise) == GST_PROMISE_RESULT_PENDING) {
272     GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_EXPIRED;
273     g_cond_broadcast (GST_PROMISE_COND (promise));
274     GST_LOG ("%p expired", promise);
275
276     change_func = GST_PROMISE_CHANGE_FUNC (promise);
277     change_data = GST_PROMISE_CHANGE_DATA (promise);
278     GST_PROMISE_CHANGE_FUNC (promise) = NULL;
279     GST_PROMISE_CHANGE_DATA (promise) = NULL;
280   }
281   g_mutex_unlock (GST_PROMISE_LOCK (promise));
282
283   if (change_func)
284     change_func (promise, change_data);
285 }
286
287 static void
288 gst_promise_free (GstMiniObject * object)
289 {
290   GstPromise *promise = (GstPromise *) object;
291
292   /* the promise *must* be dealt with in some way before destruction */
293   g_warn_if_fail (GST_PROMISE_RESULT (promise) != GST_PROMISE_RESULT_PENDING);
294
295   if (GST_PROMISE_CHANGE_NOTIFY (promise))
296     GST_PROMISE_CHANGE_NOTIFY (promise) (GST_PROMISE_CHANGE_DATA (promise));
297
298   if (GST_PROMISE_REPLY (promise)) {
299     gst_structure_set_parent_refcount (GST_PROMISE_REPLY (promise), NULL);
300     gst_structure_free (GST_PROMISE_REPLY (promise));
301   }
302   g_mutex_clear (GST_PROMISE_LOCK (promise));
303   g_cond_clear (GST_PROMISE_COND (promise));
304   GST_LOG ("%p finalized", promise);
305
306   g_free (promise);
307 }
308
309 static void
310 gst_promise_init (GstPromise * promise)
311 {
312   static volatile gsize _init = 0;
313
314   if (g_once_init_enter (&_init)) {
315     GST_DEBUG_CATEGORY_INIT (gst_promise_debug, "gstpromise", 0, "gstpromise");
316     g_once_init_leave (&_init, 1);
317   }
318
319   gst_mini_object_init (GST_MINI_OBJECT (promise), 0, GST_TYPE_PROMISE, NULL,
320       NULL, gst_promise_free);
321
322   GST_PROMISE_REPLY (promise) = NULL;
323   GST_PROMISE_RESULT (promise) = GST_PROMISE_RESULT_PENDING;
324   g_mutex_init (GST_PROMISE_LOCK (promise));
325   g_cond_init (GST_PROMISE_COND (promise));
326 }
327
328 /**
329  * gst_promise_new:
330  *
331  * Returns: a new #GstPromise
332  */
333 GstPromise *
334 gst_promise_new (void)
335 {
336   GstPromise *promise = GST_PROMISE (g_new0 (GstPromiseImpl, 1));
337
338   gst_promise_init (promise);
339   GST_LOG ("new promise %p", promise);
340
341   return promise;
342 }
343
344 /**
345  * gst_promise_new_with_change_func:
346  * @func: (scope notified): a #GstPromiseChangeFunc to call
347  * @user_data: (closure): argument to call @func with
348  * @notify: notification function that @user_data is no longer needed
349  *
350  * @func will be called exactly once when transitioning out of
351  * %GST_PROMISE_RESULT_PENDING into any of the other #GstPromiseResult
352  * states.
353  *
354  * Returns: a new #GstPromise
355  */
356 GstPromise *
357 gst_promise_new_with_change_func (GstPromiseChangeFunc func, gpointer user_data,
358     GDestroyNotify notify)
359 {
360   GstPromise *promise = gst_promise_new ();
361
362   GST_PROMISE_CHANGE_FUNC (promise) = func;
363   GST_PROMISE_CHANGE_DATA (promise) = user_data;
364   GST_PROMISE_CHANGE_NOTIFY (promise) = notify;
365
366   return promise;
367 }
368
369 GST_DEFINE_MINI_OBJECT_TYPE (GstPromise, gst_promise);