libs/gst/base/gstbasesrc.c: Handle element seek correctly when we are streaming.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in a any #GstFormat with the
39  * gst_base_src_set_format(). This format determines the format of the
40  * internal #GstSegment and the #GST_EVENT_NEW_SEGMENT. The default format for
41  * #GstBaseSrc is GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports the push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>the format is set to GST_FORMAT_BYTES (default).</para></listitem>
48  *   <listitem><para>#GstBaseSrc::is_seekable returns TRUE.</para></listitem>
49  * </itemizedlist>
50  * </para>
51  * <para>
52  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
53  * automatically seekable in push mode as well. The following conditions must be
54  * met to make the element seekable in push mode when the format is not
55  * GST_FORMAT_BYTES:
56  * <itemizedlist>
57  *   <listitem><para>
58  *     #GstBaseSrc::is_seekable returns TRUE.
59  *   </para></listitem>
60  *   <listitem><para>
61  *     #GstBaseSrc::query can convert all supported seek formats to the
62  *     internal format as set with gst_base_src_set_format().
63  *   </para></listitem>
64  *   <listitem><para>
65  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns TRUE.
66  *   </para></listitem>
67  * </itemizedlist>
68  * </para>
69  * <para>
70  * When the default format is not GST_FORMAT_BYTES, the element should ignore the
71  * offset and length in the #GstBaseSrc::create method. It is recommended to subclass
72  * #GstPushSrc instead in this situation.
73  * </para>
74  * <para>
75  * #GstBaseSrc has support for live sources. Live sources are sources that produce
76  * data at a fixed rate, such as audio or video capture. A typical live source also
77  * provides a clock to export the rate at which they produce data.
78  * Use gst_base_src_set_live() to activate the live source mode.
79  * </para>
80  * <para>
81  * A live source does not produce data in the PAUSED state, this means that the 
82  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
83  * To signal the pipeline that the element will not produce data, its return
84  * value from the READY to PAUSED state will be GST_STATE_CHANGE_NO_PREROLL.
85  * </para>
86  * <para>
87  * A typical live source will timestamp the buffers they create with the current
88  * stream time of the pipeline. This is why they can only produce data in PLAYING,
89  * when the clock is actually distributed and running.
90  * </para>
91  * <para>
92  * The #GstBaseSrc::get_times method can be used to implement pseudo-live sources.
93  * The base source will wait for the specified stream time returned in 
94  * #GstBaseSrc::get_times before pushing out the buffer. It only makes sense to implement
95  * the ::get_times function if the source is a live source.
96  * </para>
97  * <para>
98  * There is only support in GstBaseSrc for one source pad, which should be named
99  * "src". A source implementation (subclass of GstBaseSrc) should install a pad
100  * template in its base_init function, like so:
101  * </para>
102  * <para>
103  * <programlisting>
104  * static void
105  * my_element_base_init (gpointer g_class)
106  * {
107  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
108  *   // srctemplate should be a #GstStaticPadTemplate with direction
109  *   // #GST_PAD_SRC and name "src"
110  *   gst_element_class_add_pad_template (gstelement_class,
111  *       gst_static_pad_template_get (&amp;srctemplate));
112  *   // see #GstElementDetails
113  *   gst_element_class_set_details (gstelement_class, &amp;details);
114  * }
115  * </programlisting>
116  * </para>
117  * <title>Controlled shutdown of live sources in applications</title>
118  * <para>
119  * Applications that record from a live source may want to stop recording
120  * in a controlled way, so that the recording is stopped, but the data
121  * already in the pipeline processed to the end (remember that many live
122  * sources would go on recording forever otherwise). For that to happen the
123  * application needs to make the source stop recording and send an EOS
124  * event down the pipeline. The application would then wait for an
125  * EOS message posted on the pipeline's bus to know when all data has
126  * been processed and the pipeline can safely be stopped.
127  * </para>
128  * <para>
129  * Since GStreamer 0.10.3 an application may simply set the source
130  * element to NULL or READY state to make it send an EOS event downstream.
131  * The application should lock the state of the source afterwards, so that
132  * shutting down the pipeline from PLAYING doesn't temporarily start up the
133  * source element for a second time:
134  * <programlisting>
135  * ...
136  * // stop recording
137  * gst_element_set_state (audio_source, GST_STATE_NULL);
138  * gst_element_set_locked_state (audio_source, TRUE);
139  * ...
140  * </programlisting>
141  * Now the application should wait for an EOS message
142  * to be posted on the pipeline's bus. Once it has received
143  * an EOS message, it may safely shut down the entire pipeline:
144  * <programlisting>
145  * ...
146  * // everything done - shut down pipeline
147  * gst_element_set_state (pipeline, GST_STATE_NULL);
148  * gst_element_set_locked_state (audio_source, FALSE);
149  * ...
150  * </programlisting>
151  * </para>
152  * <para>
153  * Last reviewed on 2006-03-07 (0.10.4)
154  * </para>
155  * </refsect2>
156  */
157
158 #include <stdlib.h>
159 #include <string.h>
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gstmarshal.h>
168 #include <gst/gst-i18n-lib.h>
169
170 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
171 #define GST_CAT_DEFAULT gst_base_src_debug
172
173 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
174 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
177 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
178 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
179 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
180                                                                                 timeval)
181 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
182 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
183
184 /* BaseSrc signals and args */
185 enum
186 {
187   /* FILL ME */
188   LAST_SIGNAL
189 };
190
191 #define DEFAULT_BLOCKSIZE       4096
192 #define DEFAULT_NUM_BUFFERS     -1
193 #define DEFAULT_TYPEFIND        FALSE
194
195 enum
196 {
197   PROP_0,
198   PROP_BLOCKSIZE,
199   PROP_NUM_BUFFERS,
200   PROP_TYPEFIND,
201 };
202
203 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
204    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
205
206 struct _GstBaseSrcPrivate
207 {
208   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
209                                  * to avoid the sending of two EOS in some cases) */
210   gboolean discont;
211 };
212
213 static GstElementClass *parent_class = NULL;
214
215 static void gst_base_src_base_init (gpointer g_class);
216 static void gst_base_src_class_init (GstBaseSrcClass * klass);
217 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
218 static void gst_base_src_finalize (GObject * object);
219
220
221 GType
222 gst_base_src_get_type (void)
223 {
224   static GType base_src_type = 0;
225
226   if (!base_src_type) {
227     static const GTypeInfo base_src_info = {
228       sizeof (GstBaseSrcClass),
229       (GBaseInitFunc) gst_base_src_base_init,
230       NULL,
231       (GClassInitFunc) gst_base_src_class_init,
232       NULL,
233       NULL,
234       sizeof (GstBaseSrc),
235       0,
236       (GInstanceInitFunc) gst_base_src_init,
237     };
238
239     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
240         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
241   }
242   return base_src_type;
243 }
244 static GstCaps *gst_base_src_getcaps (GstPad * pad);
245 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
246
247 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
248 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
249 static void gst_base_src_set_property (GObject * object, guint prop_id,
250     const GValue * value, GParamSpec * pspec);
251 static void gst_base_src_get_property (GObject * object, guint prop_id,
252     GValue * value, GParamSpec * pspec);
253 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
254 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
255 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
256
257 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
258
259 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
260 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
261     GstSegment * segment);
262 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
263
264 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
265 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
266 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
267
268 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
269     GstStateChange transition);
270
271 static void gst_base_src_loop (GstPad * pad);
272 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
273 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
274 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
275     guint length, GstBuffer ** buf);
276 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
277     guint length, GstBuffer ** buf);
278
279 static void
280 gst_base_src_base_init (gpointer g_class)
281 {
282   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
283 }
284
285 static void
286 gst_base_src_class_init (GstBaseSrcClass * klass)
287 {
288   GObjectClass *gobject_class;
289   GstElementClass *gstelement_class;
290
291   gobject_class = (GObjectClass *) klass;
292   gstelement_class = (GstElementClass *) klass;
293
294   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
295
296   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
297
298   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
299   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
300   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
301
302   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BLOCKSIZE,
303       g_param_spec_ulong ("blocksize", "Block size",
304           "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE,
305           G_PARAM_READWRITE));
306   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_NUM_BUFFERS,
307       g_param_spec_int ("num-buffers", "num-buffers",
308           "Number of buffers to output before sending EOS", -1, G_MAXINT,
309           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
310   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TYPEFIND,
311       g_param_spec_boolean ("typefind", "Typefind",
312           "Run typefind before negotiating", DEFAULT_TYPEFIND,
313           G_PARAM_READWRITE));
314
315   gstelement_class->change_state =
316       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
317   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
318
319   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
320   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
321   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
322   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
323   klass->check_get_range =
324       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
325 }
326
327 static void
328 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
329 {
330   GstPad *pad;
331   GstPadTemplate *pad_template;
332
333   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
334
335   basesrc->is_live = FALSE;
336   basesrc->live_lock = g_mutex_new ();
337   basesrc->live_cond = g_cond_new ();
338   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
339   basesrc->num_buffers_left = -1;
340
341   basesrc->can_activate_push = TRUE;
342   basesrc->pad_mode = GST_ACTIVATE_NONE;
343
344   pad_template =
345       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
346   g_return_if_fail (pad_template != NULL);
347
348   GST_DEBUG_OBJECT (basesrc, "creating src pad");
349   pad = gst_pad_new_from_template (pad_template, "src");
350
351   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
352   gst_pad_set_activatepush_function (pad,
353       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
354   gst_pad_set_activatepull_function (pad,
355       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
356   gst_pad_set_event_function (pad,
357       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
358   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
359   gst_pad_set_checkgetrange_function (pad,
360       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
361   gst_pad_set_getrange_function (pad,
362       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
363   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
364   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
365
366   /* hold pointer to pad */
367   basesrc->srcpad = pad;
368   GST_DEBUG_OBJECT (basesrc, "adding src pad");
369   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
370
371   basesrc->blocksize = DEFAULT_BLOCKSIZE;
372   basesrc->clock_id = NULL;
373   /* we operate in BYTES by default */
374   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
375   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
376
377   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
378
379   GST_DEBUG_OBJECT (basesrc, "init done");
380 }
381
382 static void
383 gst_base_src_finalize (GObject * object)
384 {
385   GstBaseSrc *basesrc;
386
387   basesrc = GST_BASE_SRC (object);
388
389   g_mutex_free (basesrc->live_lock);
390   g_cond_free (basesrc->live_cond);
391   gst_event_replace (&basesrc->data.ABI.pending_seek, NULL);
392
393   G_OBJECT_CLASS (parent_class)->finalize (object);
394 }
395
396 /**
397  * gst_base_src_set_live:
398  * @src: base source instance
399  * @live: new live-mode
400  *
401  * If the element listens to a live source, the @livemode should
402  * be set to %TRUE. This declares that this source can't seek.
403  */
404 void
405 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
406 {
407   GST_LIVE_LOCK (src);
408   src->is_live = live;
409   GST_LIVE_UNLOCK (src);
410 }
411
412 /**
413  * gst_base_src_is_live:
414  * @src: base source instance
415  *
416  * Check if an element is in live mode.
417  *
418  * Returns: %TRUE if element is in live mode.
419  */
420 gboolean
421 gst_base_src_is_live (GstBaseSrc * src)
422 {
423   gboolean result;
424
425   GST_LIVE_LOCK (src);
426   result = src->is_live;
427   GST_LIVE_UNLOCK (src);
428
429   return result;
430 }
431
432 /**
433  * gst_base_src_set_format:
434  * @src: base source instance
435  * @format: the format to use
436  *
437  * Sets the default format of the source. This will be the format used
438  * for sending NEW_SEGMENT events and for performing seeks.
439  *
440  * If a format of GST_FORMAT_BYTES is set, the element will be able to
441  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
442  *
443  * @Since: 0.10.1
444  */
445 void
446 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
447 {
448   gst_segment_init (&src->segment, format);
449 }
450
451 static gboolean
452 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
453 {
454   GstBaseSrcClass *bclass;
455   GstBaseSrc *bsrc;
456   gboolean res = TRUE;
457
458   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
459   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
460
461   if (bclass->set_caps)
462     res = bclass->set_caps (bsrc, caps);
463
464   return res;
465 }
466
467 static GstCaps *
468 gst_base_src_getcaps (GstPad * pad)
469 {
470   GstBaseSrcClass *bclass;
471   GstBaseSrc *bsrc;
472   GstCaps *caps = NULL;
473
474   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
475   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
476   if (bclass->get_caps)
477     caps = bclass->get_caps (bsrc);
478
479   if (caps == NULL) {
480     GstPadTemplate *pad_template;
481
482     pad_template =
483         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
484     if (pad_template != NULL) {
485       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
486     }
487   }
488   return caps;
489 }
490
491 static gboolean
492 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
493 {
494   gboolean res;
495
496   switch (GST_QUERY_TYPE (query)) {
497     case GST_QUERY_POSITION:
498     {
499       GstFormat format;
500
501       gst_query_parse_position (query, &format, NULL);
502       switch (format) {
503         case GST_FORMAT_PERCENT:
504         {
505           gint64 percent;
506           gint64 position;
507           gint64 duration;
508
509           position = src->segment.last_stop;
510           duration = src->segment.duration;
511
512           if (position != -1 && duration != -1) {
513             if (position < duration)
514               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
515                   duration);
516             else
517               percent = GST_FORMAT_PERCENT_MAX;
518           } else
519             percent = -1;
520
521           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
522           res = TRUE;
523           break;
524         }
525         default:
526         {
527           gint64 position;
528
529           position = src->segment.last_stop;
530
531           if (position != -1) {
532             /* convert to requested format */
533             res =
534                 gst_pad_query_convert (src->srcpad, src->segment.format,
535                 position, &format, &position);
536           } else
537             res = TRUE;
538
539           gst_query_set_position (query, format, position);
540           break;
541         }
542       }
543       break;
544     }
545     case GST_QUERY_DURATION:
546     {
547       GstFormat format;
548
549       gst_query_parse_duration (query, &format, NULL);
550       switch (format) {
551         case GST_FORMAT_PERCENT:
552           gst_query_set_duration (query, GST_FORMAT_PERCENT,
553               GST_FORMAT_PERCENT_MAX);
554           res = TRUE;
555           break;
556         default:
557         {
558           gint64 duration;
559
560           duration = src->segment.duration;
561
562           if (duration != -1) {
563             /* convert to requested format */
564             res =
565                 gst_pad_query_convert (src->srcpad, src->segment.format,
566                 duration, &format, &duration);
567           } else {
568             res = TRUE;
569           }
570           gst_query_set_duration (query, format, duration);
571           break;
572         }
573       }
574       break;
575     }
576
577     case GST_QUERY_SEEKING:
578     {
579       gst_query_set_seeking (query, src->segment.format,
580           src->seekable, 0, src->segment.duration);
581       res = TRUE;
582       break;
583     }
584     case GST_QUERY_SEGMENT:
585     {
586       gint64 start, stop;
587
588       /* no end segment configured, current duration then */
589       if ((stop = src->segment.stop) == -1)
590         stop = src->segment.duration;
591       start = src->segment.start;
592
593       /* adjust to stream time */
594       if (src->segment.time != -1) {
595         start -= src->segment.time;
596         if (stop != -1)
597           stop -= src->segment.time;
598       }
599       gst_query_set_segment (query, src->segment.rate, src->segment.format,
600           start, stop);
601       res = TRUE;
602       break;
603     }
604
605     case GST_QUERY_FORMATS:
606     {
607       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
608           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
609       res = TRUE;
610       break;
611     }
612     case GST_QUERY_LATENCY:
613     case GST_QUERY_JITTER:
614     case GST_QUERY_RATE:
615     case GST_QUERY_CONVERT:
616     {
617       GstFormat src_fmt, dest_fmt;
618       gint64 src_val, dest_val;
619
620       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
621
622       /* we can only convert between equal formats... */
623       if (src_fmt == dest_fmt) {
624         dest_val = src_val;
625         res = TRUE;
626       } else
627         res = FALSE;
628
629       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
630       break;
631     }
632     default:
633       res = FALSE;
634       break;
635   }
636   return res;
637 }
638
639 static gboolean
640 gst_base_src_query (GstPad * pad, GstQuery * query)
641 {
642   GstBaseSrc *src;
643   GstBaseSrcClass *bclass;
644   gboolean result = FALSE;
645
646   src = GST_BASE_SRC (gst_pad_get_parent (pad));
647
648   bclass = GST_BASE_SRC_GET_CLASS (src);
649
650   if (bclass->query)
651     result = bclass->query (src, query);
652   else
653     result = gst_pad_query_default (pad, query);
654
655   gst_object_unref (src);
656
657   return result;
658 }
659
660 static gboolean
661 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
662 {
663   gboolean res = TRUE;
664
665   /* update our offset if the start/stop position was updated */
666   if (segment->format == GST_FORMAT_BYTES) {
667     segment->last_stop = segment->start;
668     segment->time = segment->start;
669   } else if (segment->start == 0) {
670     /* seek to start, we can implement a default for this. */
671     segment->last_stop = 0;
672     segment->time = 0;
673     res = TRUE;
674   } else
675     res = FALSE;
676
677   return res;
678 }
679
680 static gboolean
681 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
682 {
683   GstBaseSrcClass *bclass;
684   gboolean result = FALSE;
685
686   bclass = GST_BASE_SRC_GET_CLASS (src);
687
688   if (bclass->do_seek)
689     result = bclass->do_seek (src, segment);
690
691   return result;
692 }
693
694 /* this code implements the seeking. It is a good example
695  * handling all cases.
696  *
697  * A seek updates the currently configured segment.start
698  * and segment.stop values based on the SEEK_TYPE. If the
699  * segment.start value is updated, a seek to this new position
700  * should be performed.
701  *
702  * The seek can only be executed when we are not currently
703  * streaming any data, to make sure that this is the case, we
704  * acquire the STREAM_LOCK which is taken when we are in the
705  * _loop() function or when a getrange() is called. Normally
706  * we will not receive a seek if we are operating in pull mode
707  * though.
708  *
709  * When we are in the loop() function, we might be in the middle
710  * of pushing a buffer, which might block in a sink. To make sure
711  * that the push gets unblocked we push out a FLUSH_START event.
712  * Our loop function will get a WRONG_STATE return value from
713  * the push and will pause, effectively releasing the STREAM_LOCK.
714  *
715  * For a non-flushing seek, we pause the task, which might eventually
716  * release the STREAM_LOCK. We say eventually because when the sink
717  * blocks on the sample we might wait a very long time until the sink
718  * unblocks the sample. In any case we acquire the STREAM_LOCK and
719  * can continue the seek. A non-flushing seek is normally done in a 
720  * running pipeline to perform seamless playback.
721  * In the case of a non-flushing seek we need to make sure that the
722  * data we output after the seek is continuous with the previous data,
723  * this is because a non-flushing seek does not reset the stream-time
724  * to 0. We do this by closing the currently running segment, ie. sending
725  * a new_segment event with the stop position set to the last processed 
726  * position.
727  *
728  * After updating the segment.start/stop values, we prepare for
729  * streaming again. We push out a FLUSH_STOP to make the peer pad
730  * accept data again and we start our task again.
731  *
732  * A segment seek posts a message on the bus saying that the playback
733  * of the segment started. We store the segment flag internally because
734  * when we reach the segment.stop we have to post a segment.done
735  * instead of EOS when doing a segment seek.
736  */
737 /* FIXME, we have the unlock gboolean here because most current implementations
738  * (fdsrc, -base/gst/tcp/, ...) not only unlock when there is something to
739  * unlock
740  */
741 static gboolean
742 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
743 {
744   gboolean res;
745   gdouble rate;
746   GstFormat format;
747   GstSeekFlags flags;
748   GstSeekType cur_type, stop_type;
749   gint64 cur, stop;
750   gboolean flush;
751   gboolean update;
752   GstSegment seeksegment;
753
754   GST_DEBUG_OBJECT (src, "doing seek");
755
756   if (event) {
757     gst_event_parse_seek (event, &rate, &format, &flags,
758         &cur_type, &cur, &stop_type, &stop);
759
760     /* we have to have a format as the segment format. Try to convert
761      * if not. */
762     if (src->segment.format != format) {
763       GstFormat fmt;
764
765       fmt = src->segment.format;
766       res = TRUE;
767       if (cur_type != GST_SEEK_TYPE_NONE)
768         res = gst_pad_query_convert (src->srcpad, format, cur, &fmt, &cur);
769       if (res && stop_type != GST_SEEK_TYPE_NONE)
770         res = gst_pad_query_convert (src->srcpad, format, stop, &fmt, &stop);
771       if (!res)
772         goto no_format;
773
774       format = fmt;
775     }
776   } else {
777     flags = 0;
778   }
779
780   flush = flags & GST_SEEK_FLAG_FLUSH;
781
782   /* send flush start */
783   if (flush)
784     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
785   else
786     gst_pad_pause_task (src->srcpad);
787
788   /* unblock streaming thread */
789   if (unlock)
790     gst_base_src_unlock (src);
791
792   /* grab streaming lock, this should eventually be possible, either
793    * because the task is paused or out streaming thread stopped 
794    * because our peer is flushing. */
795   GST_PAD_STREAM_LOCK (src->srcpad);
796
797   /* make copy into temp structure, we can only update the main one
798    * when the subclass actually could to the seek. */
799   memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
800
801   /* now configure the seek segment */
802   if (event) {
803     gst_segment_set_seek (&seeksegment, rate, format, flags,
804         cur_type, cur, stop_type, stop, &update);
805   }
806
807   GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
808       " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
809       seeksegment.start, seeksegment.stop, seeksegment.last_stop);
810
811   /* do the seek, segment.last_stop contains new position. */
812   res = gst_base_src_do_seek (src, &seeksegment);
813
814   /* and prepare to continue streaming */
815   if (flush) {
816     /* send flush stop, peer will accept data and events again. We
817      * are not yet providing data as we still have the STREAM_LOCK. */
818     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
819   } else if (res && src->data.ABI.running) {
820     /* we are running the current segment and doing a non-flushing seek, 
821      * close the segment first based on the last_stop. */
822     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
823         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
824
825     gst_pad_push_event (src->srcpad,
826         gst_event_new_new_segment (TRUE,
827             src->segment.rate, src->segment.format,
828             src->segment.start, src->segment.last_stop, src->segment.time));
829   }
830
831   /* if successfull seek, we update our real segment and push
832    * out the new segment. */
833   if (res) {
834     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
835
836     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
837       gst_element_post_message (GST_ELEMENT (src),
838           gst_message_new_segment_start (GST_OBJECT (src),
839               src->segment.format, src->segment.last_stop));
840     }
841
842     if ((stop = src->segment.stop) == -1)
843       stop = src->segment.duration;
844
845     /* now send the newsegment */
846     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
847         " to %" G_GINT64_FORMAT, src->segment.start, stop);
848
849     gst_pad_push_event (src->srcpad,
850         gst_event_new_new_segment (FALSE,
851             src->segment.rate, src->segment.format,
852             src->segment.last_stop, stop, src->segment.time));
853   }
854
855   src->priv->discont = TRUE;
856   src->data.ABI.running = TRUE;
857   /* and restart the task in case it got paused explicitely or by
858    * the FLUSH_START event we pushed out. */
859   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
860       src->srcpad);
861
862   /* and release the lock again so we can continue streaming */
863   GST_PAD_STREAM_UNLOCK (src->srcpad);
864
865   return res;
866
867   /* ERROR */
868 no_format:
869   {
870     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
871     return FALSE;
872   }
873 }
874
875 /* all events send to this element directly
876  */
877 static gboolean
878 gst_base_src_send_event (GstElement * element, GstEvent * event)
879 {
880   GstBaseSrc *src;
881   gboolean result = FALSE;
882
883   src = GST_BASE_SRC (element);
884
885   switch (GST_EVENT_TYPE (event)) {
886     case GST_EVENT_FLUSH_START:
887     case GST_EVENT_FLUSH_STOP:
888       break;
889     case GST_EVENT_EOS:
890     case GST_EVENT_NEWSEGMENT:
891     case GST_EVENT_TAG:
892     case GST_EVENT_BUFFERSIZE:
893       break;
894     case GST_EVENT_QOS:
895       break;
896     case GST_EVENT_SEEK:
897     {
898       gboolean started;
899
900       GST_OBJECT_LOCK (src->srcpad);
901       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
902         goto wrong_mode;
903       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
904       GST_OBJECT_UNLOCK (src->srcpad);
905
906       if (started) {
907         /* when we are running in push mode, we can execute the
908          * seek right now, we need to unlock. */
909         result = gst_base_src_perform_seek (src, event, TRUE);
910       } else {
911         /* else we store the event and execute the seek when we
912          * get activated */
913         GST_OBJECT_LOCK (src);
914         gst_event_replace (&src->data.ABI.pending_seek, event);
915         GST_OBJECT_UNLOCK (src);
916         /* assume the seek will work */
917         result = TRUE;
918       }
919       break;
920     }
921     case GST_EVENT_NAVIGATION:
922       break;
923     default:
924       break;
925   }
926 done:
927   gst_event_unref (event);
928
929   return result;
930
931   /* ERRORS */
932 wrong_mode:
933   {
934     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
935     GST_OBJECT_UNLOCK (src->srcpad);
936     result = FALSE;
937     goto done;
938   }
939 }
940
941 static gboolean
942 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
943 {
944   gboolean result;
945
946   switch (GST_EVENT_TYPE (event)) {
947     case GST_EVENT_SEEK:
948       /* is normally called when in push mode */
949       if (!src->seekable)
950         goto not_seekable;
951
952       result = gst_base_src_perform_seek (src, event, TRUE);
953       break;
954     case GST_EVENT_FLUSH_START:
955       /* cancel any blocking getrange, is normally called
956        * when in pull mode. */
957       result = gst_base_src_unlock (src);
958       break;
959     case GST_EVENT_FLUSH_STOP:
960     default:
961       result = TRUE;
962       break;
963   }
964   return result;
965
966   /* ERRORS */
967 not_seekable:
968   {
969     GST_DEBUG_OBJECT (src, "is not seekable");
970     return FALSE;
971   }
972 }
973
974 static gboolean
975 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
976 {
977   GstBaseSrc *src;
978   GstBaseSrcClass *bclass;
979   gboolean result = FALSE;
980
981   src = GST_BASE_SRC (gst_pad_get_parent (pad));
982   bclass = GST_BASE_SRC_GET_CLASS (src);
983
984   if (bclass->event) {
985     if (!(result = bclass->event (src, event)))
986       goto subclass_failed;
987   }
988
989 done:
990   gst_event_unref (event);
991   gst_object_unref (src);
992
993   return result;
994
995   /* ERRORS */
996 subclass_failed:
997   {
998     GST_DEBUG_OBJECT (src, "subclass refused event");
999     goto done;
1000   }
1001 }
1002
1003 static void
1004 gst_base_src_set_property (GObject * object, guint prop_id,
1005     const GValue * value, GParamSpec * pspec)
1006 {
1007   GstBaseSrc *src;
1008
1009   src = GST_BASE_SRC (object);
1010
1011   switch (prop_id) {
1012     case PROP_BLOCKSIZE:
1013       src->blocksize = g_value_get_ulong (value);
1014       break;
1015     case PROP_NUM_BUFFERS:
1016       src->num_buffers = g_value_get_int (value);
1017       break;
1018     case PROP_TYPEFIND:
1019       src->data.ABI.typefind = g_value_get_boolean (value);
1020       break;
1021     default:
1022       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1023       break;
1024   }
1025 }
1026
1027 static void
1028 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1029     GParamSpec * pspec)
1030 {
1031   GstBaseSrc *src;
1032
1033   src = GST_BASE_SRC (object);
1034
1035   switch (prop_id) {
1036     case PROP_BLOCKSIZE:
1037       g_value_set_ulong (value, src->blocksize);
1038       break;
1039     case PROP_NUM_BUFFERS:
1040       g_value_set_int (value, src->num_buffers);
1041       break;
1042     case PROP_TYPEFIND:
1043       g_value_set_boolean (value, src->data.ABI.typefind);
1044       break;
1045     default:
1046       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1047       break;
1048   }
1049 }
1050
1051 /* with STREAM_LOCK and LOCK*/
1052 static GstClockReturn
1053 gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time)
1054 {
1055   GstClockReturn ret;
1056   GstClockID id;
1057   GstClock *clock;
1058
1059   /* get clock, if no clock, we don't sync */
1060   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1061     return GST_CLOCK_OK;
1062
1063   /* clock_id should be NULL outside of this function */
1064   g_assert (basesrc->clock_id == NULL);
1065   g_assert (GST_CLOCK_TIME_IS_VALID (time));
1066
1067   id = gst_clock_new_single_shot_id (clock, time);
1068
1069   basesrc->clock_id = id;
1070   /* release the object lock while waiting */
1071   GST_OBJECT_UNLOCK (basesrc);
1072
1073   ret = gst_clock_id_wait (id, NULL);
1074
1075   GST_OBJECT_LOCK (basesrc);
1076   gst_clock_id_unref (id);
1077   basesrc->clock_id = NULL;
1078
1079   return ret;
1080 }
1081
1082 /* perform synchronisation on a buffer. 
1083  * with STREAM_LOCK.
1084  */
1085 static GstClockReturn
1086 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1087 {
1088   GstClockReturn result;
1089   GstClockTime start, end;
1090   GstBaseSrcClass *bclass;
1091   GstClockTime base_time;
1092
1093   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1094
1095   start = end = -1;
1096   if (bclass->get_times)
1097     bclass->get_times (basesrc, buffer, &start, &end);
1098
1099   /* if we don't have a timestamp, we don't sync */
1100   if (!GST_CLOCK_TIME_IS_VALID (start))
1101     goto invalid_start;
1102
1103   /* now do clocking */
1104   GST_OBJECT_LOCK (basesrc);
1105   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1106
1107   GST_LOG_OBJECT (basesrc,
1108       "waiting for clock, base time %" GST_TIME_FORMAT
1109       ", stream_start %" GST_TIME_FORMAT,
1110       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1111
1112   result = gst_base_src_wait (basesrc, start + base_time);
1113   GST_OBJECT_UNLOCK (basesrc);
1114
1115   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1116
1117   return result;
1118
1119   /* special cases */
1120 invalid_start:
1121   {
1122     GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1123     return GST_CLOCK_OK;
1124   }
1125 }
1126
1127 static gboolean
1128 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1129 {
1130   guint64 size, maxsize;
1131   GstBaseSrcClass *bclass;
1132
1133   bclass = GST_BASE_SRC_GET_CLASS (src);
1134
1135   /* only operate if we are working with bytes */
1136   if (src->segment.format != GST_FORMAT_BYTES)
1137     return TRUE;
1138
1139   size = (guint64) src->segment.duration;
1140
1141   /* the max amount of bytes to read is the total size or
1142    * up to the segment.stop if present. */
1143   if (src->segment.stop != -1)
1144     maxsize = MIN (size, src->segment.stop);
1145   else
1146     maxsize = size;
1147
1148   GST_DEBUG_OBJECT (src,
1149       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1150       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1151       *length, size, src->segment.stop, maxsize);
1152
1153   /* check size */
1154   if (maxsize != -1) {
1155     if (offset > maxsize)
1156       goto unexpected_length;
1157
1158     if (offset + *length > maxsize) {
1159       /* see if length of the file changed */
1160       if (bclass->get_size)
1161         bclass->get_size (src, &size);
1162
1163       if (src->segment.stop != -1)
1164         maxsize = MIN (size, src->segment.stop);
1165       else
1166         maxsize = size;
1167
1168       if (offset + *length > maxsize) {
1169         *length = maxsize - offset;
1170       }
1171     }
1172   }
1173   if (*length == 0)
1174     goto unexpected_length;
1175
1176   return TRUE;
1177
1178   /* ERRORS */
1179 unexpected_length:
1180   {
1181     return FALSE;
1182   }
1183 }
1184
1185 static GstFlowReturn
1186 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1187     GstBuffer ** buf)
1188 {
1189   GstFlowReturn ret;
1190   GstBaseSrcClass *bclass;
1191   GstClockReturn status;
1192
1193   bclass = GST_BASE_SRC_GET_CLASS (src);
1194
1195   GST_LIVE_LOCK (src);
1196   if (src->is_live) {
1197     while (!src->live_running) {
1198       GST_DEBUG ("live source signal waiting");
1199       GST_LIVE_SIGNAL (src);
1200       GST_DEBUG ("live source waiting for running state");
1201       GST_LIVE_WAIT (src);
1202       GST_DEBUG ("live source unlocked");
1203     }
1204     /* FIXME, use another variable to signal stopping */
1205     GST_OBJECT_LOCK (src->srcpad);
1206     if (GST_PAD_IS_FLUSHING (src->srcpad))
1207       goto flushing;
1208     GST_OBJECT_UNLOCK (src->srcpad);
1209   }
1210   GST_LIVE_UNLOCK (src);
1211
1212   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))
1213     goto not_started;
1214
1215   if (G_UNLIKELY (!bclass->create))
1216     goto no_function;
1217
1218   if (!gst_base_src_update_length (src, offset, &length))
1219     goto unexpected_length;
1220
1221   if (src->num_buffers_left == 0) {
1222     goto reached_num_buffers;
1223   } else {
1224     if (src->num_buffers_left > 0)
1225       src->num_buffers_left--;
1226   }
1227
1228   GST_DEBUG_OBJECT (src,
1229       "calling create offset %" G_GUINT64_FORMAT " %" G_GINT64_FORMAT, offset,
1230       src->segment.time);
1231
1232   ret = bclass->create (src, offset, length, buf);
1233   if (ret != GST_FLOW_OK)
1234     goto done;
1235
1236   /* no timestamp set and we are at offset 0 */
1237   if (offset == 0 && src->segment.time == 0
1238       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1239     GST_BUFFER_TIMESTAMP (*buf) = 0;
1240
1241   /* now sync before pushing the buffer */
1242   status = gst_base_src_do_sync (src, *buf);
1243   switch (status) {
1244     case GST_CLOCK_EARLY:
1245       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1246       break;
1247     case GST_CLOCK_OK:
1248       GST_DEBUG_OBJECT (src, "buffer ok");
1249       break;
1250     default:
1251       GST_DEBUG_OBJECT (src, "clock returned %d, not returning", status);
1252       gst_buffer_unref (*buf);
1253       *buf = NULL;
1254       ret = GST_FLOW_WRONG_STATE;
1255       break;
1256   }
1257 done:
1258   return ret;
1259
1260   /* ERROR */
1261 flushing:
1262   {
1263     GST_DEBUG_OBJECT (src, "pad is flushing");
1264     GST_OBJECT_UNLOCK (src->srcpad);
1265     GST_LIVE_UNLOCK (src);
1266     return GST_FLOW_WRONG_STATE;
1267   }
1268 not_started:
1269   {
1270     GST_DEBUG_OBJECT (src, "getrange but not started");
1271     return GST_FLOW_WRONG_STATE;
1272   }
1273 no_function:
1274   {
1275     GST_DEBUG_OBJECT (src, "no create function");
1276     return GST_FLOW_ERROR;
1277   }
1278 unexpected_length:
1279   {
1280     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1281         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1282     return GST_FLOW_UNEXPECTED;
1283   }
1284 reached_num_buffers:
1285   {
1286     GST_DEBUG_OBJECT (src, "sent all buffers");
1287     return GST_FLOW_UNEXPECTED;
1288   }
1289 }
1290
1291 static GstFlowReturn
1292 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1293     GstBuffer ** buf)
1294 {
1295   GstBaseSrc *src;
1296   GstFlowReturn res;
1297
1298   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1299
1300   res = gst_base_src_get_range (src, offset, length, buf);
1301
1302   gst_object_unref (src);
1303
1304   return res;
1305 }
1306
1307 static gboolean
1308 gst_base_src_pad_check_get_range (GstPad * pad)
1309 {
1310   GstBaseSrcClass *bclass;
1311   GstBaseSrc *src;
1312   gboolean res;
1313
1314   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1315
1316   bclass = GST_BASE_SRC_GET_CLASS (src);
1317
1318   if (bclass->check_get_range == NULL) {
1319     GST_WARNING_OBJECT (src, "no check_get_range function set");
1320     return FALSE;
1321   }
1322
1323   res = bclass->check_get_range (src);
1324   GST_LOG_OBJECT (src, "%s() returned %d",
1325       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1326
1327   gst_object_unref (src);
1328
1329   return res;
1330 }
1331
1332 static gboolean
1333 gst_base_src_default_check_get_range (GstBaseSrc * src)
1334 {
1335   gboolean res;
1336
1337   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1338     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1339     gst_base_src_start (src);
1340     gst_base_src_stop (src);
1341   }
1342
1343   /* we can operate in getrange mode if the native format is bytes
1344    * and we are seekable, this condition is set in the random_access
1345    * flag and is set in the _start() method. */
1346   res = src->random_access;
1347
1348   return res;
1349 }
1350
1351 static void
1352 gst_base_src_loop (GstPad * pad)
1353 {
1354   GstBaseSrc *src;
1355   GstBuffer *buf = NULL;
1356   GstFlowReturn ret;
1357   gint64 position;
1358
1359   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1360
1361   src->priv->last_sent_eos = FALSE;
1362
1363   /* if we operate in bytes, we can calculate an offset */
1364   if (src->segment.format == GST_FORMAT_BYTES)
1365     position = src->segment.last_stop;
1366   else
1367     position = -1;
1368
1369   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1370   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1371     if (ret == GST_FLOW_UNEXPECTED)
1372       goto eos;
1373     else
1374       goto pause;
1375   }
1376   if (G_UNLIKELY (buf == NULL))
1377     goto error;
1378
1379   /* figure out the new position */
1380   switch (src->segment.format) {
1381     case GST_FORMAT_BYTES:
1382       position += GST_BUFFER_SIZE (buf);
1383       break;
1384     case GST_FORMAT_TIME:
1385     {
1386       GstClockTime start, duration;
1387
1388       start = GST_BUFFER_TIMESTAMP (buf);
1389       duration = GST_BUFFER_DURATION (buf);
1390
1391       if (GST_CLOCK_TIME_IS_VALID (start))
1392         position = start;
1393       else
1394         position = src->segment.last_stop;
1395
1396       if (GST_CLOCK_TIME_IS_VALID (duration))
1397         position += duration;
1398       break;
1399     }
1400     case GST_FORMAT_DEFAULT:
1401       position = GST_BUFFER_OFFSET_END (buf);
1402       break;
1403     default:
1404       position = -1;
1405       break;
1406   }
1407   if (position != -1)
1408     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1409
1410   if (G_UNLIKELY (src->priv->discont)) {
1411     buf = gst_buffer_make_metadata_writable (buf);
1412     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1413     src->priv->discont = FALSE;
1414   }
1415
1416   ret = gst_pad_push (pad, buf);
1417   if (G_UNLIKELY (ret != GST_FLOW_OK))
1418     goto pause;
1419
1420 done:
1421   gst_object_unref (src);
1422   return;
1423
1424   /* special cases */
1425 eos:
1426   {
1427     GST_DEBUG_OBJECT (src, "going to EOS, getrange returned UNEXPECTED");
1428     /* we finished the segment */
1429     src->data.ABI.running = FALSE;
1430     gst_pad_pause_task (pad);
1431     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1432       gst_element_post_message (GST_ELEMENT (src),
1433           gst_message_new_segment_done (GST_OBJECT (src),
1434               src->segment.format, src->segment.last_stop));
1435     } else {
1436       gst_pad_push_event (pad, gst_event_new_eos ());
1437       src->priv->last_sent_eos = TRUE;
1438     }
1439     goto done;
1440   }
1441 pause:
1442   {
1443     const gchar *reason = gst_flow_get_name (ret);
1444
1445     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1446     gst_pad_pause_task (pad);
1447     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1448       /* for fatal errors we post an error message */
1449       GST_ELEMENT_ERROR (src, STREAM, FAILED,
1450           (_("Internal data flow error.")),
1451           ("streaming task paused, reason %s", reason));
1452       gst_pad_push_event (pad, gst_event_new_eos ());
1453       src->priv->last_sent_eos = TRUE;
1454     }
1455     goto done;
1456   }
1457 error:
1458   {
1459     GST_ELEMENT_ERROR (src, STREAM, FAILED,
1460         (_("Internal data flow error.")), ("element returned NULL buffer"));
1461     /* we finished the segment on error */
1462     src->data.ABI.running = FALSE;
1463     gst_pad_pause_task (pad);
1464     gst_pad_push_event (pad, gst_event_new_eos ());
1465     src->priv->last_sent_eos = TRUE;
1466     goto done;
1467   }
1468 }
1469
1470 /* this will always be called between start() and stop(). So you can rely on
1471  * resources allocated by start() and freed from stop(). This needs to be added
1472  * to the docs at some point. */
1473 static gboolean
1474 gst_base_src_unlock (GstBaseSrc * basesrc)
1475 {
1476   GstBaseSrcClass *bclass;
1477   gboolean result = TRUE;
1478
1479   GST_DEBUG ("unlock");
1480   /* unblock whatever the subclass is doing */
1481   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1482   if (bclass->unlock)
1483     result = bclass->unlock (basesrc);
1484
1485   GST_DEBUG ("unschedule clock");
1486   /* and unblock the clock as well, if any */
1487   GST_OBJECT_LOCK (basesrc);
1488   if (basesrc->clock_id) {
1489     gst_clock_id_unschedule (basesrc->clock_id);
1490   }
1491   GST_OBJECT_UNLOCK (basesrc);
1492
1493   GST_DEBUG ("unlock done");
1494
1495   return result;
1496 }
1497
1498 /* default negotiation code. 
1499  *
1500  * Take intersection between src and sink pads, take first
1501  * caps and fixate. 
1502  */
1503 static gboolean
1504 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
1505 {
1506   GstCaps *thiscaps;
1507   GstCaps *caps = NULL;
1508   GstCaps *peercaps = NULL;
1509   gboolean result = FALSE;
1510
1511   /* first see what is possible on our source pad */
1512   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
1513   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
1514   /* nothing or anything is allowed, we're done */
1515   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
1516     goto no_nego_needed;
1517
1518   /* get the peer caps */
1519   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
1520   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
1521   if (peercaps) {
1522     GstCaps *icaps;
1523
1524     /* get intersection */
1525     icaps = gst_caps_intersect (thiscaps, peercaps);
1526     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
1527     gst_caps_unref (thiscaps);
1528     gst_caps_unref (peercaps);
1529     if (icaps) {
1530       /* take first (and best, since they are sorted) possibility */
1531       caps = gst_caps_copy_nth (icaps, 0);
1532       gst_caps_unref (icaps);
1533     }
1534   } else {
1535     /* no peer, work with our own caps then */
1536     caps = thiscaps;
1537   }
1538   if (caps) {
1539     caps = gst_caps_make_writable (caps);
1540     gst_caps_truncate (caps);
1541
1542     /* now fixate */
1543     if (!gst_caps_is_empty (caps)) {
1544       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
1545       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
1546
1547       if (gst_caps_is_any (caps)) {
1548         /* hmm, still anything, so element can do anything and
1549          * nego is not needed */
1550         result = TRUE;
1551       } else if (gst_caps_is_fixed (caps)) {
1552         /* yay, fixed caps, use those then */
1553         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
1554         result = TRUE;
1555       }
1556     }
1557     gst_caps_unref (caps);
1558   }
1559   return result;
1560
1561 no_nego_needed:
1562   {
1563     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
1564     if (thiscaps)
1565       gst_caps_unref (thiscaps);
1566     return TRUE;
1567   }
1568 }
1569
1570 static gboolean
1571 gst_base_src_negotiate (GstBaseSrc * basesrc)
1572 {
1573   GstBaseSrcClass *bclass;
1574   gboolean result = TRUE;
1575
1576   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1577
1578   if (bclass->negotiate)
1579     result = bclass->negotiate (basesrc);
1580
1581   return result;
1582 }
1583
1584 static gboolean
1585 gst_base_src_start (GstBaseSrc * basesrc)
1586 {
1587   GstBaseSrcClass *bclass;
1588   gboolean result;
1589   guint64 size;
1590
1591   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1592     return TRUE;
1593
1594   GST_DEBUG_OBJECT (basesrc, "starting source");
1595
1596   basesrc->num_buffers_left = basesrc->num_buffers;
1597
1598   gst_segment_init (&basesrc->segment, basesrc->segment.format);
1599   basesrc->data.ABI.running = FALSE;
1600
1601   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1602   if (bclass->start)
1603     result = bclass->start (basesrc);
1604   else
1605     result = TRUE;
1606
1607   if (!result)
1608     goto could_not_start;
1609
1610   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
1611
1612   /* figure out the size */
1613   if (basesrc->segment.format == GST_FORMAT_BYTES) {
1614     if (bclass->get_size) {
1615       if (!(result = bclass->get_size (basesrc, &size)))
1616         size = -1;
1617     } else {
1618       result = FALSE;
1619       size = -1;
1620     }
1621     /* only update the size when operating in bytes, subclass is supposed
1622      * to set duration in the start method for other formats */
1623     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
1624   }
1625
1626   GST_DEBUG_OBJECT (basesrc,
1627       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
1628       G_GINT64_FORMAT, basesrc->segment.format, result, size,
1629       basesrc->segment.duration);
1630
1631   /* check if we can seek */
1632   if (bclass->is_seekable)
1633     basesrc->seekable = bclass->is_seekable (basesrc);
1634   else
1635     basesrc->seekable = FALSE;
1636
1637   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
1638
1639   /* update for random access flag */
1640   basesrc->random_access = basesrc->seekable &&
1641       basesrc->segment.format == GST_FORMAT_BYTES;
1642
1643   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
1644
1645   /* run typefind if we are random_access and the typefinding is enabled. */
1646   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
1647     GstCaps *caps;
1648
1649     caps = gst_type_find_helper (basesrc->srcpad, size);
1650     gst_pad_set_caps (basesrc->srcpad, caps);
1651     gst_caps_unref (caps);
1652   } else {
1653     /* use class or default negotiate function */
1654     if (!gst_base_src_negotiate (basesrc))
1655       goto could_not_negotiate;
1656   }
1657
1658   return TRUE;
1659
1660   /* ERROR */
1661 could_not_start:
1662   {
1663     GST_DEBUG_OBJECT (basesrc, "could not start");
1664     return FALSE;
1665   }
1666 could_not_negotiate:
1667   {
1668     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
1669     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
1670         ("Could not negotiate format"), ("Check your filtered caps, if any"));
1671     gst_base_src_stop (basesrc);
1672     return FALSE;
1673   }
1674 }
1675
1676 static gboolean
1677 gst_base_src_stop (GstBaseSrc * basesrc)
1678 {
1679   GstBaseSrcClass *bclass;
1680   gboolean result = TRUE;
1681
1682   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1683     return TRUE;
1684
1685   GST_DEBUG_OBJECT (basesrc, "stopping source");
1686
1687   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1688   if (bclass->stop)
1689     result = bclass->stop (basesrc);
1690
1691   if (result)
1692     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
1693
1694   return result;
1695 }
1696
1697 static gboolean
1698 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
1699 {
1700   gboolean result;
1701
1702   GST_LIVE_LOCK (basesrc);
1703   basesrc->live_running = TRUE;
1704   GST_LIVE_SIGNAL (basesrc);
1705   GST_LIVE_UNLOCK (basesrc);
1706
1707   /* step 1, unblock clock sync (if any) */
1708   result = gst_base_src_unlock (basesrc);
1709
1710   /* step 2, make sure streaming finishes */
1711   result &= gst_pad_stop_task (pad);
1712
1713   return result;
1714 }
1715
1716 static gboolean
1717 gst_base_src_activate_push (GstPad * pad, gboolean active)
1718 {
1719   GstBaseSrc *basesrc;
1720   gboolean res;
1721
1722   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1723
1724   /* prepare subclass first */
1725   if (active) {
1726     GstEvent *event;
1727
1728     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
1729
1730     if (!basesrc->can_activate_push)
1731       goto no_push_activation;
1732
1733     if (!gst_base_src_start (basesrc))
1734       goto error_start;
1735
1736     basesrc->priv->last_sent_eos = FALSE;
1737
1738     /* do initial seek, which will start the task */
1739     GST_OBJECT_LOCK (basesrc);
1740     event = basesrc->data.ABI.pending_seek;
1741     basesrc->data.ABI.pending_seek = NULL;
1742     GST_OBJECT_UNLOCK (basesrc);
1743
1744     /* no need to unlock anything, the task is certainly
1745      * not running here. */
1746     res = gst_base_src_perform_seek (basesrc, event, FALSE);
1747
1748     if (event)
1749       gst_event_unref (event);
1750   } else {
1751     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
1752     res = gst_base_src_deactivate (basesrc, pad);
1753   }
1754   return res;
1755
1756   /* ERRORS */
1757 no_push_activation:
1758   {
1759     GST_DEBUG_OBJECT (basesrc, "Subclass disabled push-mode activation");
1760     return FALSE;
1761   }
1762 error_start:
1763   {
1764     gst_base_src_stop (basesrc);
1765     GST_DEBUG_OBJECT (basesrc, "Failed to start in push mode");
1766     return FALSE;
1767   }
1768 }
1769
1770 static gboolean
1771 gst_base_src_activate_pull (GstPad * pad, gboolean active)
1772 {
1773   GstBaseSrc *basesrc;
1774
1775   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1776
1777   /* prepare subclass first */
1778   if (active) {
1779     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
1780     if (!gst_base_src_start (basesrc))
1781       goto error_start;
1782
1783     /* if not random_access, we cannot operate in pull mode for now */
1784     if (!basesrc->random_access) {
1785       gst_base_src_stop (basesrc);
1786       return FALSE;
1787     }
1788     return TRUE;
1789   } else {
1790     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
1791
1792     /* don't send EOS when going from PAUSED => READY when in pull mode */
1793     basesrc->priv->last_sent_eos = TRUE;
1794
1795     if (!gst_base_src_stop (basesrc))
1796       goto error_stop;
1797
1798     return gst_base_src_deactivate (basesrc, pad);
1799   }
1800
1801 error_start:
1802   {
1803     gst_base_src_stop (basesrc);
1804     GST_DEBUG_OBJECT (basesrc, "Failed to start in pull mode");
1805     return FALSE;
1806   }
1807 error_stop:
1808   {
1809     GST_DEBUG_OBJECT (basesrc, "Failed to stop in pull mode");
1810     return FALSE;
1811   }
1812 }
1813
1814 static GstStateChangeReturn
1815 gst_base_src_change_state (GstElement * element, GstStateChange transition)
1816 {
1817   GstBaseSrc *basesrc;
1818   GstStateChangeReturn result;
1819   gboolean no_preroll = FALSE;
1820
1821   basesrc = GST_BASE_SRC (element);
1822
1823   switch (transition) {
1824     case GST_STATE_CHANGE_NULL_TO_READY:
1825       break;
1826     case GST_STATE_CHANGE_READY_TO_PAUSED:
1827       GST_LIVE_LOCK (element);
1828       if (basesrc->is_live) {
1829         no_preroll = TRUE;
1830         basesrc->live_running = FALSE;
1831       }
1832       basesrc->priv->last_sent_eos = FALSE;
1833       basesrc->priv->discont = TRUE;
1834       GST_LIVE_UNLOCK (element);
1835       break;
1836     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1837       GST_LIVE_LOCK (element);
1838       if (basesrc->is_live) {
1839         basesrc->live_running = TRUE;
1840         GST_LIVE_SIGNAL (element);
1841       }
1842       GST_LIVE_UNLOCK (element);
1843       break;
1844     default:
1845       break;
1846   }
1847
1848   if ((result =
1849           GST_ELEMENT_CLASS (parent_class)->change_state (element,
1850               transition)) == GST_STATE_CHANGE_FAILURE)
1851     goto failure;
1852
1853   switch (transition) {
1854     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1855       GST_LIVE_LOCK (element);
1856       if (basesrc->is_live) {
1857         no_preroll = TRUE;
1858         basesrc->live_running = FALSE;
1859       }
1860       GST_LIVE_UNLOCK (element);
1861       break;
1862     case GST_STATE_CHANGE_PAUSED_TO_READY:
1863       if (!gst_base_src_stop (basesrc))
1864         goto error_stop;
1865
1866       if (!basesrc->priv->last_sent_eos) {
1867         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
1868         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
1869         basesrc->priv->last_sent_eos = TRUE;
1870       }
1871       gst_event_replace (&basesrc->data.ABI.pending_seek, NULL);
1872       break;
1873     case GST_STATE_CHANGE_READY_TO_NULL:
1874       break;
1875     default:
1876       break;
1877   }
1878
1879   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
1880     result = GST_STATE_CHANGE_NO_PREROLL;
1881
1882   return result;
1883
1884   /* ERRORS */
1885 failure:
1886   {
1887     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
1888     gst_base_src_stop (basesrc);
1889     return result;
1890   }
1891 error_stop:
1892   {
1893     GST_DEBUG_OBJECT (basesrc, "Failed to stop");
1894     return GST_STATE_CHANGE_FAILURE;
1895   }
1896 }