f2c82e3c019acc22ae555fda7928a0dc210e5708
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2005 Philippe Khalaf <burger@speedy.org>
5  *
6  * gstfdsrc.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23
24 #ifdef HAVE_CONFIG_H
25 #  include "config.h"
26 #endif
27 #include "gst/gst_private.h"
28
29 #include <sys/types.h>
30 #include <sys/stat.h>
31 #include <sys/socket.h>
32 #include <fcntl.h>
33 #include <stdio.h>
34 #ifdef HAVE_UNISTD_H
35 #include <unistd.h>
36 #endif
37 #ifdef _MSC_VER
38 #include <io.h>
39 #endif
40 #include <stdlib.h>
41 #include <errno.h>
42
43 #include "gstfdsrc.h"
44
45 /* the select call is also performed on the control sockets, that way
46  * we can send special commands to unblock the select call */
47 #define CONTROL_STOP            'S'     /* stop the select call */
48 #define CONTROL_SOCKETS(src)   src->control_sock
49 #define WRITE_SOCKET(src)      src->control_sock[1]
50 #define READ_SOCKET(src)       src->control_sock[0]
51
52 #define SEND_COMMAND(src, command)          \
53 G_STMT_START {                              \
54   unsigned char c; c = command;             \
55   write (WRITE_SOCKET(src), &c, 1);         \
56 } G_STMT_END
57
58 #define READ_COMMAND(src, command, res)        \
59 G_STMT_START {                                 \
60   res = read(READ_SOCKET(src), &command, 1);   \
61 } G_STMT_END
62
63 #define DEFAULT_BLOCKSIZE       4096
64
65 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
66     GST_PAD_SRC,
67     GST_PAD_ALWAYS,
68     GST_STATIC_CAPS_ANY);
69
70 GST_DEBUG_CATEGORY_STATIC (gst_fd_src_debug);
71 #define GST_CAT_DEFAULT gst_fd_src_debug
72
73 static GstElementDetails gst_fd_src_details =
74 GST_ELEMENT_DETAILS ("Disk Source",
75     "Source/File",
76     "Synchronous read from a file",
77     "Erik Walthinsen <omega@cse.ogi.edu>");
78
79 enum
80 {
81   PROP_0,
82   PROP_FD,
83 };
84
85 static void gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data);
86
87 static void
88 _do_init (GType fd_src_type)
89 {
90   static const GInterfaceInfo urihandler_info = {
91     gst_fd_src_uri_handler_init,
92     NULL,
93     NULL
94   };
95
96   g_type_add_interface_static (fd_src_type, GST_TYPE_URI_HANDLER,
97       &urihandler_info);
98
99   GST_DEBUG_CATEGORY_INIT (gst_fd_src_debug, "fdsrc", 0, "fdsrc element");
100 }
101
102 GST_BOILERPLATE_FULL (GstFdSrc, gst_fd_src, GstElement, GST_TYPE_PUSH_SRC,
103     _do_init);
104
105 static void gst_fd_src_set_property (GObject * object, guint prop_id,
106     const GValue * value, GParamSpec * pspec);
107 static void gst_fd_src_get_property (GObject * object, guint prop_id,
108     GValue * value, GParamSpec * pspec);
109 static void gst_fd_src_dispose (GObject * obj);
110
111 static gboolean gst_fd_src_start (GstBaseSrc * bsrc);
112 static gboolean gst_fd_src_stop (GstBaseSrc * bsrc);
113 static gboolean gst_fd_src_unlock (GstBaseSrc * bsrc);
114 static gboolean gst_fd_src_is_seekable (GstBaseSrc * bsrc);
115 static gboolean gst_fd_src_get_size (GstBaseSrc * src, guint64 * size);
116
117 static GstFlowReturn gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf);
118
119 static void
120 gst_fd_src_base_init (gpointer g_class)
121 {
122   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
123
124   gst_element_class_add_pad_template (gstelement_class,
125       gst_static_pad_template_get (&srctemplate));
126   gst_element_class_set_details (gstelement_class, &gst_fd_src_details);
127 }
128
129 static void
130 gst_fd_src_class_init (GstFdSrcClass * klass)
131 {
132   GObjectClass *gobject_class;
133   GstBaseSrcClass *gstbasesrc_class;
134   GstElementClass *gstelement_class;
135   GstPushSrcClass *gstpush_src_class;
136
137   gobject_class = G_OBJECT_CLASS (klass);
138   gstelement_class = GST_ELEMENT_CLASS (klass);
139   gstbasesrc_class = (GstBaseSrcClass *) klass;
140   gstpush_src_class = (GstPushSrcClass *) klass;
141
142   parent_class = g_type_class_ref (GST_TYPE_PUSH_SRC);
143
144   gobject_class->set_property = gst_fd_src_set_property;
145   gobject_class->get_property = gst_fd_src_get_property;
146   gobject_class->dispose = gst_fd_src_dispose;
147
148   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_FD,
149       g_param_spec_int ("fd", "fd", "An open file descriptor to read from",
150           0, G_MAXINT, 0, G_PARAM_READWRITE));
151
152   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_fd_src_start);
153   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_fd_src_stop);
154   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_src_unlock);
155   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_fd_src_is_seekable);
156   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_fd_src_get_size);
157
158   gstpush_src_class->create = GST_DEBUG_FUNCPTR (gst_fd_src_create);
159 }
160
161 static void
162 gst_fd_src_init (GstFdSrc * fdsrc, GstFdSrcClass * klass)
163 {
164   fdsrc->fd = 0;
165   fdsrc->new_fd = 0;
166   fdsrc->seekable_fd = FALSE;
167   fdsrc->uri = g_strdup_printf ("fd://%d", fdsrc->fd);
168   fdsrc->curoffset = 0;
169 }
170
171 static void
172 gst_fd_src_dispose (GObject * obj)
173 {
174   GstFdSrc *src = GST_FD_SRC (obj);
175
176   g_free (src->uri);
177   src->uri = NULL;
178
179   G_OBJECT_CLASS (parent_class)->dispose (obj);
180 }
181
182 static void
183 gst_fd_src_update_fd (GstFdSrc * src)
184 {
185   struct stat stat_results;
186
187   src->fd = src->new_fd;
188   g_free (src->uri);
189   src->uri = g_strdup_printf ("fd://%d", src->fd);
190
191   if (fstat (src->fd, &stat_results) < 0)
192     goto not_seekable;
193
194   if (!S_ISREG (stat_results.st_mode))
195     goto not_seekable;
196
197   /* Try a seek of 0 bytes offset to check for seekability */
198   if (lseek (src->fd, SEEK_CUR, 0) < 0)
199     goto not_seekable;
200
201   src->seekable_fd = TRUE;
202   return;
203
204 not_seekable:
205   src->seekable_fd = FALSE;
206 }
207
208 static gboolean
209 gst_fd_src_start (GstBaseSrc * bsrc)
210 {
211   GstFdSrc *src = GST_FD_SRC (bsrc);
212   gint control_sock[2];
213
214   src->curoffset = 0;
215
216   gst_fd_src_update_fd (src);
217
218   if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
219     goto socket_pair;
220
221   READ_SOCKET (src) = control_sock[0];
222   WRITE_SOCKET (src) = control_sock[1];
223
224   fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
225   fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
226
227   return TRUE;
228
229   /* ERRORS */
230 socket_pair:
231   {
232     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
233         GST_ERROR_SYSTEM);
234     return FALSE;
235   }
236 }
237
238 static gboolean
239 gst_fd_src_stop (GstBaseSrc * bsrc)
240 {
241   GstFdSrc *src = GST_FD_SRC (bsrc);
242
243   close (READ_SOCKET (src));
244   close (WRITE_SOCKET (src));
245
246   return TRUE;
247 }
248
249 static gboolean
250 gst_fd_src_unlock (GstBaseSrc * bsrc)
251 {
252   GstFdSrc *src = GST_FD_SRC (bsrc);
253
254   SEND_COMMAND (src, CONTROL_STOP);
255
256   return TRUE;
257 }
258
259 static void
260 gst_fd_src_set_property (GObject * object, guint prop_id, const GValue * value,
261     GParamSpec * pspec)
262 {
263   GstFdSrc *src = GST_FD_SRC (object);
264
265   switch (prop_id) {
266     case PROP_FD:
267       src->new_fd = g_value_get_int (value);
268
269       /* If state is ready or below, update the current fd immediately
270        * so it is reflected in get_properties and uri */
271       GST_OBJECT_LOCK (object);
272       if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
273         gst_fd_src_update_fd (src);
274       }
275       GST_OBJECT_UNLOCK (object);
276       break;
277     default:
278       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
279       break;
280   }
281 }
282
283 static void
284 gst_fd_src_get_property (GObject * object, guint prop_id, GValue * value,
285     GParamSpec * pspec)
286 {
287   GstFdSrc *src = GST_FD_SRC (object);
288
289   switch (prop_id) {
290     case PROP_FD:
291       g_value_set_int (value, src->fd);
292       break;
293     default:
294       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
295       break;
296   }
297 }
298
299 static GstFlowReturn
300 gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
301 {
302   GstFdSrc *src;
303   GstBuffer *buf;
304   glong readbytes;
305   guint blocksize;
306
307 #ifndef HAVE_WIN32
308   fd_set readfds;
309   gint retval;
310 #endif
311
312   src = GST_FD_SRC (psrc);
313
314 #ifndef HAVE_WIN32
315   FD_ZERO (&readfds);
316   FD_SET (src->fd, &readfds);
317   FD_SET (READ_SOCKET (src), &readfds);
318
319   do {
320     retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
321   } while ((retval == -1 && errno == EINTR));
322
323   if (retval == -1)
324     goto select_error;
325
326   if (FD_ISSET (READ_SOCKET (src), &readfds)) {
327     /* read all stop commands */
328     while (TRUE) {
329       gchar command;
330       int res;
331
332       READ_COMMAND (src, command, res);
333       if (res < 0) {
334         GST_LOG_OBJECT (src, "no more commands");
335         /* no more commands */
336         break;
337       }
338     }
339     goto stopped;
340   }
341 #endif
342
343   blocksize = GST_BASE_SRC (src)->blocksize;
344
345   /* create the buffer */
346   buf = gst_buffer_new_and_alloc (blocksize);
347
348   do {
349     readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize);
350   } while (readbytes == -1 && errno == EINTR);  /* retry if interrupted */
351
352   if (readbytes < 0)
353     goto read_error;
354
355   if (readbytes == 0)
356     goto eos;
357
358   GST_BUFFER_OFFSET (buf) = src->curoffset;
359   GST_BUFFER_SIZE (buf) = readbytes;
360   GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE;
361   src->curoffset += readbytes;
362
363   GST_DEBUG_OBJECT (psrc, "Read buffer of size %u.", readbytes);
364
365   /* we're done, return the buffer */
366   *outbuf = buf;
367
368   return GST_FLOW_OK;
369
370   /* ERRORS */
371 #ifndef HAVE_WIN32
372 select_error:
373   {
374     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
375         ("select on file descriptor: %s.", g_strerror (errno)));
376     GST_DEBUG_OBJECT (psrc, "Error during select");
377     return GST_FLOW_ERROR;
378   }
379 stopped:
380   {
381     GST_DEBUG_OBJECT (psrc, "Select stopped");
382     return GST_FLOW_WRONG_STATE;
383   }
384 #endif
385 eos:
386   {
387     GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS.");
388     gst_buffer_unref (buf);
389     return GST_FLOW_UNEXPECTED;
390   }
391 read_error:
392   {
393     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
394         ("read on file descriptor: %s.", g_strerror (errno)));
395     GST_DEBUG_OBJECT (psrc, "Error reading from fd");
396     gst_buffer_unref (buf);
397     return GST_FLOW_ERROR;
398   }
399 }
400
401 gboolean
402 gst_fd_src_is_seekable (GstBaseSrc * bsrc)
403 {
404   GstFdSrc *src = GST_FD_SRC (bsrc);
405
406   return src->seekable_fd;
407 }
408
409 gboolean
410 gst_fd_src_get_size (GstBaseSrc * bsrc, guint64 * size)
411 {
412   GstFdSrc *src = GST_FD_SRC (bsrc);
413   struct stat stat_results;
414
415   if (!src->seekable_fd) {
416     /* If it isn't seekable, we won't know the length (but fstat will still
417      * succeed, and wrongly say our length is zero. */
418     return FALSE;
419   }
420
421   if (fstat (src->fd, &stat_results) < 0)
422     goto could_not_stat;
423
424   *size = stat_results.st_size;
425
426   return TRUE;
427
428   /* ERROR */
429 could_not_stat:
430   {
431     return FALSE;
432   }
433
434 }
435
436 /*** GSTURIHANDLER INTERFACE *************************************************/
437
438 static guint
439 gst_fd_src_uri_get_type (void)
440 {
441   return GST_URI_SRC;
442 }
443 static gchar **
444 gst_fd_src_uri_get_protocols (void)
445 {
446   static gchar *protocols[] = { "fd", NULL };
447
448   return protocols;
449 }
450 static const gchar *
451 gst_fd_src_uri_get_uri (GstURIHandler * handler)
452 {
453   GstFdSrc *src = GST_FD_SRC (handler);
454
455   return src->uri;
456 }
457
458 static gboolean
459 gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
460 {
461   gchar *protocol;
462   GstFdSrc *src = GST_FD_SRC (handler);
463   gint fd;
464
465   protocol = gst_uri_get_protocol (uri);
466   if (strcmp (protocol, "fd") != 0) {
467     g_free (protocol);
468     return FALSE;
469   }
470   g_free (protocol);
471
472   if (sscanf (uri, "fd://%d", &fd) != 1)
473     return FALSE;
474
475   src->new_fd = fd;
476
477   GST_OBJECT_LOCK (src);
478   if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
479     gst_fd_src_update_fd (src);
480   }
481   GST_OBJECT_UNLOCK (src);
482
483   return TRUE;
484 }
485
486 static void
487 gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
488 {
489   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
490
491   iface->get_type = gst_fd_src_uri_get_type;
492   iface->get_protocols = gst_fd_src_uri_get_protocols;
493   iface->get_uri = gst_fd_src_uri_get_uri;
494   iface->set_uri = gst_fd_src_uri_set_uri;
495 }