af06f24ef782654db56de130604f031744b34fbd
[platform/upstream/gstreamer.git] / tests / check / gst / gstpromise.c
1 /* GStreamer
2  *
3  * unit test for GstPromise
4  *
5  * Copyright (C) 2017 Matthew Waters <matthew@centricular.com>
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 #include <gst/check/gstcheck.h>
24
25 struct event_queue
26 {
27   GMutex lock;
28   GCond cond;
29   GThread *thread;
30   GMainContext *main_context;
31   GMainLoop *main_loop;
32   gpointer user_data;
33 };
34
35 static gboolean
36 _unlock_thread (GMutex * lock)
37 {
38   g_mutex_unlock (lock);
39   return G_SOURCE_REMOVE;
40 }
41
42 static gpointer
43 _promise_thread (struct event_queue *q)
44 {
45   g_mutex_lock (&q->lock);
46   q->main_context = g_main_context_new ();
47   q->main_loop = g_main_loop_new (q->main_context, FALSE);
48
49   g_cond_broadcast (&q->cond);
50   g_main_context_invoke (q->main_context, (GSourceFunc) _unlock_thread,
51       &q->lock);
52
53   g_main_loop_run (q->main_loop);
54
55   g_mutex_lock (&q->lock);
56   g_main_context_unref (q->main_context);
57   q->main_context = NULL;
58   g_main_loop_unref (q->main_loop);
59   q->main_loop = NULL;
60   g_cond_broadcast (&q->cond);
61   g_mutex_unlock (&q->lock);
62
63   return NULL;
64 }
65
66 static void
67 event_queue_start (struct event_queue *q)
68 {
69   g_mutex_lock (&q->lock);
70   q->thread = g_thread_new ("promise-thread", (GThreadFunc) _promise_thread, q);
71
72   while (!q->main_loop)
73     g_cond_wait (&q->cond, &q->lock);
74   g_mutex_unlock (&q->lock);
75 }
76
77 static void
78 event_queue_stop (struct event_queue *q)
79 {
80   g_mutex_lock (&q->lock);
81   if (q->main_loop)
82     g_main_loop_quit (q->main_loop);
83   g_mutex_unlock (&q->lock);
84 }
85
86 static void
87 event_queue_stop_wait (struct event_queue *q)
88 {
89   g_mutex_lock (&q->lock);
90   while (q->main_loop) {
91     g_main_loop_quit (q->main_loop);
92     g_cond_wait (&q->cond, &q->lock);
93   }
94   g_mutex_unlock (&q->lock);
95
96   g_thread_unref (q->thread);
97 }
98
99 static struct event_queue *
100 event_queue_new (void)
101 {
102   struct event_queue *q = g_new0 (struct event_queue, 1);
103
104   GST_LOG ("starting event queue %p", q);
105
106   g_mutex_init (&q->lock);
107   g_cond_init (&q->cond);
108   event_queue_start (q);
109
110   return q;
111 }
112
113 static void
114 event_queue_free (struct event_queue *q)
115 {
116   event_queue_stop_wait (q);
117
118   g_mutex_clear (&q->lock);
119   g_cond_clear (&q->cond);
120
121   GST_LOG ("stopped event queue %p", q);
122
123   g_free (q);
124 }
125
126 static void
127 _enqueue_task (struct event_queue *q, GSourceFunc func, gpointer data,
128     GDestroyNotify notify)
129 {
130   GSource *source;
131
132   source = g_idle_source_new ();
133   g_source_set_priority (source, G_PRIORITY_DEFAULT);
134   g_source_set_callback (source, (GSourceFunc) func, data, notify);
135   g_source_attach (source, q->main_context);
136   g_source_unref (source);
137 }
138
139 GST_START_TEST (test_reply)
140 {
141   GstPromise *r;
142
143   r = gst_promise_new ();
144
145   gst_promise_reply (r, NULL);
146   fail_unless (gst_promise_wait (r) == GST_PROMISE_RESULT_REPLIED);
147
148   gst_promise_unref (r);
149 }
150
151 GST_END_TEST;
152
153 GST_START_TEST (test_reply_data)
154 {
155   GstPromise *r;
156   GstStructure *s;
157   const GstStructure *ret;
158
159   r = gst_promise_new ();
160
161   s = gst_structure_new ("promise", "test", G_TYPE_INT, 1, NULL);
162   gst_promise_reply (r, s);
163   fail_unless (gst_promise_wait (r) == GST_PROMISE_RESULT_REPLIED);
164   ret = gst_promise_get_reply (r);
165   fail_unless (gst_structure_is_equal (ret, s));
166
167   gst_promise_unref (r);
168 }
169
170 GST_END_TEST;
171
172 GST_START_TEST (test_reply_immutable)
173 {
174   GstPromise *r;
175   GstStructure *s, *ret;
176
177   r = gst_promise_new ();
178
179   s = gst_structure_new ("promise", "test", G_TYPE_INT, 1, NULL);
180   gst_promise_reply (r, s);
181   ret = (GstStructure *) gst_promise_get_reply (r);
182
183   /* immutable result must not be able to modify the reply */
184   ASSERT_CRITICAL (gst_structure_set (ret, "foo", G_TYPE_STRING, "bar", NULL));
185   fail_unless (gst_structure_get_string (ret, "foo") == NULL);
186
187   gst_promise_unref (r);
188 }
189
190 GST_END_TEST;
191
192 GST_START_TEST (test_interrupt)
193 {
194   GstPromise *r;
195
196   r = gst_promise_new ();
197
198   gst_promise_interrupt (r);
199   fail_unless (gst_promise_wait (r) == GST_PROMISE_RESULT_INTERRUPTED);
200
201   gst_promise_unref (r);
202 }
203
204 GST_END_TEST;
205
206 GST_START_TEST (test_expire)
207 {
208   GstPromise *r;
209
210   r = gst_promise_new ();
211
212   gst_promise_expire (r);
213   fail_unless (gst_promise_wait (r) == GST_PROMISE_RESULT_EXPIRED);
214
215   gst_promise_unref (r);
216 }
217
218 GST_END_TEST;
219
220 struct change_data
221 {
222   int change_count;
223   GstPromiseResult result;
224 };
225
226 static void
227 on_change (GstPromise * promise, gpointer user_data)
228 {
229   struct change_data *res = user_data;
230
231   res->result = gst_promise_wait (promise);
232   res->change_count += 1;
233 }
234
235 GST_START_TEST (test_change_func)
236 {
237   GstPromise *r;
238   struct change_data data = { 0, };
239
240   r = gst_promise_new_with_change_func (on_change, &data, NULL);
241   gst_promise_reply (r, NULL);
242   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
243   fail_unless (data.change_count == 1);
244
245   gst_promise_unref (r);
246 }
247
248 GST_END_TEST;
249
250 GST_START_TEST (test_reply_expire)
251 {
252   GstPromise *r;
253   struct change_data data = { 0, };
254
255   r = gst_promise_new_with_change_func (on_change, &data, NULL);
256   gst_promise_reply (r, NULL);
257   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
258   fail_unless (data.change_count == 1);
259   gst_promise_expire (r);
260   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
261   fail_unless (data.change_count == 1);
262
263   gst_promise_unref (r);
264 }
265
266 GST_END_TEST;
267
268 GST_START_TEST (test_reply_discard)
269 {
270   GstPromise *r;
271
272   /* NULL promise => discard reply */
273   r = NULL;
274
275   /* no-op, we don't want a reply */
276   gst_promise_reply (r, NULL);
277
278   if (r)
279     gst_promise_unref (r);
280 }
281
282 GST_END_TEST;
283
284 GST_START_TEST (test_reply_interrupt)
285 {
286   GstPromise *r;
287   struct change_data data = { 0, };
288
289   r = gst_promise_new_with_change_func (on_change, &data, NULL);
290   gst_promise_reply (r, NULL);
291   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
292   fail_unless (data.change_count == 1);
293   gst_promise_interrupt (r);
294   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
295   fail_unless (data.change_count == 1);
296
297   gst_promise_unref (r);
298 }
299
300 GST_END_TEST;
301
302 GST_START_TEST (test_reply_reply)
303 {
304   GstPromise *r;
305   GstStructure *s;
306   struct change_data data = { 0, };
307   const GstStructure *ret;
308
309   r = gst_promise_new_with_change_func (on_change, &data, NULL);
310   s = gst_structure_new ("promise", "test", G_TYPE_INT, 1, NULL);
311   gst_promise_reply (r, s);
312   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
313   fail_unless (data.change_count == 1);
314   ASSERT_CRITICAL (gst_promise_reply (r, NULL));
315   fail_unless (gst_promise_wait (r) == GST_PROMISE_RESULT_REPLIED);
316   ret = gst_promise_get_reply (r);
317   fail_unless (gst_structure_is_equal (ret, s));
318   fail_unless (data.result == GST_PROMISE_RESULT_REPLIED);
319   fail_unless (data.change_count == 1);
320
321   gst_promise_unref (r);
322 }
323
324 GST_END_TEST;
325
326 GST_START_TEST (test_interrupt_expire)
327 {
328   GstPromise *r;
329   struct change_data data = { 0, };
330
331   r = gst_promise_new_with_change_func (on_change, &data, NULL);
332   gst_promise_interrupt (r);
333   fail_unless (data.result == GST_PROMISE_RESULT_INTERRUPTED);
334   fail_unless (data.change_count == 1);
335   gst_promise_expire (r);
336   fail_unless (data.result == GST_PROMISE_RESULT_INTERRUPTED);
337   fail_unless (data.change_count == 1);
338
339   gst_promise_unref (r);
340 }
341
342 GST_END_TEST;
343
344 GST_START_TEST (test_interrupt_reply)
345 {
346   GstPromise *r;
347   struct change_data data = { 0, };
348
349   r = gst_promise_new_with_change_func (on_change, &data, NULL);
350   gst_promise_interrupt (r);
351   fail_unless (data.result == GST_PROMISE_RESULT_INTERRUPTED);
352   fail_unless (data.change_count == 1);
353   gst_promise_reply (r, NULL);
354   fail_unless (data.result == GST_PROMISE_RESULT_INTERRUPTED);
355   fail_unless (data.change_count == 1);
356
357   gst_promise_unref (r);
358 }
359
360 GST_END_TEST;
361
362 GST_START_TEST (test_interrupt_interrupt)
363 {
364   GstPromise *r;
365   struct change_data data = { 0, };
366
367   r = gst_promise_new_with_change_func (on_change, &data, NULL);
368   gst_promise_interrupt (r);
369   fail_unless (data.result == GST_PROMISE_RESULT_INTERRUPTED);
370   fail_unless (data.change_count == 1);
371   ASSERT_CRITICAL (gst_promise_interrupt (r));
372   fail_unless (data.result == GST_PROMISE_RESULT_INTERRUPTED);
373   fail_unless (data.change_count == 1);
374
375   gst_promise_unref (r);
376 }
377
378 GST_END_TEST;
379
380 GST_START_TEST (test_expire_expire)
381 {
382   GstPromise *r;
383   struct change_data data = { 0, };
384
385   r = gst_promise_new_with_change_func (on_change, &data, NULL);
386   gst_promise_expire (r);
387   fail_unless (data.result == GST_PROMISE_RESULT_EXPIRED);
388   fail_unless (data.change_count == 1);
389   gst_promise_expire (r);
390   fail_unless (data.result == GST_PROMISE_RESULT_EXPIRED);
391   fail_unless (data.change_count == 1);
392
393   gst_promise_unref (r);
394 }
395
396 GST_END_TEST;
397
398 GST_START_TEST (test_expire_interrupt)
399 {
400   GstPromise *r;
401   struct change_data data = { 0, };
402
403   r = gst_promise_new_with_change_func (on_change, &data, NULL);
404   gst_promise_expire (r);
405   fail_unless (data.result == GST_PROMISE_RESULT_EXPIRED);
406   fail_unless (data.change_count == 1);
407   ASSERT_CRITICAL (gst_promise_interrupt (r));
408   fail_unless (data.result == GST_PROMISE_RESULT_EXPIRED);
409   fail_unless (data.change_count == 1);
410
411   gst_promise_unref (r);
412 }
413
414 GST_END_TEST;
415
416 GST_START_TEST (test_expire_reply)
417 {
418   GstPromise *r;
419   struct change_data data = { 0, };
420
421   r = gst_promise_new_with_change_func (on_change, &data, NULL);
422   gst_promise_expire (r);
423   fail_unless (data.result == GST_PROMISE_RESULT_EXPIRED);
424   fail_unless (data.change_count == 1);
425   ASSERT_CRITICAL (gst_promise_reply (r, NULL));
426   fail_unless (data.result == GST_PROMISE_RESULT_EXPIRED);
427   fail_unless (data.change_count == 1);
428
429   gst_promise_unref (r);
430 }
431
432 GST_END_TEST;
433
434 struct stress_item
435 {
436   struct event_queue *q;
437   GstPromise *promise;
438   GstPromiseResult result;
439 };
440
441 static void
442 stress_reply (struct stress_item *item)
443 {
444   switch (item->result) {
445     case GST_PROMISE_RESULT_REPLIED:
446       gst_promise_reply (item->promise, NULL);
447       break;
448     case GST_PROMISE_RESULT_INTERRUPTED:
449       gst_promise_interrupt (item->promise);
450       break;
451     case GST_PROMISE_RESULT_EXPIRED:
452       gst_promise_expire (item->promise);
453       break;
454     default:
455       g_assert_not_reached ();
456   }
457 }
458
459 struct stress_queues
460 {
461   GAsyncQueue *push_queue;
462   GAsyncQueue *wait_queue;
463   guint64 push_count;
464 };
465
466 static gboolean
467 _push_random_promise (struct event_queue *q)
468 {
469   struct stress_queues *s_q = q->user_data;
470   struct stress_item *item;
471
472   item = g_new0 (struct stress_item, 1);
473   item->promise = gst_promise_new ();
474   while (item->result == GST_PROMISE_RESULT_PENDING)
475     item->result = g_random_int () % 4;
476
477   g_async_queue_push (s_q->wait_queue, item);
478   g_async_queue_push (s_q->push_queue, item);
479
480   s_q->push_count++;
481
482   return G_SOURCE_CONTINUE;
483 }
484
485 static void
486 _push_stop_promise (struct event_queue *q)
487 {
488   struct stress_queues *s_q = q->user_data;
489   gpointer item = GINT_TO_POINTER (1);
490
491   g_async_queue_push (s_q->wait_queue, item);
492   g_async_queue_push (s_q->push_queue, item);
493 }
494
495 static gboolean
496 _pop_promise (struct event_queue *q)
497 {
498   struct stress_queues *s_q = q->user_data;
499   struct stress_item *item;
500
501   item = g_async_queue_pop (s_q->push_queue);
502
503   if (item == (void *) 1)
504     return G_SOURCE_REMOVE;
505
506   stress_reply (item);
507
508   return G_SOURCE_CONTINUE;
509 }
510
511 static gboolean
512 _wait_promise (struct event_queue *q)
513 {
514   struct stress_queues *s_q = q->user_data;
515   struct stress_item *item;
516
517   item = g_async_queue_pop (s_q->wait_queue);
518
519   if (item == (void *) 1)
520     return G_SOURCE_REMOVE;
521
522   fail_unless (gst_promise_wait (item->promise) == item->result);
523
524   gst_promise_unref (item->promise);
525   g_free (item);
526
527   return G_SOURCE_CONTINUE;
528 }
529
530 GST_START_TEST (test_stress)
531 {
532 #define N_QUEUES 3
533   struct event_queue *pushers[N_QUEUES];
534   struct event_queue *poppers[N_QUEUES];
535   struct event_queue *waiters[N_QUEUES];
536   struct stress_queues s_q = { 0, };
537   int i;
538
539   s_q.push_queue = g_async_queue_new ();
540   s_q.wait_queue = g_async_queue_new ();
541
542   for (i = 0; i < N_QUEUES; i++) {
543     pushers[i] = event_queue_new ();
544     pushers[i]->user_data = &s_q;
545     _enqueue_task (pushers[i], (GSourceFunc) _push_random_promise, pushers[i],
546         NULL);
547     waiters[i] = event_queue_new ();
548     waiters[i]->user_data = &s_q;
549     _enqueue_task (waiters[i], (GSourceFunc) _wait_promise, waiters[i], NULL);
550     poppers[i] = event_queue_new ();
551     poppers[i]->user_data = &s_q;
552     _enqueue_task (poppers[i], (GSourceFunc) _pop_promise, poppers[i], NULL);
553   }
554
555   GST_INFO ("all set up, waiting.");
556   g_usleep (100000);
557   GST_INFO ("wait done, cleaning up the test.");
558
559   {
560     struct stress_item *item;
561     int push_size;
562
563     for (i = 0; i < N_QUEUES; i++) {
564       event_queue_stop (pushers[i]);
565       event_queue_stop (poppers[i]);
566       event_queue_stop (waiters[i]);
567       _push_stop_promise (pushers[i]);
568     }
569
570     for (i = 0; i < N_QUEUES; i++) {
571       event_queue_free (pushers[i]);
572       event_queue_free (poppers[i]);
573     }
574
575     push_size = g_async_queue_length (s_q.push_queue);
576
577     /* push through all the promises so all the waits will complete */
578     while ((item = g_async_queue_try_pop (s_q.push_queue))) {
579       if (item == (void *) 1)
580         continue;
581       stress_reply (item);
582     }
583
584     for (i = 0; i < N_QUEUES; i++)
585       event_queue_free (waiters[i]);
586
587     GST_INFO ("pushed %" G_GUINT64_FORMAT ", %d leftover in push queue, "
588         "%d leftover in wait queue", s_q.push_count, push_size,
589         g_async_queue_length (s_q.wait_queue));
590
591     while ((item = g_async_queue_try_pop (s_q.wait_queue))) {
592       if (item == (void *) 1)
593         continue;
594
595       fail_unless (gst_promise_wait (item->promise) == item->result);
596
597       gst_promise_unref (item->promise);
598       g_free (item);
599     }
600   }
601
602   g_async_queue_unref (s_q.push_queue);
603   g_async_queue_unref (s_q.wait_queue);
604 }
605
606 GST_END_TEST;
607
608 static Suite *
609 gst_promise_suite (void)
610 {
611   Suite *s = suite_create ("GstPromise");
612   TCase *tc_chain = tcase_create ("general");
613
614   suite_add_tcase (s, tc_chain);
615   tcase_add_test (tc_chain, test_reply);
616   tcase_add_test (tc_chain, test_reply_data);
617   tcase_add_test (tc_chain, test_reply_immutable);
618   tcase_add_test (tc_chain, test_interrupt);
619   tcase_add_test (tc_chain, test_expire);
620   tcase_add_test (tc_chain, test_change_func);
621   tcase_add_test (tc_chain, test_reply_expire);
622   tcase_add_test (tc_chain, test_reply_discard);
623   tcase_add_test (tc_chain, test_reply_interrupt);
624   tcase_add_test (tc_chain, test_reply_reply);
625   tcase_add_test (tc_chain, test_interrupt_reply);
626   tcase_add_test (tc_chain, test_interrupt_expire);
627   tcase_add_test (tc_chain, test_interrupt_interrupt);
628   tcase_add_test (tc_chain, test_expire_expire);
629   tcase_add_test (tc_chain, test_expire_interrupt);
630   tcase_add_test (tc_chain, test_expire_reply);
631   tcase_add_test (tc_chain, test_stress);
632
633   return s;
634 }
635
636 GST_CHECK_MAIN (gst_promise);