2 * Copyright (C) 2011 David Schleef <ds@schleef.org>
3 * Copyright (C) 2011 Tim-Philipp Müller <tim.muller@collabora.co.uk>
4 * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
5 * Copyright (C) 2014 Vincent Penquerc'h <vincent@collabora.co.uk>
7 * gstelements_private.c: Shared code for core elements
9 * This library is free software; you can redistribute it and/or
10 * modify it under the terms of the GNU Library General Public
11 * License as published by the Free Software Foundation; either
12 * version 2 of the License, or (at your option) any later version.
14 * This library is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
17 * Library General Public License for more details.
19 * You should have received a copy of the GNU Library General Public
20 * License along with this library; if not, write to the
21 * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22 * Boston, MA 02110-1301, USA.
35 #include <sys/types.h>
40 #include "gstelements_private.h"
43 # include <io.h> /* lseek, open, close, read */
45 # define lseek _lseeki64
47 # define off_t guint64
48 # define WIN32_LEAN_AND_MEAN /* prevents from including too many things */
50 # undef WIN32_LEAN_AND_MEAN
52 # define EWOULDBLOCK EAGAIN
54 #endif /* G_OS_WIN32 */
56 #define BUFFER_FLAG_SHIFT 4
58 G_STATIC_ASSERT ((1 << BUFFER_FLAG_SHIFT) == GST_MINI_OBJECT_FLAG_LAST);
60 /* Returns a newly allocated string describing the flags on this buffer */
62 gst_buffer_get_flags_string (GstBuffer * buffer)
64 static const char flag_strings[] =
65 "\000\000\000\000live\000decode-only\000discont\000resync\000corrupted\000"
66 "marker\000header\000gap\000droppable\000delta-unit\000tag-memory\000"
67 "sync-after\000non-droppable\000FIXME";
68 static const guint8 flag_idx[] = { 0, 1, 2, 3, 4, 9, 21, 29, 36, 46, 53,
69 60, 64, 74, 85, 96, 107, 121,
74 /* max size is all flag strings plus a space or terminator after each one */
75 max_bytes = sizeof (flag_strings);
76 flag_str = g_malloc (max_bytes);
80 for (i = BUFFER_FLAG_SHIFT; i < G_N_ELEMENTS (flag_idx); i++) {
81 if (GST_MINI_OBJECT_CAST (buffer)->flags & (1 << i)) {
82 strcpy (end, flag_strings + flag_idx[i]);
93 /* Returns a newly-allocated string describing the metas on this buffer, or NULL */
95 gst_buffer_get_meta_string (GstBuffer * buffer)
97 gpointer state = NULL;
101 while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
102 const gchar *desc = g_type_name (meta->info->type);
105 s = g_string_new (NULL);
107 g_string_append (s, ", ");
109 g_string_append (s, desc);
112 return (s != NULL) ? g_string_free (s, FALSE) : NULL;
115 /* Define our own iovec structure here, so that we can use it unconditionally
116 * in the code below and use almost the same code path for systems where
117 * writev() is supported and those were it's not supported */
118 #ifndef HAVE_SYS_UIO_H
126 /* completely arbitrary thresholds */
127 #define FDSINK_MAX_ALLOCA_SIZE (64 * 1024) /* 64k */
128 #define FDSINK_MAX_MALLOC_SIZE ( 8 * 1024 * 1024) /* 8M */
130 /* Adapted from GLib (gio/gioprivate.h)
132 * POSIX defines IOV_MAX/UIO_MAXIOV as the maximum number of iovecs that can
133 * be sent in one go. We define our own version of it here as there are two
134 * possible names, and also define a fall-back value if none of the constants
137 #define GST_IOV_MAX IOV_MAX
138 #elif defined(UIO_MAXIOV)
139 #define GST_IOV_MAX UIO_MAXIOV
140 #elif defined(__APPLE__)
141 /* For osx/ios, UIO_MAXIOV is documented in writev(2), but <sys/uio.h>
142 * only declares it if defined(KERNEL) */
143 #define GST_IOV_MAX 512
145 /* 16 is the minimum value required by POSIX */
146 #define GST_IOV_MAX 16
150 gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes)
154 #ifdef HAVE_SYS_UIO_H
155 if (iovcnt <= GST_IOV_MAX) {
157 written = writev (fd, iov, iovcnt);
158 } while (written < 0 && errno == EINTR);
164 /* We merge the memories here because technically write()/writev() is
165 * supposed to be atomic, which it's not if we do multiple separate
166 * write() calls. It's very doubtful anyone cares though in our use
167 * cases, and it's not clear how that can be reconciled with the
168 * possibility of short writes, so in any case we might want to
169 * simplify this later or just remove it. */
170 if (iovcnt > 1 && total_bytes <= FDSINK_MAX_MALLOC_SIZE) {
173 if (total_bytes <= FDSINK_MAX_ALLOCA_SIZE)
174 mem = g_alloca (total_bytes);
176 mem = g_malloc (total_bytes);
179 for (i = 0; i < iovcnt; ++i) {
180 memcpy (p, iov[i].iov_base, iov[i].iov_len);
185 written = write (fd, mem, total_bytes);
186 } while (written < 0 && errno == EINTR);
188 if (total_bytes > FDSINK_MAX_ALLOCA_SIZE)
194 for (i = 0; i < iovcnt; ++i) {
196 ret = write (fd, iov[i].iov_base, iov[i].iov_len);
197 } while (ret < 0 && errno == EINTR);
200 if (ret != iov[i].iov_len)
210 gst_writev_iovecs (GstObject * sink, gint fd, GstPoll * fdset,
211 struct iovec *vecs, guint n_vecs, gsize bytes_to_write,
212 guint64 * bytes_written, gint max_transient_error_timeout,
213 guint64 current_position, gboolean * flushing)
215 GstFlowReturn flow_ret;
216 gint64 start_time = 0;
219 max_transient_error_timeout *= 1000;
220 if (max_transient_error_timeout)
221 start_time = g_get_monotonic_time ();
223 GST_LOG_OBJECT (sink, "%u iovecs", n_vecs);
225 /* now write it all out! */
229 left = bytes_to_write;
232 if (flushing != NULL && g_atomic_int_get (flushing)) {
233 GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
234 flow_ret = GST_FLOW_FLUSHING;
240 GST_DEBUG_OBJECT (sink, "going into select, have %" G_GSSIZE_FORMAT
241 " bytes to write", left);
242 ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE);
243 } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
254 ret = gst_writev (fd, vecs, n_vecs, left);
257 /* Wrote something, allow the caller to update the vecs passed here */
258 *bytes_written = ret;
262 if (errno == EAGAIN || errno == EWOULDBLOCK || ret == 0) {
263 /* do nothing, try again */
264 if (max_transient_error_timeout)
265 start_time = g_get_monotonic_time ();
266 } else if (errno == EACCES && max_transient_error_timeout > 0) {
267 /* seek back to where we started writing and try again after sleeping
270 * Some network file systems report EACCES spuriously, presumably
271 * because at the same time another client is reading the file.
272 * It happens at least on Linux and macOS on SMB/CIFS and NFS file
275 * Note that NFS does not check access permissions during open()
276 * but only on write()/read() according to open(2), so we would
277 * loop here in case of NFS.
279 if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
280 GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
281 max_transient_error_timeout);
284 GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
285 g_assert (current_position != -1);
288 /* Seek back to the current position, sometimes a partial write
289 * happened and we have no idea how much and if what was written
290 * is actually correct (it sometimes isn't)
292 ret = lseek (fd, current_position, SEEK_SET);
293 if (ret < 0 || ret != current_position) {
294 GST_ERROR_OBJECT (sink,
295 "failed to seek back to current write position");
302 /* do short sleep on windows where we don't use gst_poll(),
303 * to avoid excessive busy looping */
311 flow_ret = GST_FLOW_OK;
321 GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
322 ("select on file descriptor: %s", g_strerror (errno)));
323 GST_DEBUG_OBJECT (sink, "Error during select: %s", g_strerror (errno));
324 flow_ret = GST_FLOW_ERROR;
329 GST_DEBUG_OBJECT (sink, "Select stopped");
330 flow_ret = GST_FLOW_FLUSHING;
338 GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
341 GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
342 ("Error while writing to file descriptor %d: %s",
343 fd, g_strerror (errno)));
346 flow_ret = GST_FLOW_ERROR;
352 gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset,
354 guint64 * bytes_written, guint64 skip,
355 gint max_transient_error_timeout, guint64 current_position,
358 GstFlowReturn flow_ret = GST_FLOW_OK;
361 guint i, num_mem, num_vecs;
364 /* Buffers can contain up to 16 memories, so we can safely directly call
365 * writev() here without splitting up */
366 g_assert (gst_buffer_get_max_memory () <= GST_IOV_MAX);
368 num_mem = num_vecs = gst_buffer_n_memory (buffer);
370 GST_DEBUG ("Writing buffer %p with %u memories and %" G_GSIZE_FORMAT " bytes",
371 buffer, num_mem, gst_buffer_get_size (buffer));
373 vecs = g_newa (struct iovec, num_mem);
374 maps = g_newa (GstMapInfo, num_mem);
376 /* Map all memories */
381 for (i = 0; i < num_mem; ++i) {
382 mem = gst_buffer_peek_memory (buffer, i);
383 if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
384 vecs[i].iov_base = maps[i].data;
385 vecs[i].iov_len = maps[i].size;
387 GST_WARNING ("Failed to map memory %p for reading", mem);
388 vecs[i].iov_base = (void *) "";
391 left += vecs[i].iov_len;
396 guint64 bytes_written_local = 0;
399 gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, left,
400 &bytes_written_local, max_transient_error_timeout, current_position,
403 GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
404 bytes_written_local, left, gst_flow_get_name (flow_ret));
406 if (flow_ret != GST_FLOW_OK) {
407 g_assert (bytes_written_local == 0);
412 *bytes_written += bytes_written_local;
414 /* Done, no need to do bookkeeping */
415 if (bytes_written_local == left)
418 /* skip vectors that have been written in full */
419 while (bytes_written_local >= vecs[0].iov_len) {
420 bytes_written_local -= vecs[0].iov_len;
421 left -= vecs[0].iov_len;
425 g_assert (num_vecs > 0);
426 /* skip partially written vector data */
427 if (bytes_written_local > 0) {
428 vecs[0].iov_len -= bytes_written_local;
429 vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + bytes_written_local;
430 left -= bytes_written_local;
434 for (i = 0; i < num_mem; i++)
435 gst_memory_unmap (maps[i].memory, &maps[i]);
441 gst_writev_mem (GstObject * sink, gint fd, GstPoll * fdset,
442 const guint8 * data, guint size,
443 guint64 * bytes_written, guint64 skip,
444 gint max_transient_error_timeout, guint64 current_position,
447 GstFlowReturn flow_ret = GST_FLOW_OK;
451 GST_DEBUG ("Writing memory %p with %u bytes", data, size);
454 vec.iov_base = (guint8 *) data;
458 guint64 bytes_written_local = 0;
461 gst_writev_iovecs (sink, fd, fdset, &vec, 1, left,
462 &bytes_written_local, max_transient_error_timeout, current_position,
465 GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
466 bytes_written_local, left, gst_flow_get_name (flow_ret));
468 if (flow_ret != GST_FLOW_OK) {
469 g_assert (bytes_written_local == 0);
474 *bytes_written += bytes_written_local;
476 /* All done, no need for bookkeeping */
477 if (bytes_written_local == left)
480 /* skip partially written vector data */
481 if (bytes_written_local < left) {
482 vec.iov_len -= bytes_written_local;
483 vec.iov_base = ((guint8 *) vec.iov_base) + bytes_written_local;
484 left -= bytes_written_local;
492 gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset,
493 GstBufferList * buffer_list,
494 guint64 * bytes_written, guint64 skip,
495 gint max_transient_error_timeout, guint64 current_position,
498 GstFlowReturn flow_ret = GST_FLOW_OK;
501 guint num_bufs, current_buf_idx = 0, current_buf_mem_idx = 0;
505 num_bufs = gst_buffer_list_length (buffer_list);
508 GST_DEBUG ("Writing buffer list %p with %u buffers", buffer_list, num_bufs);
510 vecs = g_newa (struct iovec, GST_IOV_MAX);
511 maps = g_newa (GstMapInfo, GST_IOV_MAX);
513 /* Map the first GST_IOV_MAX memories */
519 for (i = 0; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
522 buf = gst_buffer_list_get (buffer_list, i);
523 num_mem = gst_buffer_n_memory (buf);
525 for (j = 0; j < num_mem && num_vecs < GST_IOV_MAX; j++) {
526 mem = gst_buffer_peek_memory (buf, j);
527 if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
528 vecs[num_vecs].iov_base = maps[num_vecs].data;
529 vecs[num_vecs].iov_len = maps[num_vecs].size;
531 GST_WARNING ("Failed to map memory %p for reading", mem);
532 vecs[num_vecs].iov_base = (void *) "";
533 vecs[num_vecs].iov_len = 0;
535 left += vecs[num_vecs].iov_len;
538 current_buf_mem_idx = j;
540 current_buf_mem_idx = 0;
546 guint64 bytes_written_local = 0;
547 guint vecs_written = 0;
550 gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, left,
551 &bytes_written_local, max_transient_error_timeout, current_position,
554 GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
555 bytes_written_local, left, gst_flow_get_name (flow_ret));
557 if (flow_ret != GST_FLOW_OK) {
558 g_assert (bytes_written_local == 0);
562 if (flow_ret != GST_FLOW_OK) {
563 g_assert (bytes_written_local == 0);
568 *bytes_written += bytes_written_local;
570 /* All done, no need for bookkeeping */
571 if (bytes_written_local == left && current_buf_idx == num_bufs)
574 /* skip vectors that have been written in full */
575 while (vecs_written < num_vecs
576 && bytes_written_local >= vecs[vecs_written].iov_len) {
577 bytes_written_local -= vecs[vecs_written].iov_len;
578 left -= vecs[vecs_written].iov_len;
581 g_assert (vecs_written < num_vecs || bytes_written_local == 0);
582 /* skip partially written vector data */
583 if (bytes_written_local > 0) {
584 vecs[vecs_written].iov_len -= bytes_written_local;
585 vecs[vecs_written].iov_base =
586 ((guint8 *) vecs[0].iov_base) + bytes_written_local;
587 left -= bytes_written_local;
590 /* If we have buffers left, fill them in now */
591 if (current_buf_idx < num_bufs) {
594 guint j = current_buf_mem_idx;
596 /* Unmap the first vecs_written memories now */
597 for (i = 0; i < vecs_written; i++)
598 gst_memory_unmap (maps[i].memory, &maps[i]);
599 /* Move upper remaining vecs and maps back to the beginning */
600 memmove (vecs, &vecs[vecs_written],
601 (num_vecs - vecs_written) * sizeof (vecs[0]));
602 memmove (maps, &maps[vecs_written],
603 (num_vecs - vecs_written) * sizeof (maps[0]));
604 num_vecs -= vecs_written;
606 /* And finally refill */
607 for (i = current_buf_idx; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
610 buf = gst_buffer_list_get (buffer_list, i);
611 num_mem = gst_buffer_n_memory (buf);
613 for (j = current_buf_mem_idx; j < num_mem && num_vecs < GST_IOV_MAX;
615 mem = gst_buffer_peek_memory (buf, j);
616 if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
617 vecs[num_vecs].iov_base = maps[num_vecs].data;
618 vecs[num_vecs].iov_len = maps[num_vecs].size;
620 GST_WARNING ("Failed to map memory %p for reading", mem);
621 vecs[num_vecs].iov_base = (void *) "";
622 vecs[num_vecs].iov_len = 0;
624 left += vecs[num_vecs].iov_len;
627 current_buf_mem_idx = j;
628 if (current_buf_mem_idx == num_mem)
629 current_buf_mem_idx = 0;
635 for (i = 0; i < num_vecs; i++)
636 gst_memory_unmap (maps[i].memory, &maps[i]);