c5493b0ad8d6dab4a94d993e667899f5b9c5656d
[platform/upstream/gstreamer.git] / plugins / elements / gstelements_private.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@schleef.org>
3  * Copyright (C) 2011 Tim-Philipp Müller <tim.muller@collabora.co.uk>
4  * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
5  * Copyright (C) 2014 Vincent Penquerc'h <vincent@collabora.co.uk>
6  *
7  * gstelements_private.c: Shared code for core elements
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #ifdef HAVE_CONFIG_H
26 # include "config.h"
27 #endif
28 #include <stdio.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_UIO_H
33 #include <sys/uio.h>
34 #endif
35 #include <sys/types.h>
36 #include <errno.h>
37 #include <string.h>
38 #include <string.h>
39 #include "gst/gst.h"
40 #include "gstelements_private.h"
41
42 #ifdef G_OS_WIN32
43 #  include <io.h>               /* lseek, open, close, read */
44 #  undef lseek
45 #  define lseek _lseeki64
46 #  undef off_t
47 #  define off_t guint64
48 #  define WIN32_LEAN_AND_MEAN   /* prevents from including too many things */
49 #  include <windows.h>
50 #  undef WIN32_LEAN_AND_MEAN
51 #  ifndef EWOULDBLOCK
52 #  define EWOULDBLOCK EAGAIN
53 #  endif
54 #endif /* G_OS_WIN32 */
55
56 #define BUFFER_FLAG_SHIFT 4
57
58 G_STATIC_ASSERT ((1 << BUFFER_FLAG_SHIFT) == GST_MINI_OBJECT_FLAG_LAST);
59
60 /* Returns a newly allocated string describing the flags on this buffer */
61 gchar *
62 gst_buffer_get_flags_string (GstBuffer * buffer)
63 {
64   static const char flag_strings[] =
65       "\000\000\000\000live\000decode-only\000discont\000resync\000corrupted\000"
66       "marker\000header\000gap\000droppable\000delta-unit\000tag-memory\000"
67       "sync-after\000non-droppable\000FIXME";
68   static const guint8 flag_idx[] = { 0, 1, 2, 3, 4, 9, 21, 29, 36, 46, 53,
69     60, 64, 74, 85, 96, 107, 121,
70   };
71   int i, max_bytes;
72   char *flag_str, *end;
73
74   /* max size is all flag strings plus a space or terminator after each one */
75   max_bytes = sizeof (flag_strings);
76   flag_str = g_malloc (max_bytes);
77
78   end = flag_str;
79   end[0] = '\0';
80   for (i = BUFFER_FLAG_SHIFT; i < G_N_ELEMENTS (flag_idx); i++) {
81     if (GST_MINI_OBJECT_CAST (buffer)->flags & (1 << i)) {
82       strcpy (end, flag_strings + flag_idx[i]);
83       end += strlen (end);
84       end[0] = ' ';
85       end[1] = '\0';
86       end++;
87     }
88   }
89
90   return flag_str;
91 }
92
93 /* Returns a newly-allocated string describing the metas on this buffer, or NULL */
94 gchar *
95 gst_buffer_get_meta_string (GstBuffer * buffer)
96 {
97   gpointer state = NULL;
98   GstMeta *meta;
99   GString *s = NULL;
100
101   while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
102     const gchar *desc = g_type_name (meta->info->type);
103
104     if (s == NULL)
105       s = g_string_new (NULL);
106     else
107       g_string_append (s, ", ");
108
109     g_string_append (s, desc);
110   }
111
112   return (s != NULL) ? g_string_free (s, FALSE) : NULL;
113 }
114
115 /* Define our own iovec structure here, so that we can use it unconditionally
116  * in the code below and use almost the same code path for systems where
117  * writev() is supported and those were it's not supported */
118 #ifndef HAVE_SYS_UIO_H
119 struct iovec
120 {
121   gpointer iov_base;
122   gsize iov_len;
123 };
124 #endif
125
126 /* completely arbitrary thresholds */
127 #define FDSINK_MAX_ALLOCA_SIZE (64 * 1024)      /* 64k */
128 #define FDSINK_MAX_MALLOC_SIZE ( 8 * 1024 * 1024)       /*  8M */
129
130 /* UIO_MAXIOV is documented in writev(2) on osx/ios, but <sys/uio.h>
131  * only declares it if defined(KERNEL) */
132 #ifndef UIO_MAXIOV
133 #define UIO_MAXIOV 512
134 #endif
135
136 /*
137  * POSIX writev(2) documents IOV_MAX as the max length of the iov array.
138  * If IOV_MAX is undefined, fall back to the legacy UIO_MAXIOV.
139  */
140 #ifndef IOV_MAX
141 #define IOV_MAX UIO_MAXIOV
142 #endif
143
144 static gssize
145 gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes)
146 {
147   gssize written;
148
149 #ifdef HAVE_SYS_UIO_H
150   if (iovcnt <= IOV_MAX) {
151     do {
152       written = writev (fd, iov, iovcnt);
153     } while (written < 0 && errno == EINTR);
154   } else
155 #endif
156   {
157     gint i;
158
159     /* We merge the memories here because technically write()/writev() is
160      * supposed to be atomic, which it's not if we do multiple separate
161      * write() calls. It's very doubtful anyone cares though in our use
162      * cases, and it's not clear how that can be reconciled with the
163      * possibility of short writes, so in any case we might want to
164      * simplify this later or just remove it. */
165     if (total_bytes <= FDSINK_MAX_MALLOC_SIZE) {
166       gchar *mem, *p;
167
168       if (total_bytes <= FDSINK_MAX_ALLOCA_SIZE)
169         mem = g_alloca (total_bytes);
170       else
171         mem = g_malloc (total_bytes);
172
173       p = mem;
174       for (i = 0; i < iovcnt; ++i) {
175         memcpy (p, iov[i].iov_base, iov[i].iov_len);
176         p += iov[i].iov_len;
177       }
178
179       do {
180         written = write (fd, mem, total_bytes);
181       } while (written < 0 && errno == EINTR);
182
183       if (total_bytes > FDSINK_MAX_ALLOCA_SIZE)
184         g_free (mem);
185     } else {
186       gssize ret;
187
188       written = 0;
189       for (i = 0; i < iovcnt; ++i) {
190         do {
191           ret = write (fd, iov[i].iov_base, iov[i].iov_len);
192         } while (ret < 0 && errno == EINTR);
193         if (ret > 0)
194           written += ret;
195         if (ret != iov[i].iov_len)
196           break;
197       }
198     }
199   }
200
201   return written;
202 }
203
204 static gsize
205 fill_vectors (struct iovec *vecs, GstMapInfo * maps, guint n, GstBuffer * buf)
206 {
207   GstMemory *mem;
208   gsize size = 0;
209   guint i;
210
211   g_assert (gst_buffer_n_memory (buf) == n);
212
213   for (i = 0; i < n; ++i) {
214     mem = gst_buffer_peek_memory (buf, i);
215     if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
216       vecs[i].iov_base = maps[i].data;
217       vecs[i].iov_len = maps[i].size;
218     } else {
219       GST_WARNING ("Failed to map memory %p for reading", mem);
220       vecs[i].iov_base = (void *) "";
221       vecs[i].iov_len = 0;
222     }
223     size += vecs[i].iov_len;
224   }
225
226   return size;
227 }
228
229 GstFlowReturn
230 gst_writev_buffers (GstObject * sink, gint fd, GstPoll * fdset,
231     GstBuffer ** buffers, guint num_buffers, guint8 * mem_nums,
232     guint total_mem_num, guint64 * bytes_written, guint64 skip,
233     gint max_transient_error_timeout, guint64 current_position,
234     gboolean * flushing)
235 {
236   struct iovec *vecs;
237   GstMapInfo *map_infos;
238   GstFlowReturn flow_ret;
239   gsize size = 0;
240   guint i, j;
241   gint64 start_time = 0;
242
243   max_transient_error_timeout *= 1000;
244   if (max_transient_error_timeout)
245     start_time = g_get_monotonic_time ();
246
247   GST_LOG_OBJECT (sink, "%u buffers, %u memories", num_buffers, total_mem_num);
248
249   vecs = g_newa (struct iovec, total_mem_num);
250   map_infos = g_newa (GstMapInfo, total_mem_num);
251
252   /* populate output vectors */
253   for (i = 0, j = 0; i < num_buffers; ++i) {
254     size += fill_vectors (&vecs[j], &map_infos[j], mem_nums[i], buffers[i]);
255     j += mem_nums[i];
256   }
257
258   /* now write it all out! */
259   {
260     gssize ret, left;
261     guint n_vecs = total_mem_num;
262
263     left = size;
264
265     if (skip) {
266       ret = skip;
267       errno = 0;
268       goto skip_first;
269     }
270
271     do {
272       if (flushing != NULL && g_atomic_int_get (flushing)) {
273         GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
274         flow_ret = GST_FLOW_FLUSHING;
275         goto out;
276       }
277 #ifndef HAVE_WIN32
278       if (fdset != NULL) {
279         do {
280           GST_DEBUG_OBJECT (sink, "going into select, have %" G_GSSIZE_FORMAT
281               " bytes to write", left);
282           ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE);
283         } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
284
285         if (ret == -1) {
286           if (errno == EBUSY)
287             goto stopped;
288           else
289             goto select_error;
290         }
291       }
292 #endif
293
294       ret = gst_writev (fd, vecs, n_vecs, left);
295
296       if (ret > 0) {
297         if (bytes_written)
298           *bytes_written += ret;
299       }
300
301     skip_first:
302
303       if (ret == left)
304         break;
305
306       if (ret < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) {
307         /* do nothing, try again */
308         if (max_transient_error_timeout)
309           start_time = g_get_monotonic_time ();
310       } else if (ret < 0 && errno == EACCES && max_transient_error_timeout > 0) {
311         /* seek back to where we started writing and try again after sleeping
312          * for 10ms.
313          *
314          * Some network file systems report EACCES spuriously, presumably
315          * because at the same time another client is reading the file.
316          * It happens at least on Linux and macOS on SMB/CIFS and NFS file
317          * systems.
318          *
319          * Note that NFS does not check access permissions during open()
320          * but only on write()/read() according to open(2), so we would
321          * loop here in case of NFS.
322          */
323         if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
324           GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
325               max_transient_error_timeout);
326           goto write_error;
327         }
328         GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
329         g_assert (current_position != -1);
330         g_usleep (10000);
331
332         /* Seek back to the current position, sometimes a partial write
333          * happened and we have no idea how much and if what was written
334          * is actually correct (it sometimes isn't)
335          */
336         ret = lseek (fd, current_position + *bytes_written, SEEK_SET);
337         if (ret < 0 || ret != current_position + *bytes_written) {
338           GST_ERROR_OBJECT (sink,
339               "failed to seek back to current write position");
340           goto write_error;
341         }
342       } else if (ret < 0) {
343         goto write_error;
344       } else {                  /* if (ret < left) */
345         if (max_transient_error_timeout)
346           start_time = g_get_monotonic_time ();
347         /* skip vectors that have been written in full */
348         while (ret >= vecs[0].iov_len) {
349           ret -= vecs[0].iov_len;
350           left -= vecs[0].iov_len;
351           ++vecs;
352           --n_vecs;
353         }
354         g_assert (n_vecs > 0);
355         /* skip partially written vector data */
356         if (ret > 0) {
357           vecs[0].iov_len -= ret;
358           vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + ret;
359           left -= ret;
360         }
361       }
362 #ifdef HAVE_WIN32
363       /* do short sleep on windows where we don't use gst_poll(),
364        * to avoid excessive busy looping */
365       if (fdset != NULL)
366         g_usleep (1000);
367 #endif
368
369     }
370     while (left > 0);
371   }
372
373   flow_ret = GST_FLOW_OK;
374
375 out:
376
377   for (i = 0; i < total_mem_num; ++i)
378     gst_memory_unmap (map_infos[i].memory, &map_infos[i]);
379
380   return flow_ret;
381
382 /* ERRORS */
383 #ifndef HAVE_WIN32
384 select_error:
385   {
386     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
387         ("select on file descriptor: %s", g_strerror (errno)));
388     GST_DEBUG_OBJECT (sink, "Error during select: %s", g_strerror (errno));
389     flow_ret = GST_FLOW_ERROR;
390     goto out;
391   }
392 stopped:
393   {
394     GST_DEBUG_OBJECT (sink, "Select stopped");
395     flow_ret = GST_FLOW_FLUSHING;
396     goto out;
397   }
398 #endif
399 write_error:
400   {
401     switch (errno) {
402       case ENOSPC:
403         GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
404         break;
405       default:{
406         GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
407             ("Error while writing to file descriptor %d: %s",
408                 fd, g_strerror (errno)));
409       }
410     }
411     flow_ret = GST_FLOW_ERROR;
412     goto out;
413   }
414 }