GST_END_TEST;
+static GstElement *selector;
+static GstPad *output_pad;
+static GstPad *stream1_pad;
+static GstPad *stream2_pad;
+
+static gboolean eos_received;
+static gulong eos_probe;
+static GMutex eos_probe_lock;
+static GCond eos_probe_cond;
+
+enum InputSelectorResult
+{
+ INPUT_SELECTOR_FORWARD,
+ INPUT_SELECTOR_DROP
+};
+
+static GstPadProbeReturn
+eos_pushed_probe (GstPad * pad, GstPadProbeInfo * info, gpointer udata)
+{
+ g_mutex_lock (&eos_probe_lock);
+ if (GST_EVENT_TYPE (info->data) == GST_EVENT_EOS) {
+ eos_received = TRUE;
+ g_cond_broadcast (&eos_probe_cond);
+ }
+ g_mutex_unlock (&eos_probe_lock);
+
+ return GST_PAD_PROBE_OK;
+}
+
+static void
+setup_input_selector_with_2_streams (gint active_stream)
+{
+ eos_received = FALSE;
+ g_mutex_init (&eos_probe_lock);
+ g_cond_init (&eos_probe_cond);
+
+ selector = gst_check_setup_element ("input-selector");
+ output_pad = gst_check_setup_sink_pad (selector, &sinktemplate);
+
+ gst_pad_set_active (output_pad, TRUE);
+ stream1_pad = setup_input_pad (selector);
+ stream2_pad = setup_input_pad (selector);
+
+ if (active_stream == 1) {
+ g_object_set (selector, "active-pad", GST_PAD_PEER (stream1_pad), NULL);
+ } else {
+ g_object_set (selector, "active-pad", GST_PAD_PEER (stream2_pad), NULL);
+ }
+
+ eos_probe =
+ gst_pad_add_probe (output_pad, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+ eos_pushed_probe, NULL, NULL);
+
+ fail_unless (gst_element_set_state (selector,
+ GST_STATE_PLAYING) == GST_STATE_CHANGE_SUCCESS,
+ "could not set to playing");
+
+ gst_check_setup_events_with_stream_id (stream1_pad, selector, NULL,
+ GST_FORMAT_TIME, "stream-1-id");
+ gst_check_setup_events_with_stream_id (stream2_pad, selector, NULL,
+ GST_FORMAT_TIME, "stream-2-id");
+}
+
+static void
+teardown_input_selector_with_2_streams (void)
+{
+ fail_unless (gst_element_set_state (selector,
+ GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
+
+ gst_pad_remove_probe (output_pad, eos_probe);
+
+ gst_pad_set_active (output_pad, FALSE);
+ gst_check_teardown_sink_pad (selector);
+ gst_check_teardown_element (selector);
+
+ g_mutex_clear (&eos_probe_lock);
+ g_cond_clear (&eos_probe_cond);
+}
+
+static void
+input_selector_push_buffer (gint stream, enum InputSelectorResult res)
+{
+ GstBuffer *buf;
+ GstPad *pad = stream == 1 ? stream1_pad : stream2_pad;
+
+ buf = gst_buffer_new ();
+ fail_unless (buffers == NULL);
+ fail_unless (gst_pad_push (pad, buf) == GST_FLOW_OK);
+
+ if (res == INPUT_SELECTOR_DROP) {
+ fail_unless (buffers == NULL);
+ } else {
+ fail_unless (buffers != NULL);
+ fail_unless (buffers->data == buf);
+ g_list_free_full (buffers, (GDestroyNotify) gst_buffer_unref);
+ buffers = NULL;
+ }
+}
+
+static gpointer
+input_selector_do_push_eos (GstPad * pad)
+{
+ gst_pad_push_event (pad, gst_event_new_eos ());
+ return NULL;
+}
+
+static void
+input_selector_check_eos (gint present)
+{
+ GstEvent *eos;
+
+ eos = gst_pad_get_sticky_event (output_pad, GST_EVENT_EOS, 0);
+ if (present) {
+ fail_unless (eos != NULL);
+ gst_event_unref (eos);
+ } else {
+ fail_unless (eos == NULL);
+ }
+}
+
+static void
+input_selector_push_eos (gint stream, gboolean active)
+{
+ GstPad *pad = stream == 1 ? stream1_pad : stream2_pad;
+
+ if (active) {
+ fail_unless (gst_pad_push_event (pad, gst_event_new_eos ()));
+ } else {
+ /* The non-active pads will block when receving eos, so we need to do it
+ * from a separate thread. This makes this test racy, but it should only
+ * cause false positives, not false negatives */
+ GThread *t = g_thread_new ("selector-test-push-eos",
+ (GThreadFunc) input_selector_do_push_eos, pad);
+
+ /* Sleep half a second to allow the other thread to execute, this is not
+ * a definitive solution but there is no way to know when the
+ * EOS has reached input-selector and blocked there, so this is just
+ * to reduce the possibility of this test being racy (false positives)
+ */
+ g_usleep (0.5 * G_USEC_PER_SEC);
+ g_thread_unref (t);
+ }
+
+ input_selector_check_eos (active);
+}
+
+GST_START_TEST (test_input_selector_empty_stream)
+{
+ setup_input_selector_with_2_streams (2);
+
+ /* stream1 is the empty stream, stream2 has data */
+
+ /* empty stream is just an EOS and it should not be forwarded */
+ input_selector_push_eos (1, FALSE);
+
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_eos (2, TRUE);
+
+ teardown_input_selector_with_2_streams ();
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_input_selector_shorter_stream)
+{
+ setup_input_selector_with_2_streams (2);
+
+ /* stream1 is shorter than stream2 */
+
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (1, INPUT_SELECTOR_DROP);
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+
+ /* EOS from inactive stream should not go through */
+ input_selector_push_eos (1, FALSE);
+
+ /* buffers from active stream can still flow */
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+
+ /* EOS from active stream should go through */
+ input_selector_push_eos (2, TRUE);
+
+ teardown_input_selector_with_2_streams ();
+}
+
+GST_END_TEST;
+
+
+GST_START_TEST (test_input_selector_switch_to_eos_stream)
+{
+ setup_input_selector_with_2_streams (2);
+
+ /* stream1 receives eos before stream2 and then we switch to it */
+
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (1, INPUT_SELECTOR_DROP);
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (1, INPUT_SELECTOR_DROP);
+
+ /* EOS from inactive stream should not go through */
+ input_selector_push_eos (1, FALSE);
+
+ /* buffers from active stream can still flow */
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+ input_selector_push_buffer (2, INPUT_SELECTOR_FORWARD);
+
+ /* now switch to stream1 */
+ g_object_set (selector, "active-pad", GST_PAD_PEER (stream1_pad), NULL);
+
+ /* wait for eos (it runs from a separate thread) */
+ g_mutex_lock (&eos_probe_lock);
+ while (!eos_received) {
+ g_cond_wait (&eos_probe_cond, &eos_probe_lock);
+ }
+ g_mutex_unlock (&eos_probe_lock);
+
+ teardown_input_selector_with_2_streams ();
+}
+
+GST_END_TEST;
+
GST_START_TEST (test_output_selector_no_srcpad_negotiation)
{
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_output_selector_buffer_count);
tcase_add_test (tc_chain, test_input_selector_buffer_count);
+ tcase_add_test (tc_chain, test_input_selector_empty_stream);
+ tcase_add_test (tc_chain, test_input_selector_shorter_stream);
+ tcase_add_test (tc_chain, test_input_selector_switch_to_eos_stream);
tcase_add_test (tc_chain, test_output_selector_no_srcpad_negotiation);
tc_chain = tcase_create ("output-selector-negotiation");