gst/tcp/: make multifdsink properly deal with streamheader:
authorThomas Vander Stichele <thomas@apestaart.org>
Fri, 2 Jun 2006 16:26:54 +0000 (16:26 +0000)
committerThomas Vander Stichele <thomas@apestaart.org>
Fri, 2 Jun 2006 16:26:54 +0000 (16:26 +0000)
Original commit message from CVS:
* gst/tcp/README:
* gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_init),
(gst_multi_fd_sink_remove_client_link),
(gst_multi_fd_sink_client_queue_caps),
(gst_multi_fd_sink_client_queue_buffer),
(gst_multi_fd_sink_handle_client_write),
(gst_multi_fd_sink_render):
* gst/tcp/gstmultifdsink.h:
make multifdsink properly deal with streamheader:
- streamheader is taken from caps
- buffers marked with IN_CAPS are not sent
- streamheaders are sent, on connection, from the caps of the
buffer where the client gets positioned to
- further streamheader changes are done every time the client
will receive a buffer with different caps
* tests/check/elements/multifdsink.c: (GST_START_TEST),
(gst_multifdsink_create_streamheader):
add tests for this

ChangeLog
gst/tcp/README
gst/tcp/gstmultifdsink.c
gst/tcp/gstmultifdsink.h
tests/check/elements/multifdsink.c

index 04b761bdf5be2e839c26f78f5b11a1c17a39ac45..2aaeb6580dc9f60cac3957de42ae0222205e82b8 100644 (file)
--- a/ChangeLog
+++ b/ChangeLog
@@ -1,3 +1,24 @@
+2006-06-02  Thomas Vander Stichele  <thomas at apestaart dot org>
+
+       * gst/tcp/README:
+       * gst/tcp/gstmultifdsink.c: (gst_multi_fd_sink_init),
+       (gst_multi_fd_sink_remove_client_link),
+       (gst_multi_fd_sink_client_queue_caps),
+       (gst_multi_fd_sink_client_queue_buffer),
+       (gst_multi_fd_sink_handle_client_write),
+       (gst_multi_fd_sink_render):
+       * gst/tcp/gstmultifdsink.h:
+         make multifdsink properly deal with streamheader:
+         - streamheader is taken from caps
+         - buffers marked with IN_CAPS are not sent
+         - streamheaders are sent, on connection, from the caps of the
+           buffer where the client gets positioned to
+         - further streamheader changes are done every time the client
+           will receive a buffer with different caps
+       * tests/check/elements/multifdsink.c: (GST_START_TEST),
+       (gst_multifdsink_create_streamheader):
+         add tests for this
+
 2006-06-02  Michael Smith  <msmith@fluendo.com>
 
        * ext/vorbis/vorbisdec.c: (vorbis_handle_identification_packet):
 2006-06-02  Michael Smith  <msmith@fluendo.com>
 
        * ext/vorbis/vorbisdec.c: (vorbis_handle_identification_packet):
index 589d534091acac3fbb8b98128f36c59bb7aa07cf..0e3af6a614fd4ffc025d601873b1f896f93ecb56 100644 (file)
@@ -29,3 +29,22 @@ TODO
 ----
 - implement DNS resolution
 
 ----
 - implement DNS resolution
 
+multifdsink
+-----------
+- operation:
+  - client fd gets added when "add" signal gets emitted on multifdsink
+  - signal handler creates a GstTCPClient structure, adds it to ->clients,
+    and adds the fd to ->fd_hash, then emits client-added
+  - client 
+
+  - when a buffer comes in:
+    - the _render vmethod puts the buffer on the global queue
+    - and increases bytes_to_serve
+    - (currently it sets streamheaders, but since this is treated globally
+       this is wrong - clients can be at different positions in the stream)
+
+  - when a client issues a write (ie requests data):
+    - when using GDP, if no caps sent yet, send caps first, then set caps_sent
+    - if streamheader buffers, and we haven't sent yet to this client,
+      send current streamheader buffers, then set streamheader_sent
+    - send out buffers
index 5ded7c4f02d9a74e58064c207998f3965c7307b8..7dfce3e766c4b5418841a2c038b536b6a627026b 100644 (file)
@@ -537,6 +537,8 @@ gst_multi_fd_sink_init (GstMultiFdSink * this, GstMultiFdSinkClass * klass)
 
   this->timeout = DEFAULT_TIMEOUT;
   this->sync_method = DEFAULT_SYNC_METHOD;
 
   this->timeout = DEFAULT_TIMEOUT;
   this->sync_method = DEFAULT_SYNC_METHOD;
+
+  this->header_flags = 0;
 }
 
 static void
 }
 
 static void
@@ -792,6 +794,10 @@ gst_multi_fd_sink_remove_client_link (GstMultiFdSink * sink, GList * link)
   g_slist_free (client->sending);
   client->sending = NULL;
 
   g_slist_free (client->sending);
   client->sending = NULL;
 
+  if (client->caps)
+    gst_caps_unref (client->caps);
+  client->caps = NULL;
+
   /* unlock the mutex before signaling because the signal handler
    * might query some properties */
   CLIENTS_UNLOCK (sink);
   /* unlock the mutex before signaling because the signal handler
    * might query some properties */
   CLIENTS_UNLOCK (sink);
@@ -936,7 +942,8 @@ gst_multi_fd_sink_client_queue_caps (GstMultiFdSink * sink,
       client->fd.fd, string);
   g_free (string);
 
       client->fd.fd, string);
   g_free (string);
 
-  if (!gst_dp_packet_from_caps (caps, 0, &length, &header, &payload)) {
+  if (!gst_dp_packet_from_caps (caps, sink->header_flags, &length, &header,
+          &payload)) {
     GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
     return FALSE;
   }
     GST_DEBUG_OBJECT (sink, "Could not create GDP packet from caps");
     return FALSE;
   }
@@ -965,11 +972,118 @@ static gboolean
 gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
     GstTCPClient * client, GstBuffer * buffer)
 {
 gst_multi_fd_sink_client_queue_buffer (GstMultiFdSink * sink,
     GstTCPClient * client, GstBuffer * buffer)
 {
+  GstCaps *caps;
+
+  /* TRUE: send them if the new caps have them */
+  gboolean send_streamheader = FALSE;
+  GstStructure *s;
+
+
+  /* before we queue the buffer, we check if we need to queue streamheader
+   * buffers (because it's a new client, or because they changed) */
+  caps = gst_buffer_get_caps (buffer);  /* cleaned up after streamheader */
+  if (!client->caps) {
+    GST_LOG_OBJECT (sink,
+        "[fd %5d] no previous caps for this client, send streamheader",
+        client->fd.fd);
+    send_streamheader = TRUE;
+    client->caps = gst_caps_ref (caps);
+  } else {
+    /* there were previous caps recorded, so compare */
+    if (!gst_caps_is_equal (caps, client->caps)) {
+      const GValue *sh1, *sh2;
+
+      /* caps are not equal, but could still have the same streamheader */
+      s = gst_caps_get_structure (caps, 0);
+      if (!gst_structure_has_field (s, "streamheader")) {
+        /* no new streamheader, so nothing new to send */
+        GST_LOG_OBJECT (sink,
+            "[fd %5d] new caps do not have streamheader, not sending",
+            client->fd.fd);
+      } else {
+        /* there is a new streamheader */
+        s = gst_caps_get_structure (client->caps, 0);
+        if (!gst_structure_has_field (s, "streamheader")) {
+          /* no previous streamheader, so send the new one */
+          GST_LOG_OBJECT (sink,
+              "[fd %5d] previous caps did not have streamheader, sending",
+              client->fd.fd);
+          send_streamheader = TRUE;
+        } else {
+          /* both old and new caps have streamheader set */
+          sh1 = gst_structure_get_value (s, "streamheader");
+          s = gst_caps_get_structure (caps, 0);
+          sh2 = gst_structure_get_value (s, "streamheader");
+          if (gst_value_compare (sh1, sh2) != GST_VALUE_EQUAL) {
+            GST_LOG_OBJECT (sink,
+                "[fd %5d] new streamheader different from old, sending",
+                client->fd.fd);
+            send_streamheader = TRUE;
+          }
+        }
+      }
+    }
+  }
+
+  if (G_UNLIKELY (send_streamheader)) {
+    const GValue *sh;
+    GArray *buffers;
+    int i;
+
+    GST_LOG_OBJECT (sink,
+        "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
+        client->fd.fd, caps);
+    s = gst_caps_get_structure (caps, 0);
+    if (!gst_structure_has_field (s, "streamheader")) {
+      GST_LOG_OBJECT (sink,
+          "[fd %5d] no new streamheader, so nothing to send", client->fd.fd);
+    } else {
+      GST_LOG_OBJECT (sink,
+          "[fd %5d] sending streamheader from caps %" GST_PTR_FORMAT,
+          client->fd.fd, caps);
+      sh = gst_structure_get_value (s, "streamheader");
+      g_assert (G_VALUE_TYPE (sh) == GST_TYPE_ARRAY);
+      buffers = g_value_peek_pointer (sh);
+      for (i = 0; i < buffers->len; ++i) {
+        GValue *bufval;
+        GstBuffer *buffer;
+
+        bufval = &g_array_index (buffers, GValue, i);
+        g_assert (G_VALUE_TYPE (bufval) == GST_TYPE_BUFFER);
+        buffer = g_value_peek_pointer (bufval);
+        GST_LOG_OBJECT (sink,
+            "[fd %5d] queueing streamheader buffer of length %d",
+            client->fd.fd, GST_BUFFER_SIZE (buffer));
+        gst_buffer_ref (buffer);
+
+        if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
+          guint8 *header;
+          guint len;
+
+          if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len,
+                  &header)) {
+            GST_DEBUG_OBJECT (sink,
+                "[fd %5d] could not create header, removing client",
+                client->fd.fd);
+            return FALSE;
+          }
+          gst_multi_fd_sink_client_queue_data (sink, client, (gchar *) header,
+              len);
+        }
+
+        client->sending = g_slist_append (client->sending, buffer);
+      }
+    }
+  }
+
+  gst_caps_unref (caps);
+  caps = NULL;
+  /* now we can send the buffer, possibly sending a GDP header first */
   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
     guint8 *header;
     guint len;
 
   if (sink->protocol == GST_TCP_PROTOCOL_GDP) {
     guint8 *header;
     guint len;
 
-    if (!gst_dp_header_from_buffer (buffer, 0, &len, &header)) {
+    if (!gst_dp_header_from_buffer (buffer, sink->header_flags, &len, &header)) {
       GST_DEBUG_OBJECT (sink,
           "[fd %5d] could not create header, removing client", client->fd.fd);
       return FALSE;
       GST_DEBUG_OBJECT (sink,
           "[fd %5d] could not create header, removing client", client->fd.fd);
       return FALSE;
@@ -1143,28 +1257,6 @@ gst_multi_fd_sink_handle_client_write (GstMultiFdSink * sink,
       client->caps_sent = TRUE;
     }
   }
       client->caps_sent = TRUE;
     }
   }
-  /* if we have streamheader buffers, and haven't sent them to this client
-   * yet, send them out one by one */
-  if (!client->streamheader_sent) {
-    GST_DEBUG_OBJECT (sink, "[fd %5d] Sending streamheader, %d buffers", fd,
-        g_slist_length (sink->streamheader));
-    if (sink->streamheader) {
-      GSList *l;
-
-      for (l = sink->streamheader; l; l = l->next) {
-        /* queue stream headers for sending */
-        res =
-            gst_multi_fd_sink_client_queue_buffer (sink, client,
-            GST_BUFFER (l->data));
-        if (!res) {
-          GST_DEBUG_OBJECT (sink,
-              "Failed queueing streamheader, removing client");
-          return FALSE;
-        }
-      }
-    }
-    client->streamheader_sent = TRUE;
-  }
 
   more = TRUE;
   do {
 
   more = TRUE;
   do {
@@ -1645,9 +1737,32 @@ static GstFlowReturn
 gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
   GstMultiFdSink *sink;
 gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
 {
   GstMultiFdSink *sink;
+  GstCaps *bufcaps, *padcaps;
 
   sink = GST_MULTI_FD_SINK (bsink);
 
 
   sink = GST_MULTI_FD_SINK (bsink);
 
+  /* since we check every buffer for streamheader caps, we need to make
+   * sure every buffer has caps set */
+  bufcaps = gst_buffer_get_caps (buf);
+  padcaps = GST_PAD_CAPS (GST_BASE_SINK_PAD (bsink));
+
+  /* make sure we have caps on the pad */
+  if (!padcaps) {
+    if (!bufcaps) {
+      GST_ELEMENT_ERROR (sink, CORE, NEGOTIATION, (NULL),
+          ("Received first buffer without caps set"));
+      return GST_FLOW_NOT_NEGOTIATED;
+    }
+  }
+
+  /* stamp the buffer with previous caps if no caps set */
+  if (!bufcaps) {
+    buf = gst_buffer_make_writable (buf);
+    gst_buffer_set_caps (buf, padcaps);
+  } else {
+    gst_caps_unref (bufcaps);
+  }
+
   /* since we keep this buffer out of the scope of this method */
   gst_buffer_ref (buf);
 
   /* since we keep this buffer out of the scope of this method */
   gst_buffer_ref (buf);
 
@@ -1670,9 +1785,11 @@ gst_multi_fd_sink_render (GstBaseSink * bsink, GstBuffer * buf)
   /* if the incoming buffer is marked as IN CAPS, then we assume for now
    * it's a streamheader that needs to be sent to each new client, so we
    * put it on our internal list of streamheader buffers.
   /* if the incoming buffer is marked as IN CAPS, then we assume for now
    * it's a streamheader that needs to be sent to each new client, so we
    * put it on our internal list of streamheader buffers.
-   * After that we return, since we only send these out when we get
-   * non IN_CAPS buffers so we properly keep track of clients that got
-   * streamheaders. */
+   * FIXME: we could check if the buffer's contents are in fact part of the
+   * current streamheader.
+   *
+   * We don't send the buffer to the client, since streamheaders are sent
+   * separately when necessary. */
   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
     sink->previous_buffer_in_caps = TRUE;
     GST_DEBUG_OBJECT (sink,
   if (GST_BUFFER_FLAG_IS_SET (buf, GST_BUFFER_FLAG_IN_CAPS)) {
     sink->previous_buffer_in_caps = TRUE;
     GST_DEBUG_OBJECT (sink,
index 7848903801e671b6e7ccd833d829d052c8603306..7e615736f66c9fbbebe4bdabf6b4e4cafbd49362 100644 (file)
@@ -142,9 +142,10 @@ typedef struct {
   GstTCPProtocol protocol;
 
   gboolean caps_sent;
   GstTCPProtocol protocol;
 
   gboolean caps_sent;
-  gboolean streamheader_sent;
   gboolean new_connection;
 
   gboolean new_connection;
 
+  GstCaps *caps;                /* caps of last queued buffer */
+
   /* stats */
   guint64 bytes_sent;
   guint64 connect_time;
   /* stats */
   guint64 bytes_sent;
   guint64 connect_time;
@@ -152,7 +153,6 @@ typedef struct {
   guint64 last_activity_time;
   guint64 dropped_buffers;
   guint64 avg_queue_size;
   guint64 last_activity_time;
   guint64 dropped_buffers;
   guint64 avg_queue_size;
-
 } GstTCPClient;
 
 #define CLIENTS_LOCK_INIT(fdsink)       (g_static_rec_mutex_init(&fdsink->clientslock))
 } GstTCPClient;
 
 #define CLIENTS_LOCK_INIT(fdsink)       (g_static_rec_mutex_init(&fdsink->clientslock))
@@ -203,6 +203,8 @@ struct _GstMultiFdSink {
   gint buffers_queued;  /* number of queued buffers */
   gint bytes_queued;    /* number of queued bytes */
   gint time_queued;     /* number of queued time */
   gint buffers_queued;  /* number of queued buffers */
   gint bytes_queued;    /* number of queued bytes */
   gint time_queued;     /* number of queued time */
+
+  guint8 header_flags;
 };
 
 struct _GstMultiFdSinkClass {
 };
 
 struct _GstMultiFdSinkClass {
index 67c8c4758e5008f9dc8744c39b14cb133a65e3e8..7b822cae388c51fad1ae2398ff5299188322d475 100644 (file)
@@ -31,7 +31,7 @@ GstPad *mysrcpad;
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
     GST_PAD_SRC,
     GST_PAD_ALWAYS,
-    GST_STATIC_CAPS ("application/x-gdp")
+    GST_STATIC_CAPS ("application/x-gst-check")
     );
 
 GstElement *
     );
 
 GstElement *
@@ -80,12 +80,16 @@ GST_START_TEST (test_no_clients)
 {
   GstElement *sink;
   GstBuffer *buffer;
 {
   GstElement *sink;
   GstBuffer *buffer;
+  GstCaps *caps;
 
   sink = setup_multifdsink ();
 
   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
 
 
   sink = setup_multifdsink ();
 
   ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
 
+  caps = gst_caps_from_string ("application/x-gst-check");
   buffer = gst_buffer_new_and_alloc (4);
   buffer = gst_buffer_new_and_alloc (4);
+  gst_buffer_set_caps (buffer, caps);
+  gst_caps_unref (caps);
   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
 
   GST_DEBUG ("cleaning up multifdsink");
   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
 
   GST_DEBUG ("cleaning up multifdsink");
@@ -99,6 +103,7 @@ GST_START_TEST (test_add_client)
 {
   GstElement *sink;
   GstBuffer *buffer;
 {
   GstElement *sink;
   GstBuffer *buffer;
+  GstCaps *caps;
   int pfd[2];
   gchar data[4];
   guint64 bytes_served;
   int pfd[2];
   gchar data[4];
   guint64 bytes_served;
@@ -112,7 +117,11 @@ GST_START_TEST (test_add_client)
   /* add the client */
   g_signal_emit_by_name (sink, "add", pfd[1]);
 
   /* add the client */
   g_signal_emit_by_name (sink, "add", pfd[1]);
 
+  caps = gst_caps_from_string ("application/x-gst-check");
+  GST_DEBUG ("Created test caps %p %" GST_PTR_FORMAT, caps, caps);
   buffer = gst_buffer_new_and_alloc (4);
   buffer = gst_buffer_new_and_alloc (4);
+  gst_buffer_set_caps (buffer, caps);
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
   memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
 
   memcpy (GST_BUFFER_DATA (buffer), "dead", 4);
   fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
 
@@ -123,6 +132,9 @@ GST_START_TEST (test_add_client)
   GST_DEBUG ("cleaning up multifdsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
   cleanup_multifdsink (sink);
   GST_DEBUG ("cleaning up multifdsink");
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
   cleanup_multifdsink (sink);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
 }
 
 GST_END_TEST;
 }
 
 GST_END_TEST;
@@ -143,12 +155,13 @@ G_STMT_START { \
 
 /* from the given two data buffers, create two streamheader buffers and
  * some caps that match it, and store them in the given pointers
 
 /* from the given two data buffers, create two streamheader buffers and
  * some caps that match it, and store them in the given pointers
- * returns buffers and caps with a refcount of 1 */
+ * returns  one ref to each of the buffers and the caps */
 static void
 gst_multifdsink_create_streamheader (const gchar * data1,
     const gchar * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2,
     GstCaps ** caps)
 {
 static void
 gst_multifdsink_create_streamheader (const gchar * data1,
     const gchar * data2, GstBuffer ** hbuf1, GstBuffer ** hbuf2,
     GstCaps ** caps)
 {
+  GstBuffer *buf;
   GValue array = { 0 };
   GValue value = { 0 };
   GstStructure *structure;
   GValue array = { 0 };
   GValue value = { 0 };
   GstStructure *structure;
@@ -174,12 +187,19 @@ gst_multifdsink_create_streamheader (const gchar * data1,
   g_value_init (&array, GST_TYPE_ARRAY);
 
   g_value_init (&value, GST_TYPE_BUFFER);
   g_value_init (&array, GST_TYPE_ARRAY);
 
   g_value_init (&value, GST_TYPE_BUFFER);
-  gst_value_set_buffer (&value, *hbuf1);
+  /* we take a copy, set it on the array (which refs it), then unref our copy */
+  buf = gst_buffer_copy (*hbuf1);
+  gst_value_set_buffer (&value, buf);
+  ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
+  gst_buffer_unref (buf);
   gst_value_array_append_value (&array, &value);
   g_value_unset (&value);
 
   g_value_init (&value, GST_TYPE_BUFFER);
   gst_value_array_append_value (&array, &value);
   g_value_unset (&value);
 
   g_value_init (&value, GST_TYPE_BUFFER);
-  gst_value_set_buffer (&value, *hbuf2);
+  buf = gst_buffer_copy (*hbuf2);
+  gst_value_set_buffer (&value, buf);
+  ASSERT_BUFFER_REFCOUNT (buf, "copied buffer", 2);
+  gst_buffer_unref (buf);
   gst_value_array_append_value (&array, &value);
   g_value_unset (&value);
 
   gst_value_array_append_value (&array, &value);
   g_value_unset (&value);
 
@@ -188,6 +208,14 @@ gst_multifdsink_create_streamheader (const gchar * data1,
 
   gst_structure_set_value (structure, "streamheader", &array);
   g_value_unset (&array);
 
   gst_structure_set_value (structure, "streamheader", &array);
   g_value_unset (&array);
+  ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 1);
+
+  /* set our streamheadery caps on the buffers */
+  gst_buffer_set_caps (*hbuf1, *caps);
+  gst_buffer_set_caps (*hbuf2, *caps);
+  ASSERT_CAPS_REFCOUNT (*caps, "streamheader caps", 3);
+
+  GST_DEBUG ("created streamheader caps %p %" GST_PTR_FORMAT, *caps, *caps);
 }
 
 
 }
 
 
@@ -196,7 +224,8 @@ gst_multifdsink_create_streamheader (const gchar * data1,
  * - sets streamheader caps on the pad
  * - pushes the IN_CAPS buffers
  * - pushes a buffer
  * - sets streamheader caps on the pad
  * - pushes the IN_CAPS buffers
  * - pushes a buffer
- * - verifies that the client received all the data correctly
+ * - verifies that the client received all the data correctly, and did not
+ *   get multiple copies of the streamheader
  * - adds a second client
  * - verifies that this second client receives the streamheader caps too, plus
  * - the new buffer
  * - adds a second client
  * - verifies that this second client receives the streamheader caps too, plus
  * - the new buffer
@@ -227,7 +256,8 @@ GST_START_TEST (test_streamheader)
   gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
       &caps);
   fail_unless (gst_pad_set_caps (mysrcpad, caps));
   gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
       &caps);
   fail_unless (gst_pad_set_caps (mysrcpad, caps));
-  gst_caps_unref (caps);
+  /* one is ours, two on the buffers, and one now on the pad */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
 
   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
 
   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
@@ -265,11 +295,21 @@ GST_START_TEST (test_streamheader)
   fail_unless_read ("second client", pfd2[0], 4, "deaf");
   wait_bytes_served (sink, 36);
 
   fail_unless_read ("second client", pfd2[0], 4, "deaf");
   wait_bytes_served (sink, 36);
 
-  gst_buffer_unref (hbuf1);
-  gst_buffer_unref (hbuf2);
   GST_DEBUG ("cleaning up multifdsink");
   GST_DEBUG ("cleaning up multifdsink");
+
+  g_signal_emit_by_name (sink, "remove", pfd1[1]);
+  g_signal_emit_by_name (sink, "remove", pfd2[1]);
+
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
   cleanup_multifdsink (sink);
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
   cleanup_multifdsink (sink);
+
+  ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
+  ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
+  gst_buffer_unref (hbuf1);
+  gst_buffer_unref (hbuf2);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
 }
 
 GST_END_TEST;
 }
 
 GST_END_TEST;
@@ -306,10 +346,15 @@ GST_START_TEST (test_change_streamheader)
 
   /* create caps with streamheader, set the caps, and push the IN_CAPS
    * buffers */
 
   /* create caps with streamheader, set the caps, and push the IN_CAPS
    * buffers */
-  gst_multifdsink_create_streamheader ("babe", "deadbeef", &hbuf1, &hbuf2,
+  gst_multifdsink_create_streamheader ("first", "header", &hbuf1, &hbuf2,
       &caps);
   fail_unless (gst_pad_set_caps (mysrcpad, caps));
       &caps);
   fail_unless (gst_pad_set_caps (mysrcpad, caps));
-  gst_caps_unref (caps);
+  /* one is ours, two on the buffers, and one now on the pad */
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 4);
+
+  /* one to hold for the test and one to give away */
+  ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
+  ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
 
   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
 
   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
@@ -327,22 +372,32 @@ GST_START_TEST (test_change_streamheader)
   memcpy (GST_BUFFER_DATA (buf), "f00d", 4);
   gst_pad_push (mysrcpad, buf);
 
   memcpy (GST_BUFFER_DATA (buf), "f00d", 4);
   gst_pad_push (mysrcpad, buf);
 
-  fail_unless_read ("change: first client", pfd1[0], 4, "babe");
-  fail_unless_read ("change: first client", pfd1[0], 8, "deadbeef");
+  fail_unless_read ("change: first client", pfd1[0], 5, "first");
+  fail_unless_read ("change: first client", pfd1[0], 6, "header");
   fail_unless_read ("change: first client", pfd1[0], 4, "f00d");
   fail_unless_read ("change: first client", pfd1[0], 4, "f00d");
-  wait_bytes_served (sink, 16);
+  //wait_bytes_served (sink, 16);
 
   /* now add the second client */
   g_signal_emit_by_name (sink, "add", pfd2[1]);
   fail_if_can_read ("second client, no buffer", pfd2[0]);
 
   /* change the streamheader */
 
   /* now add the second client */
   g_signal_emit_by_name (sink, "add", pfd2[1]);
   fail_if_can_read ("second client, no buffer", pfd2[0]);
 
   /* change the streamheader */
+
+  /* before we change, multifdsink still has a list of the old streamheaders */
+  ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
+  ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
   gst_buffer_unref (hbuf1);
   gst_buffer_unref (hbuf2);
   gst_buffer_unref (hbuf1);
   gst_buffer_unref (hbuf2);
-  gst_multifdsink_create_streamheader ("beef", "deadbabe", &hbuf1, &hbuf2,
+
+  /* drop our ref to the previous caps */
+  gst_caps_unref (caps);
+
+  gst_multifdsink_create_streamheader ("second", "header", &hbuf1, &hbuf2,
       &caps);
   fail_unless (gst_pad_set_caps (mysrcpad, caps));
       &caps);
   fail_unless (gst_pad_set_caps (mysrcpad, caps));
-  gst_caps_unref (caps);
+  /* one to hold for the test and one to give away */
+  ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 2);
+  ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 2);
 
   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
 
   fail_unless (gst_pad_push (mysrcpad, hbuf1) == GST_FLOW_OK);
   fail_unless (gst_pad_push (mysrcpad, hbuf2) == GST_FLOW_OK);
@@ -353,25 +408,35 @@ GST_START_TEST (test_change_streamheader)
 
   /* now push another buffer, which will trigger streamheader for second
    * client, but should also send new streamheaders to first client */
 
   /* now push another buffer, which will trigger streamheader for second
    * client, but should also send new streamheaders to first client */
-  buf = gst_buffer_new_and_alloc (4);
-  memcpy (GST_BUFFER_DATA (buf), "deaf", 4);
+  buf = gst_buffer_new_and_alloc (8);
+  memcpy (GST_BUFFER_DATA (buf), "deadbabe", 8);
   gst_pad_push (mysrcpad, buf);
 
   gst_pad_push (mysrcpad, buf);
 
-  /* FIXME: here's a bug - the first client does not get new streamheaders */
-  fail_unless_read ("first client", pfd1[0], 4, "deaf");
+  fail_unless_read ("first client", pfd1[0], 6, "second");
+  fail_unless_read ("first client", pfd1[0], 6, "header");
+  fail_unless_read ("first client", pfd1[0], 8, "deadbabe");
 
   /* new streamheader data */
 
   /* new streamheader data */
-  fail_unless_read ("second client", pfd2[0], 4, "beef");
-  fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
+  fail_unless_read ("second client", pfd2[0], 6, "second");
+  fail_unless_read ("second client", pfd2[0], 6, "header");
   /* we missed the f00d buffer */
   /* we missed the f00d buffer */
-  fail_unless_read ("second client", pfd2[0], 4, "deaf");
-  wait_bytes_served (sink, 36);
+  fail_unless_read ("second client", pfd2[0], 8, "deadbabe");
+  //wait_bytes_served (sink, 36);
 
 
-  gst_buffer_unref (hbuf1);
-  gst_buffer_unref (hbuf2);
   GST_DEBUG ("cleaning up multifdsink");
   GST_DEBUG ("cleaning up multifdsink");
+  g_signal_emit_by_name (sink, "remove", pfd1[1]);
+  g_signal_emit_by_name (sink, "remove", pfd2[1]);
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
   ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+
+  /* setting to NULL should have cleared the streamheader */
+  ASSERT_BUFFER_REFCOUNT (hbuf1, "hbuf1", 1);
+  ASSERT_BUFFER_REFCOUNT (hbuf2, "hbuf2", 1);
+  gst_buffer_unref (hbuf1);
+  gst_buffer_unref (hbuf2);
   cleanup_multifdsink (sink);
   cleanup_multifdsink (sink);
+
+  ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
+  gst_caps_unref (caps);
 }
 
 GST_END_TEST;
 }
 
 GST_END_TEST;