fdsink: use writev() in ::render() to write out memories without merging them
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsink.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *
5  * gstfdsink.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:element-fdsink
25  * @see_also: #GstFdSrc
26  *
27  * Write data to a unix file descriptor.
28  *
29  * This element will synchronize on the clock before writing the data on the
30  * socket. For file descriptors where this does not make sense (files, ...) the
31  * #GstBaseSink:sync property can be used to disable synchronisation.
32  */
33
34 #ifdef HAVE_CONFIG_H
35 #  include "config.h"
36 #endif
37
38 #include "../../gst/gst-i18n-lib.h"
39
40 #include <sys/types.h>
41
42 #include <sys/stat.h>
43 #ifdef HAVE_SYS_SOCKET_H
44 #include <sys/socket.h>
45 #endif
46 #include <fcntl.h>
47 #include <stdio.h>
48 #ifdef HAVE_UNISTD_H
49 #include <unistd.h>
50 #endif
51 #ifdef _MSC_VER
52 #undef stat
53 #define stat _stat
54 #define fstat _fstat
55 #define S_ISREG(m)      (((m)&S_IFREG)==S_IFREG)
56 #endif
57 #include <errno.h>
58 #include <string.h>
59
60 #include "gstfdsink.h"
61 #include "gstelements_private.h"
62
63 #ifdef G_OS_WIN32
64 #include <io.h>                 /* lseek, open, close, read */
65 #undef lseek
66 #define lseek _lseeki64
67 #undef off_t
68 #define off_t guint64
69 #endif
70
71 static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
72     GST_PAD_SINK,
73     GST_PAD_ALWAYS,
74     GST_STATIC_CAPS_ANY);
75
76 GST_DEBUG_CATEGORY_STATIC (gst_fd_sink__debug);
77 #define GST_CAT_DEFAULT gst_fd_sink__debug
78
79
80 /* FdSink signals and args */
81 enum
82 {
83   /* FILL ME */
84   LAST_SIGNAL
85 };
86
87 enum
88 {
89   ARG_0,
90   ARG_FD
91 };
92
93 static void gst_fd_sink_uri_handler_init (gpointer g_iface,
94     gpointer iface_data);
95
96 #define _do_init \
97   G_IMPLEMENT_INTERFACE (GST_TYPE_URI_HANDLER, gst_fd_sink_uri_handler_init); \
98   GST_DEBUG_CATEGORY_INIT (gst_fd_sink__debug, "fdsink", 0, "fdsink element");
99 #define gst_fd_sink_parent_class parent_class
100 G_DEFINE_TYPE_WITH_CODE (GstFdSink, gst_fd_sink, GST_TYPE_BASE_SINK, _do_init);
101
102 static void gst_fd_sink_set_property (GObject * object, guint prop_id,
103     const GValue * value, GParamSpec * pspec);
104 static void gst_fd_sink_get_property (GObject * object, guint prop_id,
105     GValue * value, GParamSpec * pspec);
106 static void gst_fd_sink_dispose (GObject * obj);
107
108 static gboolean gst_fd_sink_query (GstBaseSink * bsink, GstQuery * query);
109 static GstFlowReturn gst_fd_sink_render (GstBaseSink * sink,
110     GstBuffer * buffer);
111 static GstFlowReturn gst_fd_sink_render_list (GstBaseSink * bsink,
112     GstBufferList * buffer_list);
113 static gboolean gst_fd_sink_start (GstBaseSink * basesink);
114 static gboolean gst_fd_sink_stop (GstBaseSink * basesink);
115 static gboolean gst_fd_sink_unlock (GstBaseSink * basesink);
116 static gboolean gst_fd_sink_unlock_stop (GstBaseSink * basesink);
117 static gboolean gst_fd_sink_event (GstBaseSink * sink, GstEvent * event);
118
119 static gboolean gst_fd_sink_do_seek (GstFdSink * fdsink, guint64 new_offset);
120
121 static void
122 gst_fd_sink_class_init (GstFdSinkClass * klass)
123 {
124   GObjectClass *gobject_class;
125   GstElementClass *gstelement_class;
126   GstBaseSinkClass *gstbasesink_class;
127
128   gobject_class = G_OBJECT_CLASS (klass);
129   gstelement_class = GST_ELEMENT_CLASS (klass);
130   gstbasesink_class = GST_BASE_SINK_CLASS (klass);
131
132   gobject_class->set_property = gst_fd_sink_set_property;
133   gobject_class->get_property = gst_fd_sink_get_property;
134   gobject_class->dispose = gst_fd_sink_dispose;
135
136   gst_element_class_set_static_metadata (gstelement_class,
137       "Filedescriptor Sink",
138       "Sink/File",
139       "Write data to a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
140   gst_element_class_add_pad_template (gstelement_class,
141       gst_static_pad_template_get (&sinktemplate));
142
143   gstbasesink_class->render = GST_DEBUG_FUNCPTR (gst_fd_sink_render);
144   gstbasesink_class->render_list = GST_DEBUG_FUNCPTR (gst_fd_sink_render_list);
145   gstbasesink_class->start = GST_DEBUG_FUNCPTR (gst_fd_sink_start);
146   gstbasesink_class->stop = GST_DEBUG_FUNCPTR (gst_fd_sink_stop);
147   gstbasesink_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_sink_unlock);
148   gstbasesink_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_fd_sink_unlock_stop);
149   gstbasesink_class->event = GST_DEBUG_FUNCPTR (gst_fd_sink_event);
150   gstbasesink_class->query = GST_DEBUG_FUNCPTR (gst_fd_sink_query);
151
152   g_object_class_install_property (gobject_class, ARG_FD,
153       g_param_spec_int ("fd", "fd", "An open file descriptor to write to",
154           0, G_MAXINT, 1, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
155 }
156
157 static void
158 gst_fd_sink_init (GstFdSink * fdsink)
159 {
160   fdsink->fd = 1;
161   fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
162   fdsink->bytes_written = 0;
163   fdsink->current_pos = 0;
164
165   gst_base_sink_set_sync (GST_BASE_SINK (fdsink), FALSE);
166 }
167
168 static void
169 gst_fd_sink_dispose (GObject * obj)
170 {
171   GstFdSink *fdsink = GST_FD_SINK (obj);
172
173   g_free (fdsink->uri);
174   fdsink->uri = NULL;
175
176   G_OBJECT_CLASS (parent_class)->dispose (obj);
177 }
178
179 static gboolean
180 gst_fd_sink_query (GstBaseSink * bsink, GstQuery * query)
181 {
182   gboolean res = FALSE;
183   GstFdSink *fdsink;
184
185   fdsink = GST_FD_SINK (bsink);
186
187   switch (GST_QUERY_TYPE (query)) {
188     case GST_QUERY_POSITION:
189     {
190       GstFormat format;
191
192       gst_query_parse_position (query, &format, NULL);
193
194       switch (format) {
195         case GST_FORMAT_DEFAULT:
196         case GST_FORMAT_BYTES:
197           gst_query_set_position (query, GST_FORMAT_BYTES, fdsink->current_pos);
198           res = TRUE;
199           break;
200         default:
201           break;
202       }
203       break;
204     }
205     case GST_QUERY_FORMATS:
206       gst_query_set_formats (query, 2, GST_FORMAT_DEFAULT, GST_FORMAT_BYTES);
207       res = TRUE;
208       break;
209     case GST_QUERY_URI:
210       gst_query_set_uri (query, fdsink->uri);
211       res = TRUE;
212       break;
213     case GST_QUERY_SEEKING:{
214       GstFormat format;
215
216       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
217       if (format == GST_FORMAT_BYTES || format == GST_FORMAT_DEFAULT) {
218         gst_query_set_seeking (query, GST_FORMAT_BYTES, fdsink->seekable, 0,
219             -1);
220       } else {
221         gst_query_set_seeking (query, format, FALSE, 0, -1);
222       }
223       res = TRUE;
224       break;
225     }
226     default:
227       res = GST_BASE_SINK_CLASS (parent_class)->query (bsink, query);
228       break;
229
230   }
231   return res;
232 }
233
234 static GstFlowReturn
235 gst_fd_sink_render_buffers (GstFdSink * sink, GstBuffer ** buffers,
236     guint num_buffers, guint8 * mem_nums, guint total_mems)
237 {
238   return gst_writev_buffers (GST_OBJECT_CAST (sink), sink->fd, sink->fdset,
239       buffers, num_buffers, mem_nums, total_mems, &sink->bytes_written,
240       &sink->current_pos);
241 }
242
243 static GstFlowReturn
244 gst_fd_sink_render_list (GstBaseSink * bsink, GstBufferList * buffer_list)
245 {
246   GstFlowReturn flow;
247   GstBuffer **buffers;
248   GstFdSink *sink;
249   guint8 *mem_nums;
250   guint total_mems;
251   guint i, num_buffers;
252
253   sink = GST_FD_SINK_CAST (bsink);
254
255   num_buffers = gst_buffer_list_length (buffer_list);
256   if (num_buffers == 0)
257     goto no_data;
258
259   /* extract buffers from list and count memories */
260   buffers = g_newa (GstBuffer *, num_buffers);
261   mem_nums = g_newa (guint8, num_buffers);
262   for (i = 0, total_mems = 0; i < num_buffers; ++i) {
263     buffers[i] = gst_buffer_list_get (buffer_list, i);
264     mem_nums[i] = gst_buffer_n_memory (buffers[i]);
265     total_mems += mem_nums[i];
266   }
267
268   flow =
269       gst_fd_sink_render_buffers (sink, buffers, num_buffers, mem_nums,
270       total_mems);
271
272   return flow;
273
274 no_data:
275   {
276     GST_LOG_OBJECT (sink, "empty buffer list");
277     return GST_FLOW_OK;
278   }
279 }
280
281 static GstFlowReturn
282 gst_fd_sink_render (GstBaseSink * bsink, GstBuffer * buffer)
283 {
284   GstFlowReturn flow;
285   GstFdSink *sink;
286   guint8 n_mem;
287
288   sink = GST_FD_SINK_CAST (bsink);
289
290   n_mem = gst_buffer_n_memory (buffer);
291
292   if (n_mem > 0)
293     flow = gst_fd_sink_render_buffers (sink, &buffer, 1, &n_mem, n_mem);
294   else
295     flow = GST_FLOW_OK;
296
297   return flow;
298 }
299
300 static gboolean
301 gst_fd_sink_check_fd (GstFdSink * fdsink, int fd, GError ** error)
302 {
303   struct stat stat_results;
304   off_t result;
305
306   /* see that it is a valid file descriptor */
307   if (fstat (fd, &stat_results) < 0)
308     goto invalid;
309
310   if (!S_ISREG (stat_results.st_mode))
311     goto not_seekable;
312
313   /* see if it is a seekable stream */
314   result = lseek (fd, 0, SEEK_CUR);
315   if (result == -1) {
316     switch (errno) {
317       case EINVAL:
318       case EBADF:
319         goto invalid;
320
321       case ESPIPE:
322         goto not_seekable;
323     }
324   } else
325     GST_DEBUG_OBJECT (fdsink, "File descriptor %d is seekable", fd);
326
327   return TRUE;
328
329 invalid:
330   {
331     GST_ELEMENT_ERROR (fdsink, RESOURCE, WRITE, (NULL),
332         ("File descriptor %d is not valid: %s", fd, g_strerror (errno)));
333     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
334         "File descriptor %d is not valid: %s", fd, g_strerror (errno));
335     return FALSE;
336   }
337 not_seekable:
338   {
339     GST_DEBUG_OBJECT (fdsink, "File descriptor %d is a pipe", fd);
340     return TRUE;
341   }
342 }
343
344 static gboolean
345 gst_fd_sink_start (GstBaseSink * basesink)
346 {
347   GstFdSink *fdsink;
348   GstPollFD fd = GST_POLL_FD_INIT;
349
350   fdsink = GST_FD_SINK (basesink);
351   if (!gst_fd_sink_check_fd (fdsink, fdsink->fd, NULL))
352     return FALSE;
353
354   if ((fdsink->fdset = gst_poll_new (TRUE)) == NULL)
355     goto socket_pair;
356
357   fd.fd = fdsink->fd;
358   gst_poll_add_fd (fdsink->fdset, &fd);
359   gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
360
361   fdsink->bytes_written = 0;
362   fdsink->current_pos = 0;
363
364   fdsink->seekable = gst_fd_sink_do_seek (fdsink, 0);
365   GST_INFO_OBJECT (fdsink, "seeking supported: %d", fdsink->seekable);
366
367   return TRUE;
368
369   /* ERRORS */
370 socket_pair:
371   {
372     GST_ELEMENT_ERROR (fdsink, RESOURCE, OPEN_READ_WRITE, (NULL),
373         GST_ERROR_SYSTEM);
374     return FALSE;
375   }
376 }
377
378 static gboolean
379 gst_fd_sink_stop (GstBaseSink * basesink)
380 {
381   GstFdSink *fdsink = GST_FD_SINK (basesink);
382
383   if (fdsink->fdset) {
384     gst_poll_free (fdsink->fdset);
385     fdsink->fdset = NULL;
386   }
387
388   return TRUE;
389 }
390
391 static gboolean
392 gst_fd_sink_unlock (GstBaseSink * basesink)
393 {
394   GstFdSink *fdsink = GST_FD_SINK (basesink);
395
396   GST_LOG_OBJECT (fdsink, "Flushing");
397   GST_OBJECT_LOCK (fdsink);
398   gst_poll_set_flushing (fdsink->fdset, TRUE);
399   GST_OBJECT_UNLOCK (fdsink);
400
401   return TRUE;
402 }
403
404 static gboolean
405 gst_fd_sink_unlock_stop (GstBaseSink * basesink)
406 {
407   GstFdSink *fdsink = GST_FD_SINK (basesink);
408
409   GST_LOG_OBJECT (fdsink, "No longer flushing");
410   GST_OBJECT_LOCK (fdsink);
411   gst_poll_set_flushing (fdsink->fdset, FALSE);
412   GST_OBJECT_UNLOCK (fdsink);
413
414   return TRUE;
415 }
416
417 static gboolean
418 gst_fd_sink_update_fd (GstFdSink * fdsink, int new_fd, GError ** error)
419 {
420   if (new_fd < 0) {
421     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_REFERENCE,
422         "File descriptor %d is not valid", new_fd);
423     return FALSE;
424   }
425
426   if (!gst_fd_sink_check_fd (fdsink, new_fd, error))
427     goto invalid;
428
429   /* assign the fd */
430   GST_OBJECT_LOCK (fdsink);
431   if (fdsink->fdset) {
432     GstPollFD fd = GST_POLL_FD_INIT;
433
434     fd.fd = fdsink->fd;
435     gst_poll_remove_fd (fdsink->fdset, &fd);
436
437     fd.fd = new_fd;
438     gst_poll_add_fd (fdsink->fdset, &fd);
439     gst_poll_fd_ctl_write (fdsink->fdset, &fd, TRUE);
440   }
441   fdsink->fd = new_fd;
442   g_free (fdsink->uri);
443   fdsink->uri = g_strdup_printf ("fd://%d", fdsink->fd);
444
445   GST_OBJECT_UNLOCK (fdsink);
446
447   return TRUE;
448
449 invalid:
450   {
451     return FALSE;
452   }
453 }
454
455 static void
456 gst_fd_sink_set_property (GObject * object, guint prop_id,
457     const GValue * value, GParamSpec * pspec)
458 {
459   GstFdSink *fdsink;
460
461   fdsink = GST_FD_SINK (object);
462
463   switch (prop_id) {
464     case ARG_FD:{
465       int fd;
466
467       fd = g_value_get_int (value);
468       gst_fd_sink_update_fd (fdsink, fd, NULL);
469       break;
470     }
471     default:
472       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
473       break;
474   }
475 }
476
477 static void
478 gst_fd_sink_get_property (GObject * object, guint prop_id, GValue * value,
479     GParamSpec * pspec)
480 {
481   GstFdSink *fdsink;
482
483   fdsink = GST_FD_SINK (object);
484
485   switch (prop_id) {
486     case ARG_FD:
487       g_value_set_int (value, fdsink->fd);
488       break;
489     default:
490       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
491       break;
492   }
493 }
494
495 static gboolean
496 gst_fd_sink_do_seek (GstFdSink * fdsink, guint64 new_offset)
497 {
498   off_t result;
499
500   result = lseek (fdsink->fd, new_offset, SEEK_SET);
501
502   if (result == -1)
503     goto seek_failed;
504
505   fdsink->current_pos = new_offset;
506
507   GST_DEBUG_OBJECT (fdsink, "File descriptor %d to seek to position "
508       "%" G_GUINT64_FORMAT, fdsink->fd, fdsink->current_pos);
509
510   return TRUE;
511
512   /* ERRORS */
513 seek_failed:
514   {
515     GST_DEBUG_OBJECT (fdsink, "File descriptor %d failed to seek to position "
516         "%" G_GUINT64_FORMAT, fdsink->fd, new_offset);
517     return FALSE;
518   }
519 }
520
521 static gboolean
522 gst_fd_sink_event (GstBaseSink * sink, GstEvent * event)
523 {
524   GstEventType type;
525   GstFdSink *fdsink;
526
527   fdsink = GST_FD_SINK (sink);
528
529   type = GST_EVENT_TYPE (event);
530
531   switch (type) {
532     case GST_EVENT_SEGMENT:
533     {
534       const GstSegment *segment;
535
536       gst_event_parse_segment (event, &segment);
537
538       if (segment->format == GST_FORMAT_BYTES) {
539         /* only try to seek and fail when we are going to a different
540          * position */
541         if (fdsink->current_pos != segment->start) {
542           /* FIXME, the seek should be performed on the pos field, start/stop are
543            * just boundaries for valid bytes offsets. We should also fill the file
544            * with zeroes if the new position extends the current EOF (sparse streams
545            * and segment accumulation). */
546           if (!gst_fd_sink_do_seek (fdsink, (guint64) segment->start))
547             goto seek_failed;
548         }
549       } else {
550         GST_DEBUG_OBJECT (fdsink,
551             "Ignored SEGMENT event of format %u (%s)", (guint) segment->format,
552             gst_format_get_name (segment->format));
553       }
554       break;
555     }
556     default:
557       break;
558   }
559
560   return GST_BASE_SINK_CLASS (parent_class)->event (sink, event);
561
562 seek_failed:
563   {
564     GST_ELEMENT_ERROR (fdsink, RESOURCE, SEEK, (NULL),
565         ("Error while seeking on file descriptor %d: %s",
566             fdsink->fd, g_strerror (errno)));
567     gst_event_unref (event);
568     return FALSE;
569   }
570
571 }
572
573 /*** GSTURIHANDLER INTERFACE *************************************************/
574
575 static GstURIType
576 gst_fd_sink_uri_get_type (GType type)
577 {
578   return GST_URI_SINK;
579 }
580
581 static const gchar *const *
582 gst_fd_sink_uri_get_protocols (GType type)
583 {
584   static const gchar *protocols[] = { "fd", NULL };
585
586   return protocols;
587 }
588
589 static gchar *
590 gst_fd_sink_uri_get_uri (GstURIHandler * handler)
591 {
592   GstFdSink *sink = GST_FD_SINK (handler);
593
594   /* FIXME: make thread-safe */
595   return g_strdup (sink->uri);
596 }
597
598 static gboolean
599 gst_fd_sink_uri_set_uri (GstURIHandler * handler, const gchar * uri,
600     GError ** error)
601 {
602   GstFdSink *sink = GST_FD_SINK (handler);
603   gint fd;
604
605   if (sscanf (uri, "fd://%d", &fd) != 1) {
606     g_set_error (error, GST_URI_ERROR, GST_URI_ERROR_BAD_URI,
607         "File descriptor URI could not be parsed");
608     return FALSE;
609   }
610
611   return gst_fd_sink_update_fd (sink, fd, error);
612 }
613
614 static void
615 gst_fd_sink_uri_handler_init (gpointer g_iface, gpointer iface_data)
616 {
617   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
618
619   iface->get_type = gst_fd_sink_uri_get_type;
620   iface->get_protocols = gst_fd_sink_uri_get_protocols;
621   iface->get_uri = gst_fd_sink_uri_get_uri;
622   iface->set_uri = gst_fd_sink_uri_set_uri;
623 }