return written;
}
-static gsize
-fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
+static GstFlowReturn
+gst_writev_iovecs (GstObject * sink, gint fd, GstPoll * fdset,
+ struct iovec *vecs, guint n_vecs,
+ guint64 * bytes_written, gint max_transient_error_timeout,
+ guint64 current_position, gboolean * flushing)
{
- GstMemory *mem;
- gsize size = 0;
- guint i;
-
- g_assert (gst_buffer_n_memory (buf) == n);
-
- for (i = 0; i < n; ++i) {
- mem = gst_buffer_peek_memory (buf, i);
- if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
- vecs[i].iov_base = maps[i].data;
- vecs[i].iov_len = maps[i].size;
- } else {
- GST_WARNING ("Failed to map memory %p for reading", mem);
- vecs[i].iov_base = (void *) "";
- vecs[i].iov_len = 0;
- }
- size += vecs[i].iov_len;
- }
-
- return size;
-}
-
-GstFlowReturn
-gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
- GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
- guint total_mem_num, guint64 * bytes_written, guint64 skip,
- gint max_transient_error_timeout, guint64 current_position,
- gboolean * flushing)
-{
- struct iovec *vecs;
- GstMapInfo *map_infos;
GstFlowReturn flow_ret;
gsize size = 0;
- guint i, j;
gint64 start_time = 0;
+ *bytes_written = 0;
max_transient_error_timeout *= 1000;
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
- GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
-
- vecs = g_newa (struct iovec, total_mem_num);
- map_infos = g_newa (GstMapInfo, total_mem_num);
-
- /* populate output vectors */
- for (i = 0, j = 0; i < num_buffers; ++i) {
- size += fill_vectors (&vecs[j], &map_infos[j], mem_nums[i], buffers[i]);
- j += mem_nums[i];
- }
+ GST_LOG_OBJECT (sink, "%u iovecs", n_vecs);
/* now write it all out! */
{
gssize ret, left;
- guint n_vecs = total_mem_num;
left = size;
- if (skip) {
- ret = skip;
- errno = 0;
- goto skip_first;
- }
-
do {
if (flushing != NULL && g_atomic_int_get (flushing)) {
GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
ret = gst_writev (fd, vecs, n_vecs, left);
if (ret > 0) {
- if (bytes_written)
- *bytes_written += ret;
- }
-
- skip_first:
-
- if (ret == left)
+ /* Wrote something, allow the caller to update the vecs passed here */
+ *bytes_written = ret;
break;
+ }
- if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK || ret == 0) {
/* do nothing, try again */
if (max_transient_error_timeout)
start_time = g_get_monotonic_time ();
- } else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) {
+ } else if (errno == EACCES && max_transient_error_timeout > 0) {
/* seek back to where we started writing and try again after sleeping
* for 10ms.
*
* happened and we have no idea how much and if what was written
* is actually correct (it sometimes isn't)
*/
- ret = lseek (fd, current_position + *bytes_written, SEEK_SET);
- if (ret < 0 || ret != current_position + *bytes_written) {
+ ret = lseek (fd, current_position, SEEK_SET);
+ if (ret < 0 || ret != current_position) {
GST_ERROR_OBJECT (sink,
"failed to seek back to current write position");
goto write_error;
}
- } else if (ret < 0) {
+ } else {
goto write_error;
- } else { /* if (ret < left) */
- if (max_transient_error_timeout)
- start_time = g_get_monotonic_time ();
- /* skip vectors that have been written in full */
- while (ret >= vecs[0].iov_len) {
- ret -= vecs[0].iov_len;
- left -= vecs[0].iov_len;
- ++vecs;
- --n_vecs;
- }
- g_assert (n_vecs > 0);
- /* skip partially written vector data */
- if (ret > 0) {
- vecs[0].iov_len -= ret;
- vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + ret;
- left -= ret;
- }
}
#ifdef HAVE_WIN32
/* do short sleep on windows where we don't use gst_poll(),
if (fdset != NULL)
g_usleep (1000);
#endif
-
}
while (left > 0);
}
out:
- for (i = 0; i < total_mem_num; ++i)
- gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
-
return flow_ret;
/* ERRORS */
goto out;
}
}
+
+GstFlowReturn
+gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset,
+ GstBuffer * buffer,
+ guint64 * bytes_written, guint64 skip,
+ gint max_transient_error_timeout, guint64 current_position,
+ gboolean * flushing)
+{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
+ struct iovec *vecs;
+ GstMapInfo *maps;
+ guint i, num_mem, num_vecs;
+ gsize left;
+
+ /* Buffers can contain up to 16 memories, so we can safely directly call
+ * writev() here without splitting up */
+ g_assert (gst_buffer_get_max_memory () <= GST_IOV_MAX);
+
+ num_mem = num_vecs = gst_buffer_n_memory (buffer);
+
+ GST_DEBUG ("Writing buffer %p with %u memories and %" G_GSIZE_FORMAT " bytes",
+ buffer, num_mem, gst_buffer_get_size (buffer));
+
+ vecs = g_newa (struct iovec, num_mem);
+ maps = g_newa (GstMapInfo, num_mem);
+
+ /* Map all memories */
+ {
+ GstMemory *mem;
+ guint i;
+
+ left = 0;
+ for (i = 0; i < num_mem; ++i) {
+ mem = gst_buffer_peek_memory (buffer, i);
+ if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
+ vecs[i].iov_base = maps[i].data;
+ vecs[i].iov_len = maps[i].size;
+ } else {
+ GST_WARNING ("Failed to map memory %p for reading", mem);
+ vecs[i].iov_base = (void *) "";
+ vecs[i].iov_len = 0;
+ }
+ left += vecs[i].iov_len;
+ }
+ }
+
+ do {
+ guint64 bytes_written_local = 0;
+
+ flow_ret =
+ gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs,
+ &bytes_written_local, max_transient_error_timeout, current_position,
+ flushing);
+
+ GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
+ bytes_written_local, left, gst_flow_get_name (flow_ret));
+
+ if (flow_ret != GST_FLOW_OK) {
+ g_assert (bytes_written_local == 0);
+ break;
+ }
+
+ if (bytes_written)
+ *bytes_written += bytes_written_local;
+
+ /* Done, no need to do bookkeeping */
+ if (bytes_written_local == left)
+ break;
+
+ /* skip vectors that have been written in full */
+ while (bytes_written_local >= vecs[0].iov_len) {
+ bytes_written_local -= vecs[0].iov_len;
+ left -= vecs[0].iov_len;
+ ++vecs;
+ --num_vecs;
+ }
+ g_assert (num_vecs > 0);
+ /* skip partially written vector data */
+ if (bytes_written_local > 0) {
+ vecs[0].iov_len -= bytes_written_local;
+ vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + bytes_written_local;
+ left -= bytes_written_local;
+ }
+ } while (left > 0);
+
+ for (i = 0; i < num_mem; i++)
+ gst_memory_unmap (maps[i].memory, &maps[i]);
+
+ return flow_ret;
+}
+
+GstFlowReturn
+gst_write_mem (GstObject * sink, gint fd, GstPoll * fdset,
+ const guint8 * data, guint size,
+ guint64 * bytes_written, guint64 skip,
+ gint max_transient_error_timeout, guint64 current_position,
+ gboolean * flushing)
+{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
+ struct iovec vec;
+ gsize left;
+
+ GST_DEBUG ("Writing memory %p with %u bytes", data, size);
+
+ vec.iov_len = size;
+ vec.iov_base = (guint8 *) data;
+ left = size;
+
+ do {
+ guint64 bytes_written_local = 0;
+
+ flow_ret =
+ gst_writev_iovecs (sink, fd, fdset, &vec, 1,
+ &bytes_written_local, max_transient_error_timeout, current_position,
+ flushing);
+
+ GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
+ bytes_written_local, left, gst_flow_get_name (flow_ret));
+
+ if (flow_ret != GST_FLOW_OK) {
+ g_assert (bytes_written_local == 0);
+ break;
+ }
+
+ if (bytes_written)
+ *bytes_written += bytes_written_local;
+
+ /* skip partially written vector data */
+ if (bytes_written_local < left) {
+ vec.iov_len -= bytes_written_local;
+ vec.iov_base = ((guint8 *) vec.iov_base) + bytes_written_local;
+ left -= bytes_written_local;
+ }
+ } while (left > 0);
+
+ return flow_ret;
+}
+
+GstFlowReturn
+gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset,
+ GstBufferList * buffer_list,
+ guint64 * bytes_written, guint64 skip,
+ gint max_transient_error_timeout, guint64 current_position,
+ gboolean * flushing)
+{
+ GstFlowReturn flow_ret = GST_FLOW_OK;
+ struct iovec *vecs;
+ GstMapInfo *maps;
+ guint num_bufs, current_buf_idx = 0, current_buf_mem_idx = 0;
+ guint i, num_vecs;
+ gsize left;
+
+ num_bufs = gst_buffer_list_length (buffer_list);
+ num_vecs = 0;
+
+ GST_DEBUG ("Writing buffer list %p with %u buffers", buffer_list, num_bufs);
+
+ vecs = g_newa (struct iovec, GST_IOV_MAX);
+ maps = g_newa (GstMapInfo, GST_IOV_MAX);
+
+ /* Map the first GST_IOV_MAX memories */
+ {
+ GstBuffer *buf;
+ GstMemory *mem;
+ guint j = 0;
+
+ left = 0;
+ for (i = 0; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
+ guint num_mem;
+
+ buf = gst_buffer_list_get (buffer_list, i);
+ num_mem = gst_buffer_n_memory (buf);
+
+ for (j = 0; j < num_mem && num_vecs < GST_IOV_MAX; j++) {
+ mem = gst_buffer_peek_memory (buf, j);
+ if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
+ vecs[num_vecs].iov_base = maps[num_vecs].data;
+ vecs[num_vecs].iov_len = maps[num_vecs].size;
+ } else {
+ GST_WARNING ("Failed to map memory %p for reading", mem);
+ vecs[num_vecs].iov_base = (void *) "";
+ vecs[num_vecs].iov_len = 0;
+ }
+ left += vecs[num_vecs].iov_len;
+ num_vecs++;
+ }
+ }
+ current_buf_idx = i;
+ current_buf_mem_idx = j;
+ }
+
+ do {
+ guint64 bytes_written_local = 0;
+ guint vecs_written = 0;
+
+ flow_ret =
+ gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs,
+ &bytes_written_local, max_transient_error_timeout, current_position,
+ flushing);
+
+ GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
+ bytes_written_local, left, gst_flow_get_name (flow_ret));
+
+ if (flow_ret != GST_FLOW_OK) {
+ g_assert (bytes_written_local == 0);
+ break;
+ }
+
+ if (flow_ret != GST_FLOW_OK) {
+ g_assert (bytes_written_local == 0);
+ break;
+ }
+
+ if (bytes_written)
+ *bytes_written += bytes_written_local;
+
+ /* All done, no need for bookkeeping */
+ if (bytes_written_local == left && current_buf_idx == num_bufs)
+ break;
+
+ /* skip vectors that have been written in full */
+ while (vecs_written < num_vecs
+ && bytes_written_local >= vecs[vecs_written].iov_len) {
+ bytes_written_local -= vecs[vecs_written].iov_len;
+ left -= vecs[vecs_written].iov_len;
+ vecs_written++;
+ }
+ g_assert (vecs_written < num_vecs || bytes_written_local == 0);
+ /* skip partially written vector data */
+ if (bytes_written_local > 0) {
+ vecs[vecs_written].iov_len -= bytes_written_local;
+ vecs[vecs_written].iov_base =
+ ((guint8 *) vecs[0].iov_base) + bytes_written_local;
+ left -= bytes_written_local;
+ }
+
+ /* If we have buffers left, fill them in now */
+ if (current_buf_idx < num_bufs) {
+ GstBuffer *buf;
+ GstMemory *mem;
+ guint j = current_buf_mem_idx;
+
+ /* Unmap the first vecs_written memories now */
+ for (i = 0; i < vecs_written; i++)
+ gst_memory_unmap (maps[i].memory, &maps[i]);
+ /* Move upper remaining vecs and maps back to the beginning */
+ memmove (vecs, &vecs[vecs_written],
+ (num_vecs - vecs_written) * sizeof (vecs[0]));
+ memmove (maps, &maps[vecs_written],
+ (num_vecs - vecs_written) * sizeof (maps[0]));
+ num_vecs -= vecs_written;
+
+ /* And finally refill */
+ for (i = current_buf_idx; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
+ guint num_mem;
+
+ buf = gst_buffer_list_get (buffer_list, i);
+ num_mem = gst_buffer_n_memory (buf);
+
+ for (j = current_buf_mem_idx; j < num_mem && num_vecs < GST_IOV_MAX;
+ j++) {
+ mem = gst_buffer_peek_memory (buf, j);
+ if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
+ vecs[num_vecs].iov_base = maps[num_vecs].data;
+ vecs[num_vecs].iov_len = maps[num_vecs].size;
+ } else {
+ GST_WARNING ("Failed to map memory %p for reading", mem);
+ vecs[num_vecs].iov_base = (void *) "";
+ vecs[num_vecs].iov_len = 0;
+ }
+ left += vecs[num_vecs].iov_len;
+ num_vecs++;
+ }
+ }
+ current_buf_idx = i;
+ current_buf_mem_idx = j;
+ }
+ } while (left > 0);
+
+ for (i = 0; i < num_vecs; i++)
+ gst_memory_unmap (maps[i].memory, &maps[i]);
+
+ return flow_ret;
+}
}
static GstFlowReturn
-gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
- guint num_buffers, guint8 * mem_nums, guint total_mems)
+gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
{
+ GstFdSink *sink;
GstFlowReturn ret;
guint64 skip = 0;
+ guint num_buffers;
+
+ sink = GST_FD_SINK_CAST (bsink);
+
+ num_buffers = gst_buffer_list_length (buffer_list);
+ if (num_buffers == 0)
+ goto no_data;
for (;;) {
guint64 bytes_written = 0;
- ret = gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
- buffers, num_buffers, mem_nums, total_mems, &bytes_written, skip,
- 0, -1, NULL);
+ ret = gst_writev_buffer_list (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
+ buffer_list, &bytes_written, skip, 0, -1, NULL);
sink->bytes_written += bytes_written;
sink->current_pos += bytes_written;
}
return ret;
-}
-
-static GstFlowReturn
-gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
-{
- GstFlowReturn flow;
- GstBuffer **buffers;
- GstFdSink *sink;
- guint8 *mem_nums;
- guint total_mems;
- guint i, num_buffers;
-
- sink = GST_FD_SINK_CAST (bsink);
-
- num_buffers = gst_buffer_list_length (buffer_list);
- if (num_buffers == 0)
- goto no_data;
-
- /* extract buffers from list and count memories */
- buffers = g_newa (GstBuffer *, num_buffers);
- mem_nums = g_newa (guint8, num_buffers);
- for (i = 0, total_mems = 0; i < num_buffers; ++i) {
- buffers[i] = gst_buffer_list_get (buffer_list, i);
- mem_nums[i] = gst_buffer_n_memory (buffers[i]);
- total_mems += mem_nums[i];
- }
-
- flow =
- gst_fd_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
- total_mems);
-
- return flow;
no_data:
{
static GstFlowReturn
gst_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
{
- GstFlowReturn flow;
GstFdSink *sink;
- guint8 n_mem;
+ GstFlowReturn ret;
+ guint64 skip = 0;
sink = GST_FD_SINK_CAST (bsink);
- n_mem = gst_buffer_n_memory (buffer);
+ for (;;) {
+ guint64 bytes_written = 0;
+
+ ret = gst_writev_buffer (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
+ buffer, &bytes_written, skip, 0, -1, NULL);
+
+ sink->bytes_written += bytes_written;
+ sink->current_pos += bytes_written;
+ skip += bytes_written;
+
+ if (!sink->unlock)
+ break;
- if (n_mem > 0)
- flow = gst_fd_sink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
- else
- flow = GST_FLOW_OK;
+ ret = gst_base_sink_wait_preroll (GST_BASE_SINK (sink));
+ if (ret != GST_FLOW_OK)
+ return ret;
+ }
- return flow;
+ return ret;
}
static gboolean
return (ret != (off_t) - 1);
}
-static GstFlowReturn
-gst_file_sink_render_buffers (GstFileSink * sink, GstBuffer ** buffers,
- guint num_buffers, guint8 * mem_nums, guint total_mems, gsize size)
-{
- GstFlowReturn ret;
- guint64 bytes_written = 0;
-
- GST_DEBUG_OBJECT (sink,
- "writing %u buffers (%u memories, %" G_GSIZE_FORMAT
- " bytes) at position %" G_GUINT64_FORMAT, num_buffers, total_mems, size,
- sink->current_pos);
-
- ret = gst_writev_buffers (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
- buffers, num_buffers, mem_nums, total_mems, &bytes_written, 0,
- sink->max_transient_error_timeout, sink->current_pos, &sink->flushing);
-
- sink->current_pos += bytes_written;
-
- return ret;
-}
-
static GstFlowReturn
gst_file_sink_render_list_internal (GstFileSink * sink,
GstBufferList * buffer_list)
{
GstFlowReturn flow;
- GstBuffer **buffers;
- guint8 *mem_nums;
- guint total_mems;
- gsize total_size = 0;
- guint i, num_buffers;
+ guint64 bytes_written = 0;
+ guint num_buffers;
num_buffers = gst_buffer_list_length (buffer_list);
if (num_buffers == 0)
goto no_data;
- /* extract buffers from list and count memories */
- buffers = g_newa (GstBuffer *, num_buffers);
- mem_nums = g_newa (guint8, num_buffers);
- for (i = 0, total_mems = 0; i < num_buffers; ++i) {
- buffers[i] = gst_buffer_list_get (buffer_list, i);
- mem_nums[i] = gst_buffer_n_memory (buffers[i]);
- total_mems += mem_nums[i];
- total_size += gst_buffer_get_size (buffers[i]);
- }
+ GST_DEBUG_OBJECT (sink,
+ "writing %u buffers at position %" G_GUINT64_FORMAT, num_buffers,
+ sink->current_pos);
flow =
- gst_file_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
- total_mems, total_size);
+ gst_writev_buffer_list (GST_OBJECT_CAST (sink), fileno (sink->file), NULL,
+ buffer_list, &bytes_written, 0, sink->max_transient_error_timeout,
+ sink->current_pos, &sink->flushing);
+
+ sink->current_pos += bytes_written;
return flow;
if (n_mem > 0 && (sync_after || !filesink->buffer)) {
flow = gst_file_sink_flush_buffer (filesink);
- if (flow == GST_FLOW_OK)
+ if (flow == GST_FLOW_OK) {
+ guint64 bytes_written = 0;
+
+ GST_DEBUG_OBJECT (sink,
+ "writing buffer ( %" G_GSIZE_FORMAT
+ " bytes) at position %" G_GUINT64_FORMAT,
+ gst_buffer_get_size (buffer), filesink->current_pos);
+
flow =
- gst_file_sink_render_buffers (filesink, &buffer, 1, &n_mem, n_mem,
- gst_buffer_get_size (buffer));
+ gst_writev_buffer (GST_OBJECT_CAST (filesink),
+ fileno (filesink->file), NULL, buffer, &bytes_written, 0,
+ filesink->max_transient_error_timeout, filesink->current_pos,
+ &filesink->flushing);
+
+ filesink->current_pos += bytes_written;
+ }
} else if (n_mem > 0) {
GST_DEBUG_OBJECT (filesink,
"Queueing buffer of %" G_GSIZE_FORMAT " bytes at offset %"