Merging gst-plugins-ugly
[platform/upstream/gstreamer.git] / subprojects / gstreamer / plugins / elements / gstelements_private.c
1 /* GStreamer
2  * Copyright (C) 2011 David Schleef <ds@schleef.org>
3  * Copyright (C) 2011 Tim-Philipp Müller <tim.muller@collabora.co.uk>
4  * Copyright (C) 2014 Tim-Philipp Müller <tim@centricular.com>
5  * Copyright (C) 2014 Vincent Penquerc'h <vincent@collabora.co.uk>
6  *
7  * gstelements_private.c: Shared code for core elements
8  *
9  * This library is free software; you can redistribute it and/or
10  * modify it under the terms of the GNU Library General Public
11  * License as published by the Free Software Foundation; either
12  * version 2 of the License, or (at your option) any later version.
13  *
14  * This library is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
17  * Library General Public License for more details.
18  *
19  * You should have received a copy of the GNU Library General Public
20  * License along with this library; if not, write to the
21  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
22  * Boston, MA 02110-1301, USA.
23  */
24
25 #ifdef HAVE_CONFIG_H
26 # include "config.h"
27 #endif
28 #include <stdio.h>
29 #ifdef HAVE_UNISTD_H
30 #include <unistd.h>
31 #endif
32 #ifdef HAVE_SYS_UIO_H
33 #include <sys/uio.h>
34 #endif
35 #include <sys/types.h>
36 #include <errno.h>
37 #include <string.h>
38 #include <string.h>
39 #include "gst/gst.h"
40 #include "gstelements_private.h"
41
42 #ifdef G_OS_WIN32
43 #  include <io.h>               /* lseek, open, close, read */
44 #  undef lseek
45 #  define lseek _lseeki64
46 #  undef off_t
47 #  define off_t guint64
48 #  define WIN32_LEAN_AND_MEAN   /* prevents from including too many things */
49 #  include <windows.h>
50 #  undef WIN32_LEAN_AND_MEAN
51 #  ifndef EWOULDBLOCK
52 #  define EWOULDBLOCK EAGAIN
53 #  endif
54 #endif /* G_OS_WIN32 */
55
56 #define BUFFER_FLAG_SHIFT 4
57
58 G_STATIC_ASSERT ((1 << BUFFER_FLAG_SHIFT) == GST_MINI_OBJECT_FLAG_LAST);
59
60 /* Returns a newly allocated string describing the flags on this buffer */
61 gchar *
62 gst_buffer_get_flags_string (GstBuffer * buffer)
63 {
64   static const char flag_strings[] =
65       "\000\000\000\000live\000decode-only\000discont\000resync\000corrupted\000"
66       "marker\000header\000gap\000droppable\000delta-unit\000tag-memory\000"
67       "sync-after\000non-droppable\000FIXME";
68   static const guint8 flag_idx[] = { 0, 1, 2, 3, 4, 9, 21, 29, 36, 46, 53,
69     60, 64, 74, 85, 96, 107, 121,
70   };
71   int i, max_bytes;
72   char *flag_str, *end;
73
74   /* max size is all flag strings plus a space or terminator after each one */
75   max_bytes = sizeof (flag_strings);
76   flag_str = g_malloc (max_bytes);
77
78   end = flag_str;
79   end[0] = '\0';
80   for (i = BUFFER_FLAG_SHIFT; i < G_N_ELEMENTS (flag_idx); i++) {
81     if (GST_MINI_OBJECT_CAST (buffer)->flags & (1 << i)) {
82       strcpy (end, flag_strings + flag_idx[i]);
83       end += strlen (end);
84       end[0] = ' ';
85       end[1] = '\0';
86       end++;
87     }
88   }
89
90   return flag_str;
91 }
92
93 /* Returns a newly-allocated string describing the metas on this buffer, or NULL */
94 gchar *
95 gst_buffer_get_meta_string (GstBuffer * buffer)
96 {
97   gpointer state = NULL;
98   GstMeta *meta;
99   GString *s = NULL;
100
101   while ((meta = gst_buffer_iterate_meta (buffer, &state))) {
102     const gchar *desc = g_type_name (meta->info->type);
103
104     if (s == NULL)
105       s = g_string_new (NULL);
106     else
107       g_string_append (s, ", ");
108
109     g_string_append (s, desc);
110   }
111
112   return (s != NULL) ? g_string_free (s, FALSE) : NULL;
113 }
114
115 /* Define our own iovec structure here, so that we can use it unconditionally
116  * in the code below and use almost the same code path for systems where
117  * writev() is supported and those were it's not supported */
118 #ifndef HAVE_SYS_UIO_H
119 struct iovec
120 {
121   gpointer iov_base;
122   gsize iov_len;
123 };
124 #endif
125
126 /* completely arbitrary thresholds */
127 #define FDSINK_MAX_ALLOCA_SIZE (64 * 1024)      /* 64k */
128 #define FDSINK_MAX_MALLOC_SIZE ( 8 * 1024 * 1024)       /*  8M */
129
130 /* Adapted from GLib (gio/gioprivate.h)
131  *
132  * POSIX defines IOV_MAX/UIO_MAXIOV as the maximum number of iovecs that can
133  * be sent in one go.  We define our own version of it here as there are two
134  * possible names, and also define a fall-back value if none of the constants
135  * are defined */
136 #if defined(IOV_MAX)
137 #define GST_IOV_MAX IOV_MAX
138 #elif defined(UIO_MAXIOV)
139 #define GST_IOV_MAX UIO_MAXIOV
140 #elif defined(__APPLE__)
141 /* For osx/ios, UIO_MAXIOV is documented in writev(2), but <sys/uio.h>
142  * only declares it if defined(KERNEL) */
143 #define GST_IOV_MAX 512
144 #else
145 /* 16 is the minimum value required by POSIX */
146 #define GST_IOV_MAX 16
147 #endif
148
149 static gssize
150 gst_writev (gint fd, const struct iovec *iov, gint iovcnt, gsize total_bytes)
151 {
152   gssize written;
153
154 #ifdef HAVE_SYS_UIO_H
155   if (iovcnt <= GST_IOV_MAX) {
156     do {
157       written = writev (fd, iov, iovcnt);
158     } while (written < 0 && errno == EINTR);
159   } else
160 #endif
161   {
162     gint i;
163
164     /* We merge the memories here because technically write()/writev() is
165      * supposed to be atomic, which it's not if we do multiple separate
166      * write() calls. It's very doubtful anyone cares though in our use
167      * cases, and it's not clear how that can be reconciled with the
168      * possibility of short writes, so in any case we might want to
169      * simplify this later or just remove it. */
170     if (iovcnt > 1 && total_bytes <= FDSINK_MAX_MALLOC_SIZE) {
171       gchar *mem, *p;
172
173       if (total_bytes <= FDSINK_MAX_ALLOCA_SIZE)
174         mem = g_alloca (total_bytes);
175       else
176         mem = g_malloc (total_bytes);
177
178       p = mem;
179       for (i = 0; i < iovcnt; ++i) {
180         memcpy (p, iov[i].iov_base, iov[i].iov_len);
181         p += iov[i].iov_len;
182       }
183
184       do {
185         written = write (fd, mem, total_bytes);
186       } while (written < 0 && errno == EINTR);
187
188       if (total_bytes > FDSINK_MAX_ALLOCA_SIZE)
189         g_free (mem);
190     } else {
191       gssize ret;
192
193       written = 0;
194       for (i = 0; i < iovcnt; ++i) {
195         do {
196           ret = write (fd, iov[i].iov_base, iov[i].iov_len);
197         } while (ret < 0 && errno == EINTR);
198         if (ret > 0)
199           written += ret;
200         if (ret != iov[i].iov_len)
201           break;
202       }
203     }
204   }
205
206   return written;
207 }
208
209 static GstFlowReturn
210 gst_writev_iovecs (GstObject * sink, gint fd, GstPoll * fdset,
211     struct iovec *vecs, guint n_vecs, gsize bytes_to_write,
212     guint64 * bytes_written, gint max_transient_error_timeout,
213     guint64 current_position, gboolean * flushing)
214 {
215   GstFlowReturn flow_ret;
216   gint64 start_time = 0;
217
218   *bytes_written = 0;
219   max_transient_error_timeout *= 1000;
220   if (max_transient_error_timeout)
221     start_time = g_get_monotonic_time ();
222
223   GST_LOG_OBJECT (sink, "%u iovecs", n_vecs);
224
225   /* now write it all out! */
226   {
227     gssize ret, left;
228
229     left = bytes_to_write;
230
231     do {
232       if (flushing != NULL && g_atomic_int_get (flushing)) {
233         GST_DEBUG_OBJECT (sink, "Flushing, exiting loop");
234         flow_ret = GST_FLOW_FLUSHING;
235         goto out;
236       }
237 #ifndef HAVE_WIN32
238       if (fdset != NULL) {
239         do {
240           GST_DEBUG_OBJECT (sink, "going into select, have %" G_GSSIZE_FORMAT
241               " bytes to write", left);
242           ret = gst_poll_wait (fdset, GST_CLOCK_TIME_NONE);
243         } while (ret == -1 && (errno == EINTR || errno == EAGAIN));
244
245         if (ret == -1) {
246           if (errno == EBUSY)
247             goto stopped;
248           else
249             goto select_error;
250         }
251       }
252 #endif
253
254       ret = gst_writev (fd, vecs, n_vecs, left);
255
256       if (ret > 0) {
257         /* Wrote something, allow the caller to update the vecs passed here */
258         *bytes_written = ret;
259         break;
260       }
261
262       if (errno == EAGAIN || errno == EWOULDBLOCK || ret == 0) {
263         /* do nothing, try again */
264         if (max_transient_error_timeout)
265           start_time = g_get_monotonic_time ();
266       } else if (errno == EACCES && max_transient_error_timeout > 0) {
267         /* seek back to where we started writing and try again after sleeping
268          * for 10ms.
269          *
270          * Some network file systems report EACCES spuriously, presumably
271          * because at the same time another client is reading the file.
272          * It happens at least on Linux and macOS on SMB/CIFS and NFS file
273          * systems.
274          *
275          * Note that NFS does not check access permissions during open()
276          * but only on write()/read() according to open(2), so we would
277          * loop here in case of NFS.
278          */
279         if (g_get_monotonic_time () > start_time + max_transient_error_timeout) {
280           GST_ERROR_OBJECT (sink, "Got EACCES for more than %dms, failing",
281               max_transient_error_timeout);
282           goto write_error;
283         }
284         GST_DEBUG_OBJECT (sink, "got EACCES, retry after 10ms sleep");
285         g_assert (current_position != -1);
286         g_usleep (10000);
287
288         /* Seek back to the current position, sometimes a partial write
289          * happened and we have no idea how much and if what was written
290          * is actually correct (it sometimes isn't)
291          */
292         ret = lseek (fd, current_position, SEEK_SET);
293         if (ret < 0 || ret != current_position) {
294           GST_ERROR_OBJECT (sink,
295               "failed to seek back to current write position");
296           goto write_error;
297         }
298       } else {
299         goto write_error;
300       }
301 #ifdef HAVE_WIN32
302       /* do short sleep on windows where we don't use gst_poll(),
303        * to avoid excessive busy looping */
304       if (fdset != NULL)
305         g_usleep (1000);
306 #endif
307     }
308     while (left > 0);
309   }
310
311   flow_ret = GST_FLOW_OK;
312
313 out:
314
315   return flow_ret;
316
317 /* ERRORS */
318 #ifndef HAVE_WIN32
319 select_error:
320   {
321     GST_ELEMENT_ERROR (sink, RESOURCE, READ, (NULL),
322         ("select on file descriptor: %s", g_strerror (errno)));
323     GST_DEBUG_OBJECT (sink, "Error during select: %s", g_strerror (errno));
324     flow_ret = GST_FLOW_ERROR;
325     goto out;
326   }
327 stopped:
328   {
329     GST_DEBUG_OBJECT (sink, "Select stopped");
330     flow_ret = GST_FLOW_FLUSHING;
331     goto out;
332   }
333 #endif
334 write_error:
335   {
336     switch (errno) {
337       case ENOSPC:
338         GST_ELEMENT_ERROR (sink, RESOURCE, NO_SPACE_LEFT, (NULL), (NULL));
339         break;
340       default:{
341         GST_ELEMENT_ERROR (sink, RESOURCE, WRITE, (NULL),
342             ("Error while writing to file descriptor %d: %s",
343                 fd, g_strerror (errno)));
344       }
345     }
346     flow_ret = GST_FLOW_ERROR;
347     goto out;
348   }
349 }
350
351 GstFlowReturn
352 gst_writev_buffer (GstObject * sink, gint fd, GstPoll * fdset,
353     GstBuffer * buffer,
354     guint64 * bytes_written, guint64 skip,
355     gint max_transient_error_timeout, guint64 current_position,
356     gboolean * flushing)
357 {
358   GstFlowReturn flow_ret = GST_FLOW_OK;
359   struct iovec *vecs;
360   GstMapInfo *maps;
361   guint i, num_mem, num_vecs;
362   gsize left = 0;
363
364   /* Buffers can contain up to 16 memories, so we can safely directly call
365    * writev() here without splitting up */
366   g_assert (gst_buffer_get_max_memory () <= GST_IOV_MAX);
367
368   num_mem = num_vecs = gst_buffer_n_memory (buffer);
369
370   GST_DEBUG ("Writing buffer %p with %u memories and %" G_GSIZE_FORMAT " bytes",
371       buffer, num_mem, gst_buffer_get_size (buffer));
372
373   vecs = g_newa (struct iovec, num_mem);
374   maps = g_newa (GstMapInfo, num_mem);
375
376   /* Map all memories */
377   {
378     GstMemory *mem;
379     guint i;
380
381     for (i = 0; i < num_mem; ++i) {
382       mem = gst_buffer_peek_memory (buffer, i);
383       if (gst_memory_map (mem, &maps[i], GST_MAP_READ)) {
384         vecs[i].iov_base = maps[i].data;
385         vecs[i].iov_len = maps[i].size;
386       } else {
387         GST_WARNING ("Failed to map memory %p for reading", mem);
388         vecs[i].iov_base = (void *) "";
389         vecs[i].iov_len = 0;
390       }
391       left += vecs[i].iov_len;
392     }
393   }
394
395   do {
396     guint64 bytes_written_local = 0;
397
398     flow_ret =
399         gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, left,
400         &bytes_written_local, max_transient_error_timeout, current_position,
401         flushing);
402
403     GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
404         bytes_written_local, left, gst_flow_get_name (flow_ret));
405
406     if (flow_ret != GST_FLOW_OK) {
407       g_assert (bytes_written_local == 0);
408       break;
409     }
410
411     if (bytes_written)
412       *bytes_written += bytes_written_local;
413
414     /* Done, no need to do bookkeeping */
415     if (bytes_written_local == left)
416       break;
417
418     /* skip vectors that have been written in full */
419     while (bytes_written_local >= vecs[0].iov_len) {
420       bytes_written_local -= vecs[0].iov_len;
421       left -= vecs[0].iov_len;
422       ++vecs;
423       --num_vecs;
424     }
425     g_assert (num_vecs > 0);
426     /* skip partially written vector data */
427     if (bytes_written_local > 0) {
428       vecs[0].iov_len -= bytes_written_local;
429       vecs[0].iov_base = ((guint8 *) vecs[0].iov_base) + bytes_written_local;
430       left -= bytes_written_local;
431     }
432   } while (left > 0);
433
434   for (i = 0; i < num_mem; i++)
435     gst_memory_unmap (maps[i].memory, &maps[i]);
436
437   return flow_ret;
438 }
439
440 GstFlowReturn
441 gst_writev_mem (GstObject * sink, gint fd, GstPoll * fdset,
442     const guint8 * data, guint size,
443     guint64 * bytes_written, guint64 skip,
444     gint max_transient_error_timeout, guint64 current_position,
445     gboolean * flushing)
446 {
447   GstFlowReturn flow_ret = GST_FLOW_OK;
448   struct iovec vec;
449   gsize left;
450
451   GST_DEBUG ("Writing memory %p with %u bytes", data, size);
452
453   vec.iov_len = size;
454   vec.iov_base = (guint8 *) data;
455   left = size;
456
457   do {
458     guint64 bytes_written_local = 0;
459
460     flow_ret =
461         gst_writev_iovecs (sink, fd, fdset, &vec, 1, left,
462         &bytes_written_local, max_transient_error_timeout, current_position,
463         flushing);
464
465     GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
466         bytes_written_local, left, gst_flow_get_name (flow_ret));
467
468     if (flow_ret != GST_FLOW_OK) {
469       g_assert (bytes_written_local == 0);
470       break;
471     }
472
473     if (bytes_written)
474       *bytes_written += bytes_written_local;
475
476     /* All done, no need for bookkeeping */
477     if (bytes_written_local == left)
478       break;
479
480     /* skip partially written vector data */
481     if (bytes_written_local < left) {
482       vec.iov_len -= bytes_written_local;
483       vec.iov_base = ((guint8 *) vec.iov_base) + bytes_written_local;
484       left -= bytes_written_local;
485     }
486   } while (left > 0);
487
488   return flow_ret;
489 }
490
491 GstFlowReturn
492 gst_writev_buffer_list (GstObject * sink, gint fd, GstPoll * fdset,
493     GstBufferList * buffer_list,
494     guint64 * bytes_written, guint64 skip,
495     gint max_transient_error_timeout, guint64 current_position,
496     gboolean * flushing)
497 {
498   GstFlowReturn flow_ret = GST_FLOW_OK;
499   struct iovec *vecs;
500   GstMapInfo *maps;
501   guint num_bufs, current_buf_idx = 0, current_buf_mem_idx = 0;
502   guint i, num_vecs;
503   gsize left = 0;
504
505   num_bufs = gst_buffer_list_length (buffer_list);
506   num_vecs = 0;
507
508   GST_DEBUG ("Writing buffer list %p with %u buffers", buffer_list, num_bufs);
509
510   vecs = g_newa (struct iovec, GST_IOV_MAX);
511   maps = g_newa (GstMapInfo, GST_IOV_MAX);
512
513   /* Map the first GST_IOV_MAX memories */
514   {
515     GstBuffer *buf;
516     GstMemory *mem;
517     guint j = 0;
518
519     for (i = 0; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
520       guint num_mem;
521
522       buf = gst_buffer_list_get (buffer_list, i);
523       num_mem = gst_buffer_n_memory (buf);
524
525       for (j = 0; j < num_mem && num_vecs < GST_IOV_MAX; j++) {
526         mem = gst_buffer_peek_memory (buf, j);
527         if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
528           vecs[num_vecs].iov_base = maps[num_vecs].data;
529           vecs[num_vecs].iov_len = maps[num_vecs].size;
530         } else {
531           GST_WARNING ("Failed to map memory %p for reading", mem);
532           vecs[num_vecs].iov_base = (void *) "";
533           vecs[num_vecs].iov_len = 0;
534         }
535         left += vecs[num_vecs].iov_len;
536         num_vecs++;
537       }
538       current_buf_mem_idx = j;
539       if (j == num_mem)
540         current_buf_mem_idx = 0;
541     }
542     current_buf_idx = i;
543   }
544
545   do {
546     guint64 bytes_written_local = 0;
547     guint vecs_written = 0;
548
549     flow_ret =
550         gst_writev_iovecs (sink, fd, fdset, vecs, num_vecs, left,
551         &bytes_written_local, max_transient_error_timeout, current_position,
552         flushing);
553
554     GST_DEBUG ("Wrote %" G_GUINT64_FORMAT " bytes of %" G_GSIZE_FORMAT ": %s",
555         bytes_written_local, left, gst_flow_get_name (flow_ret));
556
557     if (flow_ret != GST_FLOW_OK) {
558       g_assert (bytes_written_local == 0);
559       break;
560     }
561
562     if (flow_ret != GST_FLOW_OK) {
563       g_assert (bytes_written_local == 0);
564       break;
565     }
566
567     if (bytes_written)
568       *bytes_written += bytes_written_local;
569
570     /* All done, no need for bookkeeping */
571     if (bytes_written_local == left && current_buf_idx == num_bufs)
572       break;
573
574     /* skip vectors that have been written in full */
575     while (vecs_written < num_vecs
576         && bytes_written_local >= vecs[vecs_written].iov_len) {
577       bytes_written_local -= vecs[vecs_written].iov_len;
578       left -= vecs[vecs_written].iov_len;
579       vecs_written++;
580     }
581     g_assert (vecs_written < num_vecs || bytes_written_local == 0);
582     /* skip partially written vector data */
583     if (bytes_written_local > 0) {
584       vecs[vecs_written].iov_len -= bytes_written_local;
585       vecs[vecs_written].iov_base =
586           ((guint8 *) vecs[0].iov_base) + bytes_written_local;
587       left -= bytes_written_local;
588     }
589
590     /* If we have buffers left, fill them in now */
591     if (current_buf_idx < num_bufs) {
592       GstBuffer *buf;
593       GstMemory *mem;
594       guint j = current_buf_mem_idx;
595
596       /* Unmap the first vecs_written memories now */
597       for (i = 0; i < vecs_written; i++)
598         gst_memory_unmap (maps[i].memory, &maps[i]);
599       /* Move upper remaining vecs and maps back to the beginning */
600       memmove (vecs, &vecs[vecs_written],
601           (num_vecs - vecs_written) * sizeof (vecs[0]));
602       memmove (maps, &maps[vecs_written],
603           (num_vecs - vecs_written) * sizeof (maps[0]));
604       num_vecs -= vecs_written;
605
606       /* And finally refill */
607       for (i = current_buf_idx; i < num_bufs && num_vecs < GST_IOV_MAX; i++) {
608         guint num_mem;
609
610         buf = gst_buffer_list_get (buffer_list, i);
611         num_mem = gst_buffer_n_memory (buf);
612
613         for (j = current_buf_mem_idx; j < num_mem && num_vecs < GST_IOV_MAX;
614             j++) {
615           mem = gst_buffer_peek_memory (buf, j);
616           if (gst_memory_map (mem, &maps[num_vecs], GST_MAP_READ)) {
617             vecs[num_vecs].iov_base = maps[num_vecs].data;
618             vecs[num_vecs].iov_len = maps[num_vecs].size;
619           } else {
620             GST_WARNING ("Failed to map memory %p for reading", mem);
621             vecs[num_vecs].iov_base = (void *) "";
622             vecs[num_vecs].iov_len = 0;
623           }
624           left += vecs[num_vecs].iov_len;
625           num_vecs++;
626         }
627         current_buf_mem_idx = j;
628         if (current_buf_mem_idx == num_mem)
629           current_buf_mem_idx = 0;
630       }
631       current_buf_idx = i;
632     }
633   } while (left > 0);
634
635   for (i = 0; i < num_vecs; i++)
636     gst_memory_unmap (maps[i].memory, &maps[i]);
637
638   return flow_ret;
639 }