Don't use base_init where not absolutely necessary. For example it's not necessary...
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2005 Philippe Khalaf <burger@speedy.org>
5  *
6  * gstfdsrc.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:element-fdsrc
25  * @short_description: read from a unix file descriptor
26  * @see_also: #GstFdSink
27  *
28  * Read data from a unix file descriptor.
29  */
30
31
32 #ifdef HAVE_CONFIG_H
33 #  include "config.h"
34 #endif
35 #include "gst/gst_private.h"
36
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 #include <sys/socket.h>
40 #include <fcntl.h>
41 #include <stdio.h>
42 #ifdef HAVE_UNISTD_H
43 #include <unistd.h>
44 #endif
45 #ifdef _MSC_VER
46 #include <io.h>
47 #endif
48 #include <stdlib.h>
49 #include <errno.h>
50
51 #include "gstfdsrc.h"
52
53 /* the select call is also performed on the control sockets, that way
54  * we can send special commands to unblock the select call */
55 #define CONTROL_STOP            'S'     /* stop the select call */
56 #define CONTROL_SOCKETS(src)   src->control_sock
57 #define WRITE_SOCKET(src)      src->control_sock[1]
58 #define READ_SOCKET(src)       src->control_sock[0]
59
60 #define SEND_COMMAND(src, command)          \
61 G_STMT_START {                              \
62   unsigned char c; c = command;             \
63   write (WRITE_SOCKET(src), &c, 1);         \
64 } G_STMT_END
65
66 #define READ_COMMAND(src, command, res)        \
67 G_STMT_START {                                 \
68   res = read(READ_SOCKET(src), &command, 1);   \
69 } G_STMT_END
70
71 #define DEFAULT_BLOCKSIZE       4096
72
73 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
74     GST_PAD_SRC,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS_ANY);
77
78 GST_DEBUG_CATEGORY_STATIC (gst_fd_src_debug);
79 #define GST_CAT_DEFAULT gst_fd_src_debug
80
81 enum
82 {
83   PROP_0,
84   PROP_FD,
85 };
86
87 static void gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data);
88
89 static void
90 _do_init (GType fd_src_type)
91 {
92   static const GInterfaceInfo urihandler_info = {
93     gst_fd_src_uri_handler_init,
94     NULL,
95     NULL
96   };
97
98   g_type_add_interface_static (fd_src_type, GST_TYPE_URI_HANDLER,
99       &urihandler_info);
100
101   GST_DEBUG_CATEGORY_INIT (gst_fd_src_debug, "fdsrc", 0, "fdsrc element");
102 }
103
104 GST_BOILERPLATE_FULL (GstFdSrc, gst_fd_src, GstPushSrc, GST_TYPE_PUSH_SRC,
105     _do_init);
106
107 static void gst_fd_src_set_property (GObject * object, guint prop_id,
108     const GValue * value, GParamSpec * pspec);
109 static void gst_fd_src_get_property (GObject * object, guint prop_id,
110     GValue * value, GParamSpec * pspec);
111 static void gst_fd_src_dispose (GObject * obj);
112
113 static gboolean gst_fd_src_start (GstBaseSrc * bsrc);
114 static gboolean gst_fd_src_stop (GstBaseSrc * bsrc);
115 static gboolean gst_fd_src_unlock (GstBaseSrc * bsrc);
116 static gboolean gst_fd_src_unlock_stop (GstBaseSrc * bsrc);
117 static gboolean gst_fd_src_is_seekable (GstBaseSrc * bsrc);
118 static gboolean gst_fd_src_get_size (GstBaseSrc * src, guint64 * size);
119 static gboolean gst_fd_src_do_seek (GstBaseSrc * src, GstSegment * segment);
120
121 static GstFlowReturn gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf);
122
123 static void
124 gst_fd_src_base_init (gpointer g_class)
125 {
126 }
127
128 static void
129 gst_fd_src_class_init (GstFdSrcClass * klass)
130 {
131   GObjectClass *gobject_class;
132   GstBaseSrcClass *gstbasesrc_class;
133   GstElementClass *gstelement_class;
134   GstPushSrcClass *gstpush_src_class;
135
136   gobject_class = G_OBJECT_CLASS (klass);
137   gstelement_class = GST_ELEMENT_CLASS (klass);
138   gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
139   gstpush_src_class = GST_PUSH_SRC_CLASS (klass);
140
141   parent_class = g_type_class_peek_parent (klass);
142
143   gobject_class->set_property = gst_fd_src_set_property;
144   gobject_class->get_property = gst_fd_src_get_property;
145   gobject_class->dispose = gst_fd_src_dispose;
146
147   g_object_class_install_property (gobject_class, PROP_FD,
148       g_param_spec_int ("fd", "fd", "An open file descriptor to read from",
149           0, G_MAXINT, 0, G_PARAM_READWRITE));
150
151   gst_element_class_set_details_simple (gstelement_class,
152       "Filedescriptor Source",
153       "Source/File",
154       "Read from a file descriptor", "Erik Walthinsen <omega@cse.ogi.edu>");
155   gst_element_class_add_pad_template (gstelement_class,
156       gst_static_pad_template_get (&srctemplate));
157
158   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_fd_src_start);
159   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_fd_src_stop);
160   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_src_unlock);
161   gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_fd_src_unlock_stop);
162   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_fd_src_is_seekable);
163   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_fd_src_get_size);
164   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_fd_src_do_seek);
165
166   gstpush_src_class->create = GST_DEBUG_FUNCPTR (gst_fd_src_create);
167 }
168
169 static void
170 gst_fd_src_init (GstFdSrc * fdsrc, GstFdSrcClass * klass)
171 {
172   fdsrc->fd = -1;
173   fdsrc->new_fd = 0;
174   fdsrc->seekable_fd = FALSE;
175   fdsrc->uri = g_strdup_printf ("fd://0");
176   fdsrc->curoffset = 0;
177 }
178
179 static void
180 gst_fd_src_dispose (GObject * obj)
181 {
182   GstFdSrc *src = GST_FD_SRC (obj);
183
184   g_free (src->uri);
185   src->uri = NULL;
186
187   G_OBJECT_CLASS (parent_class)->dispose (obj);
188 }
189
190 static void
191 gst_fd_src_update_fd (GstFdSrc * src)
192 {
193   struct stat stat_results;
194
195   if (src->fd != src->new_fd) {
196     GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
197
198     src->fd = src->new_fd;
199
200     g_free (src->uri);
201     src->uri = g_strdup_printf ("fd://%d", src->fd);
202
203     if (fstat (src->fd, &stat_results) < 0)
204       goto not_seekable;
205
206     if (!S_ISREG (stat_results.st_mode))
207       goto not_seekable;
208
209     /* Try a seek of 0 bytes offset to check for seekability */
210     if (lseek (src->fd, 0, SEEK_CUR) < 0)
211       goto not_seekable;
212
213     GST_INFO_OBJECT (src, "marking fd %d as seekable", src->fd);
214     src->seekable_fd = TRUE;
215   }
216   return;
217
218 not_seekable:
219   {
220     GST_INFO_OBJECT (src, "marking fd %d as NOT seekable", src->fd);
221     src->seekable_fd = FALSE;
222   }
223 }
224
225 static gboolean
226 gst_fd_src_start (GstBaseSrc * bsrc)
227 {
228   GstFdSrc *src = GST_FD_SRC (bsrc);
229   gint control_sock[2];
230
231   src->curoffset = 0;
232
233   gst_fd_src_update_fd (src);
234
235   if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
236     goto socket_pair;
237
238   READ_SOCKET (src) = control_sock[0];
239   WRITE_SOCKET (src) = control_sock[1];
240
241   fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
242   fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
243
244   return TRUE;
245
246   /* ERRORS */
247 socket_pair:
248   {
249     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
250         GST_ERROR_SYSTEM);
251     return FALSE;
252   }
253 }
254
255 static gboolean
256 gst_fd_src_stop (GstBaseSrc * bsrc)
257 {
258   GstFdSrc *src = GST_FD_SRC (bsrc);
259
260   close (READ_SOCKET (src));
261   close (WRITE_SOCKET (src));
262
263   return TRUE;
264 }
265
266 static gboolean
267 gst_fd_src_unlock (GstBaseSrc * bsrc)
268 {
269   GstFdSrc *src = GST_FD_SRC (bsrc);
270
271   GST_LOG_OBJECT (src, "sending unlock command");
272   SEND_COMMAND (src, CONTROL_STOP);
273
274   return TRUE;
275 }
276
277 static gboolean
278 gst_fd_src_unlock_stop (GstBaseSrc * bsrc)
279 {
280   GstFdSrc *src = GST_FD_SRC (bsrc);
281
282   GST_LOG_OBJECT (src, "clearing unlock command queue");
283
284   /* read all stop commands */
285   while (TRUE) {
286     gchar command;
287     int res;
288
289     GST_LOG_OBJECT (src, "reading command");
290
291     READ_COMMAND (src, command, res);
292     if (res < 0) {
293       GST_LOG_OBJECT (src, "no more commands");
294       /* no more commands */
295       break;
296     }
297   }
298
299   return TRUE;
300 }
301
302 static void
303 gst_fd_src_set_property (GObject * object, guint prop_id, const GValue * value,
304     GParamSpec * pspec)
305 {
306   GstFdSrc *src = GST_FD_SRC (object);
307
308   switch (prop_id) {
309     case PROP_FD:
310       src->new_fd = g_value_get_int (value);
311
312       /* If state is ready or below, update the current fd immediately
313        * so it is reflected in get_properties and uri */
314       GST_OBJECT_LOCK (object);
315       if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
316         GST_DEBUG_OBJECT (src, "state ready or lower, updating to use new fd");
317         gst_fd_src_update_fd (src);
318       } else {
319         GST_DEBUG_OBJECT (src, "state above ready, not updating to new fd yet");
320       }
321       GST_OBJECT_UNLOCK (object);
322       break;
323     default:
324       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
325       break;
326   }
327 }
328
329 static void
330 gst_fd_src_get_property (GObject * object, guint prop_id, GValue * value,
331     GParamSpec * pspec)
332 {
333   GstFdSrc *src = GST_FD_SRC (object);
334
335   switch (prop_id) {
336     case PROP_FD:
337       g_value_set_int (value, src->fd);
338       break;
339     default:
340       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
341       break;
342   }
343 }
344
345 static GstFlowReturn
346 gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
347 {
348   GstFdSrc *src;
349   GstBuffer *buf;
350   gssize readbytes;
351   guint blocksize;
352
353 #ifndef HAVE_WIN32
354   fd_set readfds;
355   gint retval;
356 #endif
357
358   src = GST_FD_SRC (psrc);
359
360 #ifndef HAVE_WIN32
361   FD_ZERO (&readfds);
362   FD_SET (src->fd, &readfds);
363   FD_SET (READ_SOCKET (src), &readfds);
364
365   do {
366     retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
367   } while ((retval == -1 && errno == EINTR));
368
369   if (retval == -1)
370     goto select_error;
371
372   if (FD_ISSET (READ_SOCKET (src), &readfds))
373     goto stopped;
374 #endif
375
376   blocksize = GST_BASE_SRC (src)->blocksize;
377
378   /* create the buffer */
379   buf = gst_buffer_new_and_alloc (blocksize);
380
381   do {
382     readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize);
383     GST_LOG_OBJECT (src, "read %" G_GSSIZE_FORMAT, readbytes);
384   } while (readbytes == -1 && errno == EINTR);  /* retry if interrupted */
385
386   if (readbytes < 0)
387     goto read_error;
388
389   if (readbytes == 0)
390     goto eos;
391
392   GST_BUFFER_OFFSET (buf) = src->curoffset;
393   GST_BUFFER_SIZE (buf) = readbytes;
394   GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE;
395   src->curoffset += readbytes;
396
397   GST_LOG_OBJECT (psrc, "Read buffer of size %" G_GSSIZE_FORMAT, readbytes);
398
399   /* we're done, return the buffer */
400   *outbuf = buf;
401
402   return GST_FLOW_OK;
403
404   /* ERRORS */
405 #ifndef HAVE_WIN32
406 select_error:
407   {
408     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
409         ("select on file descriptor: %s.", g_strerror (errno)));
410     GST_DEBUG_OBJECT (psrc, "Error during select");
411     return GST_FLOW_ERROR;
412   }
413 stopped:
414   {
415     GST_DEBUG_OBJECT (psrc, "Select stopped");
416     return GST_FLOW_WRONG_STATE;
417   }
418 #endif
419 eos:
420   {
421     GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS.");
422     gst_buffer_unref (buf);
423     return GST_FLOW_UNEXPECTED;
424   }
425 read_error:
426   {
427     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
428         ("read on file descriptor: %s.", g_strerror (errno)));
429     GST_DEBUG_OBJECT (psrc, "Error reading from fd");
430     gst_buffer_unref (buf);
431     return GST_FLOW_ERROR;
432   }
433 }
434
435 gboolean
436 gst_fd_src_is_seekable (GstBaseSrc * bsrc)
437 {
438   GstFdSrc *src = GST_FD_SRC (bsrc);
439
440   return src->seekable_fd;
441 }
442
443 gboolean
444 gst_fd_src_get_size (GstBaseSrc * bsrc, guint64 * size)
445 {
446   GstFdSrc *src = GST_FD_SRC (bsrc);
447   struct stat stat_results;
448
449   if (!src->seekable_fd) {
450     /* If it isn't seekable, we won't know the length (but fstat will still
451      * succeed, and wrongly say our length is zero. */
452     return FALSE;
453   }
454
455   if (fstat (src->fd, &stat_results) < 0)
456     goto could_not_stat;
457
458   *size = stat_results.st_size;
459
460   return TRUE;
461
462   /* ERROR */
463 could_not_stat:
464   {
465     return FALSE;
466   }
467 }
468
469 gboolean
470 gst_fd_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
471 {
472   gint res;
473   gint64 offset;
474   GstFdSrc *src = GST_FD_SRC (bsrc);
475
476   offset = segment->start;
477
478   /* No need to seek to the current position */
479   if (offset == src->curoffset)
480     return TRUE;
481
482   res = lseek (src->fd, offset, SEEK_SET);
483   if (G_UNLIKELY (res < 0 || res != offset))
484     goto seek_failed;
485
486   segment->last_stop = segment->start;
487   segment->time = segment->start;
488
489   return TRUE;
490
491 seek_failed:
492   GST_DEBUG_OBJECT (src, "lseek returned %" G_GINT64_FORMAT, offset);
493   return FALSE;
494 }
495
496 /*** GSTURIHANDLER INTERFACE *************************************************/
497
498 static GstURIType
499 gst_fd_src_uri_get_type (void)
500 {
501   return GST_URI_SRC;
502 }
503 static gchar **
504 gst_fd_src_uri_get_protocols (void)
505 {
506   static gchar *protocols[] = { "fd", NULL };
507
508   return protocols;
509 }
510 static const gchar *
511 gst_fd_src_uri_get_uri (GstURIHandler * handler)
512 {
513   GstFdSrc *src = GST_FD_SRC (handler);
514
515   return src->uri;
516 }
517
518 static gboolean
519 gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
520 {
521   gchar *protocol;
522   GstFdSrc *src = GST_FD_SRC (handler);
523   gint fd;
524
525   protocol = gst_uri_get_protocol (uri);
526   if (strcmp (protocol, "fd") != 0) {
527     g_free (protocol);
528     return FALSE;
529   }
530   g_free (protocol);
531
532   if (sscanf (uri, "fd://%d", &fd) != 1)
533     return FALSE;
534
535   src->new_fd = fd;
536
537   GST_OBJECT_LOCK (src);
538   if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
539     gst_fd_src_update_fd (src);
540   }
541   GST_OBJECT_UNLOCK (src);
542
543   return TRUE;
544 }
545
546 static void
547 gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
548 {
549   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
550
551   iface->get_type = gst_fd_src_uri_get_type;
552   iface->get_protocols = gst_fd_src_uri_get_protocols;
553   iface->get_uri = gst_fd_src_uri_get_uri;
554   iface->set_uri = gst_fd_src_uri_set_uri;
555 }