return ret;
}
+/**
+ * map_memory_output_vector_n:
+ * @buf: The #GstBuffer that should be mapped
+ * @offset: Offset into the buffer that should be mapped
+ * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
+ * @mapinfo: (out,array length=num_vectors): an array of #GstMapInfo structs to write into
+ * @num_vectors: the number of elements in @vectors to prevent buffer overruns
+ *
+ * Maps a buffer into memory, populating a #GOutputVector to use scatter-gather
+ * I/O to send the data over a socket. The whole buffer won't be mapped into
+ * memory if it consists of more than @num_vectors #GstMemory s.
+ *
+ * Use #unmap_n_memorys after you are
+ * finished with the mappings.
+ *
+ * Returns: The number of GstMemorys mapped
+ */
+static int
+map_n_memory_output_vector (GstBuffer * buf, size_t offset,
+ GOutputVector * vectors, GstMapInfo * mapinfo, int num_vectors)
+{
+ guint mem_idx, mem_len;
+ gsize mem_skip;
+ size_t maxsize;
+ int i;
+
+ g_return_val_if_fail (num_vectors > 0, 0);
+ memset (vectors, 0, sizeof (GOutputVector) * num_vectors);
+
+ maxsize = gst_buffer_get_size (buf) - offset;
+ if (!gst_buffer_find_memory (buf, offset, maxsize, &mem_idx, &mem_len,
+ &mem_skip))
+ g_error ("Unable to map memory at offset %" G_GSIZE_FORMAT ", buffer "
+ "length is %" G_GSIZE_FORMAT, offset, gst_buffer_get_size (buf));
+
+ for (i = 0; i < mem_len && i < num_vectors; i++) {
+ GstMapInfo map = { 0 };
+ GstMemory *mem = gst_buffer_peek_memory (buf, mem_idx + i);
+ if (!gst_memory_map (mem, &map, GST_MAP_READ))
+ g_error ("Unable to map memory %p. This should never happen.", mem);
+
+ if (i == 0) {
+ vectors[i].buffer = map.data + mem_skip;
+ vectors[i].size = map.size - mem_skip;
+ } else {
+ vectors[i].buffer = map.data;
+ vectors[i].size = map.size;
+ }
+ mapinfo[i] = map;
+ }
+ return i;
+}
+
+/**
+ * map_n_memory_output_vector:
+ * @buf: The #GstBuffer that should be mapped
+ * @offset: Offset into the buffer that should be mapped
+ * @vectors: (out,array length=num_vectors): an array of #GOutputVector structs to write into
+ * @num_vectors: the number of elements in @vectors to prevent buffer overruns
+ *
+ * Returns: The number of GstMemorys mapped
+ */
+static void
+unmap_n_memorys (GstMapInfo * mapinfo, int num_mappings)
+{
+ int i;
+ g_return_if_fail (num_mappings > 0);
+
+ for (i = 0; i < num_mappings; i++)
+ gst_memory_unmap (mapinfo[i].memory, &mapinfo[i]);
+}
+
+static gssize
+gst_multi_socket_sink_write (GstMultiSocketSink * sink,
+ GSocket * sock, GstBuffer * buffer, gsize bufoffset,
+ GCancellable * cancellable, GError ** err)
+{
+ GstMapInfo maps[8];
+ GOutputVector vec[8];
+ guint mems_mapped;
+ gssize wrote;
+
+ mems_mapped = map_n_memory_output_vector (buffer, bufoffset, vec, maps, 8);
+
+ wrote =
+ g_socket_send_message (sock, NULL, vec, mems_mapped, NULL, 0, 0,
+ cancellable, err);
+
+ unmap_n_memorys (maps, mems_mapped);
+
+ return wrote;
+}
+
+
/* Handle a write on a client,
* which indicates a read request from a client.
*
more = TRUE;
do {
- gint maxsize;
-
if (!mhclient->sending) {
/* client is not working on a buffer */
if (mhclient->bufpos == -1) {
if (mhclient->sending) {
gssize wrote;
GstBuffer *head;
- GstMapInfo map;
/* pick first buffer from list */
head = GST_BUFFER (mhclient->sending->data);
- gst_buffer_map (head, &map, GST_MAP_READ);
- maxsize = map.size - mhclient->bufoffset;
-
- /* FIXME: specific */
- /* try to write the complete buffer */
-
- wrote =
- g_socket_send (mhclient->handle.socket,
- (gchar *) map.data + mhclient->bufoffset, maxsize, sink->cancellable,
- &err);
- gst_buffer_unmap (head, &map);
+ wrote = gst_multi_socket_sink_write (sink, mhclient->handle.socket, head,
+ mhclient->bufoffset, sink->cancellable, &err);
if (wrote < 0) {
/* hmm error.. */
goto write_error;
}
} else {
- if (wrote < maxsize) {
+ if (wrote < (gst_buffer_get_size (head) - mhclient->bufoffset)) {
/* partial write, try again now */
GST_LOG_OBJECT (sink,
"partial write on %p of %" G_GSSIZE_FORMAT " bytes",
gst_check_setup_events (mysrcpad, sink, caps, GST_FORMAT_BYTES);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_buffer_fill (buffer, 0, "dead", 4);
+ gst_buffer_append_memory (buffer,
+ gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) " good", 5,
+ 0, 5, NULL, NULL));
fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
GST_DEBUG ("reading");
- fail_if (read_handle (srcsocket, data, 4) < 4);
- fail_unless (strncmp (data, "dead", 4) == 0);
- wait_bytes_served (sink, 4);
+ fail_if (read_handle (srcsocket, data, 9) < 9);
+ fail_unless (strncmp (data, "dead good", 9) == 0);
+ wait_bytes_served (sink, 9);
GST_DEBUG ("cleaning up multisocketsink");
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
GST_END_TEST;
+typedef struct
+{
+ GSocket *sinksocket, *srcsocket;
+ GstElement *sink;
+} TestSinkAndSocket;
+
+static void
+setup_sink_with_socket (TestSinkAndSocket * tsas)
+{
+ GstCaps *caps = NULL;
+
+ tsas->sink = setup_multisocketsink ();
+ fail_unless (setup_handles (&tsas->sinksocket, &tsas->srcsocket));
+
+ ASSERT_SET_STATE (tsas->sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
+
+ /* add the client */
+ g_signal_emit_by_name (tsas->sink, "add", tsas->sinksocket);
+
+ caps = gst_caps_from_string ("application/x-gst-check");
+ gst_check_setup_events (mysrcpad, tsas->sink, caps, GST_FORMAT_BYTES);
+ gst_caps_unref (caps);
+}
+
+static void
+teardown_sink_with_socket (TestSinkAndSocket * tsas)
+{
+ if (tsas->sink != NULL) {
+ ASSERT_SET_STATE (tsas->sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
+ cleanup_multisocketsink (tsas->sink);
+ tsas->sink = 0;
+ }
+ if (tsas->sinksocket != NULL) {
+ g_object_unref (tsas->sinksocket);
+ tsas->sinksocket = 0;
+ }
+ if (tsas->srcsocket != NULL) {
+ g_object_unref (tsas->srcsocket);
+ tsas->srcsocket = 0;
+ }
+}
+
+GST_START_TEST (test_sending_buffers_with_9_gstmemorys)
+{
+ TestSinkAndSocket tsas = { 0 };
+ GstBuffer *buffer;
+ int i;
+ const char *numbers[9] = { "one", "two", "three", "four", "five", "six",
+ "seven", "eight", "nine"
+ };
+ const char numbers_concat[] = "onetwothreefourfivesixseveneightnine";
+ gchar data[sizeof (numbers_concat)];
+ int len = sizeof (numbers_concat) - 1;
+
+ setup_sink_with_socket (&tsas);
+
+ buffer = gst_buffer_new ();
+ for (i = 0; i < sizeof (numbers) / sizeof (*numbers); i++)
+ gst_buffer_append_memory (buffer,
+ gst_memory_new_wrapped (GST_MEMORY_FLAG_READONLY, (gpointer) numbers[i],
+ strlen (numbers[i]), 0, strlen (numbers[i]), NULL, NULL));
+ fail_unless (gst_pad_push (mysrcpad, buffer) == GST_FLOW_OK);
+
+ GST_DEBUG ("reading");
+ fail_if (read_handle (tsas.srcsocket, data, len) < len);
+ fail_unless (strncmp (data, numbers_concat, len) == 0);
+
+ teardown_sink_with_socket (&tsas);
+}
+
+GST_END_TEST;
+
/* from the given two data buffers, create two streamheader buffers and
* some caps that match it, and store them in the given pointers
* returns one ref to each of the buffers and the caps */
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_no_clients);
tcase_add_test (tc_chain, test_add_client);
+ tcase_add_test (tc_chain, test_sending_buffers_with_9_gstmemorys);
tcase_add_test (tc_chain, test_streamheader);
tcase_add_test (tc_chain, test_change_streamheader);
tcase_add_test (tc_chain, test_burst_client_bytes);