GST_END_TEST;
+static GstPadProbeReturn
+_drop_buffer_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ gint wait;
+
+ if (GST_IS_BUFFER (info->data)) {
+ wait = GPOINTER_TO_INT (user_data);
+ if (wait > 0)
+ g_usleep (wait / 1000);
+ return GST_PAD_PROBE_DROP;
+ }
+
+ return GST_PAD_PROBE_PASS;
+}
+
+#define TIMEOUT_NUM_BUFFERS 20
+static void
+_test_timeout (gint buffer_wait)
+{
+ GstBus *bus;
+ GstMessage *msg;
+ GstElement *pipeline, *src, *src1, *agg, *sink;
+ GstPad *src1pad;
+
+ gint count = 0;
+
+ pipeline = gst_pipeline_new ("pipeline");
+ src = gst_element_factory_make ("fakesrc", NULL);
+ g_object_set (src, "num-buffers", TIMEOUT_NUM_BUFFERS, "sizetype", 2,
+ "sizemax", 4, NULL);
+
+ src1 = gst_element_factory_make ("fakesrc", NULL);
+ g_object_set (src1, "num-buffers", TIMEOUT_NUM_BUFFERS, "sizetype", 2,
+ "sizemax", 4, NULL);
+
+ agg = gst_check_setup_element ("testaggregator");
+ g_object_set (agg, "timeout", 1000 /* 1 us */ , NULL);
+ sink = gst_check_setup_element ("fakesink");
+ g_object_set (sink, "signal-handoffs", TRUE, NULL);
+ g_signal_connect (sink, "handoff", (GCallback) handoff, &count);
+
+ fail_unless (gst_bin_add (GST_BIN (pipeline), src));
+ fail_unless (gst_bin_add (GST_BIN (pipeline), src1));
+ fail_unless (gst_bin_add (GST_BIN (pipeline), agg));
+ fail_unless (gst_bin_add (GST_BIN (pipeline), sink));
+
+ src1pad = gst_element_get_static_pad (src1, "src");
+ fail_if (src1pad == NULL);
+ gst_pad_add_probe (src1pad, GST_PAD_PROBE_TYPE_BLOCK_DOWNSTREAM,
+ (GstPadProbeCallback) _drop_buffer_probe_cb,
+ GINT_TO_POINTER (buffer_wait), NULL);
+
+ fail_unless (gst_element_link (src, agg));
+ fail_unless (gst_element_link (src1, agg));
+ fail_unless (gst_element_link (agg, sink));
+
+ bus = gst_element_get_bus (pipeline);
+ fail_if (bus == NULL);
+ gst_element_set_state (pipeline, GST_STATE_PLAYING);
+
+ msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
+ fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
+ gst_message_unref (msg);
+
+ /* cannot rely on the exact number of buffers as the timeout may produce
+ * more buffers with the unsynchronized _aggregate() implementation in
+ * testaggregator */
+ fail_if (count < TIMEOUT_NUM_BUFFERS);
+
+ gst_element_set_state (pipeline, GST_STATE_NULL);
+ gst_object_unref (src1pad);
+ gst_object_unref (bus);
+ gst_object_unref (pipeline);
+}
+
+GST_START_TEST (test_timeout_pipeline)
+{
+ _test_timeout (0);
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_timeout_pipeline_with_wait)
+{
+ _test_timeout (1000000 /* 1 ms */ );
+}
+
+GST_END_TEST;
+
GST_START_TEST (test_flushing_seek)
{
GstEvent *event;
tcase_add_test (general, test_infinite_seek_50_src);
tcase_add_test (general, test_linear_pipeline);
tcase_add_test (general, test_two_src_pipeline);
+ tcase_add_test (general, test_timeout_pipeline);
+ tcase_add_test (general, test_timeout_pipeline_with_wait);
tcase_add_test (general, test_add_remove);
tcase_add_test (general, test_change_state_intensive);