tests/check: add automatic unit test suite for the ipcpipeline elements
[platform/upstream/gstreamer.git] / tests / check / pipelines / ipcpipeline.c
1 /* GStreamer
2  *
3  * tests for the ipcpipelinesrc/ipcpipelinesink elements
4  *
5  * Copyright (C) 2015-2017 YouView TV Ltd
6  *   Author: Vincent Penquerc'h <vincent.penquerch@collabora.co.uk>
7  *   Author: George Kiagiadakis <george.kiagiadakis@collabora.com>
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #define _GNU_SOURCE             /* See feature_test_macros(7) */
26 #include <unistd.h>
27 #include <stdio.h>
28 #include <fcntl.h>
29 #include <sys/wait.h>
30 #include <sys/file.h>
31 #include <sys/types.h>
32 #include <sys/socket.h>
33 #include <gst/check/gstcheck.h>
34 #include <string.h>
35
36 /* This enum contains flags that are used to configure the setup that
37  * test_base() will do internally */
38 typedef enum
39 {
40   /* Features related to the multi-process setup */
41   TEST_FEATURE_SPLIT_SINKS = 0x1,       /* separate audio and video sink processes */
42   TEST_FEATURE_RECOVERY_SLAVE_PROCESS = 0x2,
43   TEST_FEATURE_RECOVERY_MASTER_PROCESS = 0x4,
44
45   TEST_FEATURE_HAS_VIDEO = 0x10,
46   TEST_FEATURE_LIVE = 0x20,     /* sets is-live=true in {audio,video}testsrc */
47   TEST_FEATURE_ASYNC_SINK = 0x40,       /* sets sync=false in fakesink */
48   TEST_FEATURE_ERROR_SINK = 0x80,       /* generates error message in the slave */
49   TEST_FEATURE_LONG_DURATION = 0x100,   /* bigger num-buffers in {audio,video}testsrc */
50   TEST_FEATURE_FILTER_SINK_CAPS = 0x200,        /* plugs capsfilter before fakesink */
51
52   /* Source selection; Use only one of those, do not combine! */
53   TEST_FEATURE_TEST_SOURCE = 0x400,
54   TEST_FEATURE_WAV_SOURCE = 0x800,
55   TEST_FEATURE_MPEGTS_SOURCE = 0x1000 | TEST_FEATURE_HAS_VIDEO,
56   TEST_FEATURE_LIVE_A_SOURCE =
57       TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_LIVE | TEST_FEATURE_ASYNC_SINK,
58   TEST_FEATURE_LIVE_AV_SOURCE =
59       TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_HAS_VIDEO,
60 } TestFeatures;
61
62 /* This is the data structure that each function of the each test receives
63  * in user_data. It contains pointers to stack-allocated, test-specific
64  * structures that contain the test parameters (input data), the runtime
65  * data of the master (source) process (master data) and the runtime data
66  * of the slave (sink) process (slave data) */
67 typedef struct
68 {
69   gpointer id;                  /* input data struct */
70   gpointer md;                  /* master data struct */
71   gpointer sd;                  /* slave data struct */
72
73   TestFeatures features;        /* the features that this test is running with */
74
75   /* whether there is both an audio and a video stream
76    * in this process'es pipeline */
77   gboolean two_streams;
78
79   /* the pipeline of this process; could be either master or slave */
80   GstElement *p;
81
82   /* this callback will be called in the master process when
83    * the master gets STATE_CHANGED with the new state being state_target */
84   void (*state_changed_cb) (gpointer);
85   GstState state_target;
86
87 } test_data;
88
89 /* All pipelines do not start buffers at exactly zero, so we consider
90    timestamps within a small tolerance to be zero */
91 #define CLOSE_ENOUGH_TO_ZERO (GST_SECOND / 5)
92
93 /* milliseconds */
94 #define STEP_AT  100
95 #define PAUSE_AT 500
96 #define SEEK_AT  700
97 #define QUERY_AT 600
98 #define MESSAGE_AT 600
99 #define CRASH_AT 600
100 #define STOP_AT  600
101
102 /* Rough duration of the sample files we use */
103 #define MPEGTS_SAMPLE_ROUGH_DURATION (GST_SECOND * 64 / 10)
104 #define WAV_SAMPLE_ROUGH_DURATION (GST_SECOND * 65 / 10)
105
106 enum
107 {
108   MSG_ACK = 0,
109   MSG_START = 1
110 };
111
112 static GMainLoop *loop;
113 static gboolean child_dead;
114 static int pipesfa[2], pipesba[2], pipesfv[2], pipesbv[2];
115 static int ctlsock[2];
116 static int recovery_pid = 0;
117 static int check_fd = -1;
118 static GList *weak_refs = NULL;
119
120 /* lock helpers */
121
122 #define FAIL_IF(x) do { lock_check (); fail_if(x); unlock_check (); } while(0)
123 #define FAIL_UNLESS(x) do { lock_check (); fail_unless(x); unlock_check (); } while(0)
124 #define FAIL_UNLESS_EQUALS_INT(x,y) do { lock_check (); fail_unless_equals_int(x,y); unlock_check (); } while(0)
125 #define FAIL() do { lock_check (); fail(); unlock_check (); } while(0)
126
127 static void
128 lock_check (void)
129 {
130   flock (check_fd, LOCK_EX);
131 }
132
133 static void
134 unlock_check (void)
135 {
136   flock (check_fd, LOCK_UN);
137 }
138
139 static void
140 setup_lock (void)
141 {
142   gchar *name = NULL;
143   check_fd = g_file_open_tmp (NULL, &name, NULL);
144   unlink (name);
145   g_free (name);
146 }
147
148 /* tracking for ipcpipeline elements; this is used mainly to detect leaks,
149  * but also to provide a method for calling "disconnect" on all of them
150  * in the tests that require it */
151
152 static void
153 remove_weak_ref (GstElement * element)
154 {
155   weak_refs = g_list_remove (weak_refs, element);
156 }
157
158 static void
159 add_weak_ref (GstElement * element)
160 {
161   weak_refs = g_list_append (weak_refs, element);
162   g_object_weak_ref (G_OBJECT (element), (GWeakNotify) remove_weak_ref,
163       element);
164 }
165
166 static void
167 disconnect_ipcpipeline_elements (void)
168 {
169   GList *l;
170
171   for (l = weak_refs; l; l = l->next) {
172     g_signal_emit_by_name (G_OBJECT (l->data), "disconnect", NULL);
173   }
174 }
175
176 /* helper functions */
177
178 static void
179 cleanup_bus (GstElement * pipeline)
180 {
181   gst_bus_remove_watch (GST_ELEMENT_BUS (pipeline));
182   gst_bus_set_flushing (GST_ELEMENT_BUS (pipeline), TRUE);
183 }
184
185 static void
186 setup_log (const char *logfile, int append)
187 {
188   FILE *f;
189
190   f = fopen (logfile, append ? "a+" : "w");
191   gst_debug_add_log_function (gst_debug_log_default, f, NULL);
192 }
193
194 static GstElement *
195 create_pipeline (const char *type)
196 {
197   GstElement *pipeline;
198
199   pipeline = gst_element_factory_make (type, NULL);
200   FAIL_UNLESS (pipeline);
201
202   return pipeline;
203 }
204
205 static GQuark
206 to_be_removed_quark (void)
207 {
208   static GQuark q = 0;
209   if (!q)
210     q = g_quark_from_static_string ("to_be_removed");
211   return q;
212 }
213
214 static gboolean
215 are_caps_audio (const GstCaps * caps)
216 {
217   GstStructure *structure;
218   const char *name;
219
220   structure = gst_caps_get_structure (caps, 0);
221   name = gst_structure_get_name (structure);
222   return g_str_has_prefix (name, "audio/");
223 }
224
225 static gboolean
226 are_caps_video (const GstCaps * caps)
227 {
228   GstStructure *structure;
229   const char *name;
230
231   structure = gst_caps_get_structure (caps, 0);
232   name = gst_structure_get_name (structure);
233   return (g_str_has_prefix (name, "video/")
234       && strcmp (name, "video/x-dvd-subpicture"));
235 }
236
237 static int
238 caps2idx (GstCaps * caps, gboolean two_streams)
239 {
240   int idx;
241
242   if (!two_streams)
243     return 0;
244
245   if (are_caps_audio (caps)) {
246     idx = 0;
247   } else if (are_caps_video (caps)) {
248     idx = 1;
249   } else {
250     FAIL_IF (1);
251     idx = 0;
252   }
253   return idx;
254 }
255
256 static int
257 pad2idx (GstPad * pad, gboolean two_streams)
258 {
259   GstCaps *caps;
260   int idx;
261
262   if (!two_streams)
263     return 0;
264
265   caps = gst_pad_get_current_caps (pad);
266   if (!caps)
267     caps = gst_pad_get_pad_template_caps (pad);
268   FAIL_UNLESS (caps);
269
270   idx = caps2idx (caps, two_streams);
271
272   gst_caps_unref (caps);
273   return idx;
274 }
275
276 static gboolean
277 stop_pipeline (gpointer user_data)
278 {
279   GstElement *pipeline = user_data;
280   GstStateChangeReturn ret;
281
282   ret = gst_element_set_state (pipeline, GST_STATE_NULL);
283   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
284   gst_object_unref (pipeline);
285   g_main_loop_quit (loop);
286   return FALSE;
287 }
288
289 /* the master process'es async GstBus callback */
290 static gboolean
291 master_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
292 {
293   test_data *td = user_data;
294
295   switch (GST_MESSAGE_TYPE (message)) {
296     case GST_MESSAGE_ERROR:{
297       GError *err;
298       gchar *dbg;
299
300       /* elements we are removing might error out as they are taken out
301          of the pipeline, and fail to push. We don't care about those. */
302       if (g_object_get_qdata (G_OBJECT (GST_MESSAGE_SRC (message)),
303               to_be_removed_quark ()))
304         break;
305
306       gst_message_parse_error (message, &err, &dbg);
307       g_printerr ("ERROR: %s\n", err->message);
308       if (dbg != NULL)
309         g_printerr ("ERROR debug information: %s\n", dbg);
310       g_error_free (err);
311       g_free (dbg);
312       g_assert_not_reached ();
313       break;
314     }
315     case GST_MESSAGE_WARNING:{
316       GError *err;
317       gchar *dbg;
318
319       gst_message_parse_warning (message, &err, &dbg);
320       g_printerr ("WARNING: %s\n", err->message);
321       if (dbg != NULL)
322         g_printerr ("WARNING debug information: %s\n", dbg);
323       g_error_free (err);
324       g_free (dbg);
325       g_assert_not_reached ();
326       break;
327     }
328     case GST_MESSAGE_EOS:
329       g_main_loop_quit (loop);
330       break;
331     case GST_MESSAGE_STATE_CHANGED:
332       if (GST_MESSAGE_SRC (message) == GST_OBJECT_CAST (td->p)
333           && td->state_changed_cb) {
334         GstState state;
335         gst_message_parse_state_changed (message, NULL, &state, NULL);
336         if (state == td->state_target)
337           td->state_changed_cb (td);
338       }
339       break;
340     default:
341       break;
342   }
343   return TRUE;
344 }
345
346 /* source construction functions */
347
348 static GstElement *
349 create_wavparse_source_loc (const char *loc, int fdina, int fdouta)
350 {
351   GstElement *sbin, *pipeline, *filesrc, *ipcpipelinesink;
352   GError *e = NULL;
353
354   pipeline = create_pipeline ("pipeline");
355   sbin =
356       gst_parse_bin_from_description ("pushfilesrc name=filesrc ! wavparse",
357       TRUE, &e);
358   FAIL_IF (e || !sbin);
359   gst_element_set_name (sbin, "source");
360   filesrc = gst_bin_get_by_name (GST_BIN (sbin), "filesrc");
361   FAIL_UNLESS (filesrc);
362   g_object_set (filesrc, "location", loc, NULL);
363   gst_object_unref (filesrc);
364   ipcpipelinesink =
365       gst_element_factory_make ("ipcpipelinesink", "ipcpipelinesink");
366   add_weak_ref (ipcpipelinesink);
367   g_object_set (ipcpipelinesink, "fdin", fdina, "fdout", fdouta, NULL);
368   gst_bin_add_many (GST_BIN (pipeline), sbin, ipcpipelinesink, NULL);
369   FAIL_UNLESS (gst_element_link_many (sbin, ipcpipelinesink, NULL));
370
371   return pipeline;
372 }
373
374 static void
375 on_pad_added (GstElement * element, GstPad * pad, gpointer data)
376 {
377   GstCaps *caps;
378   GstElement *next;
379   GstBin *pipeline = data;
380   GstPad *sink_pad;
381
382   caps = gst_pad_get_current_caps (pad);
383   if (!caps)
384     caps = gst_pad_get_pad_template_caps (pad);
385
386   if (are_caps_video (caps)) {
387     next = gst_bin_get_by_name (GST_BIN (pipeline), "vqueue");
388   } else if (are_caps_audio (caps)) {
389     next = gst_bin_get_by_name (GST_BIN (pipeline), "aqueue");
390   } else {
391     gst_caps_unref (caps);
392     return;
393   }
394   gst_caps_unref (caps);
395
396   FAIL_UNLESS (next);
397   sink_pad = gst_element_get_static_pad (next, "sink");
398   FAIL_UNLESS (sink_pad);
399   FAIL_UNLESS (gst_pad_link (pad, sink_pad) == GST_PAD_LINK_OK);
400   gst_object_unref (sink_pad);
401
402   gst_object_unref (next);
403 }
404
405 static GstElement *
406 create_mpegts_source_loc (const char *loc, int fdina, int fdouta, int fdinv,
407     int fdoutv)
408 {
409   GstElement *pipeline, *filesrc, *tsdemux, *aqueue, *vqueue, *aipcpipelinesink,
410       *vipcpipelinesink;
411
412   pipeline = create_pipeline ("pipeline");
413   filesrc = gst_element_factory_make ("filesrc", NULL);
414   g_object_set (filesrc, "location", loc, NULL);
415   tsdemux = gst_element_factory_make ("tsdemux", NULL);
416   g_signal_connect (tsdemux, "pad-added", G_CALLBACK (on_pad_added), pipeline);
417   aqueue = gst_element_factory_make ("queue", "aqueue");
418   aipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL);
419   add_weak_ref (aipcpipelinesink);
420   g_object_set (aipcpipelinesink, "fdin", fdina, "fdout", fdouta, NULL);
421   vqueue = gst_element_factory_make ("queue", "vqueue");
422   vipcpipelinesink = gst_element_factory_make ("ipcpipelinesink", NULL);
423   add_weak_ref (vipcpipelinesink);
424   g_object_set (vipcpipelinesink, "fdin", fdinv, "fdout", fdoutv, NULL);
425   gst_bin_add_many (GST_BIN (pipeline), filesrc, tsdemux, aqueue,
426       aipcpipelinesink, vqueue, vipcpipelinesink, NULL);
427   FAIL_UNLESS (gst_element_link_many (filesrc, tsdemux, NULL));
428   FAIL_UNLESS (gst_element_link_many (aqueue, aipcpipelinesink, NULL));
429   FAIL_UNLESS (gst_element_link_many (vqueue, vipcpipelinesink, NULL));
430
431   return pipeline;
432 }
433
434 static GstElement *
435 create_test_source (gboolean live, int fdina, int fdouta, int fdinv, int fdoutv,
436     gboolean audio, gboolean video, gboolean Long)
437 {
438   GstElement *pipeline, *audiotestsrc, *aipcpipelinesink;
439   GstElement *videotestsrc, *vipcpipelinesink;
440   int L = Long ? 2 : 1;
441
442   pipeline = create_pipeline ("pipeline");
443
444   if (audio) {
445     audiotestsrc = gst_element_factory_make ("audiotestsrc", "audiotestsrc");
446     g_object_set (audiotestsrc, "is-live", live, "num-buffers",
447         live ? 270 * L : 600, NULL);
448     aipcpipelinesink = gst_element_factory_make ("ipcpipelinesink",
449         "aipcpipelinesink");
450     add_weak_ref (aipcpipelinesink);
451     g_object_set (aipcpipelinesink, "fdin", fdina, "fdout", fdouta, NULL);
452     gst_bin_add_many (GST_BIN (pipeline), audiotestsrc, aipcpipelinesink, NULL);
453     FAIL_UNLESS (gst_element_link_many (audiotestsrc, aipcpipelinesink, NULL));
454   }
455
456   if (video) {
457     videotestsrc = gst_element_factory_make ("videotestsrc", "videotestsrc");
458     g_object_set (videotestsrc, "is-live", live, "num-buffers",
459         live ? 190 * L : 600, NULL);
460     vipcpipelinesink =
461         gst_element_factory_make ("ipcpipelinesink", "vipcpipelinesink");
462     add_weak_ref (vipcpipelinesink);
463     g_object_set (vipcpipelinesink, "fdin", fdinv, "fdout", fdoutv, NULL);
464     gst_bin_add_many (GST_BIN (pipeline), videotestsrc, vipcpipelinesink, NULL);
465     FAIL_UNLESS (gst_element_link_many (videotestsrc, vipcpipelinesink, NULL));
466   }
467
468   return pipeline;
469 }
470
471 static GstElement *
472 create_source (TestFeatures features, int fdina, int fdouta, int fdinv,
473     int fdoutv, test_data * td)
474 {
475   GstElement *pipeline = NULL;
476   gboolean live = ! !(features & TEST_FEATURE_LIVE);
477   gboolean longdur = ! !(features & TEST_FEATURE_LONG_DURATION);
478   gboolean has_video = ! !(features & TEST_FEATURE_HAS_VIDEO);
479
480   if (features & TEST_FEATURE_TEST_SOURCE) {
481
482     pipeline = create_test_source (live, fdina, fdouta, fdinv, fdoutv, TRUE,
483         has_video, longdur);
484   } else if (features & TEST_FEATURE_WAV_SOURCE) {
485     pipeline = create_wavparse_source_loc ("../../tests/files/sine.wav", fdina,
486         fdouta);
487   } else if (features & TEST_FEATURE_MPEGTS_SOURCE) {
488     pipeline = create_mpegts_source_loc ("../../tests/files/test.ts", fdina,
489         fdouta, fdinv, fdoutv);
490   } else {
491     g_assert_not_reached ();
492   }
493
494   td->two_streams = has_video;
495   td->p = pipeline;
496
497   if (pipeline)
498     gst_bus_add_watch (GST_ELEMENT_BUS (pipeline), master_bus_msg, td);
499
500   return pipeline;
501 }
502
503 /* sink construction */
504
505 static GstElement *
506 create_sink (TestFeatures features, GstElement ** slave_pipeline,
507     int fdin, int fdout, const char *filter_caps)
508 {
509   GstElement *ipcpipelinesrc, *fakesink, *identity, *capsfilter, *endpoint;
510   GstCaps *caps;
511
512   if (!*slave_pipeline)
513     *slave_pipeline = create_pipeline ("ipcslavepipeline");
514   else
515     gst_object_ref (*slave_pipeline);
516   ipcpipelinesrc = gst_element_factory_make ("ipcpipelinesrc", NULL);
517   add_weak_ref (ipcpipelinesrc);
518   g_object_set (ipcpipelinesrc, "fdin", fdin, "fdout", fdout, NULL);
519   fakesink = gst_element_factory_make ("fakesink", NULL);
520   g_object_set (fakesink, "sync", !(features & TEST_FEATURE_ASYNC_SINK), NULL);
521   gst_bin_add_many (GST_BIN (*slave_pipeline), ipcpipelinesrc, fakesink, NULL);
522   endpoint = ipcpipelinesrc;
523
524   if (features & TEST_FEATURE_ERROR_SINK &&
525       !g_strcmp0 (filter_caps, "audio/x-raw")) {
526     identity = gst_element_factory_make ("identity", "error-element");
527     g_object_set (identity, "error-after", 5, NULL);
528     gst_bin_add (GST_BIN (*slave_pipeline), identity);
529     FAIL_UNLESS (gst_element_link_many (endpoint, identity, NULL));
530     endpoint = identity;
531   }
532
533   if ((features & TEST_FEATURE_FILTER_SINK_CAPS) && filter_caps) {
534     capsfilter = gst_element_factory_make ("capsfilter", NULL);
535     caps = gst_caps_from_string (filter_caps);
536     FAIL_UNLESS (caps);
537     g_object_set (capsfilter, "caps", caps, NULL);
538     gst_caps_unref (caps);
539     gst_bin_add (GST_BIN (*slave_pipeline), capsfilter);
540     FAIL_UNLESS (gst_element_link_many (endpoint, capsfilter, NULL));
541     endpoint = capsfilter;
542   }
543   FAIL_UNLESS (gst_element_link_many (endpoint, fakesink, NULL));
544
545   return *slave_pipeline;
546 }
547
548 static void
549 ensure_sink_setup (GstElement * sink, void (*setup_sink) (GstElement *, void *),
550     gpointer user_data)
551 {
552   static GQuark setup_done = 0;
553   test_data *td = user_data;
554
555   if (!setup_done)
556     setup_done = g_quark_from_static_string ("setup_done");
557
558   if (sink)
559     td->p = sink;
560
561   if (sink && setup_sink && !g_object_get_qdata (G_OBJECT (sink), setup_done)) {
562     g_object_set_qdata (G_OBJECT (sink), setup_done, GINT_TO_POINTER (1));
563     setup_sink (sink, user_data);
564   }
565 }
566
567 /* GstCheck multi-process setup helpers */
568
569 static void
570 on_child_exit (int signal)
571 {
572   int status = 0;
573   if (waitpid (-1, &status, 0) > 0 && status) {
574     FAIL ();
575     exit (status);
576   } else {
577     child_dead = TRUE;
578   }
579 }
580
581 static void
582 die_on_child_death (void)
583 {
584   struct sigaction sa;
585
586   memset (&sa, 0, sizeof (sa));
587   sa.sa_handler = on_child_exit;
588   sigaction (SIGCHLD, &sa, NULL);
589 }
590
591 static void
592 wait_for_recovery (void)
593 {
594   int value;
595
596   FAIL_UNLESS (ctlsock[1]);
597   FAIL_UNLESS (read (ctlsock[1], &value, sizeof (int)) == sizeof (int));
598   FAIL_UNLESS (value == MSG_START);
599 }
600
601 static void
602 ack_recovery (void)
603 {
604   int value = MSG_ACK;
605   FAIL_UNLESS (ctlsock[1]);
606   FAIL_UNLESS (write (ctlsock[1], &value, sizeof (int)) == sizeof (int));
607 }
608
609 static void
610 recreate_crashed_slave_process (void)
611 {
612   int value = MSG_START;
613   /* We don't recreate, because there seems to be some subtle issues
614      with forking after gst has started running. So we create a new
615      recovery process at start, and wake it up after the current
616      slave dies, so it can take its place. It's a bit hacky, but it
617      works. The spare process waits for SIGUSR2 to setup a replacement
618      pipeline and connect to the master. */
619   FAIL_UNLESS (recovery_pid);
620   FAIL_UNLESS (ctlsock[0]);
621   FAIL_UNLESS (write (ctlsock[0], &value, sizeof (int)) == sizeof (int));
622   FAIL_UNLESS (read (ctlsock[0], &value, sizeof (int)) == sizeof (int));
623   FAIL_UNLESS (value == MSG_ACK);
624 }
625
626 static gboolean
627 crash (gpointer user_data)
628 {
629   _exit (0);
630 }
631
632 static gboolean
633 unwind (gpointer user_data)
634 {
635   g_main_loop_quit (loop);
636   return FALSE;
637 }
638
639 static void
640 on_unwind (int signal)
641 {
642   g_idle_add (unwind, NULL);
643 }
644
645 static void
646 listen_for_unwind (void)
647 {
648   struct sigaction sa;
649
650   memset (&sa, 0, sizeof (sa));
651   sa.sa_handler = on_unwind;
652   sigaction (SIGUSR1, &sa, NULL);
653 }
654
655 static void
656 stop_listening_for_unwind (void)
657 {
658   struct sigaction sa;
659
660   memset (&sa, 0, sizeof (sa));
661   sa.sa_handler = SIG_DFL;
662   sigaction (SIGUSR1, &sa, NULL);
663 }
664
665 #define TEST_BASE(...) test_base(__FUNCTION__,##__VA_ARGS__)
666
667 /*
668  * This is the main function driving the tests. All tests configure it
669  * by way of all the function pointers it takes as arguments, which have
670  * self-explanatory names.
671  * Most tests are run over a number of different pipelines with the same
672  * configuration (eg, a wavparse based pipeline, a live pipeline with
673  * test audio/video, etc). Those pipelines that have more than one sink
674  * (eg, MPEG-TS source demuxing audio and video) have a version with a
675  * single slave pipeline and process, and a version with the audio and
676  * video sinks in two different processes, each with its slave pipeline.
677  * The master and slave crash tests are also run via this function, and
678  * have specific code (grep for recovery).
679  * There is a fair amount of hairy stuff to do with letting the main
680  * check process when a subprocess has failed. Best not to look at it
681  * and let it do its thing.
682  * To add new tests, duplicate a set of tests, eg the *_end_of_stream
683  * ones, and s/_end_of_stream/new_test_name/g. Then do the same for
684  * the functions they pass as parameters to test_base. Typically, the
685  * source creation sets a message hook to catch things like async-done
686  * messages. Sink creation typically adds a probe to check that events,
687  * buffers, etc, come through as expected. The two success functions
688  * check all went well for the source and sink. Note that since all of
689  * these functions take the same user data structure, and the process
690  * will fork, writing something from one process will not be reflected
691  * in the other, so there is usually a subset of data relevant to the
692  * source, and another to the sink. But some have data relevant to both,
693  * it depends on the test and what you are doing.
694  * New tests do not have to use this framework, it just avoids spending
695  * more time and effort on multi process handling.
696  */
697 static void
698 test_base (const char *name, TestFeatures features,
699     void (*run_source) (GstElement *, void *),
700     void (*setup_sink) (GstElement *, void *),
701     void (*check_success_source) (void *),
702     void (*check_success_sink) (void *),
703     gpointer input_data, gpointer master_data, gpointer slave_data)
704 {
705   GstElement *source = NULL, *asink = NULL, *vsink = NULL;
706   GstElement *slave_pipeline = NULL;
707   GstStateChangeReturn ret;
708   gboolean c_src, c_sink;
709   pid_t pid = 0;
710   unsigned char x;
711   int master_recovery_pid_comm[2] = { -1, -1 };
712   test_data td = { input_data, master_data, slave_data, features, FALSE, NULL,
713     NULL, GST_STATE_NULL
714   };
715
716   g_print ("Testing: %s\n", name);
717
718   weak_refs = NULL;
719
720   FAIL_IF (pipe2 (pipesfa, O_NONBLOCK) < 0);
721   FAIL_IF (pipe2 (pipesba, O_NONBLOCK) < 0);
722   FAIL_IF (pipe2 (pipesfv, O_NONBLOCK) < 0);
723   FAIL_IF (pipe2 (pipesbv, O_NONBLOCK) < 0);
724   FAIL_IF (socketpair (PF_UNIX, SOCK_STREAM, 0, ctlsock) < 0);
725
726   FAIL_IF (pipesfa[0] < 0);
727   FAIL_IF (pipesfa[1] < 0);
728   FAIL_IF (pipesba[0] < 0);
729   FAIL_IF (pipesba[1] < 0);
730   FAIL_IF (pipesfv[0] < 0);
731   FAIL_IF (pipesfv[1] < 0);
732   FAIL_IF (pipesbv[0] < 0);
733   FAIL_IF (pipesbv[1] < 0);
734
735   gst_debug_remove_log_function (gst_debug_log_default);
736
737   listen_for_unwind ();
738   child_dead = FALSE;
739
740   if (features & TEST_FEATURE_RECOVERY_MASTER_PROCESS) {
741     /* the other master will let us know its child's PID so we can unwind
742        it when we're finished */
743     FAIL_IF (pipe2 (master_recovery_pid_comm, O_NONBLOCK) < 0);
744
745     recovery_pid = fork ();
746     if (recovery_pid > 0) {
747       /* we're the main process that libcheck waits for */
748       die_on_child_death ();
749       while (!child_dead)
750         g_usleep (1000);
751       /* leave some time for the slave to timeout (1 second), record error, etc */
752       g_usleep (1500 * 1000);
753
754       /* Discard anything that was sent to the previous process when it died */
755       while (read (pipesba[0], &x, 1) == 1);
756
757       FAIL_UNLESS (read (master_recovery_pid_comm[0], &pid,
758               sizeof (pid)) == sizeof (pid));
759
760       setup_log ("gstsrc.log", TRUE);
761       source = create_source (features, pipesba[0], pipesfa[1], pipesbv[0],
762           pipesfv[1], &td);
763       FAIL_UNLESS (source);
764       if (run_source)
765         run_source (source, &td);
766       goto setup_done;
767     }
768   }
769
770   if (features & TEST_FEATURE_RECOVERY_SLAVE_PROCESS) {
771     recovery_pid = fork ();
772     if (!recovery_pid) {
773       wait_for_recovery ();
774
775       /* Discard anything that was sent to the previous process when it died */
776       while (read (pipesfa[0], &x, 1) == 1);
777
778       setup_log ("gstasink.log", TRUE);
779       asink = create_sink (features, &slave_pipeline, pipesfa[0], pipesba[1],
780           "audio/x-raw");
781       FAIL_UNLESS (asink);
782       ensure_sink_setup (asink, setup_sink, &td);
783       ack_recovery ();
784       goto setup_done;
785     }
786   }
787
788   pid = fork ();
789   FAIL_IF (pid < 0);
790   if (pid) {
791     if (features & TEST_FEATURE_RECOVERY_MASTER_PROCESS) {
792       FAIL_UNLESS (write (master_recovery_pid_comm[1], &pid,
793               sizeof (pid)) == sizeof (pid));
794     }
795     die_on_child_death ();
796     if (features & TEST_FEATURE_SPLIT_SINKS) {
797       pid = fork ();
798       FAIL_IF (pid < 0);
799       if (pid) {
800         die_on_child_death ();
801       }
802       c_src = ! !pid;
803       c_sink = !pid;
804     } else {
805       c_src = TRUE;
806       c_sink = FALSE;
807     }
808     if (c_src) {
809       setup_log ("gstsrc.log", FALSE);
810       source = create_source (features, pipesba[0], pipesfa[1], pipesbv[0],
811           pipesfv[1], &td);
812       FAIL_UNLESS (source);
813       run_source (source, &td);
814     }
815     if (c_sink) {
816       setup_log ("gstasink.log", FALSE);
817       asink = create_sink (features, &slave_pipeline, pipesfa[0], pipesba[1],
818           "audio/x-raw");
819       FAIL_UNLESS (asink);
820     }
821   } else {
822     td.two_streams = (features & TEST_FEATURE_HAS_VIDEO) &&
823         !(features & TEST_FEATURE_SPLIT_SINKS);
824
825     if (features & TEST_FEATURE_HAS_VIDEO) {
826       setup_log ("gstvsink.log", FALSE);
827       vsink = create_sink (features, &slave_pipeline, pipesfv[0], pipesbv[1],
828           "video/x-raw");
829       FAIL_UNLESS (vsink);
830     }
831     if (!(features & TEST_FEATURE_SPLIT_SINKS)) {
832       setup_log ("gstasink.log", FALSE);
833       asink = create_sink (features, &slave_pipeline, pipesfa[0], pipesba[1],
834           "audio/x-raw");
835       FAIL_UNLESS (asink);
836     }
837   }
838
839 setup_done:
840   ensure_sink_setup (asink, setup_sink, &td);
841   ensure_sink_setup (vsink, setup_sink, &td);
842
843   loop = g_main_loop_new (NULL, FALSE);
844   g_main_loop_run (loop);
845
846   /* tell the child process to unwind too */
847   stop_listening_for_unwind ();
848
849   if (source) {
850     ret = gst_element_set_state (source, GST_STATE_NULL);
851     FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS
852         || ret == GST_STATE_CHANGE_ASYNC);
853   }
854
855   if (pid)
856     kill (pid, SIGUSR1);
857
858   g_main_loop_unref (loop);
859
860   if (source) {
861     cleanup_bus (source);
862     if (check_success_source)
863       check_success_source (&td);
864   } else {
865     if (asink)
866       cleanup_bus (asink);
867     if (vsink)
868       cleanup_bus (vsink);
869     if (check_success_sink)
870       check_success_sink (&td);
871   }
872
873   disconnect_ipcpipeline_elements ();
874
875   close (pipesfa[0]);
876   close (pipesfa[1]);
877   close (pipesba[0]);
878   close (pipesba[1]);
879   close (pipesfv[0]);
880   close (pipesfv[1]);
881   close (pipesbv[0]);
882   close (pipesbv[1]);
883
884   /* If we have a child, we must now wait for it to be finished.
885      We can't just waitpid, because this child might be still doing
886      its shutdown, and might assert, and the die_on_child_death
887      function will exit with the right exit code if so. So we wait
888      for the child_dead boolean to be set, which die_on_child_death
889      sets if the child dies normally. */
890   if (pid) {
891     while (!child_dead)
892       g_usleep (1000);
893   }
894
895   if (source) {
896     FAIL_UNLESS_EQUALS_INT (GST_OBJECT_REFCOUNT_VALUE (source), 1);
897     gst_object_unref (source);
898   }
899   /* asink and vsink may be the same object, so refcount is not sure to be 1 */
900   if (asink)
901     gst_object_unref (asink);
902   if (vsink)
903     gst_object_unref (vsink);
904
905   /* cleanup tasks a bit earlier to make sure all weak refs are gone */
906   gst_task_cleanup_all ();
907
908   /* all ipcpipeline elements we created should now be destroyed */
909   if (weak_refs) {
910 #if 1
911     /* to make it easier to see what leaks */
912     GList *l;
913     for (l = weak_refs; l; l = l->next) {
914       g_print ("%s has %u refs\n", GST_ELEMENT_NAME (l->data),
915           GST_OBJECT_REFCOUNT_VALUE (l->data));
916     }
917 #endif
918     FAIL_UNLESS (0);
919   }
920 }
921
922 /**** play-pause test ****/
923
924 typedef struct
925 {
926   gboolean got_state_changed_to_playing[2];
927   gboolean got_state_changed_to_paused;
928 } play_pause_master_data;
929
930 typedef struct
931 {
932   gboolean got_caps[2];
933   gboolean got_segment[2];
934   gboolean got_buffer[2];
935 } play_pause_slave_data;
936
937 static gboolean
938 idlenull (gpointer user_data)
939 {
940   test_data *td = user_data;
941   GstStateChangeReturn ret;
942
943   ret = gst_element_set_state (td->p, GST_STATE_NULL);
944   FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS);
945   gst_object_unref (td->p);
946   g_main_loop_quit (loop);
947   return G_SOURCE_REMOVE;
948 }
949
950 static gboolean idleplay (gpointer user_data);
951 static gboolean
952 idlepause (gpointer user_data)
953 {
954   test_data *td = user_data;
955   play_pause_master_data *d = td->md;
956   GstStateChangeReturn ret;
957
958   ret = gst_element_set_state (td->p, GST_STATE_PAUSED);
959   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
960   if (ret == GST_STATE_CHANGE_SUCCESS || ret == GST_STATE_CHANGE_NO_PREROLL) {
961     /* if the state change is not async, we won't get an aync-done, but
962        this is expected, so set the flag here */
963     d->got_state_changed_to_paused = TRUE;
964     td->state_target = GST_STATE_PLAYING;
965     g_timeout_add (STEP_AT, idleplay, user_data);
966     return G_SOURCE_REMOVE;
967   }
968   gst_object_unref (td->p);
969   return G_SOURCE_REMOVE;
970 }
971
972 static gboolean
973 idleplay (gpointer user_data)
974 {
975   test_data *td = user_data;
976   play_pause_master_data *d = td->md;
977   GstStateChangeReturn ret;
978
979   ret = gst_element_set_state (td->p, GST_STATE_PLAYING);
980   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
981   if (ret == GST_STATE_CHANGE_SUCCESS || ret == GST_STATE_CHANGE_NO_PREROLL) {
982     /* if the state change is not async, we won't get an aync-done, but
983        this is expected, so set the flag here */
984     d->got_state_changed_to_playing[1] = TRUE;
985     td->state_target = GST_STATE_NULL;
986     g_timeout_add (STEP_AT, idlenull, user_data);
987     return G_SOURCE_REMOVE;
988   }
989   gst_object_unref (td->p);
990   return G_SOURCE_REMOVE;
991 }
992
993 static void
994 play_pause_on_state_changed (gpointer user_data)
995 {
996   test_data *td = user_data;
997   play_pause_master_data *d = td->md;
998   GstStateChangeReturn ret;
999
1000   if (d->got_state_changed_to_paused) {
1001     d->got_state_changed_to_playing[1] = TRUE;
1002     td->state_target = GST_STATE_NULL;
1003     ret = gst_element_set_state (td->p, GST_STATE_NULL);
1004     FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS);
1005     g_main_loop_quit (loop);
1006   } else if (d->got_state_changed_to_playing[0]) {
1007     d->got_state_changed_to_paused = TRUE;
1008     td->state_target = GST_STATE_PLAYING;
1009     gst_object_ref (td->p);
1010     g_timeout_add (STEP_AT, (GSourceFunc) idleplay, td);
1011   } else {
1012     d->got_state_changed_to_playing[0] = TRUE;
1013     td->state_target = GST_STATE_PAUSED;
1014     gst_object_ref (td->p);
1015     g_timeout_add (STEP_AT, (GSourceFunc) idlepause, td);
1016   }
1017 }
1018
1019 static void
1020 play_pause_source (GstElement * source, void *user_data)
1021 {
1022   test_data *td = user_data;
1023   GstStateChangeReturn ret;
1024
1025   td->state_target = GST_STATE_PLAYING;
1026   td->state_changed_cb = play_pause_on_state_changed;
1027   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1028   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1029 }
1030
1031 static GstPadProbeReturn
1032 play_pause_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1033 {
1034   test_data *td = user_data;
1035   play_pause_slave_data *d = td->sd;
1036   GstCaps *caps;
1037
1038   if (GST_IS_BUFFER (info->data)) {
1039     d->got_buffer[pad2idx (pad, td->two_streams)] = TRUE;
1040   } else if (GST_IS_EVENT (info->data)) {
1041     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
1042       gst_event_parse_caps (info->data, &caps);
1043       d->got_caps[caps2idx (caps, td->two_streams)] = TRUE;
1044     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
1045       d->got_segment[pad2idx (pad, td->two_streams)] = TRUE;
1046     }
1047   }
1048
1049   return GST_PAD_PROBE_OK;
1050 }
1051
1052 static void
1053 hook_probe_types (const GValue * sinkv, GstPadProbeCallback probe,
1054     unsigned int types, gpointer user_data)
1055 {
1056   GstElement *sink;
1057   GstPad *pad;
1058
1059   sink = g_value_get_object (sinkv);
1060   FAIL_UNLESS (sink);
1061   pad = gst_element_get_static_pad (sink, "sink");
1062   FAIL_UNLESS (pad);
1063   gst_pad_add_probe (pad, types, probe, user_data, NULL);
1064   gst_object_unref (pad);
1065 }
1066
1067 static void
1068 hook_probe (const GValue * sinkv, GstPadProbeCallback probe, gpointer user_data)
1069 {
1070   hook_probe_types (sinkv, probe,
1071       GST_PAD_PROBE_TYPE_DATA_DOWNSTREAM | GST_PAD_PROBE_TYPE_EVENT_FLUSH |
1072       GST_PAD_PROBE_TYPE_QUERY_DOWNSTREAM, user_data);
1073 }
1074
1075 static void
1076 hook_play_pause_probe (const GValue * v, gpointer user_data)
1077 {
1078   hook_probe (v, play_pause_probe, user_data);
1079 }
1080
1081 static void
1082 setup_sink_play_pause (GstElement * sink, void *user_data)
1083 {
1084   GstIterator *it;
1085
1086   it = gst_bin_iterate_sinks (GST_BIN (sink));
1087   while (gst_iterator_foreach (it, hook_play_pause_probe, user_data))
1088     gst_iterator_resync (it);
1089   gst_iterator_free (it);
1090 }
1091
1092 static void
1093 check_success_source_play_pause (void *user_data)
1094 {
1095   test_data *td = user_data;
1096   play_pause_master_data *d = td->md;
1097
1098   FAIL_UNLESS (d->got_state_changed_to_playing[0]);
1099   FAIL_UNLESS (d->got_state_changed_to_playing[1]);
1100   FAIL_UNLESS (d->got_state_changed_to_paused);
1101 }
1102
1103 static void
1104 check_success_sink_play_pause (void *user_data)
1105 {
1106   test_data *td = user_data;
1107   play_pause_slave_data *d = td->sd;
1108   int idx;
1109
1110   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
1111     FAIL_UNLESS (d->got_caps[idx]);
1112     FAIL_UNLESS (d->got_segment[idx]);
1113     FAIL_UNLESS (d->got_buffer[idx]);
1114   }
1115 }
1116
1117 GST_START_TEST (test_empty_play_pause)
1118 {
1119   play_pause_master_data md = { 0 };
1120   play_pause_slave_data sd = { 0 };
1121
1122   TEST_BASE (TEST_FEATURE_TEST_SOURCE, play_pause_source, setup_sink_play_pause,
1123       check_success_source_play_pause, check_success_sink_play_pause, NULL, &md,
1124       &sd);
1125 }
1126
1127 GST_END_TEST;
1128
1129 GST_START_TEST (test_wavparse_play_pause)
1130 {
1131   play_pause_master_data md = { 0 };
1132   play_pause_slave_data sd = { 0 };
1133
1134   TEST_BASE (TEST_FEATURE_WAV_SOURCE, play_pause_source, setup_sink_play_pause,
1135       check_success_source_play_pause, check_success_sink_play_pause, NULL, &md,
1136       &sd);
1137 }
1138
1139 GST_END_TEST;
1140
1141 GST_START_TEST (test_mpegts_play_pause)
1142 {
1143   play_pause_master_data md = { 0 };
1144   play_pause_slave_data sd = { 0 };
1145
1146   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, play_pause_source,
1147       setup_sink_play_pause, check_success_source_play_pause,
1148       check_success_sink_play_pause, NULL, &md, &sd);
1149 }
1150
1151 GST_END_TEST;
1152
1153 GST_START_TEST (test_mpegts_2_play_pause)
1154 {
1155   play_pause_master_data md = { 0 };
1156   play_pause_slave_data sd = { 0 };
1157
1158   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1159       play_pause_source, setup_sink_play_pause, check_success_source_play_pause,
1160       check_success_sink_play_pause, NULL, &md, &sd);
1161 }
1162
1163 GST_END_TEST;
1164
1165 GST_START_TEST (test_live_a_play_pause)
1166 {
1167   play_pause_master_data md = { 0 };
1168   play_pause_slave_data sd = { 0 };
1169
1170   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, play_pause_source,
1171       setup_sink_play_pause, check_success_source_play_pause,
1172       check_success_sink_play_pause, NULL, &md, &sd);
1173 }
1174
1175 GST_END_TEST;
1176
1177 GST_START_TEST (test_live_av_play_pause)
1178 {
1179   play_pause_master_data md = { 0 };
1180   play_pause_slave_data sd = { 0 };
1181
1182   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, play_pause_source,
1183       setup_sink_play_pause, check_success_source_play_pause,
1184       check_success_sink_play_pause, NULL, &md, &sd);
1185 }
1186
1187 GST_END_TEST;
1188
1189 GST_START_TEST (test_live_av_2_play_pause)
1190 {
1191   play_pause_master_data md = { 0 };
1192   play_pause_slave_data sd = { 0 };
1193
1194   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1195       play_pause_source, setup_sink_play_pause, check_success_source_play_pause,
1196       check_success_sink_play_pause, NULL, &md, &sd);
1197 }
1198
1199 GST_END_TEST;
1200
1201 /**** flushing seek test ****/
1202
1203 typedef struct
1204 {
1205   gboolean segment_seek;
1206   gboolean pause;
1207 } flushing_seek_input_data;
1208
1209 typedef struct
1210 {
1211   gboolean got_state_changed_to_playing;
1212   gboolean got_segment_done;
1213   gboolean seek_sent;
1214 } flushing_seek_master_data;
1215
1216 typedef struct
1217 {
1218   GstClockTime first_ts[2];
1219   gboolean got_caps[2];
1220   gboolean got_buffer_before_seek[2];
1221   gboolean got_buffer_after_seek[2];
1222   gboolean first_buffer_after_seek_has_timestamp_0[2];
1223   gboolean got_segment_after_seek[2];
1224   gboolean got_flush_start[2];
1225   gboolean got_flush_stop[2];
1226 } flushing_seek_slave_data;
1227
1228 static gboolean
1229 send_flushing_seek (gpointer user_data)
1230 {
1231   test_data *td = user_data;
1232   const flushing_seek_input_data *i = td->id;
1233   flushing_seek_master_data *d = td->md;
1234   GstEvent *seek_event;
1235
1236   if (i->segment_seek) {
1237     GST_INFO_OBJECT (td->p, "Sending segment seek");
1238     seek_event =
1239         gst_event_new_seek (1.0, GST_FORMAT_TIME,
1240         GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, 0,
1241         GST_SEEK_TYPE_SET, 1 * GST_SECOND);
1242     FAIL_UNLESS (gst_element_send_event (td->p, seek_event));
1243   } else {
1244     GST_INFO_OBJECT (td->p, "Sending flushing seek");
1245     gst_element_seek_simple (td->p, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH, 0);
1246     g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
1247         gst_object_ref (td->p));
1248   }
1249   d->seek_sent = TRUE;
1250   return G_SOURCE_REMOVE;
1251 }
1252
1253 static gboolean
1254 pause_before_seek (gpointer user_data)
1255 {
1256   test_data *td = user_data;
1257   GstStateChangeReturn ret;
1258
1259   ret = gst_element_set_state (td->p, GST_STATE_PAUSED);
1260   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
1261
1262   return G_SOURCE_REMOVE;
1263 }
1264
1265 static gboolean
1266 flushing_seek_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
1267 {
1268   test_data *td = user_data;
1269   flushing_seek_master_data *d = td->md;
1270
1271   if (GST_IS_PIPELINE (GST_MESSAGE_SRC (message))) {
1272     if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_SEGMENT_DONE) {
1273       d->got_segment_done = TRUE;
1274       g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
1275           gst_object_ref (td->p));
1276     }
1277   }
1278   return master_bus_msg (bus, message, user_data);
1279 }
1280
1281 static void
1282 flushing_seek_on_state_changed (gpointer user_data)
1283 {
1284   test_data *td = user_data;
1285   const flushing_seek_input_data *i = td->id;
1286   flushing_seek_master_data *d = td->md;
1287
1288   if (!d->got_state_changed_to_playing) {
1289     d->got_state_changed_to_playing = TRUE;
1290     if (i->pause)
1291       g_timeout_add (PAUSE_AT, (GSourceFunc) pause_before_seek, td);
1292     g_timeout_add (SEEK_AT, (GSourceFunc) send_flushing_seek, td);
1293   }
1294 }
1295
1296 static void
1297 flushing_seek_source (GstElement * source, gpointer user_data)
1298 {
1299   test_data *td = user_data;
1300   GstStateChangeReturn ret;
1301
1302   /* we're on the source, there's already the basic master_bus_msg watch,
1303      and gst doesn't want more than one watch, so we remove the watch and
1304      call it directly when done in the new watch */
1305   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
1306   gst_bus_add_watch (GST_ELEMENT_BUS (source), flushing_seek_bus_msg,
1307       user_data);
1308   td->state_target = GST_STATE_PLAYING;
1309   td->state_changed_cb = flushing_seek_on_state_changed;
1310   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1311   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1312 }
1313
1314 static GstPadProbeReturn
1315 flushing_seek_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1316 {
1317   test_data *td = user_data;
1318   flushing_seek_slave_data *d = td->sd;
1319   GstClockTime ts;
1320   int idx;
1321   GstCaps *caps;
1322
1323   if (GST_IS_BUFFER (info->data)) {
1324     idx = pad2idx (pad, td->two_streams);
1325     if (d->got_flush_stop[idx]) {
1326       if (!d->got_buffer_after_seek[idx]) {
1327         ts = GST_BUFFER_TIMESTAMP (info->data);
1328         d->first_buffer_after_seek_has_timestamp_0[idx] =
1329             (ts < d->first_ts[idx] + 10 * GST_MSECOND);
1330         d->got_buffer_after_seek[idx] = TRUE;
1331       }
1332     } else if (!d->got_buffer_before_seek[idx]) {
1333       d->got_buffer_before_seek[idx] = TRUE;
1334       d->first_ts[idx] = GST_BUFFER_TIMESTAMP (info->data);
1335     }
1336   } else if (GST_IS_EVENT (info->data)) {
1337     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
1338       gst_event_parse_caps (info->data, &caps);
1339       if (are_caps_audio (caps) || are_caps_video (caps)) {
1340         idx = caps2idx (caps, td->two_streams);
1341         d->got_caps[idx] = TRUE;
1342       }
1343     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
1344       /* from the sink pipeline, we don't know whether the master issued a seek,
1345          as the seek_sent memory location isn't directly accesible to us, so we
1346          look for a segment after a buffer to mean a seek was sent */
1347       idx = pad2idx (pad, td->two_streams);
1348       if (d->got_buffer_before_seek[idx])
1349         d->got_segment_after_seek[idx] = TRUE;
1350     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_FLUSH_START) {
1351       idx = pad2idx (pad, td->two_streams);
1352       d->got_flush_start[idx] = TRUE;
1353     } else if (GST_EVENT_TYPE (info->data) == GST_EVENT_FLUSH_STOP) {
1354       idx = pad2idx (pad, td->two_streams);
1355       if (d->got_buffer_before_seek[idx])
1356         d->got_flush_stop[idx] = TRUE;
1357     }
1358   }
1359
1360   return GST_PAD_PROBE_OK;
1361 }
1362
1363 static void
1364 hook_flushing_seek_probe (const GValue * v, gpointer user_data)
1365 {
1366   hook_probe (v, flushing_seek_probe, user_data);
1367 }
1368
1369 static void
1370 setup_sink_flushing_seek (GstElement * sink, gpointer user_data)
1371 {
1372   GstIterator *it;
1373
1374   it = gst_bin_iterate_sinks (GST_BIN (sink));
1375   while (gst_iterator_foreach (it, hook_flushing_seek_probe, user_data))
1376     gst_iterator_resync (it);
1377   gst_iterator_free (it);
1378 }
1379
1380 static void
1381 check_success_source_flushing_seek (gpointer user_data)
1382 {
1383   test_data *td = user_data;
1384   const flushing_seek_input_data *i = td->id;
1385   flushing_seek_master_data *d = td->md;
1386
1387   FAIL_UNLESS (d->got_state_changed_to_playing);
1388   FAIL_UNLESS (d->seek_sent);
1389   FAIL_UNLESS (d->got_segment_done == i->segment_seek);
1390 }
1391
1392 static void
1393 check_success_sink_flushing_seek (gpointer user_data)
1394 {
1395   test_data *td = user_data;
1396   flushing_seek_slave_data *d = td->sd;
1397   gint idx;
1398
1399   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
1400     FAIL_UNLESS (d->got_caps[idx]);
1401     FAIL_UNLESS (d->got_buffer_before_seek[idx]);
1402     FAIL_UNLESS (d->got_buffer_after_seek[idx]);
1403     FAIL_UNLESS (d->got_segment_after_seek[idx]);
1404     FAIL_UNLESS (d->got_flush_start[idx]);
1405     FAIL_UNLESS (d->got_flush_stop[idx]);
1406     FAIL_UNLESS (d->first_buffer_after_seek_has_timestamp_0[idx]);
1407   }
1408 }
1409
1410 GST_START_TEST (test_empty_flushing_seek)
1411 {
1412   flushing_seek_input_data id = { FALSE, FALSE };
1413   flushing_seek_master_data md = { 0 };
1414   flushing_seek_slave_data sd = { 0 };
1415
1416   TEST_BASE (TEST_FEATURE_TEST_SOURCE, flushing_seek_source,
1417       setup_sink_flushing_seek, check_success_source_flushing_seek,
1418       check_success_sink_flushing_seek, &id, &md, &sd);
1419 }
1420
1421 GST_END_TEST;
1422
1423 GST_START_TEST (test_wavparse_flushing_seek)
1424 {
1425   flushing_seek_input_data id = { FALSE, FALSE };
1426   flushing_seek_master_data md = { 0 };
1427   flushing_seek_slave_data sd = { 0 };
1428
1429   TEST_BASE (TEST_FEATURE_WAV_SOURCE, flushing_seek_source,
1430       setup_sink_flushing_seek, check_success_source_flushing_seek,
1431       check_success_sink_flushing_seek, &id, &md, &sd);
1432 }
1433
1434 GST_END_TEST;
1435
1436 GST_START_TEST (test_mpegts_flushing_seek)
1437 {
1438   flushing_seek_input_data id = { FALSE, FALSE };
1439   flushing_seek_master_data md = { 0 };
1440   flushing_seek_slave_data sd = { 0 };
1441
1442   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, flushing_seek_source,
1443       setup_sink_flushing_seek, check_success_source_flushing_seek,
1444       check_success_sink_flushing_seek, &id, &md, &sd);
1445 }
1446
1447 GST_END_TEST;
1448
1449 GST_START_TEST (test_mpegts_2_flushing_seek)
1450 {
1451   flushing_seek_input_data id = { FALSE, FALSE };
1452   flushing_seek_master_data md = { 0 };
1453   flushing_seek_slave_data sd = { 0 };
1454
1455   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1456       flushing_seek_source, setup_sink_flushing_seek,
1457       check_success_source_flushing_seek, check_success_sink_flushing_seek, &id,
1458       &md, &sd);
1459 }
1460
1461 GST_END_TEST;
1462
1463 GST_START_TEST (test_live_a_flushing_seek)
1464 {
1465   flushing_seek_input_data id = { FALSE, FALSE };
1466   flushing_seek_master_data md = { 0 };
1467   flushing_seek_slave_data sd = { 0 };
1468
1469   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, flushing_seek_source,
1470       setup_sink_flushing_seek, check_success_source_flushing_seek,
1471       check_success_sink_flushing_seek, &id, &md, &sd);
1472 }
1473
1474 GST_END_TEST;
1475
1476 GST_START_TEST (test_live_av_flushing_seek)
1477 {
1478   flushing_seek_input_data id = { FALSE, FALSE };
1479   flushing_seek_master_data md = { 0 };
1480   flushing_seek_slave_data sd = { 0 };
1481
1482   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, flushing_seek_source,
1483       setup_sink_flushing_seek, check_success_source_flushing_seek,
1484       check_success_sink_flushing_seek, &id, &md, &sd);
1485 }
1486
1487 GST_END_TEST;
1488
1489 GST_START_TEST (test_live_av_2_flushing_seek)
1490 {
1491   flushing_seek_input_data id = { FALSE, FALSE };
1492   flushing_seek_master_data md = { 0 };
1493   flushing_seek_slave_data sd = { 0 };
1494
1495   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1496       flushing_seek_source, setup_sink_flushing_seek,
1497       check_success_source_flushing_seek, check_success_sink_flushing_seek, &id,
1498       &md, &sd);
1499 }
1500
1501 GST_END_TEST;
1502
1503 GST_START_TEST (test_empty_flushing_seek_in_pause)
1504 {
1505   flushing_seek_input_data id = { FALSE, TRUE };
1506   flushing_seek_master_data md = { 0 };
1507   flushing_seek_slave_data sd = { 0 };
1508
1509   TEST_BASE (TEST_FEATURE_TEST_SOURCE, flushing_seek_source,
1510       setup_sink_flushing_seek, check_success_source_flushing_seek,
1511       check_success_sink_flushing_seek, &id, &md, &sd);
1512 }
1513
1514 GST_END_TEST;
1515
1516 GST_START_TEST (test_wavparse_flushing_seek_in_pause)
1517 {
1518   flushing_seek_input_data id = { FALSE, TRUE };
1519   flushing_seek_master_data md = { 0 };
1520   flushing_seek_slave_data sd = { 0 };
1521
1522   TEST_BASE (TEST_FEATURE_WAV_SOURCE, flushing_seek_source,
1523       setup_sink_flushing_seek, check_success_source_flushing_seek,
1524       check_success_sink_flushing_seek, &id, &md, &sd);
1525 }
1526
1527 GST_END_TEST;
1528
1529 GST_START_TEST (test_mpegts_flushing_seek_in_pause)
1530 {
1531   flushing_seek_input_data id = { FALSE, TRUE };
1532   flushing_seek_master_data md = { 0 };
1533   flushing_seek_slave_data sd = { 0 };
1534
1535   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, flushing_seek_source,
1536       setup_sink_flushing_seek, check_success_source_flushing_seek,
1537       check_success_sink_flushing_seek, &id, &md, &sd);
1538 }
1539
1540 GST_END_TEST;
1541
1542 GST_START_TEST (test_mpegts_2_flushing_seek_in_pause)
1543 {
1544   flushing_seek_input_data id = { FALSE, TRUE };
1545   flushing_seek_master_data md = { 0 };
1546   flushing_seek_slave_data sd = { 0 };
1547
1548   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1549       flushing_seek_source, setup_sink_flushing_seek,
1550       check_success_source_flushing_seek,
1551       check_success_sink_flushing_seek, &id, &md, &sd);
1552 }
1553
1554 GST_END_TEST;
1555
1556 GST_START_TEST (test_empty_segment_seek)
1557 {
1558   flushing_seek_input_data id = { TRUE, FALSE };
1559   flushing_seek_master_data md = { 0 };
1560   flushing_seek_slave_data sd = { 0 };
1561
1562   TEST_BASE (TEST_FEATURE_TEST_SOURCE, flushing_seek_source,
1563       setup_sink_flushing_seek, check_success_source_flushing_seek,
1564       check_success_sink_flushing_seek, &id, &md, &sd);
1565 }
1566
1567 GST_END_TEST;
1568
1569 GST_START_TEST (test_wavparse_segment_seek)
1570 {
1571   flushing_seek_input_data id = { TRUE, FALSE };
1572   flushing_seek_master_data md = { 0 };
1573   flushing_seek_slave_data sd = { 0 };
1574
1575   TEST_BASE (TEST_FEATURE_WAV_SOURCE, flushing_seek_source,
1576       setup_sink_flushing_seek, check_success_source_flushing_seek,
1577       check_success_sink_flushing_seek, &id, &md, &sd);
1578 }
1579
1580 GST_END_TEST;
1581
1582 GST_START_TEST (test_live_a_segment_seek)
1583 {
1584   flushing_seek_input_data id = { TRUE, FALSE };
1585   flushing_seek_master_data md = { 0 };
1586   flushing_seek_slave_data sd = { 0 };
1587
1588   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE,
1589       flushing_seek_source, setup_sink_flushing_seek,
1590       check_success_source_flushing_seek,
1591       check_success_sink_flushing_seek, &id, &md, &sd);
1592 }
1593
1594 GST_END_TEST;
1595
1596 GST_START_TEST (test_live_av_segment_seek)
1597 {
1598   flushing_seek_input_data id = { TRUE, FALSE };
1599   flushing_seek_master_data md = { 0 };
1600   flushing_seek_slave_data sd = { 0 };
1601
1602   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE,
1603       flushing_seek_source, setup_sink_flushing_seek,
1604       check_success_source_flushing_seek,
1605       check_success_sink_flushing_seek, &id, &md, &sd);
1606 }
1607
1608 GST_END_TEST;
1609
1610 GST_START_TEST (test_live_av_2_segment_seek)
1611 {
1612   flushing_seek_input_data id = { TRUE, FALSE };
1613   flushing_seek_master_data md = { 0 };
1614   flushing_seek_slave_data sd = { 0 };
1615
1616   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1617       flushing_seek_source, setup_sink_flushing_seek,
1618       check_success_source_flushing_seek,
1619       check_success_sink_flushing_seek, &id, &md, &sd);
1620 }
1621
1622 GST_END_TEST;
1623
1624 /**** seek stress test ****/
1625
1626 typedef struct
1627 {
1628   gint n_flushing_seeks;
1629   gint n_paused_seeks;
1630   gint n_segment_seeks;
1631 } seek_stress_input_data;
1632
1633 typedef struct
1634 {
1635   gboolean got_state_changed_to_playing;
1636   gboolean got_eos;
1637   gboolean seek_sent;
1638   guint64 t0;
1639 } seek_stress_master_data;
1640
1641 static gboolean
1642 send_seek_stress (gpointer user_data)
1643 {
1644   test_data *td = user_data;
1645   seek_stress_input_data *i = td->id;
1646   seek_stress_master_data *d = td->md;
1647   GstEvent *seek_event;
1648   unsigned int available, seekidx;
1649   GstClockTime t, base;
1650
1651   /* Live streams don't like to be seeked too far away from the
1652      "current" time, since they're live, so always seek near the
1653      "real" time, so we still exercise seeking to another position
1654      but still land somewhere close enough to "live" position. */
1655   t = (g_get_monotonic_time () - d->t0) * 1000;
1656   base = t > GST_SECOND / 2 ? t - GST_SECOND / 2 : 0;
1657   t = base + g_random_int_range (0, GST_SECOND);
1658
1659   /* pick a random seek type among the ones we have left */
1660   available = i->n_flushing_seeks + i->n_paused_seeks + i->n_segment_seeks;
1661   if (available == 0) {
1662     GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (td->p),
1663         GST_DEBUG_GRAPH_SHOW_ALL, "inter.test.toplaying");
1664     FAIL_UNLESS (gst_element_set_state (td->p,
1665             GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
1666     g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
1667         gst_object_ref (td->p));
1668     gst_object_unref (td->p);
1669     return G_SOURCE_REMOVE;
1670   }
1671
1672   seekidx = rand () % available;
1673   if (seekidx < i->n_flushing_seeks) {
1674     GST_INFO_OBJECT (td->p, "Sending flushing seek to %" GST_TIME_FORMAT,
1675         GST_TIME_ARGS (t));
1676     FAIL_UNLESS (gst_element_set_state (td->p,
1677             GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
1678     FAIL_UNLESS (gst_element_seek_simple (td->p, GST_FORMAT_TIME,
1679             GST_SEEK_FLAG_FLUSH, t));
1680     --i->n_flushing_seeks;
1681     return G_SOURCE_CONTINUE;
1682   }
1683   seekidx -= i->n_flushing_seeks;
1684
1685   if (seekidx < i->n_paused_seeks) {
1686     GST_INFO_OBJECT (td->p,
1687         "Sending flushing seek in paused to %" GST_TIME_FORMAT,
1688         GST_TIME_ARGS (t));
1689     FAIL_UNLESS (gst_element_set_state (td->p,
1690             GST_STATE_PAUSED) != GST_STATE_CHANGE_FAILURE);
1691     FAIL_UNLESS (gst_element_seek_simple (td->p, GST_FORMAT_TIME,
1692             GST_SEEK_FLAG_FLUSH, t));
1693     --i->n_paused_seeks;
1694     return G_SOURCE_CONTINUE;
1695   }
1696   seekidx -= i->n_paused_seeks;
1697
1698   GST_INFO_OBJECT (td->p, "Sending segment seek to %" GST_TIME_FORMAT,
1699       GST_TIME_ARGS (t));
1700   seek_event =
1701       gst_event_new_seek (1.0, GST_FORMAT_TIME,
1702       GST_SEEK_FLAG_SEGMENT | GST_SEEK_FLAG_FLUSH, GST_SEEK_TYPE_SET, t,
1703       GST_SEEK_TYPE_SET, t + 5 * GST_SECOND);
1704   FAIL_UNLESS (gst_element_send_event (td->p, seek_event));
1705   --i->n_segment_seeks;
1706   return G_SOURCE_CONTINUE;
1707 }
1708
1709 static gboolean
1710 seek_stress_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
1711 {
1712   test_data *td = user_data;
1713   seek_stress_master_data *d = td->md;
1714
1715   if (GST_IS_PIPELINE (GST_MESSAGE_SRC (message))) {
1716     if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS ||
1717         GST_MESSAGE_TYPE (message) == GST_MESSAGE_SEGMENT_DONE) {
1718       d->got_eos = TRUE;
1719     }
1720   }
1721   return master_bus_msg (bus, message, user_data);
1722 }
1723
1724 static void
1725 seek_stress_on_state_changed (gpointer user_data)
1726 {
1727   test_data *td = user_data;
1728   seek_stress_master_data *d = td->md;
1729
1730   if (!d->got_state_changed_to_playing) {
1731     d->got_state_changed_to_playing = TRUE;
1732     d->t0 = g_get_monotonic_time ();
1733     gst_object_ref (td->p);
1734     g_timeout_add (10, (GSourceFunc) send_seek_stress, td);
1735   }
1736 }
1737
1738 static void
1739 seek_stress_source (GstElement * source, gpointer user_data)
1740 {
1741   test_data *td = user_data;
1742   GstStateChangeReturn ret;
1743
1744   /* we're on the source, there's already the basic master_bus_msg watch,
1745      and gst doesn't want more than one watch, so we remove the watch and
1746      call it directly when done in the new watch */
1747   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
1748   gst_bus_add_watch (GST_ELEMENT_BUS (source), seek_stress_bus_msg, user_data);
1749   td->state_target = GST_STATE_PLAYING;
1750   td->state_changed_cb = seek_stress_on_state_changed;
1751   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1752   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1753 }
1754
1755 static void
1756 check_success_source_seek_stress (gpointer user_data)
1757 {
1758   test_data *td = user_data;
1759   seek_stress_input_data *i = td->id;
1760   seek_stress_master_data *d = td->md;
1761
1762   FAIL_UNLESS (d->got_state_changed_to_playing);
1763   FAIL_UNLESS_EQUALS_INT (i->n_flushing_seeks, 0);
1764   FAIL_UNLESS_EQUALS_INT (i->n_paused_seeks, 0);
1765   FAIL_UNLESS_EQUALS_INT (i->n_segment_seeks, 0);
1766   FAIL_IF (d->got_eos);
1767 }
1768
1769 GST_START_TEST (test_empty_seek_stress)
1770 {
1771   seek_stress_input_data id = { 100, 100, 100 };
1772   seek_stress_master_data md = { 0 };
1773
1774   TEST_BASE (TEST_FEATURE_TEST_SOURCE, seek_stress_source, NULL,
1775       check_success_source_seek_stress, NULL, &id, &md, NULL);
1776 }
1777
1778 GST_END_TEST;
1779
1780 GST_START_TEST (test_wavparse_seek_stress)
1781 {
1782   seek_stress_input_data id = { 100, 100, 100 };
1783   seek_stress_master_data md = { 0 };
1784
1785   TEST_BASE (TEST_FEATURE_WAV_SOURCE, seek_stress_source, NULL,
1786       check_success_source_seek_stress, NULL, &id, &md, NULL);
1787 }
1788
1789 GST_END_TEST;
1790
1791 GST_START_TEST (test_mpegts_seek_stress)
1792 {
1793   seek_stress_input_data id = { 100, 100, 0 };
1794   seek_stress_master_data md = { 0 };
1795
1796   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, seek_stress_source, NULL,
1797       check_success_source_seek_stress, NULL, &id, &md, NULL);
1798 }
1799
1800 GST_END_TEST;
1801
1802 GST_START_TEST (test_mpegts_2_seek_stress)
1803 {
1804   seek_stress_input_data id = { 100, 100, 0 };
1805   seek_stress_master_data md = { 0 };
1806
1807   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
1808       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1809       &md, NULL);
1810 }
1811
1812 GST_END_TEST;
1813
1814 GST_START_TEST (test_live_a_seek_stress)
1815 {
1816   seek_stress_input_data id = { 100, 0, 100 };
1817   seek_stress_master_data md = { 0 };
1818
1819   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_LONG_DURATION,
1820       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1821       &md, NULL);
1822 }
1823
1824 GST_END_TEST;
1825
1826 GST_START_TEST (test_live_av_seek_stress)
1827 {
1828   seek_stress_input_data id = { 100, 0, 100 };
1829   seek_stress_master_data md = { 0 };
1830
1831   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_LONG_DURATION,
1832       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1833       &md, NULL);
1834 }
1835
1836 GST_END_TEST;
1837
1838 GST_START_TEST (test_live_av_2_seek_stress)
1839 {
1840   seek_stress_input_data id = { 100, 0, 100 };
1841   seek_stress_master_data md = { 0 };
1842
1843   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_LONG_DURATION |
1844       TEST_FEATURE_SPLIT_SINKS,
1845       seek_stress_source, NULL, check_success_source_seek_stress, NULL, &id,
1846       &md, NULL);
1847 }
1848
1849 GST_END_TEST;
1850
1851 /**** upstream query test ****/
1852
1853 typedef struct
1854 {
1855   GstClockTime expected_duration;
1856
1857   /* In this test, the source does a position query (in the source pipeline
1858      process), and must check its return against the last buffer timestamp
1859      in the sink pipeline process. We open a pipe to let the sink send us
1860      the timestamps it receives so the source can make the comparison. */
1861   gint ts_pipes[2];
1862 } upstream_query_input_data;
1863
1864 typedef struct
1865 {
1866   gboolean got_state_changed_to_playing;
1867   gboolean got_correct_position;
1868   gboolean got_correct_duration;
1869   GstClockTime last_buffer_ts;
1870 } upstream_query_master_data;
1871
1872 typedef struct
1873 {
1874   gboolean got_caps[2];
1875   gboolean got_buffer[2];
1876   GstClockTime last_buffer_ts;
1877 } upstream_query_slave_data;
1878
1879 static gboolean
1880 send_upstream_queries (gpointer user_data)
1881 {
1882   test_data *td = user_data;
1883   upstream_query_input_data *i = td->id;
1884   upstream_query_master_data *d = td->md;
1885   gint64 pos, dur, last;
1886
1887   FAIL_UNLESS (gst_element_query_position (td->p, GST_FORMAT_TIME, &pos));
1888
1889   /* read up the buffer ts sent by the sink process till the last one */
1890   while (read (i->ts_pipes[0], &last, sizeof (last)) == sizeof (last)) {
1891     /* timestamps may not be increasing because we are getting ts from
1892      * both the audio and video streams; the position query will report
1893      * the higher */
1894     if (last > d->last_buffer_ts)
1895       d->last_buffer_ts = last;
1896   }
1897   if (ABS ((gint64) (pos - d->last_buffer_ts)) <= CLOSE_ENOUGH_TO_ZERO)
1898     d->got_correct_position = TRUE;
1899
1900   FAIL_UNLESS (gst_element_query_duration (td->p, GST_FORMAT_TIME, &dur));
1901   if (GST_CLOCK_TIME_IS_VALID (i->expected_duration)) {
1902     GstClockTimeDiff diff = GST_CLOCK_DIFF (dur, i->expected_duration);
1903     if (diff >= -CLOSE_ENOUGH_TO_ZERO && diff <= CLOSE_ENOUGH_TO_ZERO)
1904       d->got_correct_duration = TRUE;
1905   } else {
1906     if (!GST_CLOCK_TIME_IS_VALID (dur))
1907       d->got_correct_duration = TRUE;
1908   }
1909
1910   g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, td->p);
1911   return FALSE;
1912 }
1913
1914 static void
1915 upstream_query_on_state_changed (gpointer user_data)
1916 {
1917   test_data *td = user_data;
1918   upstream_query_master_data *d = td->md;
1919
1920   if (!d->got_state_changed_to_playing) {
1921     d->got_state_changed_to_playing = TRUE;
1922     gst_object_ref (td->p);
1923     g_timeout_add (QUERY_AT, (GSourceFunc) send_upstream_queries, td);
1924   }
1925 }
1926
1927 static void
1928 upstream_query_source (GstElement * source, gpointer user_data)
1929 {
1930   test_data *td = user_data;
1931   GstStateChangeReturn ret;
1932
1933   td->state_changed_cb = upstream_query_on_state_changed;
1934   td->state_target = GST_STATE_PLAYING;
1935   ret = gst_element_set_state (source, GST_STATE_PLAYING);
1936   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
1937 }
1938
1939 static GstPadProbeReturn
1940 upstream_query_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
1941 {
1942   test_data *td = user_data;
1943   upstream_query_input_data *i = td->id;
1944   upstream_query_slave_data *d = td->sd;
1945   GstCaps *caps;
1946
1947   if (GST_IS_BUFFER (info->data)) {
1948     d->got_buffer[pad2idx (pad, td->two_streams)] = TRUE;
1949     if (GST_BUFFER_TIMESTAMP_IS_VALID (info->data)) {
1950       d->last_buffer_ts = GST_BUFFER_TIMESTAMP (info->data);
1951       FAIL_UNLESS (write (i->ts_pipes[1], &d->last_buffer_ts,
1952               sizeof (d->last_buffer_ts)) == sizeof (d->last_buffer_ts));
1953     }
1954   } else if (GST_IS_EVENT (info->data)) {
1955     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
1956       gst_event_parse_caps (info->data, &caps);
1957       d->got_caps[caps2idx (caps, td->two_streams)] = TRUE;
1958     }
1959   }
1960
1961   return GST_PAD_PROBE_OK;
1962 }
1963
1964 static void
1965 hook_upstream_query_probe (const GValue * v, gpointer user_data)
1966 {
1967   hook_probe (v, upstream_query_probe, user_data);
1968 }
1969
1970 static void
1971 setup_sink_upstream_query (GstElement * sink, gpointer user_data)
1972 {
1973   GstIterator *it;
1974
1975   it = gst_bin_iterate_sinks (GST_BIN (sink));
1976   while (gst_iterator_foreach (it, hook_upstream_query_probe, user_data))
1977     gst_iterator_resync (it);
1978   gst_iterator_free (it);
1979 }
1980
1981 static void
1982 check_success_source_upstream_query (gpointer user_data)
1983 {
1984   test_data *td = user_data;
1985   upstream_query_master_data *d = td->md;
1986
1987   FAIL_UNLESS (d->got_state_changed_to_playing);
1988   FAIL_UNLESS (d->got_correct_position);
1989   FAIL_UNLESS (d->got_correct_duration);
1990 }
1991
1992 static void
1993 check_success_sink_upstream_query (gpointer user_data)
1994 {
1995   test_data *td = user_data;
1996   upstream_query_slave_data *d = td->sd;
1997   int idx;
1998
1999   for (idx = 0; idx < (td->two_streams ? 2 : 1); ++idx) {
2000     FAIL_UNLESS (d->got_caps[idx]);
2001     FAIL_UNLESS (d->got_buffer[idx]);
2002   }
2003 }
2004
2005 GST_START_TEST (test_empty_upstream_query)
2006 {
2007   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2008   upstream_query_master_data md = { 0 };
2009   upstream_query_slave_data sd = { 0 };
2010
2011   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2012   TEST_BASE (TEST_FEATURE_TEST_SOURCE, upstream_query_source,
2013       setup_sink_upstream_query, check_success_source_upstream_query,
2014       check_success_sink_upstream_query, &id, &md, &sd);
2015   close (id.ts_pipes[0]);
2016   close (id.ts_pipes[1]);
2017 }
2018
2019 GST_END_TEST;
2020
2021 GST_START_TEST (test_wavparse_upstream_query)
2022 {
2023   upstream_query_input_data id = { WAV_SAMPLE_ROUGH_DURATION, };
2024   upstream_query_master_data md = { 0 };
2025   upstream_query_slave_data sd = { 0 };
2026
2027   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2028   TEST_BASE (TEST_FEATURE_WAV_SOURCE, upstream_query_source,
2029       setup_sink_upstream_query, check_success_source_upstream_query,
2030       check_success_sink_upstream_query, &id, &md, &sd);
2031   close (id.ts_pipes[0]);
2032   close (id.ts_pipes[1]);
2033 }
2034
2035 GST_END_TEST;
2036
2037 GST_START_TEST (test_mpegts_upstream_query)
2038 {
2039   upstream_query_input_data id = { MPEGTS_SAMPLE_ROUGH_DURATION, };
2040   upstream_query_master_data md = { 0 };
2041   upstream_query_slave_data sd = { 0 };
2042
2043   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2044   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, upstream_query_source,
2045       setup_sink_upstream_query, check_success_source_upstream_query,
2046       check_success_sink_upstream_query, &id, &md, &sd);
2047   close (id.ts_pipes[0]);
2048   close (id.ts_pipes[1]);
2049 }
2050
2051 GST_END_TEST;
2052
2053 GST_START_TEST (test_mpegts_2_upstream_query)
2054 {
2055   upstream_query_input_data id = { MPEGTS_SAMPLE_ROUGH_DURATION, };
2056   upstream_query_master_data md = { 0 };
2057   upstream_query_slave_data sd = { 0 };
2058
2059   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2060   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2061       upstream_query_source, setup_sink_upstream_query,
2062       check_success_source_upstream_query, check_success_sink_upstream_query,
2063       &id, &md, &sd);
2064   close (id.ts_pipes[0]);
2065   close (id.ts_pipes[1]);
2066 }
2067
2068 GST_END_TEST;
2069
2070 GST_START_TEST (test_live_a_upstream_query)
2071 {
2072   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2073   upstream_query_master_data md = { 0 };
2074   upstream_query_slave_data sd = { 0 };
2075
2076   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2077   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE,
2078       upstream_query_source, setup_sink_upstream_query,
2079       check_success_source_upstream_query, check_success_sink_upstream_query,
2080       &id, &md, &sd);
2081   close (id.ts_pipes[0]);
2082   close (id.ts_pipes[1]);
2083 }
2084
2085 GST_END_TEST;
2086
2087 GST_START_TEST (test_live_av_upstream_query)
2088 {
2089   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2090   upstream_query_master_data md = { 0 };
2091   upstream_query_slave_data sd = { 0 };
2092
2093   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2094   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE,
2095       upstream_query_source, setup_sink_upstream_query,
2096       check_success_source_upstream_query, check_success_sink_upstream_query,
2097       &id, &md, &sd);
2098   close (id.ts_pipes[0]);
2099   close (id.ts_pipes[1]);
2100 }
2101
2102 GST_END_TEST;
2103
2104 GST_START_TEST (test_live_av_2_upstream_query)
2105 {
2106   upstream_query_input_data id = { GST_CLOCK_TIME_NONE, };
2107   upstream_query_master_data md = { 0 };
2108   upstream_query_slave_data sd = { 0 };
2109
2110   FAIL_UNLESS (pipe2 (id.ts_pipes, O_NONBLOCK) == 0);
2111   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2112       upstream_query_source, setup_sink_upstream_query,
2113       check_success_source_upstream_query, check_success_sink_upstream_query,
2114       &id, &md, &sd);
2115   close (id.ts_pipes[0]);
2116   close (id.ts_pipes[1]);
2117 }
2118
2119 GST_END_TEST;
2120
2121 /**** message test ****/
2122
2123 typedef struct
2124 {
2125   gboolean got_state_changed_to_playing;
2126   guint8 num_got_message;
2127   guint8 num_sent_message;
2128 } message_master_data;
2129
2130 static void
2131 send_ipcpipeline_test_message_event (const GValue * v, gpointer user_data)
2132 {
2133   test_data *td = user_data;
2134   message_master_data *d = td->md;
2135   GstElement *element = g_value_get_object (v);
2136   GstMessage *msg;
2137   gboolean ret;
2138
2139   d->num_sent_message++;
2140
2141   msg = gst_message_new_element (GST_OBJECT (element),
2142       gst_structure_new_empty ("ipcpipeline-test"));
2143   ret = gst_element_send_event (element,
2144       gst_event_new_sink_message ("ipcpipeline-test", msg));
2145   FAIL_UNLESS (ret);
2146   gst_message_unref (msg);
2147 }
2148
2149 static gboolean
2150 send_sink_message (gpointer user_data)
2151 {
2152   test_data *td = user_data;
2153   GstIterator *it;
2154
2155   it = gst_bin_iterate_sources (GST_BIN (td->p));
2156   while (gst_iterator_foreach (it, send_ipcpipeline_test_message_event, td))
2157     gst_iterator_resync (it);
2158   gst_iterator_free (it);
2159
2160   gst_object_unref (td->p);
2161   return G_SOURCE_REMOVE;
2162 }
2163
2164 static gboolean
2165 message_bus_msg (GstBus * bus, GstMessage * message, gpointer user_data)
2166 {
2167   test_data *td = user_data;
2168   message_master_data *d = td->md;
2169   const GstStructure *structure;
2170
2171   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ELEMENT) {
2172     structure = gst_message_get_structure (message);
2173     FAIL_UNLESS (structure);
2174     if (gst_structure_has_name (structure, "ipcpipeline-test")) {
2175       d->num_got_message++;
2176       if (d->num_got_message == d->num_sent_message)
2177         g_main_loop_quit (loop);
2178     }
2179   }
2180   return master_bus_msg (bus, message, user_data);
2181 }
2182
2183 static void
2184 message_on_state_changed (gpointer user_data)
2185 {
2186   test_data *td = user_data;
2187   message_master_data *d = td->md;
2188
2189   if (!d->got_state_changed_to_playing) {
2190     d->got_state_changed_to_playing = TRUE;
2191     gst_object_ref (td->p);
2192     g_timeout_add (MESSAGE_AT, (GSourceFunc) send_sink_message, td);
2193   }
2194 }
2195
2196 static void
2197 message_source (GstElement * source, gpointer user_data)
2198 {
2199   test_data *td = user_data;
2200   GstStateChangeReturn ret;
2201
2202   /* we're on the source, there's already the basic master_bus_msg watch,
2203      and gst doesn't want more than one watch, so we remove the watch and
2204      call it directly when done in the new watch */
2205   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
2206   gst_bus_add_watch (GST_ELEMENT_BUS (source), message_bus_msg, user_data);
2207   td->state_target = GST_STATE_PLAYING;
2208   td->state_changed_cb = message_on_state_changed;
2209   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2210   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2211 }
2212
2213 static void
2214 check_success_source_message (gpointer user_data)
2215 {
2216   test_data *td = user_data;
2217   message_master_data *d = td->md;
2218
2219   FAIL_UNLESS (d->got_state_changed_to_playing);
2220   FAIL_UNLESS_EQUALS_INT (d->num_got_message, d->num_sent_message);
2221 }
2222
2223 GST_START_TEST (test_empty_message)
2224 {
2225   message_master_data md = { 0 };
2226   TEST_BASE (TEST_FEATURE_TEST_SOURCE, message_source, NULL,
2227       check_success_source_message, NULL, NULL, &md, NULL);
2228 }
2229
2230 GST_END_TEST;
2231
2232 GST_START_TEST (test_wavparse_message)
2233 {
2234   message_master_data md = { 0 };
2235   TEST_BASE (TEST_FEATURE_WAV_SOURCE, message_source, NULL,
2236       check_success_source_message, NULL, NULL, &md, NULL);
2237 }
2238
2239 GST_END_TEST;
2240
2241 GST_START_TEST (test_live_a_message)
2242 {
2243   message_master_data md = { 0 };
2244   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, message_source, NULL,
2245       check_success_source_message, NULL, NULL, &md, NULL);
2246 }
2247
2248 GST_END_TEST;
2249
2250 GST_START_TEST (test_live_av_message)
2251 {
2252   message_master_data md = { 0 };
2253   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, message_source, NULL,
2254       check_success_source_message, NULL, NULL, &md, NULL);
2255 }
2256
2257 GST_END_TEST;
2258
2259 GST_START_TEST (test_live_av_2_message)
2260 {
2261   message_master_data md = { 0 };
2262   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2263       message_source, NULL, check_success_source_message, NULL, NULL, &md,
2264       NULL);
2265 }
2266
2267 GST_END_TEST;
2268
2269 /**** end of stream test ****/
2270
2271 typedef struct
2272 {
2273   gboolean got_state_changed_to_playing;
2274 } end_of_stream_master_data;
2275
2276 typedef struct
2277 {
2278   gboolean got_buffer[2];
2279   gboolean got_eos[2];
2280 } end_of_stream_slave_data;
2281
2282 static void
2283 end_of_stream_on_state_changed (gpointer user_data)
2284 {
2285   test_data *td = user_data;
2286   end_of_stream_master_data *d = td->md;
2287
2288   if (!d->got_state_changed_to_playing)
2289     d->got_state_changed_to_playing = TRUE;
2290 }
2291
2292 static void
2293 end_of_stream_source (GstElement * source, gpointer user_data)
2294 {
2295   test_data *td = user_data;
2296   GstStateChangeReturn ret;
2297
2298   td->state_changed_cb = end_of_stream_on_state_changed;
2299   td->state_target = GST_STATE_PLAYING;
2300   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2301   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2302 }
2303
2304 static GstPadProbeReturn
2305 end_of_stream_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2306 {
2307   test_data *td = user_data;
2308   end_of_stream_slave_data *d = td->sd;
2309
2310   if (GST_IS_BUFFER (info->data)) {
2311     d->got_buffer[pad2idx (pad, td->two_streams)] = TRUE;
2312   } else if (GST_IS_EVENT (info->data)) {
2313     if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
2314       d->got_eos[pad2idx (pad, td->two_streams)] = TRUE;
2315     }
2316   }
2317
2318   return GST_PAD_PROBE_OK;
2319 }
2320
2321 static void
2322 hook_end_of_stream_probe (const GValue * v, gpointer user_data)
2323 {
2324   hook_probe (v, end_of_stream_probe, user_data);
2325 }
2326
2327 static void
2328 setup_sink_end_of_stream (GstElement * sink, gpointer user_data)
2329 {
2330   GstIterator *it;
2331
2332   it = gst_bin_iterate_sinks (GST_BIN (sink));
2333   while (gst_iterator_foreach (it, hook_end_of_stream_probe, user_data))
2334     gst_iterator_resync (it);
2335   gst_iterator_free (it);
2336 }
2337
2338 static void
2339 check_success_source_end_of_stream (gpointer user_data)
2340 {
2341   test_data *td = user_data;
2342   end_of_stream_master_data *d = td->md;
2343
2344   FAIL_UNLESS (d->got_state_changed_to_playing);
2345 }
2346
2347 static void
2348 check_success_sink_end_of_stream (gpointer user_data)
2349 {
2350   test_data *td = user_data;
2351   end_of_stream_slave_data *d = td->sd;
2352   int idx;
2353
2354   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
2355     FAIL_UNLESS (d->got_buffer[idx]);
2356     FAIL_UNLESS (d->got_eos[idx]);
2357   }
2358 }
2359
2360 GST_START_TEST (test_empty_end_of_stream)
2361 {
2362   end_of_stream_master_data md = { 0 };
2363   end_of_stream_slave_data sd = { 0 };
2364
2365   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_ASYNC_SINK,
2366       end_of_stream_source, setup_sink_end_of_stream,
2367       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2368       NULL, &md, &sd);
2369 }
2370
2371 GST_END_TEST;
2372
2373 GST_START_TEST (test_wavparse_end_of_stream)
2374 {
2375   end_of_stream_master_data md = { 0 };
2376   end_of_stream_slave_data sd = { 0 };
2377
2378   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_ASYNC_SINK,
2379       end_of_stream_source, setup_sink_end_of_stream,
2380       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2381       NULL, &md, &sd);
2382 }
2383
2384 GST_END_TEST;
2385
2386 GST_START_TEST (test_mpegts_end_of_stream)
2387 {
2388   end_of_stream_master_data md = { 0 };
2389   end_of_stream_slave_data sd = { 0 };
2390
2391   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_ASYNC_SINK,
2392       end_of_stream_source, setup_sink_end_of_stream,
2393       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2394       NULL, &md, &sd);
2395 }
2396
2397 GST_END_TEST;
2398
2399 GST_START_TEST (test_mpegts_2_end_of_stream)
2400 {
2401   end_of_stream_master_data md = { 0 };
2402   end_of_stream_slave_data sd = { 0 };
2403
2404   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS |
2405       TEST_FEATURE_ASYNC_SINK,
2406       end_of_stream_source, setup_sink_end_of_stream,
2407       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2408       NULL, &md, &sd);
2409 }
2410
2411 GST_END_TEST;
2412
2413 GST_START_TEST (test_live_a_end_of_stream)
2414 {
2415   end_of_stream_master_data md = { 0 };
2416   end_of_stream_slave_data sd = { 0 };
2417
2418   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE,
2419       end_of_stream_source, setup_sink_end_of_stream,
2420       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2421       NULL, &md, &sd);
2422 }
2423
2424 GST_END_TEST;
2425
2426 GST_START_TEST (test_live_av_end_of_stream)
2427 {
2428   end_of_stream_master_data md = { 0 };
2429   end_of_stream_slave_data sd = { 0 };
2430
2431   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE,
2432       end_of_stream_source, setup_sink_end_of_stream,
2433       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2434       NULL, &md, &sd);
2435 }
2436
2437 GST_END_TEST;
2438
2439 GST_START_TEST (test_live_av_2_end_of_stream)
2440 {
2441   end_of_stream_master_data md = { 0 };
2442   end_of_stream_slave_data sd = { 0 };
2443
2444   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2445       end_of_stream_source, setup_sink_end_of_stream,
2446       check_success_source_end_of_stream, check_success_sink_end_of_stream,
2447       NULL, &md, &sd);
2448 }
2449
2450 GST_END_TEST;
2451
2452 /**** reverse playback test ****/
2453
2454 typedef struct
2455 {
2456   gboolean got_state_changed_to_playing;
2457   gboolean seek_sent;
2458 } reverse_playback_master_data;
2459
2460 typedef struct
2461 {
2462   gboolean got_segment_with_negative_rate;
2463   gboolean got_buffer_after_segment_with_negative_rate;
2464   GstClockTime first_backward_buffer_timestamp;
2465   gboolean got_buffer_one_second_early;
2466 } reverse_playback_slave_data;
2467
2468 static gboolean
2469 play_backwards (gpointer user_data)
2470 {
2471   test_data *td = user_data;
2472   reverse_playback_master_data *d = td->md;
2473   gint64 pos;
2474   gboolean ret;
2475
2476   FAIL_UNLESS (gst_element_query_position (td->p, GST_FORMAT_TIME, &pos));
2477
2478   ret =
2479       gst_element_seek (td->p, -0.5, GST_FORMAT_TIME, 0, GST_SEEK_TYPE_SET, 0,
2480       GST_SEEK_TYPE_SET, pos);
2481   FAIL_UNLESS (ret);
2482   d->seek_sent = TRUE;
2483
2484   gst_object_unref (td->p);
2485   return G_SOURCE_REMOVE;
2486 }
2487
2488 static void
2489 reverse_playback_on_state_changed (gpointer user_data)
2490 {
2491   test_data *td = user_data;
2492   reverse_playback_master_data *d = td->md;
2493
2494   if (!d->got_state_changed_to_playing) {
2495     d->got_state_changed_to_playing = TRUE;
2496     gst_object_ref (td->p);
2497     g_timeout_add (2000, (GSourceFunc) play_backwards, td);
2498   }
2499 }
2500
2501 static void
2502 reverse_playback_source (GstElement * source, gpointer user_data)
2503 {
2504   test_data *td = user_data;
2505   GstStateChangeReturn ret;
2506
2507   td->state_target = GST_STATE_PLAYING;
2508   td->state_changed_cb = reverse_playback_on_state_changed;
2509   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2510   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2511 }
2512
2513 static GstPadProbeReturn
2514 reverse_playback_probe (GstPad * pad, GstPadProbeInfo * info,
2515     gpointer user_data)
2516 {
2517   test_data *td = user_data;
2518   reverse_playback_slave_data *d = td->sd;
2519
2520   if (GST_IS_EVENT (info->data)) {
2521     if (GST_EVENT_TYPE (info->data) == GST_EVENT_SEGMENT) {
2522       const GstSegment *s;
2523       gst_event_parse_segment (GST_EVENT (info->data), &s);
2524       if (s->rate < 0)
2525         d->got_segment_with_negative_rate = TRUE;
2526     }
2527   } else if (GST_IS_BUFFER (info->data)) {
2528     GstClockTime ts = GST_BUFFER_TIMESTAMP (info->data);
2529     if (GST_CLOCK_TIME_IS_VALID (ts)) {
2530       if (d->got_segment_with_negative_rate) {
2531         if (d->got_buffer_after_segment_with_negative_rate) {
2532           /* We test for 1 second, not just earlier, to make sure we don't
2533              just see B frames, or whatever else */
2534           if (ts < d->first_backward_buffer_timestamp - GST_SECOND) {
2535             d->got_buffer_one_second_early = TRUE;
2536           }
2537         } else {
2538           d->got_buffer_after_segment_with_negative_rate = TRUE;
2539           d->first_backward_buffer_timestamp = ts;
2540         }
2541       }
2542     }
2543   }
2544
2545   return GST_PAD_PROBE_OK;
2546 }
2547
2548 static void
2549 hook_reverse_playback_probe (const GValue * v, gpointer user_data)
2550 {
2551   hook_probe (v, reverse_playback_probe, user_data);
2552 }
2553
2554 static void
2555 setup_sink_reverse_playback (GstElement * sink, gpointer user_data)
2556 {
2557   GstIterator *it;
2558
2559   it = gst_bin_iterate_sinks (GST_BIN (sink));
2560   while (gst_iterator_foreach (it, hook_reverse_playback_probe, user_data))
2561     gst_iterator_resync (it);
2562   gst_iterator_free (it);
2563 }
2564
2565 static void
2566 check_success_source_reverse_playback (gpointer user_data)
2567 {
2568   test_data *td = user_data;
2569   reverse_playback_master_data *d = td->md;
2570
2571   FAIL_UNLESS (d->got_state_changed_to_playing);
2572   FAIL_UNLESS (d->seek_sent);
2573 }
2574
2575 static void
2576 check_success_sink_reverse_playback (gpointer user_data)
2577 {
2578   test_data *td = user_data;
2579   reverse_playback_slave_data *d = td->sd;
2580
2581   FAIL_UNLESS (d->got_segment_with_negative_rate);
2582   FAIL_UNLESS (d->got_buffer_after_segment_with_negative_rate);
2583   FAIL_UNLESS (GST_CLOCK_TIME_IS_VALID (d->first_backward_buffer_timestamp));
2584   FAIL_UNLESS (d->first_backward_buffer_timestamp >= GST_SECOND);
2585   FAIL_UNLESS (d->got_buffer_one_second_early);
2586 }
2587
2588 GST_START_TEST (test_a_reverse_playback)
2589 {
2590   reverse_playback_master_data md = { 0 };
2591   reverse_playback_slave_data sd = { 0 };
2592
2593   TEST_BASE (TEST_FEATURE_TEST_SOURCE,
2594       reverse_playback_source, setup_sink_reverse_playback,
2595       check_success_source_reverse_playback,
2596       check_success_sink_reverse_playback, NULL, &md, &sd);
2597 }
2598
2599 GST_END_TEST;
2600
2601 GST_START_TEST (test_av_reverse_playback)
2602 {
2603   reverse_playback_master_data md = { 0 };
2604   reverse_playback_slave_data sd = { 0 };
2605
2606   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO,
2607       reverse_playback_source, setup_sink_reverse_playback,
2608       check_success_source_reverse_playback,
2609       check_success_sink_reverse_playback, NULL, &md, &sd);
2610 }
2611
2612 GST_END_TEST;
2613
2614 GST_START_TEST (test_av_2_reverse_playback)
2615 {
2616   reverse_playback_master_data md = { 0 };
2617   reverse_playback_slave_data sd = { 0 };
2618
2619   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO |
2620       TEST_FEATURE_SPLIT_SINKS,
2621       reverse_playback_source, setup_sink_reverse_playback,
2622       check_success_source_reverse_playback,
2623       check_success_sink_reverse_playback, NULL, &md, &sd);
2624 }
2625
2626 GST_END_TEST;
2627
2628 /**** tags test ****/
2629
2630 enum
2631 {
2632   TEST_TAG_EMPTY,
2633   TEST_TAG_TWO_TAGS,
2634   N_TEST_TAGS
2635 };
2636
2637 typedef struct
2638 {
2639   gboolean got_state_changed_to_playing;
2640   gboolean tags_sent[N_TEST_TAGS];
2641 } tags_master_data;
2642
2643 typedef struct
2644 {
2645   gboolean tags_received[N_TEST_TAGS];
2646 } tags_slave_data;
2647
2648 static void
2649 send_tags_on_element (const GValue * v, gpointer user_data)
2650 {
2651   test_data *td = user_data;
2652   tags_master_data *d = td->md;
2653   GstElement *sink;
2654   GstPad *pad;
2655   GstEvent *e;
2656
2657   sink = g_value_get_object (v);
2658   FAIL_UNLESS (sink);
2659   pad = gst_element_get_static_pad (sink, "sink");
2660   FAIL_UNLESS (pad);
2661
2662   e = gst_event_new_tag (gst_tag_list_new_empty ());
2663   FAIL_UNLESS (gst_pad_send_event (pad, e));
2664   d->tags_sent[TEST_TAG_EMPTY] = TRUE;
2665
2666   e = gst_event_new_tag (gst_tag_list_new (GST_TAG_TITLE, "title",
2667           GST_TAG_BITRATE, 56000, NULL));
2668   FAIL_UNLESS (gst_pad_send_event (pad, e));
2669   d->tags_sent[TEST_TAG_TWO_TAGS] = TRUE;
2670
2671   gst_object_unref (pad);
2672 }
2673
2674 static gboolean
2675 send_tags (gpointer user_data)
2676 {
2677   test_data *td = user_data;
2678   GstIterator *it;
2679
2680   it = gst_bin_iterate_sinks (GST_BIN (td->p));
2681   while (gst_iterator_foreach (it, send_tags_on_element, user_data))
2682     gst_iterator_resync (it);
2683   gst_iterator_free (it);
2684
2685   g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, td->p);
2686   return G_SOURCE_REMOVE;
2687 }
2688
2689 static void
2690 tags_on_state_changed (gpointer user_data)
2691 {
2692   test_data *td = user_data;
2693   tags_master_data *d = td->md;
2694
2695   if (!d->got_state_changed_to_playing) {
2696     d->got_state_changed_to_playing = TRUE;
2697     gst_object_ref (td->p);
2698     g_timeout_add (STEP_AT, (GSourceFunc) send_tags, td);
2699   }
2700 }
2701
2702 static void
2703 tags_source (GstElement * source, gpointer user_data)
2704 {
2705   test_data *td = user_data;
2706   GstStateChangeReturn ret;
2707
2708   td->state_target = GST_STATE_PLAYING;
2709   td->state_changed_cb = tags_on_state_changed;
2710   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2711   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2712 }
2713
2714 static GstPadProbeReturn
2715 tags_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
2716 {
2717   test_data *td = user_data;
2718   tags_slave_data *d = td->sd;
2719   guint funsigned;
2720   gchar *fstring = NULL;
2721
2722   if (GST_IS_EVENT (info->data)) {
2723     if (GST_EVENT_TYPE (info->data) == GST_EVENT_TAG) {
2724       GstTagList *taglist = NULL;
2725       gst_event_parse_tag (GST_EVENT (info->data), &taglist);
2726       FAIL_UNLESS (taglist);
2727       if (gst_tag_list_is_empty (taglist)) {
2728         d->tags_received[TEST_TAG_EMPTY] = TRUE;
2729       } else if (gst_tag_list_get_string (taglist, GST_TAG_TITLE, &fstring)
2730           && !strcmp (fstring, "title")
2731           && gst_tag_list_get_uint (taglist, GST_TAG_BITRATE, &funsigned)
2732           && funsigned == 56000) {
2733         d->tags_received[TEST_TAG_TWO_TAGS] = TRUE;
2734       }
2735     }
2736   }
2737   g_free (fstring);
2738
2739   return GST_PAD_PROBE_OK;
2740 }
2741
2742 static void
2743 hook_tags_probe (const GValue * v, gpointer user_data)
2744 {
2745   hook_probe (v, tags_probe, user_data);
2746 }
2747
2748 static void
2749 setup_sink_tags (GstElement * sink, gpointer user_data)
2750 {
2751   GstIterator *it;
2752
2753   it = gst_bin_iterate_sinks (GST_BIN (sink));
2754   while (gst_iterator_foreach (it, hook_tags_probe, user_data))
2755     gst_iterator_resync (it);
2756   gst_iterator_free (it);
2757 }
2758
2759 static void
2760 check_success_source_tags (gpointer user_data)
2761 {
2762   test_data *td = user_data;
2763   tags_master_data *d = td->md;
2764   gint n;
2765
2766   FAIL_UNLESS (d->got_state_changed_to_playing);
2767   for (n = 0; n < N_TEST_TAGS; ++n) {
2768     FAIL_UNLESS (d->tags_sent[n]);
2769   }
2770 }
2771
2772 static void
2773 check_success_sink_tags (gpointer user_data)
2774 {
2775   test_data *td = user_data;
2776   tags_slave_data *d = td->sd;
2777   gint n;
2778
2779   for (n = 0; n < N_TEST_TAGS; ++n) {
2780     FAIL_UNLESS (d->tags_received[n]);
2781   }
2782 }
2783
2784 GST_START_TEST (test_empty_tags)
2785 {
2786   tags_master_data md = { 0 };
2787   tags_slave_data sd = { 0 };
2788
2789   TEST_BASE (TEST_FEATURE_TEST_SOURCE, tags_source, setup_sink_tags,
2790       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2791 }
2792
2793 GST_END_TEST;
2794
2795 GST_START_TEST (test_wavparse_tags)
2796 {
2797   tags_master_data md = { 0 };
2798   tags_slave_data sd = { 0 };
2799
2800   TEST_BASE (TEST_FEATURE_WAV_SOURCE, tags_source, setup_sink_tags,
2801       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2802 }
2803
2804 GST_END_TEST;
2805
2806 GST_START_TEST (test_mpegts_tags)
2807 {
2808   tags_master_data md = { 0 };
2809   tags_slave_data sd = { 0 };
2810
2811   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, tags_source, setup_sink_tags,
2812       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2813 }
2814
2815 GST_END_TEST;
2816
2817 GST_START_TEST (test_mpegts_2_tags)
2818 {
2819   tags_master_data md = { 0 };
2820   tags_slave_data sd = { 0 };
2821
2822   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS, tags_source,
2823       setup_sink_tags, check_success_source_tags, check_success_sink_tags, NULL,
2824       &md, &sd);
2825 }
2826
2827 GST_END_TEST;
2828
2829 GST_START_TEST (test_live_a_tags)
2830 {
2831   tags_master_data md = { 0 };
2832   tags_slave_data sd = { 0 };
2833
2834   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, tags_source, setup_sink_tags,
2835       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2836 }
2837
2838 GST_END_TEST;
2839
2840 GST_START_TEST (test_live_av_tags)
2841 {
2842   tags_master_data md = { 0 };
2843   tags_slave_data sd = { 0 };
2844
2845   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, tags_source, setup_sink_tags,
2846       check_success_source_tags, check_success_sink_tags, NULL, &md, &sd);
2847 }
2848
2849 GST_END_TEST;
2850
2851 GST_START_TEST (test_live_av_2_tags)
2852 {
2853   tags_master_data md = { 0 };
2854   tags_slave_data sd = { 0 };
2855
2856   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
2857       tags_source, setup_sink_tags, check_success_source_tags,
2858       check_success_sink_tags, NULL, &md, &sd);
2859 }
2860
2861 GST_END_TEST;
2862
2863 /**** nagivation test ****/
2864
2865 enum
2866 {
2867   TEST_NAV_MOUSE_MOVE,
2868   TEST_NAV_KEY_PRESS,
2869   N_NAVIGATION_EVENTS
2870 };
2871
2872 typedef struct
2873 {
2874   gboolean got_state_changed_to_playing;
2875   gboolean navigation_received[N_NAVIGATION_EVENTS];
2876 } navigation_master_data;
2877
2878 typedef struct
2879 {
2880   gboolean started;
2881   gboolean navigation_sent[N_NAVIGATION_EVENTS];
2882   gint step;
2883 } navigation_slave_data;
2884
2885 static GstPadProbeReturn
2886 navigation_probe_source (GstPad * pad, GstPadProbeInfo * info,
2887     gpointer user_data)
2888 {
2889   test_data *td = user_data;
2890   navigation_master_data *d = td->md;
2891   const GstStructure *s;
2892   const gchar *string, *key;
2893   double x, y;
2894
2895   if (GST_IS_EVENT (info->data)) {
2896     if (GST_EVENT_TYPE (info->data) == GST_EVENT_NAVIGATION) {
2897       s = gst_event_get_structure (info->data);
2898       FAIL_UNLESS (s);
2899
2900       /* mouse-move */
2901       string = gst_structure_get_string (s, "event");
2902       if (string && !strcmp (string, "mouse-move")) {
2903         if (gst_structure_get_double (s, "pointer_x", &x) && x == 4.7) {
2904           if (gst_structure_get_double (s, "pointer_y", &y) && y == 0.1) {
2905             d->navigation_received[TEST_NAV_MOUSE_MOVE] = TRUE;
2906           }
2907         }
2908       }
2909
2910       /* key-press */
2911       string = gst_structure_get_string (s, "event");
2912       if (string && !strcmp (string, "key-press")) {
2913         key = gst_structure_get_string (s, "key");
2914         if (key && !strcmp (key, "Left")) {
2915           d->navigation_received[TEST_NAV_KEY_PRESS] = TRUE;
2916         }
2917       }
2918
2919       /* drop at this point to imply successful handling; the upstream filesrc
2920        * does not know how to handle navigation events and returns FALSE,
2921        * which makes the test fail */
2922       return GST_PAD_PROBE_DROP;
2923     }
2924   }
2925   return GST_PAD_PROBE_OK;
2926 }
2927
2928 static void
2929 hook_navigation_probe_source (const GValue * v, gpointer user_data)
2930 {
2931   hook_probe_types (v, navigation_probe_source,
2932       GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, user_data);
2933 }
2934
2935 static void
2936 navigation_on_state_changed (gpointer user_data)
2937 {
2938   test_data *td = user_data;
2939   navigation_master_data *d = td->md;
2940
2941   if (!d->got_state_changed_to_playing)
2942     d->got_state_changed_to_playing = TRUE;
2943 }
2944
2945 static void
2946 navigation_source (GstElement * source, void *user_data)
2947 {
2948   test_data *td = user_data;
2949   GstStateChangeReturn ret;
2950   GstIterator *it;
2951
2952   it = gst_bin_iterate_sinks (GST_BIN (source));
2953   while (gst_iterator_foreach (it, hook_navigation_probe_source, user_data))
2954     gst_iterator_resync (it);
2955   gst_iterator_free (it);
2956
2957   td->state_target = GST_STATE_PLAYING;
2958   td->state_changed_cb = navigation_on_state_changed;
2959   ret = gst_element_set_state (source, GST_STATE_PLAYING);
2960   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
2961 }
2962
2963 static void
2964 send_navigation_event (const GValue * v, gpointer user_data)
2965 {
2966   test_data *td = user_data;
2967   navigation_slave_data *d = td->sd;
2968   GstElement *sink;
2969   GstPad *pad, *peer;
2970   GstStructure *s;
2971   GstEvent *e = NULL;
2972
2973   sink = g_value_get_object (v);
2974   FAIL_UNLESS (sink);
2975   pad = gst_element_get_static_pad (sink, "sink");
2976   FAIL_UNLESS (pad);
2977   peer = gst_pad_get_peer (pad);
2978   FAIL_UNLESS (peer);
2979   gst_object_unref (pad);
2980
2981   switch (d->step) {
2982     case TEST_NAV_MOUSE_MOVE:
2983       s = gst_structure_new ("application/x-gst-navigation", "event",
2984           G_TYPE_STRING, "mouse-move", "button", G_TYPE_INT, 0, "pointer_x",
2985           G_TYPE_DOUBLE, 4.7, "pointer_y", G_TYPE_DOUBLE, 0.1, NULL);
2986       e = gst_event_new_navigation (s);
2987       break;
2988     case TEST_NAV_KEY_PRESS:
2989       s = gst_structure_new ("application/x-gst-navigation", "event",
2990           G_TYPE_STRING, "key-press", "key", G_TYPE_STRING, "Left", NULL);
2991       e = gst_event_new_navigation (s);
2992       break;
2993   }
2994
2995   FAIL_UNLESS (e);
2996   FAIL_UNLESS (gst_pad_send_event (peer, e));
2997   d->navigation_sent[d->step] = TRUE;
2998
2999   gst_object_unref (peer);
3000 }
3001
3002 static gboolean
3003 step_navigation (gpointer user_data)
3004 {
3005   test_data *td = user_data;
3006   navigation_slave_data *d = td->sd;
3007   GstIterator *it;
3008
3009   it = gst_bin_iterate_sinks (GST_BIN (td->p));
3010   while (gst_iterator_foreach (it, send_navigation_event, user_data))
3011     gst_iterator_resync (it);
3012   gst_iterator_free (it);
3013
3014   if (++d->step < N_NAVIGATION_EVENTS)
3015     return G_SOURCE_CONTINUE;
3016
3017   /* we are in the slave; send EOS to force the master to stop the pipeline */
3018   gst_element_post_message (GST_ELEMENT (td->p),
3019       gst_message_new_eos (GST_OBJECT (td->p)));
3020
3021   gst_object_unref (td->p);
3022   return G_SOURCE_REMOVE;
3023 }
3024
3025 static GstPadProbeReturn
3026 navigation_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3027 {
3028   test_data *td = user_data;
3029   navigation_slave_data *d = td->sd;
3030   GstClockTime ts;
3031
3032   if (GST_IS_BUFFER (info->data)) {
3033     ts = GST_BUFFER_TIMESTAMP (info->data);
3034     if (GST_CLOCK_TIME_IS_VALID (ts) && ts > STEP_AT * GST_MSECOND) {
3035       if (!d->started) {
3036         d->started = TRUE;
3037         gst_object_ref (td->p);
3038         g_timeout_add (50, step_navigation, td);
3039       }
3040     }
3041   }
3042
3043   return GST_PAD_PROBE_OK;
3044 }
3045
3046 static void
3047 hook_navigation_probe (const GValue * v, gpointer user_data)
3048 {
3049   hook_probe (v, navigation_probe, user_data);
3050 }
3051
3052 static void
3053 setup_sink_navigation (GstElement * sink, gpointer user_data)
3054 {
3055   GstIterator *it;
3056
3057   it = gst_bin_iterate_sinks (GST_BIN (sink));
3058   while (gst_iterator_foreach (it, hook_navigation_probe, user_data))
3059     gst_iterator_resync (it);
3060   gst_iterator_free (it);
3061 }
3062
3063 static void
3064 check_success_source_navigation (gpointer user_data)
3065 {
3066   test_data *td = user_data;
3067   navigation_master_data *d = td->md;
3068   gint n;
3069
3070   FAIL_UNLESS (d->got_state_changed_to_playing);
3071   for (n = 0; n < N_NAVIGATION_EVENTS; ++n) {
3072     FAIL_UNLESS (d->navigation_received[n]);
3073   }
3074 }
3075
3076 static void
3077 check_success_sink_navigation (gpointer user_data)
3078 {
3079   test_data *td = user_data;
3080   navigation_slave_data *d = td->sd;
3081   gint n;
3082
3083   FAIL_UNLESS (d->started);
3084   for (n = 0; n < N_NAVIGATION_EVENTS; ++n) {
3085     FAIL_UNLESS (d->navigation_sent[n]);
3086   }
3087 }
3088
3089 GST_START_TEST (test_non_live_av_navigation)
3090 {
3091   navigation_master_data md = { 0 };
3092   navigation_slave_data sd = { 0 };
3093
3094   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, navigation_source,
3095       setup_sink_navigation, check_success_source_navigation,
3096       check_success_sink_navigation, NULL, &md, &sd);
3097 }
3098
3099 GST_END_TEST;
3100
3101 GST_START_TEST (test_non_live_av_2_navigation)
3102 {
3103   navigation_master_data md = { 0 };
3104   navigation_slave_data sd = { 0 };
3105
3106   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3107       navigation_source, setup_sink_navigation, check_success_source_navigation,
3108       check_success_sink_navigation, NULL, &md, &sd);
3109 }
3110
3111 GST_END_TEST;
3112
3113 GST_START_TEST (test_live_av_navigation)
3114 {
3115   navigation_master_data md = { 0 };
3116   navigation_slave_data sd = { 0 };
3117
3118   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, navigation_source,
3119       setup_sink_navigation, check_success_source_navigation,
3120       check_success_sink_navigation, NULL, &md, &sd);
3121 }
3122
3123 GST_END_TEST;
3124
3125 GST_START_TEST (test_live_av_2_navigation)
3126 {
3127   navigation_master_data md = { 0 };
3128   navigation_slave_data sd = { 0 };
3129
3130   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3131       navigation_source, setup_sink_navigation, check_success_source_navigation,
3132       check_success_sink_navigation, NULL, &md, &sd);
3133 }
3134
3135 GST_END_TEST;
3136
3137 /**** reconfigure test ****/
3138
3139 typedef struct
3140 {
3141   gboolean got_state_changed_to_playing;
3142   gboolean reconfigure_sent[2];
3143 } reconfigure_master_data;
3144
3145 typedef struct
3146 {
3147   gboolean reconfigure_scheduled;
3148   gboolean reconfigure_sent[2];
3149   gboolean got_caps[2][2];
3150 } reconfigure_slave_data;
3151
3152 static GstPadProbeReturn
3153 reconfigure_source_probe (GstPad * pad, GstPadProbeInfo * info,
3154     gpointer user_data)
3155 {
3156   test_data *td = user_data;
3157   reconfigure_master_data *d = td->md;
3158
3159   if (GST_EVENT_TYPE (info->data) == GST_EVENT_RECONFIGURE) {
3160     gint idx = pad2idx (pad, td->two_streams);
3161     d->reconfigure_sent[idx] = TRUE;
3162     if (!td->two_streams || d->reconfigure_sent[idx ? 0 : 1]) {
3163       g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
3164           gst_object_ref (td->p));
3165     }
3166   }
3167
3168   return GST_PAD_PROBE_OK;
3169 }
3170
3171 static void
3172 hook_reconfigure_source_probe (const GValue * v, gpointer user_data)
3173 {
3174   hook_probe_types (v, reconfigure_source_probe,
3175       GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, user_data);
3176 }
3177
3178 static void
3179 reconfigure_on_state_changed (gpointer user_data)
3180 {
3181   test_data *td = user_data;
3182   reconfigure_master_data *d = td->md;
3183
3184   if (!d->got_state_changed_to_playing)
3185     d->got_state_changed_to_playing = TRUE;
3186 }
3187
3188 static void
3189 reconfigure_source (GstElement * source, gpointer user_data)
3190 {
3191   test_data *td = user_data;
3192   GstStateChangeReturn ret;
3193   GstIterator *it;
3194
3195   it = gst_bin_iterate_sinks (GST_BIN (source));
3196   while (gst_iterator_foreach (it, hook_reconfigure_source_probe, user_data))
3197     gst_iterator_resync (it);
3198   gst_iterator_free (it);
3199
3200   td->state_target = GST_STATE_PLAYING;
3201   td->state_changed_cb = reconfigure_on_state_changed;
3202   ret = gst_element_set_state (source, GST_STATE_PLAYING);
3203   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
3204 }
3205
3206 static void
3207 send_reconfigure_on_element (const GValue * v, gpointer user_data)
3208 {
3209   test_data *td = user_data;
3210   reconfigure_slave_data *d = td->sd;
3211   GstElement *sink, *capsfilter;
3212   GstPad *pad, *peer;
3213   GstCaps *caps = NULL;
3214
3215   sink = g_value_get_object (v);
3216   FAIL_UNLESS (sink);
3217   pad = gst_element_get_static_pad (sink, "sink");
3218   FAIL_UNLESS (pad);
3219
3220   // look for the previous element, change caps if a capsfilter
3221   peer = gst_pad_get_peer (pad);
3222   FAIL_UNLESS (peer);
3223   capsfilter = GST_ELEMENT (gst_pad_get_parent (peer));
3224   g_object_get (capsfilter, "caps", &caps, NULL);
3225   FAIL_UNLESS (caps);
3226   caps = gst_caps_make_writable (caps);
3227   if (!strcmp (gst_structure_get_name (gst_caps_get_structure (caps, 0)),
3228           "audio/x-raw")) {
3229     gst_caps_set_simple (caps, "rate", G_TYPE_INT, 48000, NULL);
3230   } else {
3231     gst_caps_set_simple (caps, "width", G_TYPE_INT, 320, "height", G_TYPE_INT,
3232         200, NULL);
3233   }
3234   g_object_set (capsfilter, "caps", caps, NULL);
3235   FAIL_UNLESS (capsfilter);
3236
3237   gst_object_unref (capsfilter);
3238   gst_object_unref (peer);
3239
3240   d->reconfigure_sent[caps2idx (caps, td->two_streams)] = TRUE;
3241
3242   gst_caps_unref (caps);
3243   gst_object_unref (pad);
3244 }
3245
3246 static gboolean
3247 send_reconfigure (gpointer user_data)
3248 {
3249   test_data *td = user_data;
3250   GstIterator *it;
3251
3252   it = gst_bin_iterate_sinks (GST_BIN (td->p));
3253   while (gst_iterator_foreach (it, send_reconfigure_on_element, user_data))
3254     gst_iterator_resync (it);
3255   gst_iterator_free (it);
3256
3257   gst_object_unref (td->p);
3258   return G_SOURCE_REMOVE;
3259 }
3260
3261 static GstPadProbeReturn
3262 reconfigure_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3263 {
3264   test_data *td = user_data;
3265   reconfigure_slave_data *d = td->sd;
3266   GstClockTime ts;
3267   GstCaps *caps;
3268   int idx;
3269
3270   if (GST_IS_BUFFER (info->data)) {
3271     ts = GST_BUFFER_TIMESTAMP (info->data);
3272     if (GST_CLOCK_TIME_IS_VALID (ts) && ts >= STEP_AT * GST_MSECOND) {
3273       if (!d->reconfigure_scheduled) {
3274         d->reconfigure_scheduled = TRUE;
3275         gst_object_ref (td->p);
3276         g_idle_add ((GSourceFunc) send_reconfigure, td);
3277       }
3278     }
3279   } else if (GST_IS_EVENT (info->data)) {
3280     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
3281       gst_event_parse_caps (GST_EVENT (info->data), &caps);
3282       idx = caps2idx (caps, td->two_streams);
3283       if (d->reconfigure_sent[idx]) {
3284         d->got_caps[idx][1] = TRUE;
3285       } else {
3286         d->got_caps[idx][0] = TRUE;
3287       }
3288     }
3289   }
3290
3291   return GST_PAD_PROBE_OK;
3292 }
3293
3294 static void
3295 hook_reconfigure_probe (const GValue * v, gpointer user_data)
3296 {
3297   hook_probe (v, reconfigure_probe, user_data);
3298 }
3299
3300 static void
3301 setup_sink_reconfigure (GstElement * sink, gpointer user_data)
3302 {
3303   GstIterator *it;
3304
3305   it = gst_bin_iterate_sinks (GST_BIN (sink));
3306   while (gst_iterator_foreach (it, hook_reconfigure_probe, user_data))
3307     gst_iterator_resync (it);
3308   gst_iterator_free (it);
3309 }
3310
3311 static void
3312 check_success_source_reconfigure (gpointer user_data)
3313 {
3314   test_data *td = user_data;
3315   reconfigure_master_data *d = td->md;
3316   gint idx;
3317
3318   FAIL_UNLESS (d->got_state_changed_to_playing);
3319   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
3320     FAIL_UNLESS (d->reconfigure_sent[idx]);
3321   }
3322 }
3323
3324 static void
3325 check_success_sink_reconfigure (gpointer user_data)
3326 {
3327   test_data *td = user_data;
3328   reconfigure_slave_data *d = td->sd;
3329   gint idx;
3330
3331   FAIL_UNLESS (d->reconfigure_scheduled);
3332   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
3333     FAIL_UNLESS (d->reconfigure_sent[idx]);
3334     FAIL_UNLESS (d->got_caps[idx][0]);
3335     FAIL_UNLESS (d->got_caps[idx][1]);
3336   }
3337 }
3338
3339 GST_START_TEST (test_non_live_a_reconfigure)
3340 {
3341   reconfigure_master_data md = { 0 };
3342   reconfigure_slave_data sd = { 0 };
3343
3344   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_FILTER_SINK_CAPS,
3345       reconfigure_source, setup_sink_reconfigure,
3346       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3347       &md, &sd);
3348 }
3349
3350 GST_END_TEST;
3351
3352 GST_START_TEST (test_non_live_av_reconfigure)
3353 {
3354   reconfigure_master_data md = { 0 };
3355   reconfigure_slave_data sd = { 0 };
3356
3357   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO |
3358       TEST_FEATURE_FILTER_SINK_CAPS,
3359       reconfigure_source, setup_sink_reconfigure,
3360       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3361       &md, &sd);
3362 }
3363
3364 GST_END_TEST;
3365
3366 GST_START_TEST (test_live_a_reconfigure)
3367 {
3368   reconfigure_master_data md = { 0 };
3369   reconfigure_slave_data sd = { 0 };
3370
3371   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_FILTER_SINK_CAPS,
3372       reconfigure_source, setup_sink_reconfigure,
3373       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3374       &md, &sd);
3375 }
3376
3377 GST_END_TEST;
3378
3379 GST_START_TEST (test_live_av_reconfigure)
3380 {
3381   reconfigure_master_data md = { 0 };
3382   reconfigure_slave_data sd = { 0 };
3383
3384   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_FILTER_SINK_CAPS,
3385       reconfigure_source, setup_sink_reconfigure,
3386       check_success_source_reconfigure, check_success_sink_reconfigure, NULL,
3387       &md, &sd);
3388 }
3389
3390 GST_END_TEST;
3391
3392 /**** state changes test ****/
3393
3394 typedef struct
3395 {
3396   gint step;
3397   GHashTable *fdin, *fdout;
3398   gboolean waiting_state_change;
3399 } state_changes_master_data;
3400
3401 typedef struct
3402 {
3403   gint n_null;
3404   gint n_ready;
3405   gint n_paused;
3406   gint n_playing;
3407   gboolean got_eos;
3408   GThread *thread;
3409   gint refcount;
3410 } state_changes_slave_data;
3411
3412 static void
3413 set_fdin (gpointer key, gpointer value, gpointer user_data)
3414 {
3415   g_object_set (key, "fdin", GPOINTER_TO_INT (value), NULL);
3416 }
3417
3418 static void
3419 set_fdout (gpointer key, gpointer value, gpointer user_data)
3420 {
3421   g_object_set (key, "fdout", GPOINTER_TO_INT (value), NULL);
3422 }
3423
3424 /*
3425  * NULL
3426  * 0: READY NULL READY PAUSED READY PAUSED READY NULL
3427  * 8: READY PAUSED PLAYING PAUSED PLAYING PAUSED READY PAUSED READY NULL
3428  * 18: disconnect
3429  * 19: READY NULL READY PAUSED READY PAUSED READY NULL
3430  * 27: READY PAUSED PLAYING PAUSED PLAYING PAUSED READY PAUSED READY NULL
3431  * 37: reconnect
3432  * 38: READY NULL READY PAUSED READY PAUSED READY NULL
3433  * 46: READY PAUSED PLAYING PAUSED PLAYING
3434  * 51: EOS
3435  */
3436 static gboolean
3437 step_state_changes (gpointer user_data)
3438 {
3439   test_data *td = user_data;
3440   state_changes_master_data *d = td->md;
3441   gboolean ret = G_SOURCE_CONTINUE;
3442   GstStateChangeReturn scret = GST_STATE_CHANGE_FAILURE;
3443   GList *l;
3444   int fdin, fdout;
3445
3446   if (d->waiting_state_change)
3447     goto done;
3448
3449   switch (d->step++) {
3450     case 1:
3451     case 7:
3452     case 17:
3453     case 20:
3454     case 26:
3455     case 36:
3456     case 39:
3457     case 45:
3458       scret = gst_element_set_state (td->p, GST_STATE_NULL);
3459       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_SUCCESS);
3460       break;
3461     case 0:
3462     case 2:
3463     case 4:
3464     case 6:
3465     case 8:
3466     case 14:
3467     case 16:
3468     case 38:
3469     case 40:
3470     case 42:
3471     case 44:
3472     case 46:
3473       scret = gst_element_set_state (td->p, GST_STATE_READY);
3474       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_SUCCESS);
3475       break;
3476     case 19:
3477     case 21:
3478     case 23:
3479     case 25:
3480     case 27:
3481     case 33:
3482     case 35:
3483       /* while we are disconnected, we can't do NULL -> READY */
3484       scret = gst_element_set_state (td->p, GST_STATE_READY);
3485       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_FAILURE);
3486       break;
3487     case 3:
3488     case 5:
3489     case 9:
3490     case 11:
3491     case 13:
3492     case 15:
3493     case 41:
3494     case 43:
3495     case 47:
3496     case 49:
3497       td->state_target = GST_STATE_PAUSED;
3498       scret = gst_element_set_state (td->p, GST_STATE_PAUSED);
3499       FAIL_IF (scret == GST_STATE_CHANGE_FAILURE);
3500       break;
3501     case 22:
3502     case 24:
3503     case 28:
3504     case 30:
3505     case 32:
3506     case 34:
3507       /* while we are disconnected, we can't do NULL -> READY */
3508       scret = gst_element_set_state (td->p, GST_STATE_PAUSED);
3509       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_FAILURE);
3510       break;
3511     case 10:
3512     case 12:
3513     case 48:
3514     case 50:
3515       td->state_target = GST_STATE_PLAYING;
3516       scret = gst_element_set_state (td->p, GST_STATE_PLAYING);
3517       FAIL_IF (scret == GST_STATE_CHANGE_FAILURE);
3518       break;
3519     case 29:
3520     case 31:
3521       /* while we are disconnected, we can't do NULL -> READY */
3522       scret = gst_element_set_state (td->p, GST_STATE_PLAYING);
3523       FAIL_UNLESS_EQUALS_INT (scret, GST_STATE_CHANGE_FAILURE);
3524       break;
3525     case 18:
3526       d->fdin = g_hash_table_new (g_direct_hash, g_direct_equal);
3527       d->fdout = g_hash_table_new (g_direct_hash, g_direct_equal);
3528       for (l = weak_refs; l; l = l->next) {
3529         g_object_get (l->data, "fdin", &fdin, "fdout", &fdout, NULL);
3530         g_hash_table_insert (d->fdin, (gpointer) l->data,
3531             GINT_TO_POINTER (fdin));
3532         g_hash_table_insert (d->fdout, (gpointer) l->data,
3533             GINT_TO_POINTER (fdout));
3534         g_signal_emit_by_name (G_OBJECT (l->data), "disconnect", NULL);
3535       }
3536       break;
3537     case 37:
3538       g_hash_table_foreach (d->fdin, set_fdin, NULL);
3539       g_hash_table_foreach (d->fdout, set_fdout, NULL);
3540       g_hash_table_destroy (d->fdin);
3541       g_hash_table_destroy (d->fdout);
3542       break;
3543     case 51:
3544       /* send EOS early to avoid waiting for the actual end of the file */
3545       gst_element_send_event (td->p, gst_event_new_eos ());
3546       gst_object_unref (td->p);
3547       ret = G_SOURCE_REMOVE;
3548       break;
3549   }
3550
3551   if (scret == GST_STATE_CHANGE_ASYNC)
3552     d->waiting_state_change = TRUE;
3553
3554 done:
3555   return ret;
3556 }
3557
3558 static void
3559 state_changes_state_changed (gpointer user_data)
3560 {
3561   test_data *td = user_data;
3562   state_changes_master_data *d = td->md;
3563
3564   d->waiting_state_change = FALSE;
3565 }
3566
3567 static void
3568 state_changes_source (GstElement * source, gpointer user_data)
3569 {
3570   test_data *td = user_data;
3571   state_changes_master_data *d = td->md;
3572
3573   gst_object_ref (source);
3574   g_timeout_add (STEP_AT, (GSourceFunc) step_state_changes, td);
3575
3576   d->waiting_state_change = FALSE;
3577   td->state_changed_cb = state_changes_state_changed;
3578 }
3579
3580 static GstPadProbeReturn
3581 state_changes_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
3582 {
3583   test_data *td = user_data;
3584   state_changes_slave_data *d = td->sd;
3585
3586   if (GST_IS_EVENT (info->data)) {
3587     if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
3588       d->got_eos = TRUE;
3589       if (g_atomic_int_dec_and_test (&d->refcount)) {
3590         g_thread_join (d->thread);
3591         gst_object_unref (td->p);
3592       }
3593       return GST_PAD_PROBE_REMOVE;
3594     }
3595   }
3596
3597   return GST_PAD_PROBE_OK;
3598 }
3599
3600 static void
3601 hook_state_changes_probe (const GValue * v, gpointer user_data)
3602 {
3603   test_data *td = user_data;
3604   state_changes_slave_data *d = td->sd;
3605
3606   d->refcount++;
3607   hook_probe (v, state_changes_probe, user_data);
3608 }
3609
3610 static gpointer
3611 state_changes_watcher (gpointer user_data)
3612 {
3613   test_data *td = user_data;
3614   state_changes_slave_data *d = td->sd;
3615   GstState state = GST_STATE_VOID_PENDING, prev_state = GST_STATE_VOID_PENDING;
3616   GstStateChangeReturn ret;
3617
3618   while (!d->got_eos) {
3619     ret = gst_element_get_state (td->p, &state, NULL, GST_CLOCK_TIME_NONE);
3620     if (ret == GST_STATE_CHANGE_SUCCESS && state != GST_STATE_VOID_PENDING) {
3621       if (state != prev_state) {
3622         switch (state) {
3623           case GST_STATE_NULL:
3624             d->n_null++;
3625             break;
3626           case GST_STATE_READY:
3627             d->n_ready++;
3628             break;
3629           case GST_STATE_PAUSED:
3630             d->n_paused++;
3631             break;
3632           case GST_STATE_PLAYING:
3633             d->n_playing++;
3634             break;
3635           default:
3636             fail_if (1);
3637         }
3638         prev_state = state;
3639       }
3640     }
3641     g_usleep (STEP_AT * 1000 / 4);
3642   }
3643   return NULL;
3644 }
3645
3646 static void
3647 setup_sink_state_changes (GstElement * sink, gpointer user_data)
3648 {
3649   test_data *td = user_data;
3650   state_changes_slave_data *d = td->sd;
3651   GstIterator *it;
3652
3653   gst_object_ref (sink);
3654   d->refcount = 0;
3655   d->thread =
3656       g_thread_new ("state-changes-watcher",
3657       (GThreadFunc) state_changes_watcher, td);
3658   FAIL_UNLESS (d->thread);
3659
3660   it = gst_bin_iterate_sinks (GST_BIN (sink));
3661   while (gst_iterator_foreach (it, hook_state_changes_probe, td))
3662     gst_iterator_resync (it);
3663   gst_iterator_free (it);
3664 }
3665
3666 static void
3667 check_success_source_state_changes (gpointer user_data)
3668 {
3669   test_data *td = user_data;
3670   state_changes_master_data *d = td->md;
3671
3672   FAIL_UNLESS_EQUALS_INT (d->step, 52);
3673 }
3674
3675 static void
3676 check_success_sink_state_changes (gpointer user_data)
3677 {
3678   test_data *td = user_data;
3679   state_changes_slave_data *d = td->sd;
3680
3681   FAIL_UNLESS (d->got_eos);
3682   FAIL_UNLESS_EQUALS_INT (d->n_null, 6);
3683   FAIL_UNLESS_EQUALS_INT (d->n_ready, 12);
3684   FAIL_UNLESS_EQUALS_INT (d->n_paused, 10);
3685   FAIL_UNLESS_EQUALS_INT (d->n_playing, 4);
3686 }
3687
3688 GST_START_TEST (test_empty_state_changes)
3689 {
3690   state_changes_master_data md = { 0 };
3691   state_changes_slave_data sd = { 0 };
3692
3693   TEST_BASE (TEST_FEATURE_TEST_SOURCE, state_changes_source,
3694       setup_sink_state_changes, check_success_source_state_changes,
3695       check_success_sink_state_changes, NULL, &md, &sd);
3696 }
3697
3698 GST_END_TEST;
3699
3700 GST_START_TEST (test_wavparse_state_changes)
3701 {
3702   state_changes_master_data md = { 0 };
3703   state_changes_slave_data sd = { 0 };
3704
3705   TEST_BASE (TEST_FEATURE_WAV_SOURCE, state_changes_source,
3706       setup_sink_state_changes, check_success_source_state_changes,
3707       check_success_sink_state_changes, NULL, &md, &sd);
3708 }
3709
3710 GST_END_TEST;
3711
3712 GST_START_TEST (test_mpegts_state_changes)
3713 {
3714   state_changes_master_data md = { 0 };
3715   state_changes_slave_data sd = { 0 };
3716
3717   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, state_changes_source,
3718       setup_sink_state_changes, check_success_source_state_changes,
3719       check_success_sink_state_changes, NULL, &md, &sd);
3720 }
3721
3722 GST_END_TEST;
3723
3724 GST_START_TEST (test_mpegts_2_state_changes)
3725 {
3726   state_changes_master_data md = { 0 };
3727   state_changes_slave_data sd = { 0 };
3728
3729   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3730       state_changes_source, setup_sink_state_changes,
3731       check_success_source_state_changes, check_success_sink_state_changes,
3732       NULL, &md, &sd);
3733 }
3734
3735 GST_END_TEST;
3736
3737 /**** state changes stress test ****/
3738
3739 typedef struct
3740 {
3741   gint n_state_changes;
3742 } state_changes_stress_input_data;
3743
3744 typedef struct
3745 {
3746   gboolean got_state_changed_to_playing;
3747   gboolean async_state_change_completed;
3748 } state_changes_stress_master_data;
3749
3750 static gboolean
3751 step_state_changes_stress (gpointer user_data)
3752 {
3753   test_data *td = user_data;
3754   state_changes_stress_input_data *i = td->id;
3755   state_changes_stress_master_data *d = td->md;
3756   static const GstState states[] =
3757       { GST_STATE_NULL, GST_STATE_READY, GST_STATE_PAUSED, GST_STATE_PLAYING };
3758   GstState state;
3759   GstStateChangeReturn ret;
3760
3761   /* wait for async state change to complete before continuing */
3762   if (!d->async_state_change_completed)
3763     return G_SOURCE_CONTINUE;
3764
3765   if (i->n_state_changes == 0) {
3766     ret = gst_element_set_state (td->p, GST_STATE_PLAYING);
3767     FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
3768     g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, td->p);
3769     return G_SOURCE_REMOVE;
3770   }
3771   --i->n_state_changes;
3772
3773   state = states[rand () % 4];
3774   ret = gst_element_set_state (td->p, state);
3775   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
3776
3777   if (ret == GST_STATE_CHANGE_ASYNC) {
3778     td->state_target = state;
3779     d->async_state_change_completed = FALSE;
3780   }
3781
3782   return G_SOURCE_CONTINUE;
3783 }
3784
3785 static void
3786 state_changes_stress_on_state_changed (gpointer user_data)
3787 {
3788   test_data *td = user_data;
3789   state_changes_stress_master_data *d = td->md;
3790
3791   if (!d->got_state_changed_to_playing) {
3792     d->got_state_changed_to_playing = TRUE;
3793     gst_object_ref (td->p);
3794     g_timeout_add (50, (GSourceFunc) step_state_changes_stress, td);
3795   }
3796   d->async_state_change_completed = TRUE;
3797 }
3798
3799 static void
3800 state_changes_stress_source (GstElement * source, gpointer user_data)
3801 {
3802   test_data *td = user_data;
3803   GstStateChangeReturn ret;
3804
3805   td->state_target = GST_STATE_PLAYING;
3806   td->state_changed_cb = state_changes_stress_on_state_changed;
3807   ret = gst_element_set_state (source, GST_STATE_PLAYING);
3808   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
3809 }
3810
3811 static void
3812 check_success_source_state_changes_stress (gpointer user_data)
3813 {
3814   test_data *td = user_data;
3815   state_changes_stress_input_data *i = td->id;
3816   state_changes_stress_master_data *d = td->md;
3817
3818   FAIL_UNLESS (d->got_state_changed_to_playing);
3819   FAIL_UNLESS_EQUALS_INT (i->n_state_changes, 0);
3820 }
3821
3822 GST_START_TEST (test_empty_state_changes_stress)
3823 {
3824   state_changes_stress_input_data id = { 500 };
3825   state_changes_stress_master_data md = { 0 };
3826
3827   TEST_BASE (TEST_FEATURE_TEST_SOURCE, state_changes_stress_source, NULL,
3828       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3829 }
3830
3831 GST_END_TEST;
3832
3833 GST_START_TEST (test_wavparse_state_changes_stress)
3834 {
3835   state_changes_stress_input_data id = { 500 };
3836   state_changes_stress_master_data md = { 0 };
3837
3838   TEST_BASE (TEST_FEATURE_WAV_SOURCE, state_changes_stress_source, NULL,
3839       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3840 }
3841
3842 GST_END_TEST;
3843
3844 GST_START_TEST (test_mpegts_state_changes_stress)
3845 {
3846   state_changes_stress_input_data id = { 500 };
3847   state_changes_stress_master_data md = { 0 };
3848
3849   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, state_changes_stress_source, NULL,
3850       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3851 }
3852
3853 GST_END_TEST;
3854
3855 GST_START_TEST (test_mpegts_2_state_changes_stress)
3856 {
3857   state_changes_stress_input_data id = { 500 };
3858   state_changes_stress_master_data md = { 0 };
3859
3860   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3861       state_changes_stress_source, NULL,
3862       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3863 }
3864
3865 GST_END_TEST;
3866
3867 GST_START_TEST (test_live_a_state_changes_stress)
3868 {
3869   state_changes_stress_input_data id = { 500 };
3870   state_changes_stress_master_data md = { 0 };
3871
3872   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, state_changes_stress_source, NULL,
3873       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3874 }
3875
3876 GST_END_TEST;
3877
3878 GST_START_TEST (test_live_av_state_changes_stress)
3879 {
3880   state_changes_stress_input_data id = { 500 };
3881   state_changes_stress_master_data md = { 0 };
3882
3883   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, state_changes_stress_source, NULL,
3884       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3885 }
3886
3887 GST_END_TEST;
3888
3889 GST_START_TEST (test_live_av_2_state_changes_stress)
3890 {
3891   state_changes_stress_input_data id = { 500 };
3892   state_changes_stress_master_data md = { 0 };
3893
3894   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
3895       state_changes_stress_source, NULL,
3896       check_success_source_state_changes_stress, NULL, &id, &md, NULL);
3897 }
3898
3899 GST_END_TEST;
3900
3901 /**** serialized query test ****/
3902
3903 typedef struct
3904 {
3905   gboolean sent_query[2];
3906   gboolean got_query_reply[2];
3907   GstPad *pad[2];
3908 } serialized_query_master_data;
3909
3910 typedef struct
3911 {
3912   gboolean got_query;
3913 } serialized_query_slave_data;
3914
3915 static gboolean
3916 send_drain (gpointer user_data)
3917 {
3918   test_data *td = user_data;
3919   serialized_query_master_data *d = td->md;
3920   GstQuery *q;
3921   gint idx;
3922
3923   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
3924     q = gst_query_new_drain ();
3925     FAIL_UNLESS (gst_pad_query (d->pad[idx], q));
3926     d->got_query_reply[idx] = TRUE;
3927     gst_query_unref (q);
3928     gst_object_unref (d->pad[idx]);
3929   }
3930
3931   g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline, gst_object_ref (td->p));
3932   return G_SOURCE_REMOVE;
3933 }
3934
3935 static GstPadProbeReturn
3936 serialized_query_probe_source (GstPad * pad, GstPadProbeInfo * info,
3937     gpointer user_data)
3938 {
3939   test_data *td = user_data;
3940   serialized_query_master_data *d = td->md;
3941   GstClockTime ts;
3942   gint idx;
3943
3944   if (GST_IS_BUFFER (info->data)) {
3945     ts = GST_BUFFER_TIMESTAMP (info->data);
3946     idx = pad2idx (pad, td->two_streams);
3947     if (!d->sent_query[idx] && GST_CLOCK_TIME_IS_VALID (ts)
3948         && ts > STEP_AT * GST_MSECOND) {
3949       g_atomic_int_set (&d->sent_query[idx], TRUE);
3950       d->pad[idx] = gst_object_ref (pad);
3951       if (!td->two_streams || g_atomic_int_get (&d->sent_query[idx ? 0 : 1]))
3952         g_idle_add (send_drain, td);
3953     }
3954   }
3955   return GST_PAD_PROBE_OK;
3956 }
3957
3958 static void
3959 hook_serialized_query_probe_source (const GValue * v, gpointer user_data)
3960 {
3961   hook_probe (v, serialized_query_probe_source, user_data);
3962 }
3963
3964 static void
3965 serialized_query_source (GstElement * source, gpointer user_data)
3966 {
3967   GstIterator *it;
3968   GstStateChangeReturn ret;
3969
3970   it = gst_bin_iterate_sinks (GST_BIN (source));
3971   while (gst_iterator_foreach (it, hook_serialized_query_probe_source,
3972           user_data))
3973     gst_iterator_resync (it);
3974   gst_iterator_free (it);
3975
3976   ret = gst_element_set_state (source, GST_STATE_PLAYING);
3977   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
3978       || ret == GST_STATE_CHANGE_SUCCESS);
3979 }
3980
3981 static GstPadProbeReturn
3982 serialized_query_probe (GstPad * pad, GstPadProbeInfo * info,
3983     gpointer user_data)
3984 {
3985   test_data *td = user_data;
3986   serialized_query_slave_data *d = td->sd;
3987
3988   if (GST_IS_QUERY (info->data)) {
3989     if (GST_QUERY_TYPE (info->data) == GST_QUERY_DRAIN) {
3990       d->got_query = TRUE;
3991     }
3992   }
3993   return GST_PAD_PROBE_OK;
3994 }
3995
3996 static void
3997 hook_serialized_query_probe (const GValue * v, gpointer user_data)
3998 {
3999   hook_probe (v, serialized_query_probe, user_data);
4000 }
4001
4002 static void
4003 setup_sink_serialized_query (GstElement * sink, gpointer user_data)
4004 {
4005   GstIterator *it;
4006
4007   it = gst_bin_iterate_sinks (GST_BIN (sink));
4008   while (gst_iterator_foreach (it, hook_serialized_query_probe, user_data))
4009     gst_iterator_resync (it);
4010   gst_iterator_free (it);
4011 }
4012
4013 static void
4014 check_success_source_serialized_query (gpointer user_data)
4015 {
4016   test_data *td = user_data;
4017   serialized_query_master_data *d = td->md;
4018   gint idx;
4019
4020   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
4021     FAIL_UNLESS (d->sent_query[idx]);
4022     FAIL_UNLESS (d->got_query_reply[idx]);
4023   }
4024 }
4025
4026 static void
4027 check_success_sink_serialized_query (gpointer user_data)
4028 {
4029   test_data *td = user_data;
4030   serialized_query_slave_data *d = td->sd;
4031
4032   FAIL_UNLESS (d->got_query);
4033 }
4034
4035 GST_START_TEST (test_empty_serialized_query)
4036 {
4037   serialized_query_master_data md = { 0 };
4038   serialized_query_slave_data sd = { 0 };
4039
4040   TEST_BASE (TEST_FEATURE_TEST_SOURCE, serialized_query_source,
4041       setup_sink_serialized_query, check_success_source_serialized_query,
4042       check_success_sink_serialized_query, NULL, &md, &sd);
4043 }
4044
4045 GST_END_TEST;
4046
4047 GST_START_TEST (test_wavparse_serialized_query)
4048 {
4049   serialized_query_master_data md = { 0 };
4050   serialized_query_slave_data sd = { 0 };
4051
4052   TEST_BASE (TEST_FEATURE_WAV_SOURCE, serialized_query_source,
4053       setup_sink_serialized_query, check_success_source_serialized_query,
4054       check_success_sink_serialized_query, NULL, &md, &sd);
4055 }
4056
4057 GST_END_TEST;
4058
4059 GST_START_TEST (test_mpegts_serialized_query)
4060 {
4061   serialized_query_master_data md = { 0 };
4062   serialized_query_slave_data sd = { 0 };
4063
4064   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, serialized_query_source,
4065       setup_sink_serialized_query, check_success_source_serialized_query,
4066       check_success_sink_serialized_query, NULL, &md, &sd);
4067 }
4068
4069 GST_END_TEST;
4070
4071 GST_START_TEST (test_mpegts_2_serialized_query)
4072 {
4073   serialized_query_master_data md = { 0 };
4074   serialized_query_slave_data sd = { 0 };
4075
4076   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4077       serialized_query_source, setup_sink_serialized_query,
4078       check_success_source_serialized_query,
4079       check_success_sink_serialized_query, NULL, &md, &sd);
4080 }
4081
4082 GST_END_TEST;
4083
4084 GST_START_TEST (test_live_a_serialized_query)
4085 {
4086   serialized_query_master_data md = { 0 };
4087   serialized_query_slave_data sd = { 0 };
4088
4089   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, serialized_query_source,
4090       setup_sink_serialized_query, check_success_source_serialized_query,
4091       check_success_sink_serialized_query, NULL, &md, &sd);
4092 }
4093
4094 GST_END_TEST;
4095
4096 GST_START_TEST (test_live_av_serialized_query)
4097 {
4098   serialized_query_master_data md = { 0 };
4099   serialized_query_slave_data sd = { 0 };
4100
4101   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, serialized_query_source,
4102       setup_sink_serialized_query, check_success_source_serialized_query,
4103       check_success_sink_serialized_query, NULL, &md, &sd);
4104 }
4105
4106 GST_END_TEST;
4107
4108 GST_START_TEST (test_live_av_2_serialized_query)
4109 {
4110   serialized_query_master_data md = { 0 };
4111   serialized_query_slave_data sd = { 0 };
4112
4113   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4114       serialized_query_source, setup_sink_serialized_query,
4115       check_success_source_serialized_query,
4116       check_success_sink_serialized_query, NULL, &md, &sd);
4117 }
4118
4119 GST_END_TEST;
4120
4121 /**** non serialized event test ****/
4122
4123 typedef struct
4124 {
4125   gboolean sent_event[2];
4126 } non_serialized_event_master_data;
4127
4128 typedef struct
4129 {
4130   gboolean got_event;
4131 } non_serialized_event_slave_data;
4132
4133 static GstPadProbeReturn
4134 non_serialized_event_probe_source (GstPad * pad, GstPadProbeInfo * info,
4135     gpointer user_data)
4136 {
4137   test_data *td = user_data;
4138   non_serialized_event_master_data *d = td->md;
4139   GstClockTime ts;
4140   GstEvent *e;
4141   gint idx;
4142
4143   if (GST_IS_BUFFER (info->data)) {
4144     ts = GST_BUFFER_TIMESTAMP (info->data);
4145     idx = pad2idx (pad, td->two_streams);
4146     if (!g_atomic_int_get (&d->sent_event[idx])
4147         && GST_CLOCK_TIME_IS_VALID (ts) && ts > STEP_AT * GST_MSECOND) {
4148       e = gst_event_new_custom (GST_EVENT_CUSTOM_DOWNSTREAM_OOB,
4149           gst_structure_new ("name", "field", G_TYPE_INT, 42, NULL));
4150       FAIL_UNLESS (e);
4151       FAIL_UNLESS (gst_pad_send_event (pad, e));
4152
4153       if (!td->two_streams || g_atomic_int_get (&d->sent_event[idx ? 0 : 1])) {
4154         g_timeout_add (STEP_AT, (GSourceFunc) stop_pipeline,
4155             gst_object_ref (td->p));
4156       }
4157       g_atomic_int_set (&d->sent_event[idx], TRUE);
4158     }
4159   }
4160   return GST_PAD_PROBE_OK;
4161 }
4162
4163 static void
4164 hook_non_serialized_event_probe_source (const GValue * v, gpointer user_data)
4165 {
4166   hook_probe (v, non_serialized_event_probe_source, user_data);
4167 }
4168
4169 static void
4170 non_serialized_event_source (GstElement * source, gpointer user_data)
4171 {
4172   GstIterator *it;
4173   GstStateChangeReturn ret;
4174
4175   it = gst_bin_iterate_sinks (GST_BIN (source));
4176   while (gst_iterator_foreach (it, hook_non_serialized_event_probe_source,
4177           user_data))
4178     gst_iterator_resync (it);
4179   gst_iterator_free (it);
4180
4181   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4182   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4183       || ret == GST_STATE_CHANGE_SUCCESS);
4184 }
4185
4186 static GstPadProbeReturn
4187 non_serialized_event_probe (GstPad * pad, GstPadProbeInfo * info,
4188     gpointer user_data)
4189 {
4190   test_data *td = user_data;
4191   non_serialized_event_slave_data *d = td->sd;
4192   const GstStructure *s;
4193   gint val;
4194
4195   if (GST_IS_EVENT (info->data)) {
4196     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CUSTOM_DOWNSTREAM_OOB) {
4197       s = gst_event_get_structure (info->data);
4198       FAIL_UNLESS (!strcmp (gst_structure_get_name (s), "name"));
4199       FAIL_UNLESS (gst_structure_get_int (s, "field", &val));
4200       FAIL_UNLESS (val == 42);
4201       d->got_event = TRUE;
4202     }
4203   }
4204   return GST_PAD_PROBE_OK;
4205 }
4206
4207 static void
4208 hook_non_serialized_event_probe (const GValue * v, gpointer user_data)
4209 {
4210   hook_probe (v, non_serialized_event_probe, user_data);
4211 }
4212
4213 static void
4214 setup_sink_non_serialized_event (GstElement * sink, gpointer user_data)
4215 {
4216   GstIterator *it;
4217
4218   it = gst_bin_iterate_sinks (GST_BIN (sink));
4219   while (gst_iterator_foreach (it, hook_non_serialized_event_probe, user_data))
4220     gst_iterator_resync (it);
4221   gst_iterator_free (it);
4222 }
4223
4224 static void
4225 check_success_source_non_serialized_event (gpointer user_data)
4226 {
4227   test_data *td = user_data;
4228   non_serialized_event_master_data *d = td->md;
4229   gint idx;
4230
4231   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
4232     FAIL_UNLESS (d->sent_event[idx]);
4233   }
4234 }
4235
4236 static void
4237 check_success_sink_non_serialized_event (gpointer user_data)
4238 {
4239   test_data *td = user_data;
4240   non_serialized_event_slave_data *d = td->sd;
4241
4242   FAIL_UNLESS (d->got_event);
4243 }
4244
4245 GST_START_TEST (test_empty_non_serialized_event)
4246 {
4247   non_serialized_event_master_data md = { 0 };
4248   non_serialized_event_slave_data sd = { 0 };
4249
4250   TEST_BASE (TEST_FEATURE_TEST_SOURCE, non_serialized_event_source,
4251       setup_sink_non_serialized_event,
4252       check_success_source_non_serialized_event,
4253       check_success_sink_non_serialized_event, NULL, &md, &sd);
4254 }
4255
4256 GST_END_TEST;
4257
4258 GST_START_TEST (test_wavparse_non_serialized_event)
4259 {
4260   non_serialized_event_master_data md = { 0 };
4261   non_serialized_event_slave_data sd = { 0 };
4262
4263   TEST_BASE (TEST_FEATURE_WAV_SOURCE, non_serialized_event_source,
4264       setup_sink_non_serialized_event,
4265       check_success_source_non_serialized_event,
4266       check_success_sink_non_serialized_event, NULL, &md, &sd);
4267 }
4268
4269 GST_END_TEST;
4270
4271 GST_START_TEST (test_mpegts_non_serialized_event)
4272 {
4273   non_serialized_event_master_data md = { 0 };
4274   non_serialized_event_slave_data sd = { 0 };
4275
4276   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, non_serialized_event_source,
4277       setup_sink_non_serialized_event,
4278       check_success_source_non_serialized_event,
4279       check_success_sink_non_serialized_event, NULL, &md, &sd);
4280 }
4281
4282 GST_END_TEST;
4283
4284 GST_START_TEST (test_mpegts_2_non_serialized_event)
4285 {
4286   non_serialized_event_master_data md = { 0 };
4287   non_serialized_event_slave_data sd = { 0 };
4288
4289   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4290       non_serialized_event_source, setup_sink_non_serialized_event,
4291       check_success_source_non_serialized_event,
4292       check_success_sink_non_serialized_event, NULL, &md, &sd);
4293 }
4294
4295 GST_END_TEST;
4296
4297 GST_START_TEST (test_live_a_non_serialized_event)
4298 {
4299   non_serialized_event_master_data md = { 0 };
4300   non_serialized_event_slave_data sd = { 0 };
4301
4302   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, non_serialized_event_source,
4303       setup_sink_non_serialized_event,
4304       check_success_source_non_serialized_event,
4305       check_success_sink_non_serialized_event, NULL, &md, &sd);
4306 }
4307
4308 GST_END_TEST;
4309
4310 GST_START_TEST (test_live_av_non_serialized_event)
4311 {
4312   non_serialized_event_master_data md = { 0 };
4313   non_serialized_event_slave_data sd = { 0 };
4314
4315   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, non_serialized_event_source,
4316       setup_sink_non_serialized_event,
4317       check_success_source_non_serialized_event,
4318       check_success_sink_non_serialized_event, NULL, &md, &sd);
4319 }
4320
4321 GST_END_TEST;
4322
4323 GST_START_TEST (test_live_av_2_non_serialized_event)
4324 {
4325   non_serialized_event_master_data md = { 0 };
4326   non_serialized_event_slave_data sd = { 0 };
4327
4328   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4329       non_serialized_event_source, setup_sink_non_serialized_event,
4330       check_success_source_non_serialized_event,
4331       check_success_sink_non_serialized_event, NULL, &md, &sd);
4332 }
4333
4334 GST_END_TEST;
4335
4336 /**** meta test ****/
4337
4338 enum
4339 {
4340   TEST_META_PROTECTION = 0,
4341   N_TEST_META
4342 };
4343
4344 typedef struct
4345 {
4346   gboolean meta_sent[N_TEST_META];
4347 } meta_master_data;
4348
4349 typedef struct
4350 {
4351   gboolean meta_received[N_TEST_META];
4352 } meta_slave_data;
4353
4354 static GstPadProbeReturn
4355 meta_probe_source (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4356 {
4357   test_data *td = user_data;
4358   meta_master_data *d = td->md;
4359   GstBuffer *buffer;
4360   GstProtectionMeta *meta;
4361
4362   if (GST_IS_BUFFER (info->data)) {
4363     buffer = GST_BUFFER (info->data);
4364     meta =
4365         gst_buffer_add_protection_meta (buffer, gst_structure_new ("name",
4366             "somefield", G_TYPE_INT, 42, NULL));
4367     FAIL_UNLESS (meta);
4368     d->meta_sent[TEST_META_PROTECTION] = TRUE;
4369   }
4370   return GST_PAD_PROBE_OK;
4371 }
4372
4373 static void
4374 hook_meta_probe_source (const GValue * v, gpointer user_data)
4375 {
4376   hook_probe (v, meta_probe_source, user_data);
4377 }
4378
4379 static void
4380 meta_source (GstElement * source, gpointer user_data)
4381 {
4382   GstIterator *it;
4383   GstStateChangeReturn ret;
4384
4385   it = gst_bin_iterate_sinks (GST_BIN (source));
4386   while (gst_iterator_foreach (it, hook_meta_probe_source, user_data))
4387     gst_iterator_resync (it);
4388   gst_iterator_free (it);
4389
4390   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4391   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4392       || ret == GST_STATE_CHANGE_SUCCESS);
4393
4394   g_timeout_add (STOP_AT, (GSourceFunc) stop_pipeline, gst_object_ref (source));
4395 }
4396
4397 static gboolean
4398 scan_meta (GstBuffer * buffer, GstMeta ** meta, gpointer user_data)
4399 {
4400   test_data *td = user_data;
4401   meta_slave_data *d = td->sd;
4402   int val;
4403   GstStructure *s;
4404   GstProtectionMeta *pmeta;
4405
4406   if ((*meta)->info->api == GST_PROTECTION_META_API_TYPE) {
4407     pmeta = (GstProtectionMeta *) * meta;
4408     FAIL_UNLESS (GST_IS_STRUCTURE (pmeta->info));
4409     s = GST_STRUCTURE (pmeta->info);
4410     FAIL_UNLESS (!strcmp (gst_structure_get_name (s), "name"));
4411     FAIL_UNLESS (gst_structure_get_int (s, "somefield", &val));
4412     FAIL_UNLESS (val == 42);
4413     d->meta_received[TEST_META_PROTECTION] = TRUE;
4414   }
4415   return TRUE;
4416 }
4417
4418 static GstPadProbeReturn
4419 meta_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4420 {
4421   if (GST_IS_BUFFER (info->data)) {
4422     gst_buffer_foreach_meta (info->data, scan_meta, user_data);
4423   }
4424   return GST_PAD_PROBE_OK;
4425 }
4426
4427 static void
4428 hook_meta_probe (const GValue * v, gpointer user_data)
4429 {
4430   hook_probe (v, meta_probe, user_data);
4431 }
4432
4433 static void
4434 setup_sink_meta (GstElement * sink, gpointer user_data)
4435 {
4436   GstIterator *it;
4437
4438   it = gst_bin_iterate_sinks (GST_BIN (sink));
4439   while (gst_iterator_foreach (it, hook_meta_probe, user_data))
4440     gst_iterator_resync (it);
4441   gst_iterator_free (it);
4442 }
4443
4444 static void
4445 check_success_source_meta (gpointer user_data)
4446 {
4447   test_data *td = user_data;
4448   meta_master_data *d = td->md;
4449   size_t n;
4450
4451   for (n = 0; n < N_TEST_META; ++n)
4452     FAIL_UNLESS (d->meta_sent[n]);
4453 }
4454
4455 static void
4456 check_success_sink_meta (gpointer user_data)
4457 {
4458   test_data *td = user_data;
4459   meta_slave_data *d = td->sd;
4460   size_t n;
4461
4462   for (n = 0; n < N_TEST_META; ++n)
4463     FAIL_UNLESS (d->meta_received[n]);
4464 }
4465
4466 GST_START_TEST (test_empty_meta)
4467 {
4468   meta_master_data md = { 0 };
4469   meta_slave_data sd = { 0 };
4470
4471   TEST_BASE (TEST_FEATURE_TEST_SOURCE, meta_source, setup_sink_meta,
4472       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4473 }
4474
4475 GST_END_TEST;
4476
4477 GST_START_TEST (test_wavparse_meta)
4478 {
4479   meta_master_data md = { 0 };
4480   meta_slave_data sd = { 0 };
4481
4482   TEST_BASE (TEST_FEATURE_WAV_SOURCE, meta_source, setup_sink_meta,
4483       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4484 }
4485
4486 GST_END_TEST;
4487
4488 GST_START_TEST (test_mpegts_meta)
4489 {
4490   meta_master_data md = { 0 };
4491   meta_slave_data sd = { 0 };
4492
4493   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE, meta_source, setup_sink_meta,
4494       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4495 }
4496
4497 GST_END_TEST;
4498
4499 GST_START_TEST (test_mpegts_2_meta)
4500 {
4501   meta_master_data md = { 0 };
4502   meta_slave_data sd = { 0 };
4503
4504   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_SPLIT_SINKS, meta_source,
4505       setup_sink_meta, check_success_source_meta, check_success_sink_meta,
4506       NULL, &md, &sd);
4507 }
4508
4509 GST_END_TEST;
4510
4511 GST_START_TEST (test_live_a_meta)
4512 {
4513   meta_master_data md = { 0 };
4514   meta_slave_data sd = { 0 };
4515
4516   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE, meta_source, setup_sink_meta,
4517       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4518 }
4519
4520 GST_END_TEST;
4521
4522 GST_START_TEST (test_live_av_meta)
4523 {
4524   meta_master_data md = { 0 };
4525   meta_slave_data sd = { 0 };
4526
4527   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, meta_source, setup_sink_meta,
4528       check_success_source_meta, check_success_sink_meta, NULL, &md, &sd);
4529 }
4530
4531 GST_END_TEST;
4532
4533 GST_START_TEST (test_live_av_2_meta)
4534 {
4535   meta_master_data md = { 0 };
4536   meta_slave_data sd = { 0 };
4537
4538   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4539       meta_source, setup_sink_meta, check_success_source_meta,
4540       check_success_sink_meta, NULL, &md, &sd);
4541 }
4542
4543 GST_END_TEST;
4544
4545 /**** source change test ****/
4546
4547 typedef struct
4548 {
4549   void (*switcher) (GstElement *, char *name);
4550 } source_change_input_data;
4551
4552 typedef struct
4553 {
4554   gboolean source_change_scheduled;
4555   gboolean source_changed;
4556 } source_change_master_data;
4557
4558 typedef struct
4559 {
4560   gboolean got_caps[2][2];
4561   gboolean got_buffer[2][2];
4562   GstCaps *caps[2];
4563 } source_change_slave_data;
4564
4565 static gboolean
4566 stop_source (gpointer user_data)
4567 {
4568   GstElement *source = user_data;
4569
4570   FAIL_UNLESS (gst_element_set_state (source,
4571           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
4572   gst_object_unref (source);
4573   return FALSE;
4574 }
4575
4576 static gboolean
4577 remove_source (gpointer user_data)
4578 {
4579   GstElement *source = user_data;
4580
4581   FAIL_UNLESS (gst_element_set_state (source,
4582           GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS);
4583   gst_bin_remove (GST_BIN (GST_ELEMENT_PARENT (source)), source);
4584   return FALSE;
4585 }
4586
4587 static void
4588 switch_to_aiff (GstElement * pipeline, char *name)
4589 {
4590   GstElement *sbin, *filesrc, *ipcpipelinesink;
4591   GError *e = NULL;
4592
4593   sbin =
4594       gst_parse_bin_from_description ("pushfilesrc name=filesrc ! aiffparse",
4595       TRUE, &e);
4596   FAIL_IF (e || !sbin);
4597   gst_element_set_name (sbin, name);
4598   filesrc = gst_bin_get_by_name (GST_BIN (sbin), "filesrc");
4599   FAIL_UNLESS (filesrc);
4600   g_object_set (filesrc, "location", "../../tests/files/s16be-id3v2.aiff",
4601       NULL);
4602   gst_object_unref (filesrc);
4603   gst_bin_add (GST_BIN (pipeline), sbin);
4604   ipcpipelinesink = gst_bin_get_by_name (GST_BIN (pipeline), "ipcpipelinesink");
4605   FAIL_UNLESS (ipcpipelinesink);
4606   FAIL_UNLESS (gst_element_link (sbin, ipcpipelinesink));
4607   gst_object_unref (ipcpipelinesink);
4608   gst_element_sync_state_with_parent (sbin);
4609   g_free (name);
4610 }
4611
4612 static void
4613 switch_av (GstElement * pipeline, char *name, gboolean live, gboolean Long)
4614 {
4615   GstElement *src, *ipcpipelinesink;
4616   gint L = Long ? 10 : 1;
4617
4618   if (g_str_has_prefix (name, "videotestsrc")) {
4619     /* replace video source with audio source */
4620     src = gst_element_factory_make ("audiotestsrc", NULL);
4621     FAIL_UNLESS (src);
4622     g_object_set (src, "is-live", live, "num-buffers", live ? 27 * L : -1,
4623         NULL);
4624     gst_bin_add (GST_BIN (pipeline), src);
4625     ipcpipelinesink =
4626         gst_bin_get_by_name (GST_BIN (pipeline), "vipcpipelinesink");
4627     FAIL_UNLESS (ipcpipelinesink);
4628     FAIL_UNLESS (gst_element_link (src, ipcpipelinesink));
4629     gst_object_unref (ipcpipelinesink);
4630     gst_element_sync_state_with_parent (src);
4631   }
4632
4633   if (g_str_has_prefix (name, "audiotestsrc")) {
4634     /* replace audio source with video source */
4635     src = gst_element_factory_make ("videotestsrc", NULL);
4636     FAIL_UNLESS (src);
4637     g_object_set (src, "is-live", live, "num-buffers", live ? 19 * L : -1,
4638         NULL);
4639     gst_bin_add (GST_BIN (pipeline), src);
4640     ipcpipelinesink =
4641         gst_bin_get_by_name (GST_BIN (pipeline), "aipcpipelinesink");
4642     FAIL_UNLESS (ipcpipelinesink);
4643     FAIL_UNLESS (gst_element_link (src, ipcpipelinesink));
4644     gst_object_unref (ipcpipelinesink);
4645     gst_element_sync_state_with_parent (src);
4646   }
4647
4648   g_free (name);
4649 }
4650
4651 static void
4652 switch_live_av (GstElement * pipeline, char *name)
4653 {
4654   switch_av (pipeline, name, TRUE, FALSE);
4655 }
4656
4657 static GstPadProbeReturn
4658 change_source_blocked (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4659 {
4660   test_data *td = user_data;
4661   const source_change_input_data *i = td->id;
4662   source_change_master_data *d = td->md;
4663   GstElement *source;
4664   GstPad *peer;
4665
4666   peer = gst_pad_get_peer (pad);
4667   FAIL_UNLESS (peer);
4668   FAIL_UNLESS (gst_pad_unlink (pad, peer));
4669   gst_object_unref (peer);
4670
4671   source = GST_ELEMENT (gst_element_get_parent (pad));
4672   FAIL_UNLESS (source);
4673   g_object_set_qdata (G_OBJECT (source), to_be_removed_quark (),
4674       GINT_TO_POINTER (1));
4675
4676   gst_bin_remove (GST_BIN (GST_ELEMENT_PARENT (source)), source);
4677   (*i->switcher) (td->p, gst_element_get_name (source));
4678
4679   g_idle_add (stop_source, source);
4680
4681   d->source_changed = TRUE;
4682
4683   gst_object_unref (td->p);
4684   return GST_PAD_PROBE_REMOVE;
4685 }
4686
4687 static gboolean
4688 change_source (gpointer user_data)
4689 {
4690   test_data *td = user_data;
4691   GstElement *source;
4692   GstPad *pad;
4693   static const char *const names[] =
4694       { "source", "audiotestsrc", "videotestsrc" };
4695   gboolean found = FALSE;
4696   size_t n;
4697
4698   for (n = 0; n < G_N_ELEMENTS (names); ++n) {
4699     source = gst_bin_get_by_name (GST_BIN (td->p), names[n]);
4700     if (source) {
4701       found = TRUE;
4702       pad = gst_element_get_static_pad (source, "src");
4703       FAIL_UNLESS (pad);
4704       gst_object_ref (td->p);
4705       gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_IDLE, change_source_blocked,
4706           user_data, NULL);
4707       gst_object_unref (pad);
4708       gst_object_unref (source);
4709     }
4710   }
4711   FAIL_UNLESS (found);
4712
4713   gst_object_unref (td->p);
4714   return G_SOURCE_REMOVE;
4715 }
4716
4717 static void
4718 source_change_on_state_changed (gpointer user_data)
4719 {
4720   test_data *td = user_data;
4721   source_change_master_data *d = td->md;
4722
4723   if (!d->source_change_scheduled) {
4724     d->source_change_scheduled = TRUE;
4725     gst_object_ref (td->p);
4726     g_timeout_add (STEP_AT, change_source, td);
4727   }
4728 }
4729
4730 static void
4731 source_change_source (GstElement * source, gpointer user_data)
4732 {
4733   test_data *td = user_data;
4734   GstStateChangeReturn ret;
4735
4736   td->state_target = GST_STATE_PLAYING;
4737   td->state_changed_cb = source_change_on_state_changed;
4738   ret = gst_element_set_state (source, GST_STATE_PLAYING);
4739   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
4740       || ret == GST_STATE_CHANGE_SUCCESS);
4741 }
4742
4743 static int
4744 scppad2idx (GstPad * pad, gboolean two_streams, GstCaps * newcaps)
4745 {
4746   static GQuark scpidx = 0;
4747   gpointer p;
4748   int idx;
4749   GstCaps *caps;
4750
4751   if (!scpidx)
4752     scpidx = g_quark_from_static_string ("scpidx");
4753
4754   if (!two_streams)
4755     return 0;
4756
4757   p = g_object_get_qdata (G_OBJECT (pad), scpidx);
4758   if (p)
4759     return GPOINTER_TO_INT (p) - 1;
4760
4761   caps = gst_pad_get_current_caps (pad);
4762   if (!caps)
4763     caps = gst_pad_get_pad_template_caps (pad);
4764   if ((!caps || gst_caps_is_any (caps)) && newcaps)
4765     caps = gst_caps_ref (newcaps);
4766   FAIL_UNLESS (caps);
4767   idx = caps2idx (caps, two_streams);
4768   gst_caps_unref (caps);
4769   g_object_set_qdata (G_OBJECT (pad), scpidx, GINT_TO_POINTER (idx + 1));
4770   return idx;
4771 }
4772
4773 static GstPadProbeReturn
4774 source_change_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
4775 {
4776   test_data *td = user_data;
4777   source_change_slave_data *d = td->sd;
4778   GstCaps *caps;
4779   int idx;
4780
4781   if (GST_IS_BUFFER (info->data)) {
4782     idx = scppad2idx (pad, td->two_streams, NULL);
4783     if (d->got_caps[idx][1])
4784       d->got_buffer[idx][1] = TRUE;
4785     else if (d->got_caps[idx][0])
4786       d->got_buffer[idx][0] = TRUE;
4787   } else if (GST_IS_EVENT (info->data)) {
4788     if (GST_EVENT_TYPE (info->data) == GST_EVENT_CAPS) {
4789       gst_event_parse_caps (info->data, &caps);
4790       idx = scppad2idx (pad, td->two_streams, caps);
4791       if (!d->got_caps[idx][0]) {
4792         FAIL_IF (d->caps[idx]);
4793         d->got_caps[idx][0] = TRUE;
4794         d->caps[idx] = gst_caps_ref (caps);
4795       } else {
4796         FAIL_UNLESS (d->caps);
4797         if (gst_caps_is_equal (caps, d->caps[idx])) {
4798           FAIL ();
4799         } else {
4800           gst_caps_replace (&d->caps[idx], NULL);
4801           d->got_caps[idx][1] = TRUE;
4802         }
4803       }
4804     }
4805   }
4806   return GST_PAD_PROBE_OK;
4807 }
4808
4809 static void
4810 hook_source_change_probe (const GValue * v, gpointer user_data)
4811 {
4812   hook_probe (v, source_change_probe, user_data);
4813 }
4814
4815 static void
4816 setup_sink_source_change (GstElement * sink, gpointer user_data)
4817 {
4818   GstIterator *it;
4819
4820   it = gst_bin_iterate_sinks (GST_BIN (sink));
4821   while (gst_iterator_foreach (it, hook_source_change_probe, user_data))
4822     gst_iterator_resync (it);
4823   gst_iterator_free (it);
4824 }
4825
4826 static void
4827 check_success_source_source_change (gpointer user_data)
4828 {
4829   test_data *td = user_data;
4830   source_change_master_data *d = td->md;
4831
4832   FAIL_UNLESS (d->source_change_scheduled);
4833   FAIL_UNLESS (d->source_changed);
4834 }
4835
4836 static void
4837 check_success_sink_source_change (gpointer user_data)
4838 {
4839   test_data *td = user_data;
4840   source_change_slave_data *d = td->sd;
4841   int idx;
4842
4843   for (idx = 0; idx < (td->two_streams ? 2 : 1); idx++) {
4844     FAIL_UNLESS (d->got_caps[idx][0]);
4845     FAIL_UNLESS (d->got_buffer[idx][0]);
4846     FAIL_UNLESS (d->got_caps[idx][1]);
4847     FAIL_UNLESS (d->got_buffer[idx][1]);
4848   }
4849 }
4850
4851 GST_START_TEST (test_non_live_source_change)
4852 {
4853   source_change_input_data id = { switch_to_aiff };
4854   source_change_master_data md = { 0 };
4855   source_change_slave_data sd = { 0 };
4856
4857   TEST_BASE (TEST_FEATURE_WAV_SOURCE, source_change_source,
4858       setup_sink_source_change, check_success_source_source_change,
4859       check_success_sink_source_change, &id, &md, &sd);
4860 }
4861
4862 GST_END_TEST;
4863
4864 GST_START_TEST (test_live_av_source_change)
4865 {
4866   source_change_input_data id = { switch_live_av };
4867   source_change_master_data md = { 0 };
4868   source_change_slave_data sd = { 0 };
4869
4870   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, source_change_source,
4871       setup_sink_source_change, check_success_source_source_change,
4872       check_success_sink_source_change, &id, &md, &sd);
4873 }
4874
4875 GST_END_TEST;
4876
4877 GST_START_TEST (test_live_av_2_source_change)
4878 {
4879   source_change_input_data id = { switch_live_av };
4880   source_change_master_data md = { 0 };
4881   source_change_slave_data sd = { 0 };
4882
4883   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
4884       source_change_source, setup_sink_source_change,
4885       check_success_source_source_change, check_success_sink_source_change,
4886       &id, &md, &sd);
4887 }
4888
4889 GST_END_TEST;
4890
4891 /**** dynamic pipeline change stress test ****/
4892
4893 typedef struct
4894 {
4895   guint n_switches_0;
4896   void (*switcher0) (test_data *);
4897   guint n_switches_1;
4898   void (*switcher1) (test_data *);
4899 } dynamic_pipeline_change_stress_input_data;
4900
4901 typedef struct
4902 {
4903   GMutex mutex;
4904   GCond cond;
4905   guint n_blocks_left;
4906   guint n_blocks_done;
4907   gboolean adding_probes;
4908   gboolean dynamic_pipeline_change_stress_scheduled;
4909 } dynamic_pipeline_change_stress_master_data;
4910
4911 static gboolean dynamic_pipeline_change_stress_step (gpointer user_data);
4912
4913 static GstPadProbeReturn
4914 dynamic_pipeline_change_stress_source_blocked_switch_av (GstPad * pad,
4915     GstPadProbeInfo * info, gpointer user_data)
4916 {
4917   test_data *td = user_data;
4918   dynamic_pipeline_change_stress_master_data *d = td->md;
4919   GstElement *source;
4920   GstPad *peer;
4921
4922   /* An idle pad probe could be called directly from the gst_pad_add_probe call
4923      if the pad happens to be idle right now. This would deadlock us though, as
4924      we need all pads to be blocked at the same time, so we need the iteration
4925      over all pads to be done before the pad probes execute. So we keep track of
4926      whether we're iterating to add the probes, and pass if so. */
4927   if (d->adding_probes) {
4928     return GST_PAD_PROBE_PASS;
4929   }
4930
4931   peer = gst_pad_get_peer (pad);
4932   FAIL_UNLESS (peer);
4933   FAIL_UNLESS (gst_pad_unlink (pad, peer));
4934   gst_object_unref (peer);
4935
4936   source = GST_ELEMENT (gst_element_get_parent (pad));
4937   FAIL_UNLESS (source);
4938   g_object_set_qdata (G_OBJECT (source), to_be_removed_quark (),
4939       GINT_TO_POINTER (1));
4940
4941   /* we want all pads to be blocked before we proceed */
4942   g_mutex_lock (&d->mutex);
4943   d->n_blocks_left--;
4944   while (d->n_blocks_left > 0)
4945     g_cond_wait (&d->cond, &d->mutex);
4946   g_mutex_unlock (&d->mutex);
4947   g_cond_broadcast (&d->cond);
4948
4949   g_mutex_lock (&d->mutex);
4950   switch_av (td->p, gst_element_get_name (source),
4951       ! !(td->features & TEST_FEATURE_LIVE), TRUE);
4952   g_mutex_unlock (&d->mutex);
4953
4954   g_idle_add_full (G_PRIORITY_HIGH, remove_source, source, g_object_unref);
4955
4956   if (g_atomic_int_dec_and_test (&d->n_blocks_done))
4957     g_timeout_add (STEP_AT, dynamic_pipeline_change_stress_step, td);
4958
4959   return GST_PAD_PROBE_REMOVE;
4960 }
4961
4962 static void
4963 change_audio_channel (GstElement * pipeline, char *name,
4964     const char *ipcpipelinesink_name, gboolean live)
4965 {
4966   GstElement *src, *ipcpipelinesink;
4967
4968   /* replace audio source with video source */
4969   src = gst_element_factory_make ("audiotestsrc", NULL);
4970   FAIL_UNLESS (src);
4971   g_object_set (src, "is-live", live, "num-buffers", live ? 190 : -1, NULL);
4972
4973   gst_bin_add (GST_BIN (pipeline), src);
4974   ipcpipelinesink =
4975       gst_bin_get_by_name (GST_BIN (pipeline), ipcpipelinesink_name);
4976   FAIL_UNLESS (ipcpipelinesink);
4977   FAIL_UNLESS (gst_element_link (src, ipcpipelinesink));
4978   gst_object_unref (ipcpipelinesink);
4979   gst_element_sync_state_with_parent (src);
4980
4981   g_free (name);
4982 }
4983
4984 static GstPadProbeReturn
4985 dynamic_pipeline_change_stress_source_blocked_change_audio_channel (GstPad *
4986     pad, GstPadProbeInfo * info, gpointer user_data)
4987 {
4988   test_data *td = user_data;
4989   dynamic_pipeline_change_stress_master_data *d = td->md;
4990   GstElement *source;
4991   GstPad *peer;
4992   const char *ipcpipelinesink_name;
4993
4994   /* An idle pad probe could be called directly from the gst_pad_add_probe call
4995      if the pad happens to be idle right now. This would deadlock us though, as
4996      we need all pads to be blocked at the same time, so we need the iteration
4997      over all pads to be done before the pad probes execute. So we keep track of
4998      whether we're iterating to add the probes, and pass if so. */
4999   if (d->adding_probes) {
5000     return GST_PAD_PROBE_PASS;
5001   }
5002
5003   peer = gst_pad_get_peer (pad);
5004   FAIL_UNLESS (peer);
5005   ipcpipelinesink_name = GST_ELEMENT_NAME (GST_PAD_PARENT (peer));
5006   FAIL_UNLESS (gst_pad_unlink (pad, peer));
5007   gst_object_unref (peer);
5008
5009   source = GST_ELEMENT (gst_element_get_parent (pad));
5010   FAIL_UNLESS (source);
5011   g_object_set_qdata (G_OBJECT (source), to_be_removed_quark (),
5012       GINT_TO_POINTER (1));
5013
5014   /* we want all pads to be blocked before we proceed */
5015   g_mutex_lock (&d->mutex);
5016   d->n_blocks_left--;
5017   while (d->n_blocks_left > 0)
5018     g_cond_wait (&d->cond, &d->mutex);
5019   g_cond_broadcast (&d->cond);
5020   g_mutex_unlock (&d->mutex);
5021
5022   g_mutex_lock (&d->mutex);
5023   change_audio_channel (td->p, gst_element_get_name (source),
5024       ipcpipelinesink_name, ! !(td->features & TEST_FEATURE_LIVE));
5025   g_mutex_unlock (&d->mutex);
5026
5027   g_idle_add_full (G_PRIORITY_HIGH, remove_source, source, g_object_unref);
5028
5029   if (g_atomic_int_dec_and_test (&d->n_blocks_done))
5030     g_timeout_add (STEP_AT, dynamic_pipeline_change_stress_step, td);
5031
5032   return GST_PAD_PROBE_REMOVE;
5033 }
5034
5035 typedef struct
5036 {
5037   const char *const *names;
5038   size_t n_names;
5039     GstPadProbeReturn (*f) (GstPad * pad, GstPadProbeInfo * info,
5040       gpointer user_data);
5041   test_data *td;
5042 } block_if_named_data;
5043
5044 static void
5045 block_if_named (const GValue * v, gpointer user_data)
5046 {
5047   block_if_named_data *bind = user_data;
5048   GstElement *e;
5049   GstPad *pad;
5050   size_t n;
5051
5052   e = g_value_get_object (v);
5053   FAIL_UNLESS (e);
5054   for (n = 0; n < bind->n_names; ++n) {
5055     if (g_str_has_prefix (GST_ELEMENT_NAME (e), bind->names[n])) {
5056       pad = gst_element_get_static_pad (e, "src");
5057       FAIL_UNLESS (pad);
5058
5059       if (!g_object_get_qdata (G_OBJECT (e), to_be_removed_quark ()))
5060         gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_IDLE, bind->f, bind->td,
5061             NULL);
5062       gst_object_unref (pad);
5063     }
5064   }
5065 }
5066
5067 static void
5068 count_audio_sources (const GValue * v, gpointer user_data)
5069 {
5070   GstElement *e;
5071
5072   e = g_value_get_object (v);
5073   FAIL_UNLESS (e);
5074
5075   // we don't want to count the sources that are in the process
5076   // of being removed asynchronously
5077   if (g_object_get_qdata (G_OBJECT (e), to_be_removed_quark ()))
5078     return;
5079
5080   if (g_str_has_prefix (GST_ELEMENT_NAME (e), "audiotestsrc"))
5081     ++ * (guint *) user_data;
5082 }
5083
5084 static void
5085 dynamic_pipeline_change_stress_swap_source (test_data * td)
5086 {
5087   dynamic_pipeline_change_stress_master_data *d = td->md;
5088   static const char *const names[] =
5089       { "source", "audiotestsrc", "videotestsrc" };
5090   block_if_named_data bind = { names, sizeof (names) / sizeof (names[0]),
5091     dynamic_pipeline_change_stress_source_blocked_switch_av, td
5092   };
5093   GstIterator *it;
5094
5095   /* we have two sources, we need to wait for both */
5096   d->n_blocks_left = d->n_blocks_done = 2;
5097
5098   it = gst_bin_iterate_sources (GST_BIN (td->p));
5099   d->adding_probes = TRUE;
5100   while (gst_iterator_foreach (it, block_if_named, &bind)) {
5101     GST_INFO_OBJECT (td->p, "Resync");
5102     gst_iterator_resync (it);
5103   }
5104   d->adding_probes = FALSE;
5105   gst_iterator_free (it);
5106 }
5107
5108 static void
5109 dynamic_pipeline_change_stress_change_audio_channel (test_data * td)
5110 {
5111   dynamic_pipeline_change_stress_master_data *d = td->md;
5112   static const char *const names[] = { "audiotestsrc" };
5113   block_if_named_data bind = { names, sizeof (names) / sizeof (names[0]),
5114     dynamic_pipeline_change_stress_source_blocked_change_audio_channel, td
5115   };
5116   GstIterator *it;
5117   guint audio_sources;
5118
5119   /* we have either zero or one audio source */
5120   it = gst_bin_iterate_sources (GST_BIN (td->p));
5121   audio_sources = 0;
5122   while (gst_iterator_foreach (it, count_audio_sources, &audio_sources)) {
5123     GST_INFO_OBJECT (td->p, "Resync");
5124     gst_iterator_resync (it);
5125   }
5126   gst_iterator_free (it);
5127   d->n_blocks_left = d->n_blocks_done = audio_sources;
5128
5129   it = gst_bin_iterate_sources (GST_BIN (td->p));
5130   d->adding_probes = TRUE;
5131   while (gst_iterator_foreach (it, block_if_named, &bind)) {
5132     GST_INFO_OBJECT (td->p, "Resync");
5133     gst_iterator_resync (it);
5134   }
5135   d->adding_probes = FALSE;
5136   gst_iterator_free (it);
5137 }
5138
5139 static gboolean
5140 dynamic_pipeline_change_stress_step (gpointer user_data)
5141 {
5142   test_data *td = user_data;
5143   dynamic_pipeline_change_stress_input_data *i = td->id;
5144   guint available, idx;
5145
5146   /* pick a random action among the ones we have left */
5147   available = i->n_switches_0 + i->n_switches_1;
5148   if (available == 0) {
5149     GST_INFO_OBJECT (td->p, "Destroying pipeline");
5150     FAIL_UNLESS (gst_element_set_state (td->p,
5151             GST_STATE_PLAYING) != GST_STATE_CHANGE_FAILURE);
5152     g_timeout_add (STEP_AT, stop_pipeline, td->p);
5153     return G_SOURCE_REMOVE;
5154   }
5155
5156   idx = rand () % available;
5157   if (idx < i->n_switches_0) {
5158     (*i->switcher0) (td);
5159     --i->n_switches_0;
5160     return G_SOURCE_REMOVE;
5161   }
5162   idx -= i->n_switches_0;
5163
5164   if (idx < i->n_switches_1) {
5165     (*i->switcher1) (td);
5166     --i->n_switches_1;
5167     return G_SOURCE_REMOVE;
5168   }
5169   idx -= i->n_switches_1;
5170
5171   return G_SOURCE_REMOVE;
5172 }
5173
5174 static void
5175 dynamic_pipeline_change_stress_on_state_changed (gpointer user_data)
5176 {
5177   test_data *td = user_data;
5178   dynamic_pipeline_change_stress_master_data *d = td->md;
5179
5180   if (!d->dynamic_pipeline_change_stress_scheduled) {
5181     d->dynamic_pipeline_change_stress_scheduled = TRUE;
5182     gst_object_ref (td->p);
5183     g_timeout_add (STEP_AT, dynamic_pipeline_change_stress_step, td);
5184   }
5185 }
5186
5187 static void
5188 dynamic_pipeline_change_stress (GstElement * source, gpointer user_data)
5189 {
5190   test_data *td = user_data;
5191   dynamic_pipeline_change_stress_master_data *d = td->md;
5192   GstStateChangeReturn ret;
5193
5194   g_mutex_init (&d->mutex);
5195   g_cond_init (&d->cond);
5196
5197   td->state_target = GST_STATE_PLAYING;
5198   td->state_changed_cb = dynamic_pipeline_change_stress_on_state_changed;
5199   ret = gst_element_set_state (source, GST_STATE_PLAYING);
5200   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
5201       || ret == GST_STATE_CHANGE_SUCCESS);
5202 }
5203
5204 static void
5205 check_success_source_dynamic_pipeline_change_stress (gpointer user_data)
5206 {
5207   test_data *td = user_data;
5208   dynamic_pipeline_change_stress_input_data *i = td->id;
5209   dynamic_pipeline_change_stress_master_data *d = td->md;
5210
5211   FAIL_UNLESS (d->dynamic_pipeline_change_stress_scheduled);
5212   FAIL_UNLESS_EQUALS_INT (i->n_switches_0, 0);
5213   FAIL_UNLESS_EQUALS_INT (i->n_switches_1, 0);
5214
5215   g_cond_clear (&d->cond);
5216   g_mutex_clear (&d->mutex);
5217 }
5218
5219 GST_START_TEST (test_non_live_av_dynamic_pipeline_change_stress)
5220 {
5221   dynamic_pipeline_change_stress_input_data id = { 100,
5222     dynamic_pipeline_change_stress_swap_source, 100,
5223     dynamic_pipeline_change_stress_change_audio_channel
5224   };
5225   dynamic_pipeline_change_stress_master_data md = { 0 };
5226
5227   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO,
5228       dynamic_pipeline_change_stress, NULL,
5229       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5230       NULL);
5231 }
5232
5233 GST_END_TEST;
5234
5235 GST_START_TEST (test_non_live_av_2_dynamic_pipeline_change_stress)
5236 {
5237   dynamic_pipeline_change_stress_input_data id = { 100,
5238     dynamic_pipeline_change_stress_swap_source, 100,
5239     dynamic_pipeline_change_stress_change_audio_channel
5240   };
5241   dynamic_pipeline_change_stress_master_data md = { 0 };
5242
5243   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_HAS_VIDEO |
5244       TEST_FEATURE_SPLIT_SINKS, dynamic_pipeline_change_stress, NULL,
5245       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5246       NULL);
5247 }
5248
5249 GST_END_TEST;
5250
5251 GST_START_TEST (test_live_av_dynamic_pipeline_change_stress)
5252 {
5253   dynamic_pipeline_change_stress_input_data id = { 100,
5254     dynamic_pipeline_change_stress_swap_source, 100,
5255     dynamic_pipeline_change_stress_change_audio_channel
5256   };
5257   dynamic_pipeline_change_stress_master_data md = { 0 };
5258
5259   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE, dynamic_pipeline_change_stress, NULL,
5260       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5261       NULL);
5262 }
5263
5264 GST_END_TEST;
5265
5266 GST_START_TEST (test_live_av_2_dynamic_pipeline_change_stress)
5267 {
5268   dynamic_pipeline_change_stress_input_data id = { 100,
5269     dynamic_pipeline_change_stress_swap_source, 100,
5270     dynamic_pipeline_change_stress_change_audio_channel
5271   };
5272   dynamic_pipeline_change_stress_master_data md = { 0 };
5273
5274   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_SPLIT_SINKS,
5275       dynamic_pipeline_change_stress, NULL,
5276       check_success_source_dynamic_pipeline_change_stress, NULL, &id, &md,
5277       NULL);
5278 }
5279
5280 GST_END_TEST;
5281
5282 /**** error from slave test ****/
5283
5284 typedef struct
5285 {
5286   gboolean crash;
5287 } error_from_slave_input_data;
5288
5289 typedef struct
5290 {
5291   gboolean second_pass;
5292   gboolean got_state_changed_to_playing_on_first_pass;
5293   gboolean got_error_on_first_pass;
5294   gboolean got_state_changed_to_playing_on_second_pass;
5295   gboolean got_error_on_second_pass;
5296 } error_from_slave_master_data;
5297
5298 static gboolean
5299 bump_through_NULL (gpointer user_data)
5300 {
5301   test_data *td = user_data;
5302   error_from_slave_input_data *i = td->id;
5303   error_from_slave_master_data *d = td->md;
5304   GstStateChangeReturn ret;
5305   GstElement *sink;
5306
5307   ret = gst_element_set_state (td->p, GST_STATE_NULL);
5308   if (!i->crash) {
5309     FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS);
5310   }
5311   FAIL_UNLESS (gst_element_get_state (td->p, NULL, NULL,
5312           GST_CLOCK_TIME_NONE) == GST_STATE_CHANGE_SUCCESS);
5313
5314   d->second_pass = TRUE;
5315
5316   if (i->crash) {
5317     recreate_crashed_slave_process ();
5318     /* give the process time to be created in the other process */
5319     g_usleep (500 * 1000);
5320
5321     /* reconnect to to slave process */
5322     sink = gst_bin_get_by_name (GST_BIN (td->p), "ipcpipelinesink");
5323     FAIL_UNLESS (sink);
5324     g_object_set (sink, "fdin", pipesba[0], "fdout", pipesfa[1], NULL);
5325     gst_object_unref (sink);
5326   }
5327
5328   ret = gst_element_set_state (td->p, GST_STATE_PLAYING);
5329   FAIL_UNLESS (ret == GST_STATE_CHANGE_SUCCESS
5330       || ret == GST_STATE_CHANGE_ASYNC);
5331
5332   g_timeout_add (STOP_AT, (GSourceFunc) stop_pipeline, td->p);
5333   return G_SOURCE_REMOVE;
5334 }
5335
5336 static void
5337 disconnect (const GValue * v, gpointer user_data)
5338 {
5339   GstElement *e;
5340
5341   e = g_value_get_object (v);
5342   FAIL_UNLESS (e);
5343   g_signal_emit_by_name (G_OBJECT (e), "disconnect", NULL);
5344 }
5345
5346 static gboolean
5347 error_from_slave_source_bus_msg (GstBus * bus, GstMessage * message,
5348     gpointer user_data)
5349 {
5350   test_data *td = user_data;
5351   error_from_slave_input_data *i = td->id;
5352   error_from_slave_master_data *d = td->md;
5353
5354   if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR) {
5355     if (!d->second_pass) {
5356       if (!d->got_error_on_first_pass) {
5357         GstIterator *it;
5358
5359         d->got_error_on_first_pass = TRUE;
5360
5361         if (i->crash) {
5362           it = gst_bin_iterate_sinks (GST_BIN (td->p));
5363           while (gst_iterator_foreach (it, disconnect, NULL))
5364             gst_iterator_resync (it);
5365           gst_iterator_free (it);
5366         }
5367
5368         gst_object_ref (td->p);
5369         g_timeout_add (STEP_AT, bump_through_NULL, td);
5370       }
5371
5372       /* don't pass the expected error */
5373       return TRUE;
5374     }
5375   } else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_EOS) {
5376     if (!d->second_pass) {
5377       /* We'll get an expected EOS as the source reacts to the error */
5378       return TRUE;
5379     }
5380   }
5381   return master_bus_msg (bus, message, user_data);
5382 }
5383
5384 static void
5385 error_from_slave_on_state_changed (gpointer user_data)
5386 {
5387   test_data *td = user_data;
5388   error_from_slave_master_data *d = td->md;
5389
5390   if (d->second_pass)
5391     d->got_state_changed_to_playing_on_second_pass = TRUE;
5392   else
5393     d->got_state_changed_to_playing_on_first_pass = TRUE;
5394 }
5395
5396 static gboolean
5397 error_from_slave_position_getter (GstElement * element)
5398 {
5399   gint64 pos;
5400
5401   /* we do not care about the result */
5402   gst_element_query_position (element, GST_FORMAT_TIME, &pos);
5403
5404   return TRUE;
5405 }
5406
5407 static void
5408 error_from_slave_source (GstElement * source, gpointer user_data)
5409 {
5410   test_data *td = user_data;
5411   GstStateChangeReturn ret;
5412
5413   /* we're on the source, there's already the basic master_bus_msg watch,
5414      and gst doesn't want more than one watch, so we remove the watch and
5415      call it directly when done in the new watch */
5416   gst_bus_remove_watch (GST_ELEMENT_BUS (source));
5417   gst_bus_add_watch (GST_ELEMENT_BUS (source), error_from_slave_source_bus_msg,
5418       user_data);
5419   g_timeout_add (STEP_AT, (GSourceFunc) error_from_slave_position_getter,
5420       source);
5421
5422   td->state_changed_cb = error_from_slave_on_state_changed;
5423   td->state_target = GST_STATE_PLAYING;
5424   ret = gst_element_set_state (source, GST_STATE_PLAYING);
5425   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC);
5426 }
5427
5428 static gboolean
5429 error_from_slave_sink_bus_msg (GstBus * bus, GstMessage * message,
5430     gpointer user_data)
5431 {
5432   test_data *td = user_data;
5433   error_from_slave_input_data *i = td->id;
5434
5435   switch (GST_MESSAGE_TYPE (message)) {
5436     case GST_MESSAGE_ERROR:
5437       if (!strcmp (GST_ELEMENT_NAME (GST_MESSAGE_SRC (message)),
5438               "error-element"))
5439         g_object_set (GST_MESSAGE_SRC (message), "error-after", -1, NULL);
5440       break;
5441     case GST_MESSAGE_ASYNC_DONE:
5442       if (GST_IS_PIPELINE (GST_MESSAGE_SRC (message))) {
5443         /* We have two identical processes, and only one must crash. They can
5444            be distinguished by recovery_pid, however. */
5445         if (i->crash && recovery_pid)
5446           g_timeout_add (CRASH_AT, (GSourceFunc) crash, NULL);
5447       }
5448       break;
5449     default:
5450       break;
5451   }
5452   return TRUE;
5453 }
5454
5455 static void
5456 setup_sink_error_from_slave (GstElement * sink, gpointer user_data)
5457 {
5458   gst_bus_add_watch (GST_ELEMENT_BUS (sink), error_from_slave_sink_bus_msg,
5459       user_data);
5460 }
5461
5462 static void
5463 check_success_source_error_from_slave (gpointer user_data)
5464 {
5465   test_data *td = user_data;
5466   error_from_slave_master_data *d = td->md;
5467
5468   FAIL_UNLESS (d->second_pass);
5469   FAIL_UNLESS (d->got_state_changed_to_playing_on_first_pass);
5470   FAIL_UNLESS (d->got_state_changed_to_playing_on_second_pass);
5471   FAIL_UNLESS (d->got_error_on_first_pass);
5472   FAIL_IF (d->got_error_on_second_pass);
5473 }
5474
5475 GST_START_TEST (test_empty_error_from_slave)
5476 {
5477   error_from_slave_input_data id = { FALSE };
5478   error_from_slave_master_data md = { 0 };
5479
5480   TEST_BASE (TEST_FEATURE_TEST_SOURCE | TEST_FEATURE_ERROR_SINK,
5481       error_from_slave_source, setup_sink_error_from_slave,
5482       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5483 }
5484
5485 GST_END_TEST;
5486
5487 GST_START_TEST (test_wavparse_error_from_slave)
5488 {
5489   error_from_slave_input_data id = { FALSE };
5490   error_from_slave_master_data md = { 0 };
5491
5492   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_ERROR_SINK,
5493       error_from_slave_source, setup_sink_error_from_slave,
5494       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5495 }
5496
5497 GST_END_TEST;
5498
5499 GST_START_TEST (test_mpegts_error_from_slave)
5500 {
5501   error_from_slave_input_data id = { FALSE };
5502   error_from_slave_master_data md = { 0 };
5503
5504   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_ERROR_SINK,
5505       error_from_slave_source, setup_sink_error_from_slave,
5506       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5507 }
5508
5509 GST_END_TEST;
5510
5511 GST_START_TEST (test_mpegts_2_error_from_slave)
5512 {
5513   error_from_slave_input_data id = { FALSE };
5514   error_from_slave_master_data md = { 0 };
5515
5516   TEST_BASE (TEST_FEATURE_MPEGTS_SOURCE | TEST_FEATURE_ERROR_SINK |
5517       TEST_FEATURE_SPLIT_SINKS,
5518       error_from_slave_source, setup_sink_error_from_slave,
5519       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5520 }
5521
5522 GST_END_TEST;
5523
5524 GST_START_TEST (test_live_a_error_from_slave)
5525 {
5526   error_from_slave_input_data id = { FALSE };
5527   error_from_slave_master_data md = { 0 };
5528
5529   TEST_BASE (TEST_FEATURE_LIVE_A_SOURCE | TEST_FEATURE_ERROR_SINK,
5530       error_from_slave_source, setup_sink_error_from_slave,
5531       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5532 }
5533
5534 GST_END_TEST;
5535
5536 GST_START_TEST (test_live_av_error_from_slave)
5537 {
5538   error_from_slave_input_data id = { FALSE };
5539   error_from_slave_master_data md = { 0 };
5540
5541   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_ERROR_SINK,
5542       error_from_slave_source, setup_sink_error_from_slave,
5543       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5544 }
5545
5546 GST_END_TEST;
5547
5548 GST_START_TEST (test_live_av_2_error_from_slave)
5549 {
5550   error_from_slave_input_data id = { FALSE };
5551   error_from_slave_master_data md = { 0 };
5552
5553   TEST_BASE (TEST_FEATURE_LIVE_AV_SOURCE | TEST_FEATURE_ERROR_SINK |
5554       TEST_FEATURE_SPLIT_SINKS,
5555       error_from_slave_source, setup_sink_error_from_slave,
5556       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5557 }
5558
5559 GST_END_TEST;
5560
5561 GST_START_TEST (test_wavparse_slave_process_crash)
5562 {
5563   error_from_slave_input_data id = { TRUE };
5564   error_from_slave_master_data md = { 0 };
5565
5566   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_RECOVERY_SLAVE_PROCESS,
5567       error_from_slave_source, setup_sink_error_from_slave,
5568       check_success_source_error_from_slave, NULL, &id, &md, NULL);
5569 }
5570
5571 GST_END_TEST;
5572
5573 /**** master process crash test ****/
5574
5575 typedef struct
5576 {
5577   gboolean got_state_changed_to_playing;
5578 } master_process_crash_master_data;
5579
5580 typedef struct
5581 {
5582   gboolean got_error;
5583   gboolean got_eos;
5584 } master_process_crash_slave_data;
5585
5586 static void
5587 master_process_crash_on_state_changed (gpointer user_data)
5588 {
5589   test_data *td = user_data;
5590   master_process_crash_master_data *d = td->md;
5591
5592   if (!d->got_state_changed_to_playing) {
5593     d->got_state_changed_to_playing = TRUE;
5594
5595     /* We have two identical processes, and only one must crash. They can
5596        be distinguished by recovery_pid, however. */
5597     if (!recovery_pid)
5598       g_timeout_add (CRASH_AT, (GSourceFunc) crash, NULL);
5599   }
5600 }
5601
5602 static void
5603 master_process_crash_source (GstElement * source, gpointer user_data)
5604 {
5605   test_data *td = user_data;
5606   GstStateChangeReturn ret;
5607
5608   td->state_target = GST_STATE_PLAYING;
5609   td->state_changed_cb = master_process_crash_on_state_changed;
5610   ret = gst_element_set_state (source, GST_STATE_PLAYING);
5611   FAIL_UNLESS (ret == GST_STATE_CHANGE_ASYNC
5612       || ret == GST_STATE_CHANGE_SUCCESS);
5613 }
5614
5615 static GstPadProbeReturn
5616 master_process_crash_probe (GstPad * pad, GstPadProbeInfo * info,
5617     gpointer user_data)
5618 {
5619   test_data *td = user_data;
5620   master_process_crash_slave_data *d = td->sd;
5621
5622   if (GST_IS_EVENT (info->data)) {
5623     if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
5624       d->got_eos = TRUE;
5625     }
5626   }
5627
5628   return GST_PAD_PROBE_OK;
5629 }
5630
5631 static void
5632 hook_master_process_crash_probe (const GValue * v, gpointer user_data)
5633 {
5634   hook_probe (v, master_process_crash_probe, user_data);
5635 }
5636
5637 static gboolean
5638 go_to_NULL_and_reconnect (gpointer user_data)
5639 {
5640   GstElement *pipeline = user_data;
5641   GstStateChangeReturn ret;
5642   GstElement *src;
5643
5644   ret = gst_element_set_state (pipeline, GST_STATE_NULL);
5645   FAIL_IF (ret == GST_STATE_CHANGE_FAILURE);
5646
5647   /* reconnect to to master process */
5648   src = gst_bin_get_by_name (GST_BIN (pipeline), "ipcpipelinesrc0");
5649   FAIL_UNLESS (src);
5650   g_object_set (src, "fdin", pipesfa[0], "fdout", pipesba[1], NULL);
5651   gst_object_unref (src);
5652
5653   gst_object_unref (pipeline);
5654   return G_SOURCE_REMOVE;
5655 }
5656
5657 static gboolean
5658 master_process_crash_bus_msg (GstBus * bus, GstMessage * message,
5659     gpointer user_data)
5660 {
5661   test_data *td = user_data;
5662   master_process_crash_slave_data *d = td->sd;
5663   GstIterator *it;
5664
5665   switch (GST_MESSAGE_TYPE (message)) {
5666     case GST_MESSAGE_ERROR:
5667       if (!d->got_error) {
5668         it = gst_bin_iterate_sources (GST_BIN (td->p));
5669         while (gst_iterator_foreach (it, disconnect, NULL))
5670           gst_iterator_resync (it);
5671         gst_iterator_free (it);
5672         g_timeout_add (10, (GSourceFunc) go_to_NULL_and_reconnect,
5673             gst_object_ref (td->p));
5674         d->got_error = TRUE;
5675       }
5676       break;
5677     default:
5678       break;
5679   }
5680   return TRUE;
5681 }
5682
5683 static void
5684 setup_sink_master_process_crash (GstElement * sink, gpointer user_data)
5685 {
5686   GstIterator *it;
5687
5688   it = gst_bin_iterate_sinks (GST_BIN (sink));
5689   while (gst_iterator_foreach (it, hook_master_process_crash_probe, user_data))
5690     gst_iterator_resync (it);
5691   gst_iterator_free (it);
5692
5693   gst_bus_add_watch (GST_ELEMENT_BUS (sink), master_process_crash_bus_msg,
5694       user_data);
5695 }
5696
5697 static void
5698 check_success_source_master_process_crash (gpointer user_data)
5699 {
5700   test_data *td = user_data;
5701   master_process_crash_master_data *d = td->md;
5702
5703   FAIL_UNLESS (d->got_state_changed_to_playing);
5704 }
5705
5706 static void
5707 check_success_sink_master_process_crash (gpointer user_data)
5708 {
5709   test_data *td = user_data;
5710   master_process_crash_slave_data *d = td->sd;
5711
5712   FAIL_UNLESS (d->got_error);
5713   FAIL_UNLESS (d->got_eos);
5714 }
5715
5716 GST_START_TEST (test_wavparse_master_process_crash)
5717 {
5718   master_process_crash_master_data md = { 0 };
5719   master_process_crash_slave_data sd = { 0 };
5720
5721   TEST_BASE (TEST_FEATURE_WAV_SOURCE | TEST_FEATURE_RECOVERY_MASTER_PROCESS,
5722       master_process_crash_source, setup_sink_master_process_crash,
5723       check_success_source_master_process_crash,
5724       check_success_sink_master_process_crash, NULL, &md, &sd);
5725 }
5726
5727 GST_END_TEST;
5728
5729 static Suite *
5730 ipcpipeline_suite (void)
5731 {
5732   Suite *s = suite_create ("ipcpipeline");
5733   TCase *tc_chain = tcase_create ("general");
5734
5735   setup_lock ();
5736
5737   suite_add_tcase (s, tc_chain);
5738   tcase_set_timeout (tc_chain, 180);
5739
5740   /* play_pause tests put the pipeline in PLAYING state, then in
5741      PAUSED state, then in PLAYING state again. The sink expects
5742      async-done messages or state change successes. */
5743   if (1) {
5744     tcase_add_test (tc_chain, test_empty_play_pause);
5745     tcase_add_test (tc_chain, test_wavparse_play_pause);
5746     tcase_add_test (tc_chain, test_mpegts_play_pause);
5747     tcase_add_test (tc_chain, test_mpegts_2_play_pause);
5748     tcase_add_test (tc_chain, test_live_a_play_pause);
5749     tcase_add_test (tc_chain, test_live_av_play_pause);
5750     tcase_add_test (tc_chain, test_live_av_2_play_pause);
5751   }
5752
5753   /* flushing_seek tests perform a flushing seek in PLAYING
5754      state. The sinks check a buffer with the target timestamp
5755      is received after the seek. */
5756   if (1) {
5757     tcase_add_test (tc_chain, test_empty_flushing_seek);
5758     tcase_add_test (tc_chain, test_wavparse_flushing_seek);
5759     tcase_add_test (tc_chain, test_mpegts_flushing_seek);
5760     tcase_add_test (tc_chain, test_mpegts_2_flushing_seek);
5761     tcase_add_test (tc_chain, test_live_a_flushing_seek);
5762     tcase_add_test (tc_chain, test_live_av_flushing_seek);
5763     tcase_add_test (tc_chain, test_live_av_2_flushing_seek);
5764   }
5765
5766   /* flushing_seek_in_pause tests perform a flushing seek in
5767      PAUSED state. These are disabled for live pipelines since
5768      those will not generate data in PAUSED, so we won't get
5769      a buffer. */
5770   if (1) {
5771     tcase_add_test (tc_chain, test_empty_flushing_seek_in_pause);
5772     tcase_add_test (tc_chain, test_wavparse_flushing_seek_in_pause);
5773     tcase_add_test (tc_chain, test_mpegts_flushing_seek_in_pause);
5774     tcase_add_test (tc_chain, test_mpegts_2_flushing_seek_in_pause);
5775     /* live scenarios skipped: live sources do not generate buffers
5776      * when paused */
5777   }
5778
5779   /* segment_seek tests perform a segment seek in PLAYING
5780      state. The sinks check a buffer with the target timestamp
5781      is received after the seek, and that a SEGMENT_DONE is
5782      received at the end of the segment. */
5783   if (1) {
5784     tcase_add_test (tc_chain, test_empty_segment_seek);
5785     tcase_add_test (tc_chain, test_wavparse_segment_seek);
5786     /* mpegts skipped: tsdemux does not support segment seeks */
5787     tcase_add_test (tc_chain, test_live_a_segment_seek);
5788     tcase_add_test (tc_chain, test_live_av_segment_seek);
5789     tcase_add_test (tc_chain, test_live_av_2_segment_seek);
5790   }
5791
5792   /* seek_stress tests perform stress testing on seeks, then waits
5793      in PLAYING for EOS or segment-done. */
5794   if (1) {
5795     tcase_add_test (tc_chain, test_empty_seek_stress);
5796     tcase_add_test (tc_chain, test_wavparse_seek_stress);
5797     tcase_add_test (tc_chain, test_mpegts_seek_stress);
5798     tcase_add_test (tc_chain, test_mpegts_2_seek_stress);
5799     tcase_add_test (tc_chain, test_live_a_seek_stress);
5800     tcase_add_test (tc_chain, test_live_av_seek_stress);
5801     tcase_add_test (tc_chain, test_live_av_2_seek_stress);
5802   }
5803
5804   /* upstream_query tests send position and duration queries, and
5805      checks the results are as expected. */
5806   if (1) {
5807     tcase_add_test (tc_chain, test_empty_upstream_query);
5808     tcase_add_test (tc_chain, test_wavparse_upstream_query);
5809     tcase_add_test (tc_chain, test_mpegts_upstream_query);
5810     tcase_add_test (tc_chain, test_mpegts_2_upstream_query);
5811     tcase_add_test (tc_chain, test_live_a_upstream_query);
5812     tcase_add_test (tc_chain, test_live_av_upstream_query);
5813     tcase_add_test (tc_chain, test_live_av_2_upstream_query);
5814   }
5815
5816   /* message tests send a sink message downstream, which causes
5817      the sinks to reply with the embedded event, which is checked.
5818      This is not possible when elements go into pull mode. */
5819   if (1) {
5820     tcase_add_test (tc_chain, test_empty_message);
5821     tcase_add_test (tc_chain, test_wavparse_message);
5822     /* mpegts skipped because it goes into pull mode:
5823        https://bugzilla.gnome.org/show_bug.cgi?id=751637 */
5824     tcase_add_test (tc_chain, test_live_a_message);
5825     tcase_add_test (tc_chain, test_live_av_message);
5826     tcase_add_test (tc_chain, test_live_av_2_message);
5827   }
5828
5829   /* end_of_stream tests check the EOS event and message are
5830      properly received when the stream reaches its end. */
5831   if (1) {
5832     tcase_add_test (tc_chain, test_empty_end_of_stream);
5833     tcase_add_test (tc_chain, test_wavparse_end_of_stream);
5834     tcase_add_test (tc_chain, test_mpegts_end_of_stream);
5835     tcase_add_test (tc_chain, test_mpegts_2_end_of_stream);
5836     tcase_add_test (tc_chain, test_live_a_end_of_stream);
5837     tcase_add_test (tc_chain, test_live_av_end_of_stream);
5838     tcase_add_test (tc_chain, test_live_av_2_end_of_stream);
5839   }
5840
5841   /* reverse_playback tests issue a seek with negative rate,
5842      and check buffers timestamp are in decreasing order.
5843      This does not work with sources which do not support
5844      negative playback rate (live ones, and some demuxers). */
5845   if (1) {
5846     /* wavparse and tsdemux does not support backward playback */
5847     tcase_add_test (tc_chain, test_a_reverse_playback);
5848     tcase_add_test (tc_chain, test_av_reverse_playback);
5849     tcase_add_test (tc_chain, test_av_2_reverse_playback);
5850   }
5851
5852   /* tags tests check tags are carried to the slave. */
5853   if (1) {
5854     tcase_add_test (tc_chain, test_empty_tags);
5855     tcase_add_test (tc_chain, test_wavparse_tags);
5856     tcase_add_test (tc_chain, test_mpegts_tags);
5857     tcase_add_test (tc_chain, test_mpegts_2_tags);
5858     tcase_add_test (tc_chain, test_live_a_tags);
5859     tcase_add_test (tc_chain, test_live_av_tags);
5860     tcase_add_test (tc_chain, test_live_av_2_tags);
5861   }
5862
5863   /* reconfigure tests that pipeline reconfiguration via
5864      the reconfigure event works */
5865   if (1) {
5866     tcase_add_test (tc_chain, test_non_live_a_reconfigure);
5867     tcase_add_test (tc_chain, test_non_live_av_reconfigure);
5868     tcase_add_test (tc_chain, test_live_a_reconfigure);
5869     tcase_add_test (tc_chain, test_live_av_reconfigure);
5870   }
5871
5872   /* state_change tests issue a number of state changes in
5873      (hopefully) all interesting configurations, and checks
5874      the state changes occured on the slave pipeline. The links
5875      are disconnected and reconnected to check it all still
5876      works after this. */
5877   if (1) {
5878     tcase_add_test (tc_chain, test_empty_state_changes);
5879     tcase_add_test (tc_chain, test_wavparse_state_changes);
5880     tcase_add_test (tc_chain, test_mpegts_state_changes);
5881     tcase_add_test (tc_chain, test_mpegts_2_state_changes);
5882     /* live scenarios skipped: live sources will cause no buffer
5883      * to flow in PAUSED, so the pipeline will only finish READY->PAUSED
5884      * once switching to PLAYING */
5885   }
5886
5887   /* state_changes_stress tests change state randomly and rapidly. */
5888   if (1) {
5889     tcase_add_test (tc_chain, test_empty_state_changes_stress);
5890     tcase_add_test (tc_chain, test_wavparse_state_changes_stress);
5891     tcase_add_test (tc_chain, test_mpegts_state_changes_stress);
5892     tcase_add_test (tc_chain, test_mpegts_2_state_changes_stress);
5893     tcase_add_test (tc_chain, test_live_a_state_changes_stress);
5894     tcase_add_test (tc_chain, test_live_av_state_changes_stress);
5895     tcase_add_test (tc_chain, test_live_av_2_state_changes_stress);
5896   }
5897
5898   /* serialized_query tests checks that a serialized query is
5899      handled by the slave pipeline. */
5900   if (1) {
5901     tcase_add_test (tc_chain, test_empty_serialized_query);
5902     tcase_add_test (tc_chain, test_wavparse_serialized_query);
5903     tcase_add_test (tc_chain, test_mpegts_serialized_query);
5904     tcase_add_test (tc_chain, test_mpegts_2_serialized_query);
5905     tcase_add_test (tc_chain, test_live_a_serialized_query);
5906     tcase_add_test (tc_chain, test_live_av_serialized_query);
5907     tcase_add_test (tc_chain, test_live_av_2_serialized_query);
5908   }
5909
5910   /* non_serialized_event tests checks that a non serialized event
5911      is handled by the slave pipeline. */
5912   if (1) {
5913     tcase_add_test (tc_chain, test_empty_non_serialized_event);
5914     tcase_add_test (tc_chain, test_wavparse_non_serialized_event);
5915     tcase_add_test (tc_chain, test_mpegts_non_serialized_event);
5916     tcase_add_test (tc_chain, test_mpegts_2_non_serialized_event);
5917     tcase_add_test (tc_chain, test_live_a_non_serialized_event);
5918     tcase_add_test (tc_chain, test_live_av_non_serialized_event);
5919     tcase_add_test (tc_chain, test_live_av_2_non_serialized_event);
5920   }
5921
5922   /* meta tests checks that GstMeta on buffers are correctly
5923      received by the slave pipeline. */
5924   if (1) {
5925     tcase_add_test (tc_chain, test_empty_meta);
5926     tcase_add_test (tc_chain, test_wavparse_meta);
5927     tcase_add_test (tc_chain, test_mpegts_meta);
5928     tcase_add_test (tc_chain, test_mpegts_2_meta);
5929     tcase_add_test (tc_chain, test_live_a_meta);
5930     tcase_add_test (tc_chain, test_live_av_meta);
5931     tcase_add_test (tc_chain, test_live_av_2_meta);
5932   }
5933
5934   /* source_change tests checks that the pipelines can handle a
5935      change of source/caps. */
5936   if (1) {
5937     tcase_add_test (tc_chain, test_non_live_source_change);
5938     tcase_add_test (tc_chain, test_live_av_source_change);
5939     tcase_add_test (tc_chain, test_live_av_2_source_change);
5940   }
5941
5942   /* navigation tests checks that navigation events from the slave
5943      are received by the master. */
5944   if (1) {
5945     tcase_add_test (tc_chain, test_non_live_av_navigation);
5946     tcase_add_test (tc_chain, test_non_live_av_2_navigation);
5947     tcase_add_test (tc_chain, test_live_av_navigation);
5948     tcase_add_test (tc_chain, test_live_av_2_navigation);
5949   }
5950
5951   /* dynamic_pipeline_change_stress tests stress tests dynamic
5952      pipeline changes. */
5953   if (1) {
5954     tcase_add_test (tc_chain, test_non_live_av_dynamic_pipeline_change_stress);
5955     tcase_add_test (tc_chain,
5956         test_non_live_av_2_dynamic_pipeline_change_stress);
5957     tcase_add_test (tc_chain, test_live_av_dynamic_pipeline_change_stress);
5958     tcase_add_test (tc_chain, test_live_av_2_dynamic_pipeline_change_stress);
5959   }
5960
5961   /* error_from_slave tests checks an error message issued
5962      by the slave pipeline is received by the master pipeline. */
5963   if (1) {
5964     tcase_add_test (tc_chain, test_empty_error_from_slave);
5965     tcase_add_test (tc_chain, test_wavparse_error_from_slave);
5966     tcase_add_test (tc_chain, test_mpegts_error_from_slave);
5967     tcase_add_test (tc_chain, test_mpegts_2_error_from_slave);
5968     tcase_add_test (tc_chain, test_live_a_error_from_slave);
5969     tcase_add_test (tc_chain, test_live_av_error_from_slave);
5970     tcase_add_test (tc_chain, test_live_av_2_error_from_slave);
5971   }
5972
5973   /* slave_process_crash tests test that a crash of the slave
5974      process can be recovered from by the master, which can
5975      replace the slave process and continue. */
5976   tcase_add_test (tc_chain, test_wavparse_slave_process_crash);
5977
5978   /* master_process_crash tests test that a crash of the master
5979      process can be recovered from by the slave. I don't recall
5980      how the recovery from that works, but it does! A watchdog
5981      process replaces the master process, and the slave will
5982      go to NULL and reconnect after it gets a timeout talking
5983      with the master pipeline. */
5984   tcase_add_test (tc_chain, test_wavparse_master_process_crash);
5985
5986   return s;
5987 }
5988
5989 GST_CHECK_MAIN (ipcpipeline);