libs/gst/base/: Add ::unlock_stop to basesrc and basesink. This allows an opportunity...
[platform/upstream/gstreamer.git] / plugins / elements / gstfdsrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *                    2000 Wim Taymans <wtay@chello.be>
4  *                    2005 Philippe Khalaf <burger@speedy.org>
5  *
6  * gstfdsrc.c:
7  *
8  * This library is free software; you can redistribute it and/or
9  * modify it under the terms of the GNU Library General Public
10  * License as published by the Free Software Foundation; either
11  * version 2 of the License, or (at your option) any later version.
12  *
13  * This library is distributed in the hope that it will be useful,
14  * but WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * Library General Public License for more details.
17  *
18  * You should have received a copy of the GNU Library General Public
19  * License along with this library; if not, write to the
20  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
21  * Boston, MA 02111-1307, USA.
22  */
23 /**
24  * SECTION:element-fdsrc
25  * @short_description: read from a unix file descriptor
26  * @see_also: #GstFdSink
27  *
28  * Read data from a unix file descriptor.
29  */
30
31
32 #ifdef HAVE_CONFIG_H
33 #  include "config.h"
34 #endif
35 #include "gst/gst_private.h"
36
37 #include <sys/types.h>
38 #include <sys/stat.h>
39 #include <sys/socket.h>
40 #include <fcntl.h>
41 #include <stdio.h>
42 #ifdef HAVE_UNISTD_H
43 #include <unistd.h>
44 #endif
45 #ifdef _MSC_VER
46 #include <io.h>
47 #endif
48 #include <stdlib.h>
49 #include <errno.h>
50
51 #include "gstfdsrc.h"
52
53 /* the select call is also performed on the control sockets, that way
54  * we can send special commands to unblock the select call */
55 #define CONTROL_STOP            'S'     /* stop the select call */
56 #define CONTROL_SOCKETS(src)   src->control_sock
57 #define WRITE_SOCKET(src)      src->control_sock[1]
58 #define READ_SOCKET(src)       src->control_sock[0]
59
60 #define SEND_COMMAND(src, command)          \
61 G_STMT_START {                              \
62   unsigned char c; c = command;             \
63   write (WRITE_SOCKET(src), &c, 1);         \
64 } G_STMT_END
65
66 #define READ_COMMAND(src, command, res)        \
67 G_STMT_START {                                 \
68   res = read(READ_SOCKET(src), &command, 1);   \
69 } G_STMT_END
70
71 #define DEFAULT_BLOCKSIZE       4096
72
73 static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
74     GST_PAD_SRC,
75     GST_PAD_ALWAYS,
76     GST_STATIC_CAPS_ANY);
77
78 GST_DEBUG_CATEGORY_STATIC (gst_fd_src_debug);
79 #define GST_CAT_DEFAULT gst_fd_src_debug
80
81 static const GstElementDetails gst_fd_src_details =
82 GST_ELEMENT_DETAILS ("Disk Source",
83     "Source/File",
84     "Synchronous read from a file",
85     "Erik Walthinsen <omega@cse.ogi.edu>");
86
87 enum
88 {
89   PROP_0,
90   PROP_FD,
91 };
92
93 static void gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data);
94
95 static void
96 _do_init (GType fd_src_type)
97 {
98   static const GInterfaceInfo urihandler_info = {
99     gst_fd_src_uri_handler_init,
100     NULL,
101     NULL
102   };
103
104   g_type_add_interface_static (fd_src_type, GST_TYPE_URI_HANDLER,
105       &urihandler_info);
106
107   GST_DEBUG_CATEGORY_INIT (gst_fd_src_debug, "fdsrc", 0, "fdsrc element");
108 }
109
110 GST_BOILERPLATE_FULL (GstFdSrc, gst_fd_src, GstElement, GST_TYPE_PUSH_SRC,
111     _do_init);
112
113 static void gst_fd_src_set_property (GObject * object, guint prop_id,
114     const GValue * value, GParamSpec * pspec);
115 static void gst_fd_src_get_property (GObject * object, guint prop_id,
116     GValue * value, GParamSpec * pspec);
117 static void gst_fd_src_dispose (GObject * obj);
118
119 static gboolean gst_fd_src_start (GstBaseSrc * bsrc);
120 static gboolean gst_fd_src_stop (GstBaseSrc * bsrc);
121 static gboolean gst_fd_src_unlock (GstBaseSrc * bsrc);
122 static gboolean gst_fd_src_unlock_stop (GstBaseSrc * bsrc);
123 static gboolean gst_fd_src_is_seekable (GstBaseSrc * bsrc);
124 static gboolean gst_fd_src_get_size (GstBaseSrc * src, guint64 * size);
125 static gboolean gst_fd_src_do_seek (GstBaseSrc * src, GstSegment * segment);
126
127 static GstFlowReturn gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf);
128
129 static void
130 gst_fd_src_base_init (gpointer g_class)
131 {
132   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
133
134   gst_element_class_add_pad_template (gstelement_class,
135       gst_static_pad_template_get (&srctemplate));
136   gst_element_class_set_details (gstelement_class, &gst_fd_src_details);
137 }
138
139 static void
140 gst_fd_src_class_init (GstFdSrcClass * klass)
141 {
142   GObjectClass *gobject_class;
143   GstBaseSrcClass *gstbasesrc_class;
144   GstElementClass *gstelement_class;
145   GstPushSrcClass *gstpush_src_class;
146
147   gobject_class = G_OBJECT_CLASS (klass);
148   gstelement_class = GST_ELEMENT_CLASS (klass);
149   gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
150   gstpush_src_class = GST_PUSH_SRC_CLASS (klass);
151
152   parent_class = g_type_class_peek_parent (klass);
153
154   gobject_class->set_property = gst_fd_src_set_property;
155   gobject_class->get_property = gst_fd_src_get_property;
156   gobject_class->dispose = gst_fd_src_dispose;
157
158   g_object_class_install_property (gobject_class, PROP_FD,
159       g_param_spec_int ("fd", "fd", "An open file descriptor to read from",
160           0, G_MAXINT, 0, G_PARAM_READWRITE));
161
162   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_fd_src_start);
163   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_fd_src_stop);
164   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_fd_src_unlock);
165   gstbasesrc_class->unlock_stop = GST_DEBUG_FUNCPTR (gst_fd_src_unlock_stop);
166   gstbasesrc_class->is_seekable = GST_DEBUG_FUNCPTR (gst_fd_src_is_seekable);
167   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_fd_src_get_size);
168   gstbasesrc_class->do_seek = GST_DEBUG_FUNCPTR (gst_fd_src_do_seek);
169
170   gstpush_src_class->create = GST_DEBUG_FUNCPTR (gst_fd_src_create);
171 }
172
173 static void
174 gst_fd_src_init (GstFdSrc * fdsrc, GstFdSrcClass * klass)
175 {
176   fdsrc->fd = -1;
177   fdsrc->new_fd = 0;
178   fdsrc->seekable_fd = FALSE;
179   fdsrc->uri = g_strdup_printf ("fd://0");
180   fdsrc->curoffset = 0;
181 }
182
183 static void
184 gst_fd_src_dispose (GObject * obj)
185 {
186   GstFdSrc *src = GST_FD_SRC (obj);
187
188   g_free (src->uri);
189   src->uri = NULL;
190
191   G_OBJECT_CLASS (parent_class)->dispose (obj);
192 }
193
194 static void
195 gst_fd_src_update_fd (GstFdSrc * src)
196 {
197   struct stat stat_results;
198
199   if (src->fd != src->new_fd) {
200     GST_INFO_OBJECT (src, "Updating to fd %d", src->new_fd);
201
202     src->fd = src->new_fd;
203
204     g_free (src->uri);
205     src->uri = g_strdup_printf ("fd://%d", src->fd);
206
207     if (fstat (src->fd, &stat_results) < 0)
208       goto not_seekable;
209
210     if (!S_ISREG (stat_results.st_mode))
211       goto not_seekable;
212
213     /* Try a seek of 0 bytes offset to check for seekability */
214     if (lseek (src->fd, 0, SEEK_CUR) < 0)
215       goto not_seekable;
216
217     GST_INFO_OBJECT (src, "marking fd %d as seekable", src->fd);
218     src->seekable_fd = TRUE;
219   }
220   return;
221
222 not_seekable:
223   {
224     GST_INFO_OBJECT (src, "marking fd %d as NOT seekable", src->fd);
225     src->seekable_fd = FALSE;
226   }
227 }
228
229 static gboolean
230 gst_fd_src_start (GstBaseSrc * bsrc)
231 {
232   GstFdSrc *src = GST_FD_SRC (bsrc);
233   gint control_sock[2];
234
235   src->curoffset = 0;
236
237   gst_fd_src_update_fd (src);
238
239   if (socketpair (PF_UNIX, SOCK_STREAM, 0, control_sock) < 0)
240     goto socket_pair;
241
242   READ_SOCKET (src) = control_sock[0];
243   WRITE_SOCKET (src) = control_sock[1];
244
245   fcntl (READ_SOCKET (src), F_SETFL, O_NONBLOCK);
246   fcntl (WRITE_SOCKET (src), F_SETFL, O_NONBLOCK);
247
248   return TRUE;
249
250   /* ERRORS */
251 socket_pair:
252   {
253     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ_WRITE, (NULL),
254         GST_ERROR_SYSTEM);
255     return FALSE;
256   }
257 }
258
259 static gboolean
260 gst_fd_src_stop (GstBaseSrc * bsrc)
261 {
262   GstFdSrc *src = GST_FD_SRC (bsrc);
263
264   close (READ_SOCKET (src));
265   close (WRITE_SOCKET (src));
266
267   return TRUE;
268 }
269
270 static gboolean
271 gst_fd_src_unlock (GstBaseSrc * bsrc)
272 {
273   GstFdSrc *src = GST_FD_SRC (bsrc);
274
275   GST_LOG_OBJECT (src, "sending unlock command");
276   SEND_COMMAND (src, CONTROL_STOP);
277
278   return TRUE;
279 }
280
281 static gboolean
282 gst_fd_src_unlock_stop (GstBaseSrc * bsrc)
283 {
284   GstFdSrc *src = GST_FD_SRC (bsrc);
285
286   GST_LOG_OBJECT (src, "clearing unlock command queue");
287
288   /* read all stop commands */
289   while (TRUE) {
290     gchar command;
291     int res;
292
293     GST_LOG_OBJECT (src, "reading command");
294
295     READ_COMMAND (src, command, res);
296     if (res < 0) {
297       GST_LOG_OBJECT (src, "no more commands");
298       /* no more commands */
299       break;
300     }
301   }
302
303   return TRUE;
304 }
305
306 static void
307 gst_fd_src_set_property (GObject * object, guint prop_id, const GValue * value,
308     GParamSpec * pspec)
309 {
310   GstFdSrc *src = GST_FD_SRC (object);
311
312   switch (prop_id) {
313     case PROP_FD:
314       src->new_fd = g_value_get_int (value);
315
316       /* If state is ready or below, update the current fd immediately
317        * so it is reflected in get_properties and uri */
318       GST_OBJECT_LOCK (object);
319       if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
320         GST_DEBUG_OBJECT (src, "state ready or lower, updating to use new fd");
321         gst_fd_src_update_fd (src);
322       } else {
323         GST_DEBUG_OBJECT (src, "state above ready, not updating to new fd yet");
324       }
325       GST_OBJECT_UNLOCK (object);
326       break;
327     default:
328       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
329       break;
330   }
331 }
332
333 static void
334 gst_fd_src_get_property (GObject * object, guint prop_id, GValue * value,
335     GParamSpec * pspec)
336 {
337   GstFdSrc *src = GST_FD_SRC (object);
338
339   switch (prop_id) {
340     case PROP_FD:
341       g_value_set_int (value, src->fd);
342       break;
343     default:
344       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
345       break;
346   }
347 }
348
349 static GstFlowReturn
350 gst_fd_src_create (GstPushSrc * psrc, GstBuffer ** outbuf)
351 {
352   GstFdSrc *src;
353   GstBuffer *buf;
354   gssize readbytes;
355   guint blocksize;
356
357 #ifndef HAVE_WIN32
358   fd_set readfds;
359   gint retval;
360 #endif
361
362   src = GST_FD_SRC (psrc);
363
364 #ifndef HAVE_WIN32
365   FD_ZERO (&readfds);
366   FD_SET (src->fd, &readfds);
367   FD_SET (READ_SOCKET (src), &readfds);
368
369   do {
370     retval = select (FD_SETSIZE, &readfds, NULL, NULL, NULL);
371   } while ((retval == -1 && errno == EINTR));
372
373   if (retval == -1)
374     goto select_error;
375
376   if (FD_ISSET (READ_SOCKET (src), &readfds))
377     goto stopped;
378 #endif
379
380   blocksize = GST_BASE_SRC (src)->blocksize;
381
382   /* create the buffer */
383   buf = gst_buffer_new_and_alloc (blocksize);
384
385   do {
386     readbytes = read (src->fd, GST_BUFFER_DATA (buf), blocksize);
387     GST_LOG_OBJECT (src, "read %" G_GSSIZE_FORMAT, readbytes);
388   } while (readbytes == -1 && errno == EINTR);  /* retry if interrupted */
389
390   if (readbytes < 0)
391     goto read_error;
392
393   if (readbytes == 0)
394     goto eos;
395
396   GST_BUFFER_OFFSET (buf) = src->curoffset;
397   GST_BUFFER_SIZE (buf) = readbytes;
398   GST_BUFFER_TIMESTAMP (buf) = GST_CLOCK_TIME_NONE;
399   src->curoffset += readbytes;
400
401   GST_LOG_OBJECT (psrc, "Read buffer of size %" G_GSSIZE_FORMAT, readbytes);
402
403   /* we're done, return the buffer */
404   *outbuf = buf;
405
406   return GST_FLOW_OK;
407
408   /* ERRORS */
409 #ifndef HAVE_WIN32
410 select_error:
411   {
412     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
413         ("select on file descriptor: %s.", g_strerror (errno)));
414     GST_DEBUG_OBJECT (psrc, "Error during select");
415     return GST_FLOW_ERROR;
416   }
417 stopped:
418   {
419     GST_DEBUG_OBJECT (psrc, "Select stopped");
420     return GST_FLOW_WRONG_STATE;
421   }
422 #endif
423 eos:
424   {
425     GST_DEBUG_OBJECT (psrc, "Read 0 bytes. EOS.");
426     gst_buffer_unref (buf);
427     return GST_FLOW_UNEXPECTED;
428   }
429 read_error:
430   {
431     GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
432         ("read on file descriptor: %s.", g_strerror (errno)));
433     GST_DEBUG_OBJECT (psrc, "Error reading from fd");
434     gst_buffer_unref (buf);
435     return GST_FLOW_ERROR;
436   }
437 }
438
439 gboolean
440 gst_fd_src_is_seekable (GstBaseSrc * bsrc)
441 {
442   GstFdSrc *src = GST_FD_SRC (bsrc);
443
444   return src->seekable_fd;
445 }
446
447 gboolean
448 gst_fd_src_get_size (GstBaseSrc * bsrc, guint64 * size)
449 {
450   GstFdSrc *src = GST_FD_SRC (bsrc);
451   struct stat stat_results;
452
453   if (!src->seekable_fd) {
454     /* If it isn't seekable, we won't know the length (but fstat will still
455      * succeed, and wrongly say our length is zero. */
456     return FALSE;
457   }
458
459   if (fstat (src->fd, &stat_results) < 0)
460     goto could_not_stat;
461
462   *size = stat_results.st_size;
463
464   return TRUE;
465
466   /* ERROR */
467 could_not_stat:
468   {
469     return FALSE;
470   }
471 }
472
473 gboolean
474 gst_fd_src_do_seek (GstBaseSrc * bsrc, GstSegment * segment)
475 {
476   gint res;
477   gint64 offset;
478   GstFdSrc *src = GST_FD_SRC (bsrc);
479
480   offset = segment->start;
481
482   /* No need to seek to the current position */
483   if (offset == src->curoffset)
484     return TRUE;
485
486   res = lseek (src->fd, offset, SEEK_SET);
487   if (G_UNLIKELY (res < 0 || res != offset))
488     goto seek_failed;
489
490   segment->last_stop = segment->start;
491   segment->time = segment->start;
492
493   return TRUE;
494
495 seek_failed:
496   GST_DEBUG_OBJECT (src, "lseek returned %" G_GINT64_FORMAT, offset);
497   return FALSE;
498 }
499
500 /*** GSTURIHANDLER INTERFACE *************************************************/
501
502 static GstURIType
503 gst_fd_src_uri_get_type (void)
504 {
505   return GST_URI_SRC;
506 }
507 static gchar **
508 gst_fd_src_uri_get_protocols (void)
509 {
510   static gchar *protocols[] = { "fd", NULL };
511
512   return protocols;
513 }
514 static const gchar *
515 gst_fd_src_uri_get_uri (GstURIHandler * handler)
516 {
517   GstFdSrc *src = GST_FD_SRC (handler);
518
519   return src->uri;
520 }
521
522 static gboolean
523 gst_fd_src_uri_set_uri (GstURIHandler * handler, const gchar * uri)
524 {
525   gchar *protocol;
526   GstFdSrc *src = GST_FD_SRC (handler);
527   gint fd;
528
529   protocol = gst_uri_get_protocol (uri);
530   if (strcmp (protocol, "fd") != 0) {
531     g_free (protocol);
532     return FALSE;
533   }
534   g_free (protocol);
535
536   if (sscanf (uri, "fd://%d", &fd) != 1)
537     return FALSE;
538
539   src->new_fd = fd;
540
541   GST_OBJECT_LOCK (src);
542   if (GST_STATE (GST_ELEMENT (src)) <= GST_STATE_READY) {
543     gst_fd_src_update_fd (src);
544   }
545   GST_OBJECT_UNLOCK (src);
546
547   return TRUE;
548 }
549
550 static void
551 gst_fd_src_uri_handler_init (gpointer g_iface, gpointer iface_data)
552 {
553   GstURIHandlerInterface *iface = (GstURIHandlerInterface *) g_iface;
554
555   iface->get_type = gst_fd_src_uri_get_type;
556   iface->get_protocols = gst_fd_src_uri_get_protocols;
557   iface->get_uri = gst_fd_src_uri_get_uri;
558   iface->set_uri = gst_fd_src_uri_set_uri;
559 }