GST_END_TEST;
+static GstPadProbeReturn
+appsrc_pad_probe (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
+{
+ GList **expected = (GList **) (user_data);
+ GList *next;
+ GstEvent *exp;
+ GstBuffer *exp_buf;
+
+ if (GST_IS_EVENT (GST_PAD_PROBE_INFO_DATA (info))) {
+ GstEvent *ev = GST_PAD_PROBE_INFO_EVENT (info);
+ GST_DEBUG ("Got event %s", GST_EVENT_TYPE_NAME (ev));
+ switch (GST_EVENT_TYPE (ev)) {
+ case GST_EVENT_SEGMENT:
+ {
+ fail_if (*expected == NULL,
+ "appsrc pushed a SEGMENT event but we didn't expect any");
+ next = (*expected)->next;
+ fail_unless (GST_IS_EVENT ((*expected)->data),
+ "appsrc pushed a SEGMENT event but we expected any others");
+ exp = GST_EVENT ((*expected)->data);
+ fail_unless (GST_EVENT_TYPE (ev) == GST_EVENT_TYPE (exp),
+ "Got event of type %s but expected event was %s",
+ GST_EVENT_TYPE_NAME (ev), GST_EVENT_TYPE_NAME (exp));
+
+ {
+ const GstSegment *recvseg, *expectseg;
+
+ /* Compare segment values */
+ gst_event_parse_segment (ev, &recvseg);
+ gst_event_parse_segment (exp, &expectseg);
+
+ fail_unless_equals_int (recvseg->format, expectseg->format);
+ fail_unless_equals_uint64 (recvseg->offset, expectseg->offset);
+ fail_unless_equals_uint64 (recvseg->start, expectseg->start);
+ fail_unless_equals_uint64 (recvseg->stop, expectseg->stop);
+ fail_unless_equals_uint64 (recvseg->time, expectseg->time);
+ }
+
+ gst_event_unref (exp);
+ g_list_free1 (*expected);
+ *expected = next;
+ }
+ break;
+ case GST_EVENT_EOS:
+ fail_if (*expected == NULL,
+ "appsrc pushed a EOS event but we didn't expect any");
+ next = (*expected)->next;
+ fail_unless (GST_IS_EVENT ((*expected)->data),
+ "appsrc pushed a EOS event but we expected any others");
+ exp = GST_EVENT ((*expected)->data);
+ fail_unless (GST_EVENT_TYPE (ev) == GST_EVENT_TYPE (exp),
+ "Got event of type %s but expected event was %s",
+ GST_EVENT_TYPE_NAME (ev), GST_EVENT_TYPE_NAME (exp));
+
+ gst_event_unref (exp);
+ g_list_free1 (*expected);
+ *expected = next;
+ break;
+ case GST_EVENT_CAPS:
+ {
+ GstCaps *caps;
+ fail_if (*expected == NULL,
+ "appsrc pushed a CAPS event but we didn't expect any");
+ next = (*expected)->next;
+ fail_unless (GST_IS_EVENT ((*expected)->data),
+ "appsrc pushed a CAPS event but we expected any others");
+ exp = GST_EVENT ((*expected)->data);
+ fail_unless (GST_EVENT_TYPE (ev) == GST_EVENT_TYPE (exp),
+ "Got event of type %s but expected event was %s",
+ GST_EVENT_TYPE_NAME (ev), GST_EVENT_TYPE_NAME (exp));
+ gst_event_parse_caps (ev, &caps);
+ GST_DEBUG ("caps set to : %" GST_PTR_FORMAT, caps);
+
+ gst_event_unref (exp);
+ g_list_free1 (*expected);
+ *expected = next;
+ break;
+ }
+
+ default:
+ break;
+ }
+ } else if (GST_IS_BUFFER (GST_PAD_PROBE_INFO_DATA (info))) {
+ GstBuffer *recvbuf = GST_PAD_PROBE_INFO_BUFFER (info);
+ GST_DEBUG ("Got buffer");
+ fail_if (*expected == NULL,
+ "appsrc pushed a buffer but we didn't expect any");
+ next = (*expected)->next;
+ fail_unless (GST_IS_BUFFER ((*expected)->data),
+ "appsrc pushed a buffer but we expected that it's not a event");
+
+ exp_buf = GST_BUFFER ((*expected)->data);
+ fail_unless_equals_uint64 (GST_BUFFER_PTS (recvbuf),
+ GST_BUFFER_PTS (exp_buf));
+ fail_unless_equals_uint64 (GST_BUFFER_DTS (recvbuf),
+ GST_BUFFER_DTS (exp_buf));
+ fail_unless_equals_uint64 (GST_BUFFER_DURATION (recvbuf),
+ GST_BUFFER_DURATION (exp_buf));
+
+ g_list_free1 (*expected);
+ *expected = next;
+ }
+
+ return GST_PAD_PROBE_OK;
+}
+
+/* Assuming application driven streaming with multiple period.
+ * application provides custom segment per each period */
+GST_START_TEST (test_appsrc_period_with_custom_segment)
+{
+ GstElement *pipe, *src, *sink;
+ GstMessage *msg;
+ gint i, j, period;
+ GstAppSrcCallbacks cb = { 0 };
+ GstAppStreamType modes[] = { GST_APP_STREAM_TYPE_STREAM,
+ GST_APP_STREAM_TYPE_SEEKABLE
+ };
+ GstSegment segment;
+ GstSample *sample;
+ GstBuffer *buffer;
+ GstClockTime period_duration = 5 * GST_SECOND;
+ GstEvent *event;
+ gulong probe_id;
+ GstPad *pad;
+ GList *expected = NULL;
+
+ for (i = 0; i < G_N_ELEMENTS (modes); i++) {
+ /* mode 0: stream-type == GST_APP_STREAM_TYPE_STREAM
+ * mode 1: stream-type == GST_APP_STREAM_TYPE_SEEKABLE */
+
+ GST_INFO ("checking mode %d", modes[i]);
+
+ pipe = gst_pipeline_new ("pipeline");
+ src = gst_element_factory_make ("appsrc", NULL);
+ sink = gst_element_factory_make ("fakesink", NULL);
+ gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
+ fail_unless (gst_element_link (src, sink) == TRUE);
+ pad = gst_element_get_static_pad (sink, "sink");
+
+ probe_id = gst_pad_add_probe (pad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+ (GstPadProbeCallback) appsrc_pad_probe, &expected, NULL);
+
+ g_object_set (G_OBJECT (src), "stream-type", modes[i], "format",
+ GST_FORMAT_TIME, "handle-segment-change", TRUE, NULL);
+
+ if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
+ cb.seek_data = seek_cb;
+ gst_app_src_set_callbacks (GST_APP_SRC (src), &cb, NULL, NULL);
+ }
+
+ ASSERT_SET_STATE (pipe, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ /* 2 periods exits */
+ for (period = 0; period < 2; period++) {
+ /* Total presentation timeline is form 0 sec to 10 sec
+ * - Each period's first PTS is 1 sec and last PTS is 5 sec
+ * - First period has presentation timeline with 0 ~ 5
+ * - Last period has presentation timeline with 5 ~ 10
+ */
+
+ /* PREPARE SEGMENT */
+ gst_segment_init (&segment, GST_FORMAT_TIME);
+ segment.start = segment.position = GST_SECOND;
+ segment.time = period * period_duration;
+ segment.base = period * period_duration;
+
+ /* PREPARE BUFFER */
+ buffer = gst_buffer_new_and_alloc (4);
+ GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+
+ /* PREPARE SAMPLE */
+ sample = gst_sample_new (buffer, NULL, &segment, NULL);
+
+ expected = g_list_append (expected, gst_event_new_segment (&segment));
+ expected = g_list_append (expected, buffer);
+
+ /* 1st sample includes buffer and segment */
+ fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
+ == GST_FLOW_OK);
+
+ /* CLEAN UP */
+ gst_buffer_unref (buffer);
+ gst_sample_unref (sample);
+
+ /* Push the left buffers in the current period */
+ for (j = 2; j <= 5; j++) {
+ buffer = gst_buffer_new_and_alloc (4);
+ GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = j * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ expected = g_list_append (expected, buffer);
+ fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src), buffer)
+ == GST_FLOW_OK);
+ }
+ }
+
+ if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
+ /* Client request seek to 7 sec position (which belongs to 2nd period)
+ * Application must provides corresponding buffer (of 2nd period) with
+ * new custom segment */
+
+ GstClockTime requested_pos = 7 * GST_SECOND;
+ /* In this test case, we are checking the serialized order of
+ * events and buffers, so, give some time to the appsrc loop to
+ * push all to sink */
+ g_usleep (G_USEC_PER_SEC * 1);
+
+ GST_DEBUG ("Seek to %" GST_TIME_FORMAT, GST_TIME_ARGS (requested_pos));
+ event = gst_event_new_seek (1.0, GST_FORMAT_TIME, GST_SEEK_FLAG_FLUSH,
+ GST_SEEK_TYPE_SET, requested_pos, GST_SEEK_TYPE_NONE, -1);
+ fail_unless (gst_element_send_event (pipe, event) == TRUE);
+
+ /* PREPARE SEGMENT */
+ gst_segment_init (&segment, GST_FORMAT_TIME);
+ segment.start = segment.position = 3 * GST_SECOND;
+ segment.time = requested_pos;
+
+ /* PREPARE BUFFER */
+ buffer = gst_buffer_new_and_alloc (4);
+ GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = 3 * GST_SECOND;
+
+ /* PREPARE SAMPLE */
+ sample = gst_sample_new (buffer, NULL, &segment, NULL);
+
+ expected = g_list_append (expected, gst_event_new_segment (&segment));
+ expected = g_list_append (expected, buffer);
+
+ /* 1st sample includes buffer and segment */
+ fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
+ == GST_FLOW_OK);
+
+ /* CLEAN UP */
+ gst_buffer_unref (buffer);
+ gst_sample_unref (sample);
+
+ /* Push the left buffers in the current period */
+ for (j = 4; j <= 5; j++) {
+ buffer = gst_buffer_new_and_alloc (4);
+ GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = j * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+ expected = g_list_append (expected, buffer);
+ fail_unless (gst_app_src_push_buffer (GST_APP_SRC (src), buffer)
+ == GST_FLOW_OK);
+ }
+ }
+
+ expected = g_list_append (expected, gst_event_new_eos ());
+ fail_unless (gst_app_src_end_of_stream (GST_APP_SRC (src)) == GST_FLOW_OK);
+
+ msg =
+ gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1,
+ GST_MESSAGE_EOS | GST_MESSAGE_ERROR);
+ fail_unless (msg);
+ fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
+ gst_message_unref (msg);
+
+ gst_pad_remove_probe (pad, probe_id);
+ gst_object_unref (pad);
+
+ ASSERT_SET_STATE (pipe, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ gst_object_unref (pipe);
+ fail_if (expected != NULL);
+ }
+}
+
+GST_END_TEST;
+
+GST_START_TEST (test_appsrc_custom_segment_twice)
+{
+ GstElement *pipe, *src, *sink;
+ GstMessage *msg;
+ gint i, tc;
+ GstAppSrcCallbacks cb = { 0 };
+ GstAppStreamType modes[] = { GST_APP_STREAM_TYPE_STREAM,
+ GST_APP_STREAM_TYPE_SEEKABLE
+ };
+ GstSample *sample;
+ gulong probe_id;
+ GstPad *pad;
+ GList *expected = NULL;
+ GstSegment segment;
+ GstBuffer *buffer;
+
+ for (tc = 0; tc < 4; tc++) {
+ /* Case 0: Push segment1 without buffer,
+ * then push segment1 with buffer again.
+ * Expected behaviour is that pushing segment only once to downstream */
+
+ /* Case 1: Push segment1 with buffer,
+ * then push segment1 with buffer again.
+ * Expected behaviour is that pushing segment only once to downstream */
+
+ /* Case 2: Push segment1 without buffer,
+ * then push segment2 with buffer.
+ * Expected behaviour is that pushing only segment2 with buffer
+ * to downstream */
+
+ /* Case 3: Push segment1 with buffer,
+ * then push segment2 with buffer.
+ * Expected behaviour is that pushing segment1 with buffer,
+ * and then segment2 with buffer */
+
+ GST_INFO ("Test Case #%d", tc);
+
+ for (i = 0; i < G_N_ELEMENTS (modes); i++) {
+ GST_INFO ("checking mode %d", modes[i]);
+
+ pipe = gst_pipeline_new ("pipeline");
+ src = gst_element_factory_make ("appsrc", NULL);
+ sink = gst_element_factory_make ("fakesink", NULL);
+ gst_bin_add_many (GST_BIN (pipe), src, sink, NULL);
+ fail_unless (gst_element_link (src, sink));
+
+ pad = gst_element_get_static_pad (sink, "sink");
+
+ probe_id = gst_pad_add_probe (pad,
+ GST_PAD_PROBE_TYPE_BUFFER | GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM,
+ (GstPadProbeCallback) appsrc_pad_probe, &expected, NULL);
+
+ g_object_set (G_OBJECT (src), "stream-type", modes[i], "format",
+ GST_FORMAT_TIME, "handle-segment-change", TRUE, NULL);
+
+ if (modes[i] != GST_APP_STREAM_TYPE_STREAM) {
+ cb.seek_data = seek_cb;
+ gst_app_src_set_callbacks (GST_APP_SRC (src), &cb, NULL, NULL);
+ }
+
+ ASSERT_SET_STATE (pipe, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ GST_DEBUG ("Prepare/Push the first sample");
+ /* PREPARE SEGMENT */
+ gst_segment_init (&segment, GST_FORMAT_TIME);
+ segment.start = segment.position = segment.time = GST_SECOND;
+
+ /* PREPARE BUFFER */
+ buffer = gst_buffer_new_and_alloc (4);
+ GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+
+ /* PREPARE FIRST SAMPLE */
+ if (tc == 0) {
+ /* Test Case 0: Push a sample without buffer */
+ sample = gst_sample_new (NULL, NULL, &segment, NULL);
+ expected = g_list_append (expected, gst_event_new_segment (&segment));
+ } else if (tc == 2) {
+ /* Test Case 2: Push a sample without buffer.
+ * We don't expect this segment will be used,
+ * because the updated next sample will be actually used */
+ sample = gst_sample_new (NULL, NULL, &segment, NULL);
+ } else {
+ sample = gst_sample_new (buffer, NULL, &segment, NULL);
+ expected = g_list_append (expected, gst_event_new_segment (&segment));
+ expected = g_list_append (expected, buffer);
+ }
+ /* PUSH THE FIRST SAMPLE */
+ fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
+ == GST_FLOW_OK);
+
+ /* CLEAN UP */
+ gst_buffer_unref (buffer);
+ gst_sample_unref (sample);
+
+ GST_DEBUG ("Prepare/Push the last sample");
+ /* PREPARE SEGMENT */
+ gst_segment_init (&segment, GST_FORMAT_TIME);
+ segment.start = segment.position = segment.time =
+ (tc == 0 || tc == 1) ? 1 * GST_SECOND : 2 * GST_SECOND;
+
+ /* PREPARE BUFFER */
+ buffer = gst_buffer_new_and_alloc (4);
+ GST_BUFFER_DTS (buffer) = GST_BUFFER_PTS (buffer) = 2 * GST_SECOND;
+ GST_BUFFER_DURATION (buffer) = GST_SECOND;
+
+ /* PREPARE THE LAST SAMPLE */
+ if (tc == 0 || tc == 1) {
+ /* Test Case 0 or 1: Push a sample with duplicated segment */
+ sample = gst_sample_new (buffer, NULL, &segment, NULL);
+ expected = g_list_append (expected, buffer);
+ } else {
+ sample = gst_sample_new (buffer, NULL, &segment, NULL);
+ expected = g_list_append (expected, gst_event_new_segment (&segment));
+ expected = g_list_append (expected, buffer);
+ }
+
+ fail_unless (gst_app_src_push_sample (GST_APP_SRC (src), sample)
+ == GST_FLOW_OK);
+
+ /* CLEAN UP */
+ gst_buffer_unref (buffer);
+ gst_sample_unref (sample);
+
+ expected = g_list_append (expected, gst_event_new_eos ());
+ fail_unless (gst_app_src_end_of_stream (GST_APP_SRC (src)) ==
+ GST_FLOW_OK);
+
+ msg =
+ gst_bus_timed_pop_filtered (GST_ELEMENT_BUS (pipe), -1,
+ GST_MESSAGE_EOS | GST_MESSAGE_ERROR);
+ fail_unless (msg);
+ fail_unless (GST_MESSAGE_TYPE (msg) == GST_MESSAGE_EOS);
+ gst_message_unref (msg);
+
+ gst_pad_remove_probe (pad, probe_id);
+ gst_object_unref (pad);
+
+ ASSERT_SET_STATE (pipe, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ gst_object_unref (pipe);
+ fail_if (expected != NULL);
+ }
+ }
+}
+
+GST_END_TEST;
+
static Suite *
appsrc_suite (void)
{
tcase_add_test (tc_chain, test_appsrc_caps_in_push_modes);
tcase_add_test (tc_chain, test_appsrc_blocked_on_caps);
tcase_add_test (tc_chain, test_appsrc_push_buffer_list);
+ tcase_add_test (tc_chain, test_appsrc_period_with_custom_segment);
+ tcase_add_test (tc_chain, test_appsrc_custom_segment_twice);
if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);