this->timeout = DEFAULT_TIMEOUT;
this->sync_method = DEFAULT_SYNC_METHOD;
+
+ this->header_flags = 0;
}
static void
g_slist_free (client->sending);
client->sending = NULL;
+ if (client->caps)
+ gst_caps_unref (client->caps);
+ client->caps = NULL;
+
/* unlock the mutex before signaling because the signal handler
* might query some properties */
CLIENTS_UNLOCK (sink);
client->fd.fd, string);
g_free (string);
- if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+ if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header,
+ &payload)) {
GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
return FALSE;
}
gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
GstTCPClient * client, GstBuffer * buffer)
{
+ GstCaps *caps;
+
+ /* TRUE: send them if the new caps have them */
+ gboolean send_streamheader = FALSE;
+ GstStructure *s;
+
+
+ /* before we queue the buffer, we check if we need to queue streamheader
+ * buffers (because it's a new client, or because they changed) */
+ caps = gst_buffer_get_caps (buffer); /* cleaned up after streamheader */
+ if (!client->caps) {
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] no previous caps for this client, send streamheader",
+ client->fd.fd);
+ send_streamheader = TRUE;
+ client->caps = gst_caps_ref (caps);
+ } else {
+ /* there were previous caps recorded, so compare */
+ if (!gst_caps_is_equal (caps, client->caps)) {
+ const GValue *sh1, *sh2;
+
+ /* caps are not equal, but could still have the same streamheader */
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_has_field (s, "streamheader")) {
+ /* no new streamheader, so nothing new to send */
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] new caps do not have streamheader, not sending",
+ client->fd.fd);
+ } else {
+ /* there is a new streamheader */
+ s = gst_caps_get_structure (client->caps, 0);
+ if (!gst_structure_has_field (s, "streamheader")) {
+ /* no previous streamheader, so send the new one */
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] previous caps did not have streamheader, sending",
+ client->fd.fd);
+ send_streamheader = TRUE;
+ } else {
+ /* both old and new caps have streamheader set */
+ sh1 = gst_structure_get_value (s, "streamheader");
+ s = gst_caps_get_structure (caps, 0);
+ sh2 = gst_structure_get_value (s, "streamheader");
+ if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] new streamheader different from old, sending",
+ client->fd.fd);
+ send_streamheader = TRUE;
+ }
+ }
+ }
+ }
+ }
+
+ if (G_UNLIKELY (send_streamheader)) {
+ const GValue *sh;
+ GArray *buffers;
+ int i;
+
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
+ client->fd.fd, caps);
+ s = gst_caps_get_structure (caps, 0);
+ if (!gst_structure_has_field (s, "streamheader")) {
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
+ } else {
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
+ client->fd.fd, caps);
+ sh = gst_structure_get_value (s, "streamheader");
+ g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
+ buffers = g_value_peek_pointer (sh);
+ for (i = 0; i < buffers->len; ++i) {
+ GValue *bufval;
+ GstBuffer *buffer;
+
+ bufval = &g_array_index (buffers, GValue, i);
+ g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
+ buffer = g_value_peek_pointer (bufval);
+ GST_LOG_OBJECT (sink,
+ "[fd %5d] queueing streamheader buffer of length %d",
+ client->fd.fd, GST_BUFFER_SIZE (buffer));
+ gst_buffer_ref (buffer);
+
+ if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
+ guint8 *header;
+ guint len;
+
+ if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len,
+ &header)) {
+ GST_DEBUG_OBJECT (sink,
+ "[fd %5d] could not create header, removing client",
+ client->fd.fd);
+ return FALSE;
+ }
+ gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header,
+ len);
+ }
+
+ client->sending = g_slist_append (client->sending, buffer);
+ }
+ }
+ }
+
+ gst_caps_unref (caps);
+ caps = NULL;
+ /* now we can send the buffer, possibly sending a GDP header first */
if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
guint8 *header;
guint len;
- if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
+ if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) {
GST_DEBUG_OBJECT (sink,
"[fd %5d] could not create header, removing client", client->fd.fd);
return FALSE;
client->caps_sent = TRUE;
}
}
- /* if we have streamheader buffers, and haven't sent them to this client
- * yet, send them out one by one */
- if (!client->streamheader_sent) {
- GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
- g_slist_length (sink->streamheader));
- if (sink->streamheader) {
- GSList *l;
-
- for (l = sink->streamheader; l; l = l->next) {
- /* queue stream headers for sending */
- res =
- gst_multi_fd_sink_client_queue_buffer (sink, client,
- GST_BUFFER (l->data));
- if (!res) {
- GST_DEBUG_OBJECT (sink,
- "Failed queueing streamheader, removing client");
- return FALSE;
- }
- }
- }
- client->streamheader_sent = TRUE;
- }
more = TRUE;
do {
gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
{
GstMultiFdSink *sink;
+ GstCaps *bufcaps, *padcaps;
sink = GST_MULTI_FD_SINK (bsink);
+ /* since we check every buffer for streamheader caps, we need to make
+ * sure every buffer has caps set */
+ bufcaps = gst_buffer_get_caps (buf);
+ padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
+
+ /* make sure we have caps on the pad */
+ if (!padcaps) {
+ if (!bufcaps) {
+ GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
+ ("Received first buffer without caps set"));
+ return GST_FLOW_NOT_NEGOTIATED;
+ }
+ }
+
+ /* stamp the buffer with previous caps if no caps set */
+ if (!bufcaps) {
+ buf = gst_buffer_make_writable (buf);
+ gst_buffer_set_caps (buf, padcaps);
+ } else {
+ gst_caps_unref (bufcaps);
+ }
+
/* since we keep this buffer out of the scope of this method */
gst_buffer_ref (buf);
/* if the incoming buffer is marked as IN CAPS, then we assume for now
* it's a streamheader that needs to be sent to each new client, so we
* put it on our internal list of streamheader buffers.
- * After that we return, since we only send these out when we get
- * non IN_CAPS buffers so we properly keep track of clients that got
- * streamheaders. */
+ * FIXME: we could check if the buffer's contents are in fact part of the
+ * current streamheader.
+ *
+ * We don't send the buffer to the client, since streamheaders are sent
+ * separately when necessary. */
if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
sink->previous_buffer_in_caps = TRUE;
GST_DEBUG_OBJECT (sink,
static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
- GST_STATIC_CAPS ("application/x-gdp")
+ GST_STATIC_CAPS ("application/x-gst-check")
);
GstElement *
{
GstElement *sink;
GstBuffer *buffer;
+ GstCaps *caps;
sink = setup_multifdsink ();
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+ caps = gst_caps_from_string ("application/x-gst-check");
buffer = gst_buffer_new_and_alloc (4);
+ gst_buffer_set_caps (buffer, caps);
+ gst_caps_unref (caps);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("cleaning up multifdsink");
{
GstElement *sink;
GstBuffer *buffer;
+ GstCaps *caps;
int pfd[2];
gchar data[4];
guint64 bytes_served;
/* add the client */
g_signal_emit_by_name (sink, "add", pfd[1]);
+ caps = gst_caps_from_string ("application/x-gst-check");
+ GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
buffer = gst_buffer_new_and_alloc (4);
+ gst_buffer_set_caps (buffer, caps);
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("cleaning up multifdsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
}
GST_END_TEST;
/* from the given two data buffers, create two streamheader buffers and
* some caps that match it, and store them in the given pointers
- * returns buffers and caps with a refcount of 1 */
+ * returns one ref to each of the buffers and the caps */
static void
gst_multifdsink_create_streamheader (const gchar * data1,
const gchar * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2,
GstCaps ** caps)
{
+ GstBuffer *buf;
GValue array = { 0 };
GValue value = { 0 };
GstStructure *structure;
g_value_init (&array, GST_TYPE_ARRAY);
g_value_init (&value, GST_TYPE_BUFFER);
- gst_value_set_buffer (&value, *hbuf1);
+ /* we take a copy, set it on the array (which refs it), then unref our copy */
+ buf = gst_buffer_copy (*hbuf1);
+ gst_value_set_buffer (&value, buf);
+ ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
+ gst_buffer_unref (buf);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
g_value_init (&value, GST_TYPE_BUFFER);
- gst_value_set_buffer (&value, *hbuf2);
+ buf = gst_buffer_copy (*hbuf2);
+ gst_value_set_buffer (&value, buf);
+ ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
+ gst_buffer_unref (buf);
gst_value_array_append_value (&array, &value);
g_value_unset (&value);
gst_structure_set_value (structure, "streamheader", &array);
g_value_unset (&array);
+ ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 1);
+
+ /* set our streamheadery caps on the buffers */
+ gst_buffer_set_caps (*hbuf1, *caps);
+ gst_buffer_set_caps (*hbuf2, *caps);
+ ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 3);
+
+ GST_DEBUG ("created streamheader caps %p %" GST_PTR_FORMAT, *caps, *caps);
}
* - sets streamheader caps on the pad
* - pushes the IN_CAPS buffers
* - pushes a buffer
- * - verifies that the client received all the data correctly
+ * - verifies that the client received all the data correctly, and did not
+ * get multiple copies of the streamheader
* - adds a second client
* - verifies that this second client receives the streamheader caps too, plus
* - the new buffer
gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
&caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
- gst_caps_unref (caps);
+ /* one is ours, two on the buffers, and one now on the pad */
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
fail_unless_read ("second client", pfd2[0], 4, "deaf");
wait_bytes_served (sink, 36);
- gst_buffer_unref (hbuf1);
- gst_buffer_unref (hbuf2);
GST_DEBUG ("cleaning up multifdsink");
+
+ g_signal_emit_by_name (sink, "remove", pfd1[1]);
+ g_signal_emit_by_name (sink, "remove", pfd2[1]);
+
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_multifdsink (sink);
+
+ ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
+ ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
+ gst_buffer_unref (hbuf1);
+ gst_buffer_unref (hbuf2);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
}
GST_END_TEST;
/* create caps with streamheader, set the caps, and push the IN_CAPS
* buffers */
- gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
+ gst_multifdsink_create_streamheader ("first", "header", &hbuf1, &hbuf2,
&caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
- gst_caps_unref (caps);
+ /* one is ours, two on the buffers, and one now on the pad */
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
+
+ /* one to hold for the test and one to give away */
+ ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
+ ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
memcpy (GST_BUFFER_DATA (buf), "f00d", 4);
gst_pad_push (mysrcpad, buf);
- fail_unless_read ("change: first client", pfd1[0], 4, "babe");
- fail_unless_read ("change: first client", pfd1[0], 8, "deadbeef");
+ fail_unless_read ("change: first client", pfd1[0], 5, "first");
+ fail_unless_read ("change: first client", pfd1[0], 6, "header");
fail_unless_read ("change: first client", pfd1[0], 4, "f00d");
- wait_bytes_served (sink, 16);
+ //wait_bytes_served (sink, 16);
/* now add the second client */
g_signal_emit_by_name (sink, "add", pfd2[1]);
fail_if_can_read ("second client, no buffer", pfd2[0]);
/* change the streamheader */
+
+ /* before we change, multifdsink still has a list of the old streamheaders */
+ ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
+ ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
gst_buffer_unref (hbuf1);
gst_buffer_unref (hbuf2);
- gst_multifdsink_create_streamheader ("beef", "deadbabe", &hbuf1, &hbuf2,
+
+ /* drop our ref to the previous caps */
+ gst_caps_unref (caps);
+
+ gst_multifdsink_create_streamheader ("second", "header", &hbuf1, &hbuf2,
&caps);
fail_unless (gst_pad_set_caps (mysrcpad, caps));
- gst_caps_unref (caps);
+ /* one to hold for the test and one to give away */
+ ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
+ ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
/* now push another buffer, which will trigger streamheader for second
* client, but should also send new streamheaders to first client */
- buf = gst_buffer_new_and_alloc (4);
- memcpy (GST_BUFFER_DATA (buf), "deaf", 4);
+ buf = gst_buffer_new_and_alloc (8);
+ memcpy (GST_BUFFER_DATA (buf), "deadbabe", 8);
gst_pad_push (mysrcpad, buf);
- /* FIXME: here's a bug - the first client does not get new streamheaders */
- fail_unless_read ("first client", pfd1[0], 4, "deaf");
+ fail_unless_read ("first client", pfd1[0], 6, "second");
+ fail_unless_read ("first client", pfd1[0], 6, "header");
+ fail_unless_read ("first client", pfd1[0], 8, "deadbabe");
/* new streamheader data */
- fail_unless_read ("second client", pfd2[0], 4, "beef");
- fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
+ fail_unless_read ("second client", pfd2[0], 6, "second");
+ fail_unless_read ("second client", pfd2[0], 6, "header");
/* we missed the f00d buffer */
- fail_unless_read ("second client", pfd2[0], 4, "deaf");
- wait_bytes_served (sink, 36);
+ fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
+ //wait_bytes_served (sink, 36);
- gst_buffer_unref (hbuf1);
- gst_buffer_unref (hbuf2);
GST_DEBUG ("cleaning up multifdsink");
+ g_signal_emit_by_name (sink, "remove", pfd1[1]);
+ g_signal_emit_by_name (sink, "remove", pfd2[1]);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+
+ /* setting to NULL should have cleared the streamheader */
+ ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
+ ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
+ gst_buffer_unref (hbuf1);
+ gst_buffer_unref (hbuf2);
cleanup_multifdsink (sink);
+
+ ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+ gst_caps_unref (caps);
}
GST_END_TEST;