/* GStreamer
* Copyright (C) <1999> Erik Walthinsen <omega@cse.ogi.edu>
* Copyright (C) <2004> Thomas Vander Stichele <thomas at apestaart dot org>
+ * Copyright (C) 2006 Wim Taymans <wim at fluendo dot com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
*
* <refsect2>
* <para>
- * This plugin writes incoming data to a set of filedescriptors. The
- * filedescriptors can be added to multifdsink by emiting the "add" signal. For
- * each descriptor added
- * the "client-added" signal will be called.
+ * This plugin writes incoming data to a set of file descriptors. The
+ * file descriptors can be added to multifdsink by emiting the "add" signal. For
+ * each descriptor added, the "client-added" signal will be called.
* </para>
* <para>
* Clients can be removed from multifdsink by emiting the "remove" signal. For
- * each descriptor removed the "client-removed" signal will be called. The
+ * each descriptor removed, the "client-removed" signal will be called. The
* "client-removed" signal can also be fired when multifdsink decides that a
* client is not active anymore or, depending on the value of the
- * "recover-policy" if the client is reading to slow.
- * In all cases, multifdsink will never ever close a filedescriptor itself, the
- * application has to do that itself in the "client-fd-removed" signal, for
- * example.
+ * "recover-policy," if the client is reading to slow.
+ * In all cases, multifdsink will never ever close a file descriptor itself.
+ * The user of multifdsink is responsible for closing the file descriptor.
+ * This can for example be done in response to the "client-fd-removed" signal.
* Note that multifdsink still has a reference to the file descriptor when the
* "client-removed" signal is emited so that "get-stats" can be performed on
* the descriptor; It is therefore not allowed to close the file descriptor in
* </para>
* <para>
* A client with a lag of at least "buffers-soft-max" enters the recovery
- * procedure which is controled with the "recover-policy" property. A recover
+ * procedure which is controlled with the "recover-policy" property. A recover
* policy of NONE will do nothing, RESYNC_LATEST will send the most recently
* received buffer as the next buffer for the client, RESYNC_SOFT_LIMIT
* positions the client to the soft limit in the buffer queue and
* multifdsink will synchronize on the clock before serving the buffers to the
* clients.
* </para>
- * <para>
- * Example pipeline:
- * <programlisting>
- * gst-launch -v videotestsrc ! multifdsink
- * </programlisting>
- * This pipeline will not do a lot since it is not possible from a gst-launch
- * line to add filedescriptors to multifdsink.
- * </para>
* </refsect2>
*
* Last reviewed on 2006-04-28 (0.10.7)
/* lock again before we remove the client completely */
CLIENTS_LOCK (sink);
- /* fd cannot be reused in the above signal callback so we can safely
+ /* fd cannot be reused in the above signal callback so we can safely
* remove it from the hashtable here */
if (!g_hash_table_remove (sink->fd_hash, &client->fd.fd)) {
GST_WARNING_OBJECT (sink,
return TRUE;
}
+/* decide where in the current buffer queue this new client should start
+ * receiving buffers from
+ * If this returns -1, it means that we haven't found a good point to
+ * start streaming from yet, and this function should be called again later
+ * when more buffers have arrived */
static gint
gst_multi_fd_sink_new_client (GstMultiFdSink * sink, GstTCPClient * client)
{
"[fd %5d] new client, bufpos %d, waiting for keyframe", client->fd.fd,
client->bufpos);
- /* the client is not yet alligned to a buffer */
+ /* the client is not yet aligned to a buffer */
if (client->bufpos < 0) {
result = -1;
} else {
}
}
+static void
+fail_if_can_read (char *msg, int fd)
+{
+ long avail;
+
+ /* verify this hasn't triggered a write yet */
+ /* FIXME: possibly racy, since if it would write, we may not get it
+ * immediately ? */
+ fail_if (ioctl (fd, FIONREAD, &avail) < 0, "%s: could not ioctl", msg);
+ fail_if (avail > 0, "%s: has bytes available to read");
+}
+
+
GST_START_TEST (test_no_clients)
{
GstElement *sink;
GST_END_TEST;
+#define fail_unless_read(msg,fd,size,ref) \
+G_STMT_START { \
+ char data[size + 1]; \
+ int nbytes; \
+\
+ GST_DEBUG ("%s: reading %d bytes", msg, size); \
+ nbytes = read (fd, data, size); \
+ data[size] = 0; \
+ GST_DEBUG ("%s: read %d bytes", msg, nbytes); \
+ fail_if (nbytes < size); \
+ fail_unless (memcmp (data, ref, size) == 0, \
+ "data read '%s' differs from '%s'", data, ref); \
+} G_STMT_END;
+
+/* from the given two data buffers, create two streamheader buffers and
+ * some caps that match it, and store them in the given pointers
+ * returns buffers and caps with a refcount of 1 */
static void
-fail_unless_read (const char *msg, int fd, int size, const char *ref)
+gst_multifdsink_create_streamheader (guint8 * data1,
+ guint8 * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2, GstCaps ** caps)
{
- char data[size];
- int nbytes;
-
- GST_DEBUG ("%s: reading %d bytes", msg, size);
- nbytes = read (fd, data, size);
- GST_DEBUG ("%s: read %d bytes", msg, nbytes);
- fail_if (nbytes < size);
- fail_unless (memcmp (data, ref, size) == 0, "data read differs from '%s'",
- ref);
+ GValue array = { 0 };
+ GValue value = { 0 };
+ GstStructure *structure;
+ guint size1 = strlen (data1);
+ guint size2 = strlen (data2);
+
+ fail_if (hbuf1 == NULL);
+ fail_if (hbuf2 == NULL);
+ fail_if (caps == NULL);
+
+ /* create caps with streamheader, set the caps, and push the IN_CAPS
+ * buffers */
+ *hbuf1 = gst_buffer_new_and_alloc (size1);
+ GST_BUFFER_FLAG_SET (*hbuf1, GST_BUFFER_FLAG_IN_CAPS);
+ memcpy (GST_BUFFER_DATA (*hbuf1), data1, size1);
+ *hbuf2 = gst_buffer_new_and_alloc (size2);
+ GST_BUFFER_FLAG_SET (*hbuf2, GST_BUFFER_FLAG_IN_CAPS);
+ memcpy (GST_BUFFER_DATA (*hbuf2), data2, size2);
+ /* we want to keep them around for the tests */
+ gst_buffer_ref (*hbuf1);
+ gst_buffer_ref (*hbuf2);
+
+ g_value_init (&array, GST_TYPE_ARRAY);
+
+ g_value_init (&value, GST_TYPE_BUFFER);
+ gst_value_set_buffer (&value, *hbuf1);
+ gst_value_array_append_value (&array, &value);
+ g_value_unset (&value);
+
+ g_value_init (&value, GST_TYPE_BUFFER);
+ gst_value_set_buffer (&value, *hbuf2);
+ gst_value_array_append_value (&array, &value);
+ g_value_unset (&value);
+
+ *caps = gst_caps_from_string ("application/x-gst-check");
+ structure = gst_caps_get_structure (*caps, 0);
+
+ gst_structure_set_value (structure, "streamheader", &array);
+ g_value_unset (&array);
}
+
/* this test:
* - adds a first client
* - sets streamheader caps on the pad
* - pushes a buffer
* - verifies that the client received all the data correctly
* - adds a second client
- * - verifies that this second client receives the streamheader caps too
+ * - verifies that this second client receives the streamheader caps too, plus
+ * - the new buffer
*/
GST_START_TEST (test_streamheader)
{
GstStructure *structure;
int pfd1[2], pfd2[2];
guint8 data[12];
- GValue array = { 0 };
- GValue value = { 0 };
guint64 bytes_served;
int avail;
/* create caps with streamheader, set the caps, and push the IN_CAPS
* buffers */
- hbuf1 = gst_buffer_new_and_alloc (4);
- GST_BUFFER_FLAG_SET (hbuf1, GST_BUFFER_FLAG_IN_CAPS);
- memcpy (GST_BUFFER_DATA (hbuf1), "babe", 4);
- hbuf2 = gst_buffer_new_and_alloc (8);
- GST_BUFFER_FLAG_SET (hbuf2, GST_BUFFER_FLAG_IN_CAPS);
- memcpy (GST_BUFFER_DATA (hbuf2), "deadbeef", 8);
- /* we want to keep them around for the tests */
- gst_buffer_ref (hbuf1);
- gst_buffer_ref (hbuf2);
-
- g_value_init (&array, GST_TYPE_ARRAY);
-
- g_value_init (&value, GST_TYPE_BUFFER);
- gst_value_set_buffer (&value, hbuf1);
- gst_value_array_append_value (&array, &value);
- g_value_unset (&value);
-
- g_value_init (&value, GST_TYPE_BUFFER);
- gst_value_set_buffer (&value, hbuf2);
- gst_value_array_append_value (&array, &value);
- g_value_unset (&value);
-
- caps = gst_caps_from_string ("application/x-gst-check");
- structure = gst_caps_get_structure (caps, 0);
-
- gst_structure_set_value (structure, "streamheader", &array);
- g_value_unset (&array);
+ gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
+ &caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
gst_caps_unref (caps);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
- /* verify this hasn't triggered a write yet */
- /* FIXME: possibly racy, since if it would write, we may not get it
- * immediately ? */
- fail_if (ioctl (pfd1[0], FIONREAD, &avail) < 0);
- fail_if (avail > 0);
+ fail_if_can_read ("first client", pfd1[0]);
/* push a non-IN_CAPS buffer, this should trigger the client receiving the
* first three buffers */
/* now add the second client */
g_signal_emit_by_name (sink, "add", pfd2[1]);
- fail_if (ioctl (pfd2[0], FIONREAD, &avail) < 0);
- fail_if (avail > 0);
+ fail_if_can_read ("second client", pfd2[0]);
/* now push another buffer, which will trigger streamheader for second
* client */
GST_END_TEST;
+/* this tests changing of streamheaders
+ * - set streamheader caps on the pad
+ * - pushes the IN_CAPS buffers
+ * - pushes a buffer
+ * - add a first client
+ * - verifies that this first client receives the first streamheader caps,
+ * plus a new buffer
+ * - change streamheader caps
+ * - verify that the first client receives the new streamheader buffers as well
+ */
+GST_START_TEST (test_change_streamheader)
+{
+ GstElement *sink;
+ GstBuffer *hbuf1, *hbuf2, *buf;
+ GstCaps *caps;
+ GstStructure *structure;
+ int pfd1[2], pfd2[2];
+ guint8 data[12];
+ GValue array = { 0 };
+ GValue value = { 0 };
+ guint64 bytes_served;
+ int avail;
+
+ sink = setup_multifdsink ();
+
+ fail_if (pipe (pfd1) == -1);
+ fail_if (pipe (pfd2) == -1);
+
+ ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ /* create caps with streamheader, set the caps, and push the IN_CAPS
+ * buffers */
+ gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
+ &caps);
+ fail_unless (gst_pad_set_caps (mysrcpad, caps));
+ gst_caps_unref (caps);
+
+ fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
+ fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
+
+ /* add the first client */
+ g_signal_emit_by_name (sink, "add", pfd1[1]);
+
+ /* verify this hasn't triggered a write yet */
+ /* FIXME: possibly racy, since if it would write, we may not get it
+ * immediately ? */
+ fail_if_can_read ("first client, no buffer", pfd1[0]);
+
+ /* now push a buffer and read */
+ buf = gst_buffer_new_and_alloc (4);
+ memcpy (GST_BUFFER_DATA (buf), "f00d", 4);
+ gst_pad_push (mysrcpad, buf);
+
+ fail_unless_read ("change: first client", pfd1[0], 4, "babe");
+ fail_unless_read ("change: first client", pfd1[0], 8, "deadbeef");
+ fail_unless_read ("change: first client", pfd1[0], 4, "f00d");
+ wait_bytes_served (sink, 16);
+
+ /* now add the second client */
+ g_signal_emit_by_name (sink, "add", pfd2[1]);
+ fail_if_can_read ("second client, no buffer", pfd2[0]);
+
+ /* change the streamheader */
+ gst_buffer_unref (hbuf1);
+ gst_buffer_unref (hbuf2);
+ gst_multifdsink_create_streamheader ("beef", "deadbabe", &hbuf1, &hbuf2,
+ &caps);
+ fail_unless (gst_pad_set_caps (mysrcpad, caps));
+ gst_caps_unref (caps);
+
+ fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
+ fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
+
+ /* verify neither client has new data available to read */
+ fail_if_can_read ("first client, changed streamheader", pfd1[0]);
+ fail_if_can_read ("second client, changed streamheader", pfd2[0]);
+
+ /* now push another buffer, which will trigger streamheader for second
+ * client, but should also send new streamheaders to first client */
+ buf = gst_buffer_new_and_alloc (4);
+ memcpy (GST_BUFFER_DATA (buf), "deaf", 4);
+ gst_pad_push (mysrcpad, buf);
+
+ /* FIXME: here's a bug - the first client does not get new streamheaders */
+ fail_unless_read ("first client", pfd1[0], 4, "deaf");
+
+ /* new streamheader data */
+ fail_unless_read ("second client", pfd2[0], 4, "beef");
+ fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
+ /* we missed the f00d buffer */
+ fail_unless_read ("second client", pfd2[0], 4, "deaf");
+ wait_bytes_served (sink, 36);
+
+ gst_buffer_unref (hbuf1);
+ gst_buffer_unref (hbuf2);
+ GST_DEBUG ("cleaning up multifdsink");
+ ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ cleanup_multifdsink (sink);
+}
+
+GST_END_TEST;
+
+/* FIXME: add test simulating chained oggs where:
+ * sync-method is burst-on-connect
+ * (when multifdsink actually does burst-on-connect based on byte size, not
+ "last keyframe" which any frame for audio :))
+ * an old client still needs to read from before the new streamheaders
+ * a new client gets the new streamheaders
+ */
Suite *
multifdsink_suite (void)
tcase_add_test (tc_chain, test_no_clients);
tcase_add_test (tc_chain, test_add_client);
tcase_add_test (tc_chain, test_streamheader);
+ tcase_add_test (tc_chain, test_change_streamheader);
return s;
}