tizen 2.3 release
[framework/multimedia/gst-plugins-base0.10.git] / ext / gio / gstgiobasesrc.c
1 /* GStreamer
2  *
3  * Copyright (C) 2007 Rene Stadler <mail@renestadler.de>
4  * Copyright (C) 2007-2009 Sebastian Dröge <sebastian.droege@collabora.co.uk>
5  * 
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 #ifdef HAVE_CONFIG_H
23 #include <config.h>
24 #endif
25
26 #include "gstgiobasesrc.h"
27
28 #include <gst/base/gsttypefindhelper.h>
29
30 GST_DEBUG_CATEGORY_STATIC (gst_gio_base_src_debug);
31 #define GST_CAT_DEFAULT gst_gio_base_src_debug
32
33 static GstStaticPadTemplate src_factory = GST_STATIC_PAD_TEMPLATE ("src",
34     GST_PAD_SRC,
35     GST_PAD_ALWAYS,
36     GST_STATIC_CAPS_ANY);
37
38 GST_BOILERPLATE (GstGioBaseSrc, gst_gio_base_src, GstBaseSrc,
39     GST_TYPE_BASE_SRC);
40
41 static void gst_gio_base_src_finalize (GObject * object);
42
43 static gboolean gst_gio_base_src_start (GstBaseSrc * base_src);
44 static gboolean gst_gio_base_src_stop (GstBaseSrc * base_src);
45 static gboolean gst_gio_base_src_get_size (GstBaseSrc * base_src,
46     guint64 * size);
47 static gboolean gst_gio_base_src_is_seekable (GstBaseSrc * base_src);
48 static gboolean gst_gio_base_src_unlock (GstBaseSrc * base_src);
49 static gboolean gst_gio_base_src_unlock_stop (GstBaseSrc * base_src);
50 static gboolean gst_gio_base_src_check_get_range (GstBaseSrc * base_src);
51 static GstFlowReturn gst_gio_base_src_create (GstBaseSrc * base_src,
52     guint64 offset, guint size, GstBuffer ** buf);
53 static gboolean gst_gio_base_src_query (GstBaseSrc * base_src,
54     GstQuery * query);
55
56 static void
57 gst_gio_base_src_base_init (gpointer gclass)
58 {
59   GstElementClass *element_class = GST_ELEMENT_CLASS (gclass);
60
61   GST_DEBUG_CATEGORY_INIT (gst_gio_base_src_debug, "gio_base_src", 0,
62       "GIO base source");
63
64   gst_element_class_add_static_pad_template (element_class, &src_factory);
65 }
66
67 static void
68 gst_gio_base_src_class_init (GstGioBaseSrcClass * klass)
69 {
70   GObjectClass *gobject_class = (GObjectClass *) klass;
71   GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
72
73   gobject_class->finalize = gst_gio_base_src_finalize;
74
75   gstbasesrc_class->start = GST_DEBUG_FUNCPTR (gst_gio_base_src_start);
76   gstbasesrc_class->stop = GST_DEBUG_FUNCPTR (gst_gio_base_src_stop);
77   gstbasesrc_class->get_size = GST_DEBUG_FUNCPTR (gst_gio_base_src_get_size);
78   gstbasesrc_class->is_seekable =
79       GST_DEBUG_FUNCPTR (gst_gio_base_src_is_seekable);
80   gstbasesrc_class->unlock = GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock);
81   gstbasesrc_class->unlock_stop =
82       GST_DEBUG_FUNCPTR (gst_gio_base_src_unlock_stop);
83   gstbasesrc_class->check_get_range =
84       GST_DEBUG_FUNCPTR (gst_gio_base_src_check_get_range);
85   gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_gio_base_src_create);
86   gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_gio_base_src_query);
87 }
88
89 static void
90 gst_gio_base_src_init (GstGioBaseSrc * src, GstGioBaseSrcClass * gclass)
91 {
92   src->cancel = g_cancellable_new ();
93 }
94
95 static void
96 gst_gio_base_src_finalize (GObject * object)
97 {
98   GstGioBaseSrc *src = GST_GIO_BASE_SRC (object);
99
100   if (src->cancel) {
101     g_object_unref (src->cancel);
102     src->cancel = NULL;
103   }
104
105   if (src->stream) {
106     g_object_unref (src->stream);
107     src->stream = NULL;
108   }
109
110   if (src->cache) {
111     gst_buffer_unref (src->cache);
112     src->cache = NULL;
113   }
114
115   GST_CALL_PARENT (G_OBJECT_CLASS, finalize, (object));
116 }
117
118 static gboolean
119 gst_gio_base_src_start (GstBaseSrc * base_src)
120 {
121   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
122   GstGioBaseSrcClass *gbsrc_class = GST_GIO_BASE_SRC_GET_CLASS (src);
123
124   src->position = 0;
125
126   /* FIXME: This will likely block */
127   src->stream = gbsrc_class->get_stream (src);
128   if (G_UNLIKELY (!G_IS_INPUT_STREAM (src->stream))) {
129     GST_ELEMENT_ERROR (src, RESOURCE, OPEN_READ, (NULL),
130         ("No input stream provided by subclass"));
131     return FALSE;
132   } else if (G_UNLIKELY (g_input_stream_is_closed (src->stream))) {
133     GST_ELEMENT_ERROR (src, LIBRARY, FAILED, (NULL),
134         ("Input stream is already closed"));
135     return FALSE;
136   }
137
138   if (G_IS_SEEKABLE (src->stream))
139     src->position = g_seekable_tell (G_SEEKABLE (src->stream));
140
141   GST_DEBUG_OBJECT (src, "started source");
142
143   return TRUE;
144 }
145
146 static gboolean
147 gst_gio_base_src_stop (GstBaseSrc * base_src)
148 {
149   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
150   GstGioBaseSrcClass *klass = GST_GIO_BASE_SRC_GET_CLASS (src);
151   gboolean success;
152   GError *err = NULL;
153
154   if (klass->close_on_stop && G_IS_INPUT_STREAM (src->stream)) {
155     GST_DEBUG_OBJECT (src, "closing stream");
156
157     /* FIXME: can block but unfortunately we can't use async operations
158      * here because they require a running main loop */
159     success = g_input_stream_close (src->stream, src->cancel, &err);
160
161     if (!success && !gst_gio_error (src, "g_input_stream_close", &err, NULL)) {
162       GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
163           ("g_input_stream_close failed: %s", err->message));
164       g_clear_error (&err);
165     } else if (!success) {
166       GST_ELEMENT_WARNING (src, RESOURCE, CLOSE, (NULL),
167           ("g_input_stream_close failed"));
168     } else {
169       GST_DEBUG_OBJECT (src, "g_input_stream_close succeeded");
170     }
171
172     g_object_unref (src->stream);
173     src->stream = NULL;
174   } else {
175     g_object_unref (src->stream);
176     src->stream = NULL;
177   }
178
179   return TRUE;
180 }
181
182 static gboolean
183 gst_gio_base_src_get_size (GstBaseSrc * base_src, guint64 * size)
184 {
185   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
186
187   if (G_IS_FILE_INPUT_STREAM (src->stream)) {
188     GFileInfo *info;
189     GError *err = NULL;
190
191     info = g_file_input_stream_query_info (G_FILE_INPUT_STREAM (src->stream),
192         G_FILE_ATTRIBUTE_STANDARD_SIZE, src->cancel, &err);
193
194     if (info != NULL) {
195       *size = g_file_info_get_size (info);
196       g_object_unref (info);
197       GST_DEBUG_OBJECT (src, "found size: %" G_GUINT64_FORMAT, *size);
198       return TRUE;
199     }
200
201     if (!gst_gio_error (src, "g_file_input_stream_query_info", &err, NULL)) {
202
203       if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
204         GST_DEBUG_OBJECT (src, "size information not available");
205       else
206         GST_WARNING_OBJECT (src, "size information retrieval failed: %s",
207             err->message);
208
209       g_clear_error (&err);
210     }
211   }
212
213   if (GST_GIO_STREAM_IS_SEEKABLE (src->stream)) {
214     goffset old;
215     goffset stream_size;
216     gboolean ret;
217     GSeekable *seekable = G_SEEKABLE (src->stream);
218     GError *err = NULL;
219
220     old = g_seekable_tell (seekable);
221
222     ret = g_seekable_seek (seekable, 0, G_SEEK_END, src->cancel, &err);
223     if (!ret) {
224       if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
225         if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
226           GST_DEBUG_OBJECT (src,
227               "Seeking to the end of stream is not supported");
228         else
229           GST_WARNING_OBJECT (src, "Seeking to end of stream failed: %s",
230               err->message);
231         g_clear_error (&err);
232       } else {
233         GST_WARNING_OBJECT (src, "Seeking to end of stream failed");
234       }
235       return FALSE;
236     }
237
238     stream_size = g_seekable_tell (seekable);
239
240     ret = g_seekable_seek (seekable, old, G_SEEK_SET, src->cancel, &err);
241     if (!ret) {
242       if (!gst_gio_error (src, "g_seekable_seek", &err, NULL)) {
243         if (GST_GIO_ERROR_MATCHES (err, NOT_SUPPORTED))
244           GST_ERROR_OBJECT (src, "Seeking to the old position not supported");
245         else
246           GST_ERROR_OBJECT (src, "Seeking to the old position failed: %s",
247               err->message);
248         g_clear_error (&err);
249       } else {
250         GST_ERROR_OBJECT (src, "Seeking to the old position faile");
251       }
252       return FALSE;
253     }
254
255     if (stream_size >= 0) {
256       *size = stream_size;
257       return TRUE;
258     }
259   }
260
261   return FALSE;
262 }
263
264 static gboolean
265 gst_gio_base_src_is_seekable (GstBaseSrc * base_src)
266 {
267   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
268   gboolean seekable;
269
270   seekable = GST_GIO_STREAM_IS_SEEKABLE (src->stream);
271
272   GST_DEBUG_OBJECT (src, "can seek: %d", seekable);
273
274   return seekable;
275 }
276
277 static gboolean
278 gst_gio_base_src_unlock (GstBaseSrc * base_src)
279 {
280   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
281
282   GST_LOG_OBJECT (src, "triggering cancellation");
283
284   g_cancellable_cancel (src->cancel);
285
286   return TRUE;
287 }
288
289 static gboolean
290 gst_gio_base_src_unlock_stop (GstBaseSrc * base_src)
291 {
292   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
293
294   GST_LOG_OBJECT (src, "resetting cancellable");
295
296   g_cancellable_reset (src->cancel);
297
298   return TRUE;
299 }
300
301 static gboolean
302 gst_gio_base_src_check_get_range (GstBaseSrc * base_src)
303 {
304   return GST_CALL_PARENT_WITH_DEFAULT (GST_BASE_SRC_CLASS,
305       check_get_range, (base_src), FALSE);
306 }
307
308 static GstFlowReturn
309 gst_gio_base_src_create (GstBaseSrc * base_src, guint64 offset, guint size,
310     GstBuffer ** buf_return)
311 {
312   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
313   GstBuffer *buf;
314   GstFlowReturn ret = GST_FLOW_OK;
315
316   g_return_val_if_fail (G_IS_INPUT_STREAM (src->stream), GST_FLOW_ERROR);
317
318   /* If we have the requested part in our cache take a subbuffer of that,
319    * otherwise fill the cache again with at least 4096 bytes from the
320    * requested offset and return a subbuffer of that.
321    *
322    * We need caching because every read/seek operation will need to go
323    * over DBus if our backend is GVfs and this is painfully slow. */
324   if (src->cache && offset >= GST_BUFFER_OFFSET (src->cache) &&
325       offset + size <= GST_BUFFER_OFFSET_END (src->cache)) {
326     GST_DEBUG_OBJECT (src, "Creating subbuffer from cached buffer: offset %"
327         G_GUINT64_FORMAT " length %u", offset, size);
328
329     buf = gst_buffer_create_sub (src->cache,
330         offset - GST_BUFFER_OFFSET (src->cache), size);
331
332     GST_BUFFER_OFFSET (buf) = offset;
333     GST_BUFFER_OFFSET_END (buf) = offset + size;
334     GST_BUFFER_SIZE (buf) = size;
335   } else {
336     guint cachesize = MAX (4096, size);
337     gssize read, res;
338     gboolean success, eos;
339     GError *err = NULL;
340
341     if (src->cache) {
342       gst_buffer_unref (src->cache);
343       src->cache = NULL;
344     }
345
346     if (G_UNLIKELY (offset != src->position)) {
347       if (!GST_GIO_STREAM_IS_SEEKABLE (src->stream))
348         return GST_FLOW_NOT_SUPPORTED;
349
350       GST_DEBUG_OBJECT (src, "Seeking to position %" G_GUINT64_FORMAT, offset);
351       ret = gst_gio_seek (src, G_SEEKABLE (src->stream), offset, src->cancel);
352
353       if (ret == GST_FLOW_OK)
354         src->position = offset;
355       else
356         return ret;
357     }
358
359     src->cache = gst_buffer_try_new_and_alloc (cachesize);
360     if (G_UNLIKELY (src->cache == NULL)) {
361       GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", cachesize);
362       return GST_FLOW_ERROR;
363     }
364
365     GST_LOG_OBJECT (src, "Reading %u bytes from offset %" G_GUINT64_FORMAT,
366         cachesize, offset);
367
368     /* GIO sometimes gives less bytes than requested although
369      * it's not at the end of file. SMB for example only
370      * supports reads up to 64k. So we loop here until we get at
371      * at least the requested amount of bytes or a read returns
372      * nothing. */
373     read = 0;
374     while (size - read > 0 && (res =
375             g_input_stream_read (G_INPUT_STREAM (src->stream),
376                 GST_BUFFER_DATA (src->cache) + read, cachesize - read,
377                 src->cancel, &err)) > 0) {
378       read += res;
379     }
380
381     success = (read >= 0);
382     eos = (cachesize > 0 && read == 0);
383
384     if (!success && !gst_gio_error (src, "g_input_stream_read", &err, &ret)) {
385       GST_ELEMENT_ERROR (src, RESOURCE, READ, (NULL),
386           ("Could not read from stream: %s", err->message));
387       g_clear_error (&err);
388     }
389
390     if (success && !eos) {
391       src->position += read;
392       GST_BUFFER_SIZE (src->cache) = read;
393
394       GST_BUFFER_OFFSET (src->cache) = offset;
395       GST_BUFFER_OFFSET_END (src->cache) = offset + read;
396
397       GST_DEBUG_OBJECT (src, "Read successful");
398       GST_DEBUG_OBJECT (src, "Creating subbuffer from new "
399           "cached buffer: offset %" G_GUINT64_FORMAT " length %u", offset,
400           size);
401
402       buf = gst_buffer_create_sub (src->cache, 0, MIN (size, read));
403
404       GST_BUFFER_OFFSET (buf) = offset;
405       GST_BUFFER_OFFSET_END (buf) = offset + MIN (size, read);
406       GST_BUFFER_SIZE (buf) = MIN (size, read);
407     } else {
408       GST_DEBUG_OBJECT (src, "Read not successful");
409       gst_buffer_unref (src->cache);
410       src->cache = NULL;
411       buf = NULL;
412     }
413
414     if (eos)
415       ret = GST_FLOW_UNEXPECTED;
416   }
417
418   *buf_return = buf;
419
420   return ret;
421 }
422
423 static gboolean
424 gst_gio_base_src_query (GstBaseSrc * base_src, GstQuery * query)
425 {
426   gboolean ret = FALSE;
427   GstGioBaseSrc *src = GST_GIO_BASE_SRC (base_src);
428
429   switch (GST_QUERY_TYPE (query)) {
430     case GST_QUERY_URI:
431       if (GST_IS_URI_HANDLER (src)) {
432         const gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
433         gst_query_set_uri (query, uri);
434         ret = TRUE;
435       }
436       break;
437     default:
438       ret = FALSE;
439       break;
440   }
441
442   if (!ret)
443     ret = GST_BASE_SRC_CLASS (parent_class)->query (base_src, query);
444
445   return ret;
446 }