docs/design/part-live-source.txt: Add docs on how live sources should timestamp.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
55  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
56  * </para>
57  * <para>
58  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
59  * automatically seekable in push mode as well. The following conditions must 
60  * be met to make the element seekable in push mode when the format is not
61  * #GST_FORMAT_BYTES:
62  * <itemizedlist>
63  *   <listitem><para>
64  *     #GstBaseSrc::is_seekable returns %TRUE.
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::query can convert all supported seek formats to the
68  *     internal format as set with gst_base_src_set_format().
69  *   </para></listitem>
70  *   <listitem><para>
71  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
72  *   </para></listitem>
73  * </itemizedlist>
74  * </para>
75  * <para>
76  * When the element does not meet the requirements to operate in pull mode,
77  * the offset and length in the #GstBaseSrc::create method should be ignored.
78  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
79  * element can operate in pull mode but only with specific offsets and
80  * lengths, it is allowed to generate an error when the wrong values are passed
81  * to the #GstBaseSrc::create function.
82  * </para>
83  * <para>
84  * #GstBaseSrc has support for live sources. Live sources are sources that 
85  * produce data at a fixed rate, such as audio or video capture devices. A 
86  * typical live source also provides a clock to publish the rate at which 
87  * they produce data.
88  * Use gst_base_src_set_live() to activate the live source mode.
89  * </para>
90  * <para>
91  * A live source does not produce data in the PAUSED state. This means that the 
92  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
93  * To signal the pipeline that the element will not produce data, the return
94  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
95  * </para>
96  * <para>
97  * A typical live source will timestamp the buffers it creates with the 
98  * current stream time of the pipeline. This is one reason why a live source
99  * can only produce data in the PLAYING state, when the clock is actually 
100  * distributed and running.
101  * </para>
102  * <para>
103  * Live sources that synchronize and block on the clock (an audio source, for
104  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
105  * function was interrupted by a state change to PAUSED.
106  * </para>
107  * <para>
108  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
109  * sources. The base source will wait for the specified stream time returned in 
110  * #GstBaseSrc::get_times before pushing out the buffer. 
111  * It only makes sense to implement the ::get_times function if the source is 
112  * a live source.
113  * </para>
114  * <para>
115  * For live sources, the base class will by default measure the time it takes to
116  * create the first buffer in the PLAYING state and will report this value as
117  * the latency. Subclasses should override the query function when this
118  * behaviour is not acceptable.
119  * </para>
120  * <para>
121  * There is only support in #GstBaseSrc for exactly one source pad, which 
122  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
123  * should install a pad template in its base_init function, like so:
124  * </para>
125  * <para>
126  * <programlisting>
127  * static void
128  * my_element_base_init (gpointer g_class)
129  * {
130  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
131  *   // srctemplate should be a #GstStaticPadTemplate with direction
132  *   // #GST_PAD_SRC and name "src"
133  *   gst_element_class_add_pad_template (gstelement_class,
134  *       gst_static_pad_template_get (&amp;srctemplate));
135  *   // see #GstElementDetails
136  *   gst_element_class_set_details (gstelement_class, &amp;details);
137  * }
138  * </programlisting>
139  * </para>
140  * <title>Controlled shutdown of live sources in applications</title>
141  * <para>
142  * Applications that record from a live source may want to stop recording
143  * in a controlled way, so that the recording is stopped, but the data
144  * already in the pipeline is processed to the end (remember that many live
145  * sources would go on recording forever otherwise). For that to happen the
146  * application needs to make the source stop recording and send an EOS
147  * event down the pipeline. The application would then wait for an
148  * EOS message posted on the pipeline's bus to know when all data has
149  * been processed and the pipeline can safely be stopped.
150  * </para>
151  * <para>
152  * Since GStreamer 0.10.3 an application may simply set the source
153  * element to NULL or READY state to make it send an EOS event downstream.
154  * The application should lock the state of the source afterwards, so that
155  * shutting down the pipeline from PLAYING doesn't temporarily start up the
156  * source element for a second time:
157  * <programlisting>
158  * ...
159  * // stop recording
160  * gst_element_set_state (audio_source, #GST_STATE_NULL);
161  * gst_element_set_locked_state (audio_source, %TRUE);
162  * ...
163  * </programlisting>
164  * Now the application should wait for an EOS message
165  * to be posted on the pipeline's bus. Once it has received
166  * an EOS message, it may safely shut down the entire pipeline:
167  * <programlisting>
168  * ...
169  * // everything done - shut down pipeline
170  * gst_element_set_state (pipeline, #GST_STATE_NULL);
171  * gst_element_set_locked_state (audio_source, %FALSE);
172  * ...
173  * </programlisting>
174  * </para>
175  * <para>
176  * Note that setting the source element to NULL or READY when the 
177  * pipeline is in the PAUSED state may cause a deadlock since the streaming
178  * thread might be blocked in PREROLL.
179  * </para>
180  * <para>
181  * Last reviewed on 2006-09-27 (0.10.11)
182  * </para>
183  * </refsect2>
184  */
185
186 #ifdef HAVE_CONFIG_H
187 #  include "config.h"
188 #endif
189
190 #include <stdlib.h>
191 #include <string.h>
192
193 #include "gstbasesrc.h"
194 #include "gsttypefindhelper.h"
195 #include <gst/gstmarshal.h>
196 #include <gst/gst-i18n-lib.h>
197
198 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
199 #define GST_CAT_DEFAULT gst_base_src_debug
200
201 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
202 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
203 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
204 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
205 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
206 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
207 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
208                                                                                 timeval)
209 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
210 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
211
212 /* BaseSrc signals and args */
213 enum
214 {
215   /* FILL ME */
216   LAST_SIGNAL
217 };
218
219 #define DEFAULT_BLOCKSIZE       4096
220 #define DEFAULT_NUM_BUFFERS     -1
221 #define DEFAULT_TYPEFIND        FALSE
222 #define DEFAULT_DO_TIMESTAMP    FALSE
223
224 enum
225 {
226   PROP_0,
227   PROP_BLOCKSIZE,
228   PROP_NUM_BUFFERS,
229   PROP_TYPEFIND,
230   PROP_DO_TIMESTAMP
231 };
232
233 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
234    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
235
236 struct _GstBaseSrcPrivate
237 {
238   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
239                                  * to avoid the sending of two EOS in some cases) */
240   gboolean discont;
241
242   /* two segments to be sent in the streaming thread with STREAM_LOCK */
243   GstEvent *close_segment;
244   GstEvent *start_segment;
245
246   /* startup latency is the time it takes between going to PLAYING and producing
247    * the first BUFFER with running_time 0. This value is included in the latency
248    * reporting. */
249   GstClockTime startup_latency;
250
251   gboolean do_timestamp;
252 };
253
254 static GstElementClass *parent_class = NULL;
255
256 static void gst_base_src_base_init (gpointer g_class);
257 static void gst_base_src_class_init (GstBaseSrcClass * klass);
258 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
259 static void gst_base_src_finalize (GObject * object);
260
261
262 GType
263 gst_base_src_get_type (void)
264 {
265   static GType base_src_type = 0;
266
267   if (G_UNLIKELY (base_src_type == 0)) {
268     static const GTypeInfo base_src_info = {
269       sizeof (GstBaseSrcClass),
270       (GBaseInitFunc) gst_base_src_base_init,
271       NULL,
272       (GClassInitFunc) gst_base_src_class_init,
273       NULL,
274       NULL,
275       sizeof (GstBaseSrc),
276       0,
277       (GInstanceInitFunc) gst_base_src_init,
278     };
279
280     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
281         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
282   }
283   return base_src_type;
284 }
285 static GstCaps *gst_base_src_getcaps (GstPad * pad);
286 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
287 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
288
289 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
290 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
291 static void gst_base_src_set_property (GObject * object, guint prop_id,
292     const GValue * value, GParamSpec * pspec);
293 static void gst_base_src_get_property (GObject * object, guint prop_id,
294     GValue * value, GParamSpec * pspec);
295 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
296 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
297 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
298 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
299
300 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
301
302 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
303 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
304     GstSegment * segment);
305 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
306 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
307     GstEvent * event, GstSegment * segment);
308
309 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
310 static gboolean gst_base_src_unlock_stop (GstBaseSrc * basesrc);
311 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
312 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
313
314 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
315     GstStateChange transition);
316
317 static void gst_base_src_loop (GstPad * pad);
318 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
319 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
320 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
321     guint length, GstBuffer ** buf);
322 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
323     guint length, GstBuffer ** buf);
324
325 static void
326 gst_base_src_base_init (gpointer g_class)
327 {
328   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
329 }
330
331 static void
332 gst_base_src_class_init (GstBaseSrcClass * klass)
333 {
334   GObjectClass *gobject_class;
335   GstElementClass *gstelement_class;
336
337   gobject_class = G_OBJECT_CLASS (klass);
338   gstelement_class = GST_ELEMENT_CLASS (klass);
339
340   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
341
342   parent_class = g_type_class_peek_parent (klass);
343
344   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
345   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
346   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
347
348   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
349       g_param_spec_ulong ("blocksize", "Block size",
350           "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
351           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
352   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
353       g_param_spec_int ("num-buffers", "num-buffers",
354           "Number of buffers to output before sending EOS", -1, G_MAXINT,
355           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
356   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
357       g_param_spec_boolean ("typefind", "Typefind",
358           "Run typefind before negotiating", DEFAULT_TYPEFIND,
359           G_PARAM_READWRITE));
360   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
361       g_param_spec_boolean ("do-timestamp", "Do timestamp",
362           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
363           G_PARAM_READWRITE));
364
365   gstelement_class->change_state =
366       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
367   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
368   gstelement_class->get_query_types =
369       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
370
371   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
372   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
373   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
374   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
375   klass->check_get_range =
376       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
377   klass->prepare_seek_segment =
378       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
379 }
380
381 static void
382 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
383 {
384   GstPad *pad;
385   GstPadTemplate *pad_template;
386
387   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
388
389   basesrc->is_live = FALSE;
390   basesrc->live_lock = g_mutex_new ();
391   basesrc->live_cond = g_cond_new ();
392   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
393   basesrc->num_buffers_left = -1;
394
395   basesrc->can_activate_push = TRUE;
396   basesrc->pad_mode = GST_ACTIVATE_NONE;
397
398   pad_template =
399       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
400   g_return_if_fail (pad_template != NULL);
401
402   GST_DEBUG_OBJECT (basesrc, "creating src pad");
403   pad = gst_pad_new_from_template (pad_template, "src");
404
405   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
406   gst_pad_set_activatepush_function (pad,
407       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
408   gst_pad_set_activatepull_function (pad,
409       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
410   gst_pad_set_event_function (pad,
411       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
412   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
413   gst_pad_set_checkgetrange_function (pad,
414       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
415   gst_pad_set_getrange_function (pad,
416       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
417   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
418   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
419   gst_pad_set_fixatecaps_function (pad,
420       GST_DEBUG_FUNCPTR (gst_base_src_fixate));
421
422   /* hold pointer to pad */
423   basesrc->srcpad = pad;
424   GST_DEBUG_OBJECT (basesrc, "adding src pad");
425   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
426
427   basesrc->blocksize = DEFAULT_BLOCKSIZE;
428   basesrc->clock_id = NULL;
429   /* we operate in BYTES by default */
430   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
431   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
432   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
433
434   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
435
436   GST_DEBUG_OBJECT (basesrc, "init done");
437 }
438
439 static void
440 gst_base_src_finalize (GObject * object)
441 {
442   GstBaseSrc *basesrc;
443   GstEvent **event_p;
444
445   basesrc = GST_BASE_SRC (object);
446
447   g_mutex_free (basesrc->live_lock);
448   g_cond_free (basesrc->live_cond);
449
450   event_p = &basesrc->data.ABI.pending_seek;
451   gst_event_replace ((GstEvent **) event_p, NULL);
452
453   G_OBJECT_CLASS (parent_class)->finalize (object);
454 }
455
456 /**
457  * gst_base_src_wait_playing:
458  * @src: the src
459  *
460  * If the #GstBaseSrcClass::create method performs its own synchronisation against
461  * the clock it must unblock when going from PLAYING to the PAUSED state and call
462  * this method before continuing to produce the remaining data.
463  *
464  * This function will block until a state change to PLAYING happens (in which
465  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
466  * to a state change to READY or a FLUSH event (in which case this function
467  * returns #GST_FLOW_WRONG_STATE).
468  *
469  * Since: 0.10.12
470  *
471  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
472  * continue. Any other return value should be returned from the create vmethod.
473  */
474 GstFlowReturn
475 gst_base_src_wait_playing (GstBaseSrc * src)
476 {
477   /* block until the state changes, or we get a flush, or something */
478   GST_LIVE_LOCK (src);
479   if (src->is_live) {
480     while (G_UNLIKELY (!src->live_running)) {
481       GST_DEBUG ("live source signal waiting");
482       GST_LIVE_SIGNAL (src);
483       GST_DEBUG ("live source waiting for running state");
484       GST_LIVE_WAIT (src);
485       GST_DEBUG ("live source unlocked");
486     }
487     /* FIXME, use another variable to signal stopping so that we don't
488      * have to grab another lock. */
489     GST_OBJECT_LOCK (src->srcpad);
490     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (src->srcpad)))
491       goto flushing;
492     GST_OBJECT_UNLOCK (src->srcpad);
493   }
494   GST_LIVE_UNLOCK (src);
495
496   return GST_FLOW_OK;
497
498   /* ERRORS */
499 flushing:
500   {
501     GST_DEBUG_OBJECT (src, "pad is flushing");
502     GST_OBJECT_UNLOCK (src->srcpad);
503     GST_LIVE_UNLOCK (src);
504     return GST_FLOW_WRONG_STATE;
505   }
506 }
507
508 /**
509  * gst_base_src_set_live:
510  * @src: base source instance
511  * @live: new live-mode
512  *
513  * If the element listens to a live source, @live should
514  * be set to %TRUE. 
515  *
516  * A live source will not produce data in the PAUSED state and
517  * will therefore not be able to participate in the PREROLL phase
518  * of a pipeline. To signal this fact to the application and the 
519  * pipeline, the state change return value of the live source will
520  * be GST_STATE_CHANGE_NO_PREROLL.
521  */
522 void
523 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
524 {
525   GST_LIVE_LOCK (src);
526   src->is_live = live;
527   GST_LIVE_UNLOCK (src);
528 }
529
530 /**
531  * gst_base_src_is_live:
532  * @src: base source instance
533  *
534  * Check if an element is in live mode.
535  *
536  * Returns: %TRUE if element is in live mode.
537  */
538 gboolean
539 gst_base_src_is_live (GstBaseSrc * src)
540 {
541   gboolean result;
542
543   GST_LIVE_LOCK (src);
544   result = src->is_live;
545   GST_LIVE_UNLOCK (src);
546
547   return result;
548 }
549
550 /**
551  * gst_base_src_set_format:
552  * @src: base source instance
553  * @format: the format to use
554  *
555  * Sets the default format of the source. This will be the format used
556  * for sending NEW_SEGMENT events and for performing seeks.
557  *
558  * If a format of GST_FORMAT_BYTES is set, the element will be able to
559  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
560  *
561  * @Since: 0.10.1
562  */
563 void
564 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
565 {
566   gst_segment_init (&src->segment, format);
567 }
568
569 /**
570  * gst_base_src_query_latency:
571  * @src: the source
572  * @live: if the source is live
573  * @min_latency: the min latency of the source
574  * @max_latency: the max latency of the source
575  *
576  * Query the source for the latency parameters. @live will be TRUE when @src is
577  * configured as a live source. @min_latency will be set to the difference
578  * between the running time and the timestamp of the first buffer.
579  * @max_latency is always the undefined value of -1.
580  *
581  * This function is mostly used by subclasses. 
582  *
583  * Returns: TRUE if the query succeeded.
584  *
585  * Since: 0.10.13
586  */
587 gboolean
588 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
589     GstClockTime * min_latency, GstClockTime * max_latency)
590 {
591   GstClockTime min;
592
593   GST_LIVE_LOCK (src);
594   if (live)
595     *live = src->is_live;
596
597   /* if we have a startup latency, report this one, else report 0. Subclasses
598    * are supposed to override the query function if they want something
599    * else. */
600   if (src->priv->startup_latency != -1)
601     min = src->priv->startup_latency;
602   else
603     min = 0;
604
605   if (min_latency)
606     *min_latency = min;
607   if (max_latency)
608     *max_latency = -1;
609
610   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
611       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
612       GST_TIME_ARGS (-1));
613   GST_LIVE_UNLOCK (src);
614
615   return TRUE;
616 }
617
618 /**
619  * gst_base_src_set_do_timestamp:
620  * @src: the source
621  * @timestamp: enable or disable timestamping
622  *
623  * Configure @src to automatically timestamp outgoing buffers based on the
624  * current running_time of the pipeline. This property is mostly useful for live
625  * sources.
626  *
627  * Since: 0.10.15
628  */
629 void
630 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
631 {
632   GST_OBJECT_LOCK (src);
633   src->priv->do_timestamp = timestamp;
634   GST_OBJECT_UNLOCK (src);
635 }
636
637 /**
638  * gst_base_src_get_do_timestamp:
639  * @src: the source
640  *
641  * Query if @src timestamps outgoing buffers based on the current running_time.
642  *
643  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
644  *
645  * Since: 0.10.15
646  */
647 gboolean
648 gst_base_src_get_do_timestamp (GstBaseSrc * src)
649 {
650   gboolean res;
651
652   GST_OBJECT_LOCK (src);
653   res = src->priv->do_timestamp;
654   GST_OBJECT_UNLOCK (src);
655
656   return res;
657 }
658
659 static gboolean
660 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
661 {
662   GstBaseSrcClass *bclass;
663   GstBaseSrc *bsrc;
664   gboolean res = TRUE;
665
666   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
667   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
668
669   if (bclass->set_caps)
670     res = bclass->set_caps (bsrc, caps);
671
672   return res;
673 }
674
675 static GstCaps *
676 gst_base_src_getcaps (GstPad * pad)
677 {
678   GstBaseSrcClass *bclass;
679   GstBaseSrc *bsrc;
680   GstCaps *caps = NULL;
681
682   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
683   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
684   if (bclass->get_caps)
685     caps = bclass->get_caps (bsrc);
686
687   if (caps == NULL) {
688     GstPadTemplate *pad_template;
689
690     pad_template =
691         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
692     if (pad_template != NULL) {
693       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
694     }
695   }
696   return caps;
697 }
698
699 static void
700 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
701 {
702   GstBaseSrcClass *bclass;
703   GstBaseSrc *bsrc;
704
705   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
706   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
707
708   if (bclass->fixate)
709     bclass->fixate (bsrc, caps);
710
711   gst_object_unref (bsrc);
712 }
713
714 static gboolean
715 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
716 {
717   gboolean res;
718
719   switch (GST_QUERY_TYPE (query)) {
720     case GST_QUERY_POSITION:
721     {
722       GstFormat format;
723
724       gst_query_parse_position (query, &format, NULL);
725       switch (format) {
726         case GST_FORMAT_PERCENT:
727         {
728           gint64 percent;
729           gint64 position;
730           gint64 duration;
731
732           position = src->segment.last_stop;
733           duration = src->segment.duration;
734
735           if (position != -1 && duration != -1) {
736             if (position < duration)
737               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
738                   duration);
739             else
740               percent = GST_FORMAT_PERCENT_MAX;
741           } else
742             percent = -1;
743
744           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
745           res = TRUE;
746           break;
747         }
748         default:
749         {
750           gint64 position;
751
752           position = src->segment.last_stop;
753
754           if (position != -1) {
755             /* convert to requested format */
756             res =
757                 gst_pad_query_convert (src->srcpad, src->segment.format,
758                 position, &format, &position);
759           } else
760             res = TRUE;
761
762           gst_query_set_position (query, format, position);
763           break;
764         }
765       }
766       break;
767     }
768     case GST_QUERY_DURATION:
769     {
770       GstFormat format;
771
772       gst_query_parse_duration (query, &format, NULL);
773
774       GST_DEBUG_OBJECT (src, "duration query in format %s",
775           gst_format_get_name (format));
776       switch (format) {
777         case GST_FORMAT_PERCENT:
778           gst_query_set_duration (query, GST_FORMAT_PERCENT,
779               GST_FORMAT_PERCENT_MAX);
780           res = TRUE;
781           break;
782         default:
783         {
784           gint64 duration;
785
786           duration = src->segment.duration;
787
788           if (duration != -1) {
789             /* convert to requested format */
790             res =
791                 gst_pad_query_convert (src->srcpad, src->segment.format,
792                 duration, &format, &duration);
793           } else {
794             res = TRUE;
795           }
796           gst_query_set_duration (query, format, duration);
797           break;
798         }
799       }
800       break;
801     }
802
803     case GST_QUERY_SEEKING:
804     {
805       gst_query_set_seeking (query, src->segment.format,
806           src->seekable, 0, src->segment.duration);
807       res = TRUE;
808       break;
809     }
810     case GST_QUERY_SEGMENT:
811     {
812       gint64 start, stop;
813
814       /* no end segment configured, current duration then */
815       if ((stop = src->segment.stop) == -1)
816         stop = src->segment.duration;
817       start = src->segment.start;
818
819       /* adjust to stream time */
820       if (src->segment.time != -1) {
821         start -= src->segment.time;
822         if (stop != -1)
823           stop -= src->segment.time;
824       }
825       gst_query_set_segment (query, src->segment.rate, src->segment.format,
826           start, stop);
827       res = TRUE;
828       break;
829     }
830
831     case GST_QUERY_FORMATS:
832     {
833       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
834           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
835       res = TRUE;
836       break;
837     }
838     case GST_QUERY_CONVERT:
839     {
840       GstFormat src_fmt, dest_fmt;
841       gint64 src_val, dest_val;
842
843       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
844
845       /* we can only convert between equal formats... */
846       if (src_fmt == dest_fmt) {
847         dest_val = src_val;
848         res = TRUE;
849       } else
850         res = FALSE;
851
852       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
853       break;
854     }
855     case GST_QUERY_LATENCY:
856     {
857       GstClockTime min, max;
858       gboolean live;
859
860       /* Subclasses should override and implement something usefull */
861       res = gst_base_src_query_latency (src, &live, &min, &max);
862
863       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
864           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
865           GST_TIME_ARGS (max));
866
867       gst_query_set_latency (query, live, min, max);
868       break;
869     }
870     case GST_QUERY_JITTER:
871     case GST_QUERY_RATE:
872     default:
873       res = FALSE;
874       break;
875   }
876   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
877       res);
878   return res;
879 }
880
881 static gboolean
882 gst_base_src_query (GstPad * pad, GstQuery * query)
883 {
884   GstBaseSrc *src;
885   GstBaseSrcClass *bclass;
886   gboolean result = FALSE;
887
888   src = GST_BASE_SRC (gst_pad_get_parent (pad));
889
890   bclass = GST_BASE_SRC_GET_CLASS (src);
891
892   if (bclass->query)
893     result = bclass->query (src, query);
894   else
895     result = gst_pad_query_default (pad, query);
896
897   gst_object_unref (src);
898
899   return result;
900 }
901
902 static gboolean
903 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
904 {
905   gboolean res = TRUE;
906
907   /* update our offset if the start/stop position was updated */
908   if (segment->format == GST_FORMAT_BYTES) {
909     segment->last_stop = segment->start;
910     segment->time = segment->start;
911   } else if (segment->start == 0) {
912     /* seek to start, we can implement a default for this. */
913     segment->last_stop = 0;
914     segment->time = 0;
915     res = TRUE;
916   } else
917     res = FALSE;
918
919   return res;
920 }
921
922 static gboolean
923 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
924 {
925   GstBaseSrcClass *bclass;
926   gboolean result = FALSE;
927
928   bclass = GST_BASE_SRC_GET_CLASS (src);
929
930   if (bclass->do_seek)
931     result = bclass->do_seek (src, segment);
932
933   return result;
934 }
935
936 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
937
938 static gboolean
939 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
940     GstSegment * segment)
941 {
942   /* By default, we try one of 2 things:
943    *   - For absolute seek positions, convert the requested position to our 
944    *     configured processing format and place it in the output segment \
945    *   - For relative seek positions, convert our current (input) values to the
946    *     seek format, adjust by the relative seek offset and then convert back to
947    *     the processing format
948    */
949   GstSeekType cur_type, stop_type;
950   gint64 cur, stop;
951   GstSeekFlags flags;
952   GstFormat seek_format, dest_format;
953   gdouble rate;
954   gboolean update;
955   gboolean res = TRUE;
956
957   gst_event_parse_seek (event, &rate, &seek_format, &flags,
958       &cur_type, &cur, &stop_type, &stop);
959   dest_format = segment->format;
960
961   if (seek_format == dest_format) {
962     gst_segment_set_seek (segment, rate, seek_format, flags,
963         cur_type, cur, stop_type, stop, &update);
964     return TRUE;
965   }
966
967   if (cur_type != GST_SEEK_TYPE_NONE) {
968     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
969     res =
970         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
971         &cur);
972     cur_type = GST_SEEK_TYPE_SET;
973   }
974
975   if (res && stop_type != GST_SEEK_TYPE_NONE) {
976     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
977     res =
978         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
979         &stop);
980     stop_type = GST_SEEK_TYPE_SET;
981   }
982
983   /* And finally, configure our output segment in the desired format */
984   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
985       stop_type, stop, &update);
986
987   if (!res)
988     goto no_format;
989
990   return res;
991
992 no_format:
993   {
994     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
995     return FALSE;
996   }
997 }
998
999 static gboolean
1000 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1001     GstSegment * seeksegment)
1002 {
1003   GstBaseSrcClass *bclass;
1004   gboolean result = FALSE;
1005
1006   bclass = GST_BASE_SRC_GET_CLASS (src);
1007
1008   if (bclass->prepare_seek_segment)
1009     result = bclass->prepare_seek_segment (src, event, seeksegment);
1010
1011   return result;
1012 }
1013
1014 /* this code implements the seeking. It is a good example
1015  * handling all cases.
1016  *
1017  * A seek updates the currently configured segment.start
1018  * and segment.stop values based on the SEEK_TYPE. If the
1019  * segment.start value is updated, a seek to this new position
1020  * should be performed.
1021  *
1022  * The seek can only be executed when we are not currently
1023  * streaming any data, to make sure that this is the case, we
1024  * acquire the STREAM_LOCK which is taken when we are in the
1025  * _loop() function or when a getrange() is called. Normally
1026  * we will not receive a seek if we are operating in pull mode
1027  * though.
1028  *
1029  * When we are in the loop() function, we might be in the middle
1030  * of pushing a buffer, which might block in a sink. To make sure
1031  * that the push gets unblocked we push out a FLUSH_START event.
1032  * Our loop function will get a WRONG_STATE return value from
1033  * the push and will pause, effectively releasing the STREAM_LOCK.
1034  *
1035  * For a non-flushing seek, we pause the task, which might eventually
1036  * release the STREAM_LOCK. We say eventually because when the sink
1037  * blocks on the sample we might wait a very long time until the sink
1038  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1039  * can continue the seek. A non-flushing seek is normally done in a 
1040  * running pipeline to perform seamless playback.
1041  * In the case of a non-flushing seek we need to make sure that the
1042  * data we output after the seek is continuous with the previous data,
1043  * this is because a non-flushing seek does not reset the stream-time
1044  * to 0. We do this by closing the currently running segment, ie. sending
1045  * a new_segment event with the stop position set to the last processed 
1046  * position.
1047  *
1048  * After updating the segment.start/stop values, we prepare for
1049  * streaming again. We push out a FLUSH_STOP to make the peer pad
1050  * accept data again and we start our task again.
1051  *
1052  * A segment seek posts a message on the bus saying that the playback
1053  * of the segment started. We store the segment flag internally because
1054  * when we reach the segment.stop we have to post a segment.done
1055  * instead of EOS when doing a segment seek.
1056  */
1057 /* FIXME (0.11), we have the unlock gboolean here because most current 
1058  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1059  * the streaming thread isn't running, resulting in bogus unlocks later when it 
1060  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1061  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1062  * until 0.11
1063  */
1064 static gboolean
1065 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1066 {
1067   gboolean res = TRUE;
1068   gdouble rate;
1069   GstFormat seek_format, dest_format;
1070   GstSeekFlags flags;
1071   GstSeekType cur_type, stop_type;
1072   gint64 cur, stop;
1073   gboolean flush;
1074   gboolean update;
1075   gboolean relative_seek = FALSE;
1076   gboolean seekseg_configured = FALSE;
1077   GstSegment seeksegment;
1078
1079   GST_DEBUG_OBJECT (src, "doing seek");
1080
1081   dest_format = src->segment.format;
1082
1083   if (event) {
1084     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1085         &cur_type, &cur, &stop_type, &stop);
1086
1087     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1088         SEEK_TYPE_IS_RELATIVE (stop_type);
1089
1090     if (dest_format != seek_format && !relative_seek) {
1091       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1092        * here before taking the stream lock, otherwise we must convert it later,
1093        * once we have the stream lock and can read the current position */
1094       gst_segment_init (&seeksegment, dest_format);
1095
1096       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1097         goto prepare_failed;
1098
1099       seekseg_configured = TRUE;
1100     }
1101
1102     flush = flags & GST_SEEK_FLAG_FLUSH;
1103   } else {
1104     flush = FALSE;
1105   }
1106
1107   /* send flush start */
1108   if (flush)
1109     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
1110   else
1111     gst_pad_pause_task (src->srcpad);
1112
1113   /* unblock streaming thread */
1114   if (unlock)
1115     gst_base_src_unlock (src);
1116
1117   /* grab streaming lock, this should eventually be possible, either
1118    * because the task is paused or our streaming thread stopped 
1119    * because our peer is flushing. */
1120   GST_PAD_STREAM_LOCK (src->srcpad);
1121
1122   if (unlock)
1123     gst_base_src_unlock_stop (src);
1124
1125   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1126    * copy the current segment info into the temp segment that we can actually
1127    * attempt the seek with. We only update the real segment if the seek suceeds. */
1128   if (!seekseg_configured) {
1129     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1130
1131     /* now configure the final seek segment */
1132     if (event) {
1133       if (src->segment.format != seek_format) {
1134         /* OK, here's where we give the subclass a chance to convert the relative
1135          * seek into an absolute one in the processing format. We set up any
1136          * absolute seek above, before taking the stream lock. */
1137         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1138           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1139               "Aborting seek");
1140           res = FALSE;
1141         }
1142       } else {
1143         /* The seek format matches our processing format, no need to ask the
1144          * the subclass to configure the segment. */
1145         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1146             cur_type, cur, stop_type, stop, &update);
1147       }
1148     }
1149     /* Else, no seek event passed, so we're just (re)starting the 
1150        current segment. */
1151   }
1152
1153   if (res) {
1154     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1155         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1156         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1157
1158     /* do the seek, segment.last_stop contains the new position. */
1159     res = gst_base_src_do_seek (src, &seeksegment);
1160   }
1161
1162   /* and prepare to continue streaming */
1163   if (flush) {
1164     /* send flush stop, peer will accept data and events again. We
1165      * are not yet providing data as we still have the STREAM_LOCK. */
1166     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
1167   } else if (res && src->data.ABI.running) {
1168     /* we are running the current segment and doing a non-flushing seek, 
1169      * close the segment first based on the last_stop. */
1170     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1171         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1172
1173     /* queue the segment for sending in the stream thread */
1174     if (src->priv->close_segment)
1175       gst_event_unref (src->priv->close_segment);
1176     src->priv->close_segment =
1177         gst_event_new_new_segment_full (TRUE,
1178         src->segment.rate, src->segment.applied_rate, src->segment.format,
1179         src->segment.start, src->segment.last_stop, src->segment.time);
1180   }
1181
1182   /* The subclass must have converted the segment to the processing format 
1183    * by now */
1184   if (res && seeksegment.format != dest_format) {
1185     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1186         "in the correct format. Aborting seek.");
1187     res = FALSE;
1188   }
1189
1190   /* if successfull seek, we update our real segment and push
1191    * out the new segment. */
1192   if (res) {
1193     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1194
1195     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1196       gst_element_post_message (GST_ELEMENT (src),
1197           gst_message_new_segment_start (GST_OBJECT (src),
1198               src->segment.format, src->segment.last_stop));
1199     }
1200
1201     /* for deriving a stop position for the playback segment form the seek
1202      * segment, we must take the duration when the stop is not set */
1203     if ((stop = src->segment.stop) == -1)
1204       stop = src->segment.duration;
1205
1206     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1207         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1208
1209     /* now replace the old segment so that we send it in the stream thread the
1210      * next time it is scheduled. */
1211     if (src->priv->start_segment)
1212       gst_event_unref (src->priv->start_segment);
1213     src->priv->start_segment =
1214         gst_event_new_new_segment_full (FALSE,
1215         src->segment.rate, src->segment.applied_rate, src->segment.format,
1216         src->segment.last_stop, stop, src->segment.time);
1217   }
1218
1219   src->priv->discont = TRUE;
1220   src->data.ABI.running = TRUE;
1221   /* and restart the task in case it got paused explicitely or by
1222    * the FLUSH_START event we pushed out. */
1223   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1224       src->srcpad);
1225
1226   /* and release the lock again so we can continue streaming */
1227   GST_PAD_STREAM_UNLOCK (src->srcpad);
1228
1229   return res;
1230
1231   /* ERROR */
1232 prepare_failed:
1233   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1234       "Aborting seek");
1235   return FALSE;
1236 }
1237
1238 static const GstQueryType *
1239 gst_base_src_get_query_types (GstElement * element)
1240 {
1241   static const GstQueryType query_types[] = {
1242     GST_QUERY_DURATION,
1243     GST_QUERY_POSITION,
1244     GST_QUERY_SEEKING,
1245     GST_QUERY_SEGMENT,
1246     GST_QUERY_FORMATS,
1247     GST_QUERY_LATENCY,
1248     GST_QUERY_JITTER,
1249     GST_QUERY_RATE,
1250     GST_QUERY_CONVERT,
1251     0
1252   };
1253
1254   return query_types;
1255 }
1256
1257 /* all events send to this element directly
1258  */
1259 static gboolean
1260 gst_base_src_send_event (GstElement * element, GstEvent * event)
1261 {
1262   GstBaseSrc *src;
1263   gboolean result = FALSE;
1264
1265   src = GST_BASE_SRC (element);
1266
1267   switch (GST_EVENT_TYPE (event)) {
1268     case GST_EVENT_FLUSH_START:
1269     case GST_EVENT_FLUSH_STOP:
1270       /* sending random flushes downstream can break stuff,
1271        * especially sync since all segment info will get flushed */
1272       break;
1273     case GST_EVENT_EOS:
1274       /* FIXME, queue EOS and make sure the task or pull function 
1275        * perform the EOS actions. */
1276       break;
1277     case GST_EVENT_NEWSEGMENT:
1278       /* sending random NEWSEGMENT downstream can break sync. */
1279       break;
1280     case GST_EVENT_TAG:
1281     case GST_EVENT_BUFFERSIZE:
1282       break;
1283     case GST_EVENT_QOS:
1284       break;
1285     case GST_EVENT_SEEK:
1286     {
1287       gboolean started;
1288
1289       GST_OBJECT_LOCK (src->srcpad);
1290       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1291         goto wrong_mode;
1292       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1293       GST_OBJECT_UNLOCK (src->srcpad);
1294
1295       if (started) {
1296         /* when we are running in push mode, we can execute the
1297          * seek right now, we need to unlock. */
1298         result = gst_base_src_perform_seek (src, event, TRUE);
1299       } else {
1300         GstEvent **event_p;
1301
1302         /* else we store the event and execute the seek when we
1303          * get activated */
1304         GST_OBJECT_LOCK (src);
1305         event_p = &src->data.ABI.pending_seek;
1306         gst_event_replace ((GstEvent **) event_p, event);
1307         GST_OBJECT_UNLOCK (src);
1308         /* assume the seek will work */
1309         result = TRUE;
1310       }
1311       break;
1312     }
1313     case GST_EVENT_NAVIGATION:
1314       break;
1315     default:
1316       break;
1317   }
1318 done:
1319   gst_event_unref (event);
1320
1321   return result;
1322
1323   /* ERRORS */
1324 wrong_mode:
1325   {
1326     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1327     GST_OBJECT_UNLOCK (src->srcpad);
1328     result = FALSE;
1329     goto done;
1330   }
1331 }
1332
1333 static gboolean
1334 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1335 {
1336   gboolean result;
1337
1338   switch (GST_EVENT_TYPE (event)) {
1339     case GST_EVENT_SEEK:
1340       /* is normally called when in push mode */
1341       if (!src->seekable)
1342         goto not_seekable;
1343
1344       result = gst_base_src_perform_seek (src, event, TRUE);
1345       break;
1346     case GST_EVENT_FLUSH_START:
1347       /* cancel any blocking getrange, is normally called
1348        * when in pull mode. */
1349       result = gst_base_src_unlock (src);
1350       break;
1351     case GST_EVENT_FLUSH_STOP:
1352       result = gst_base_src_unlock_stop (src);
1353       break;
1354     default:
1355       result = TRUE;
1356       break;
1357   }
1358   return result;
1359
1360   /* ERRORS */
1361 not_seekable:
1362   {
1363     GST_DEBUG_OBJECT (src, "is not seekable");
1364     return FALSE;
1365   }
1366 }
1367
1368 static gboolean
1369 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1370 {
1371   GstBaseSrc *src;
1372   GstBaseSrcClass *bclass;
1373   gboolean result = FALSE;
1374
1375   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1376   bclass = GST_BASE_SRC_GET_CLASS (src);
1377
1378   if (bclass->event) {
1379     if (!(result = bclass->event (src, event)))
1380       goto subclass_failed;
1381   }
1382
1383 done:
1384   gst_event_unref (event);
1385   gst_object_unref (src);
1386
1387   return result;
1388
1389   /* ERRORS */
1390 subclass_failed:
1391   {
1392     GST_DEBUG_OBJECT (src, "subclass refused event");
1393     goto done;
1394   }
1395 }
1396
1397 static void
1398 gst_base_src_set_property (GObject * object, guint prop_id,
1399     const GValue * value, GParamSpec * pspec)
1400 {
1401   GstBaseSrc *src;
1402
1403   src = GST_BASE_SRC (object);
1404
1405   switch (prop_id) {
1406     case PROP_BLOCKSIZE:
1407       src->blocksize = g_value_get_ulong (value);
1408       break;
1409     case PROP_NUM_BUFFERS:
1410       src->num_buffers = g_value_get_int (value);
1411       break;
1412     case PROP_TYPEFIND:
1413       src->data.ABI.typefind = g_value_get_boolean (value);
1414       break;
1415     case PROP_DO_TIMESTAMP:
1416       src->priv->do_timestamp = g_value_get_boolean (value);
1417       break;
1418     default:
1419       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1420       break;
1421   }
1422 }
1423
1424 static void
1425 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1426     GParamSpec * pspec)
1427 {
1428   GstBaseSrc *src;
1429
1430   src = GST_BASE_SRC (object);
1431
1432   switch (prop_id) {
1433     case PROP_BLOCKSIZE:
1434       g_value_set_ulong (value, src->blocksize);
1435       break;
1436     case PROP_NUM_BUFFERS:
1437       g_value_set_int (value, src->num_buffers);
1438       break;
1439     case PROP_TYPEFIND:
1440       g_value_set_boolean (value, src->data.ABI.typefind);
1441       break;
1442     case PROP_DO_TIMESTAMP:
1443       g_value_set_boolean (value, src->priv->do_timestamp);
1444       break;
1445     default:
1446       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1447       break;
1448   }
1449 }
1450
1451 /* with STREAM_LOCK and LOCK */
1452 static GstClockReturn
1453 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1454 {
1455   GstClockReturn ret;
1456   GstClockID id;
1457
1458   id = gst_clock_new_single_shot_id (clock, time);
1459
1460   basesrc->clock_id = id;
1461   /* release the object lock while waiting */
1462   GST_OBJECT_UNLOCK (basesrc);
1463
1464   ret = gst_clock_id_wait (id, NULL);
1465
1466   GST_OBJECT_LOCK (basesrc);
1467   gst_clock_id_unref (id);
1468   basesrc->clock_id = NULL;
1469
1470   return ret;
1471 }
1472
1473 /* perform synchronisation on a buffer. 
1474  * with STREAM_LOCK.
1475  */
1476 static GstClockReturn
1477 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1478 {
1479   GstClockReturn result;
1480   GstClockTime start, end;
1481   GstBaseSrcClass *bclass;
1482   GstClockTime base_time;
1483   GstClock *clock;
1484   GstClockTime now = -1, timestamp;
1485   gboolean do_timestamp;
1486
1487   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1488
1489   start = end = -1;
1490   if (bclass->get_times)
1491     bclass->get_times (basesrc, buffer, &start, &end);
1492
1493   /* grab the lock to prepare for clocking and calculate the startup 
1494    * latency. */
1495   GST_OBJECT_LOCK (basesrc);
1496
1497   /* get clock, if no clock, we can't sync or get the latency */
1498   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1499     goto no_clock;
1500
1501   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1502
1503   do_timestamp = basesrc->priv->do_timestamp;
1504   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1505
1506   /* first buffer, calculate the startup latency */
1507   if (basesrc->priv->startup_latency == -1) {
1508     now = gst_clock_get_time (clock);
1509
1510     GST_LOG_OBJECT (basesrc, "startup timestamp: %" GST_TIME_FORMAT,
1511         GST_TIME_ARGS (timestamp));
1512
1513     /* startup latency is the diff between when we went to PLAYING (base_time)
1514      * and the current clock time */
1515     if (now > base_time)
1516       basesrc->priv->startup_latency = now - base_time;
1517     else
1518       basesrc->priv->startup_latency = 0;
1519
1520     GST_LOG_OBJECT (basesrc, "startup running_time: %" GST_TIME_FORMAT,
1521         GST_TIME_ARGS (basesrc->priv->startup_latency));
1522
1523     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1524       if (do_timestamp)
1525         timestamp = now - base_time;
1526       else
1527         timestamp = 0;
1528
1529       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1530
1531       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1532           GST_TIME_ARGS (timestamp));
1533     }
1534
1535     /* we have a timestamp, we can subtract it from the startup_latency when it is
1536      * smaller. If the timestamp is bigger, there is no startup latency. */
1537     if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
1538       if (timestamp < basesrc->priv->startup_latency)
1539         basesrc->priv->startup_latency -= timestamp;
1540       else
1541         basesrc->priv->startup_latency = 0;
1542     }
1543
1544     GST_LOG_OBJECT (basesrc, "startup latency: %" GST_TIME_FORMAT,
1545         GST_TIME_ARGS (basesrc->priv->startup_latency));
1546   } else {
1547     /* not the first buffer, the timestamp is the diff between the clock and
1548      * base_time */
1549     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1550       now = gst_clock_get_time (clock);
1551
1552       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1553     }
1554   }
1555
1556   /* if we don't have a buffer timestamp, we don't sync */
1557   if (!GST_CLOCK_TIME_IS_VALID (start))
1558     goto invalid_start;
1559
1560   if (basesrc->is_live) {
1561     /* live source and we need to sync, add startup latency to timestamp to
1562      * get the real running_time */
1563     if (timestamp != -1) {
1564       start += basesrc->priv->startup_latency;
1565       GST_BUFFER_TIMESTAMP (buffer) =
1566           timestamp + basesrc->priv->startup_latency;
1567     }
1568   }
1569
1570   GST_LOG_OBJECT (basesrc,
1571       "waiting for clock, base time %" GST_TIME_FORMAT
1572       ", stream_start %" GST_TIME_FORMAT,
1573       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1574
1575   result = gst_base_src_wait (basesrc, clock, start + base_time);
1576   GST_OBJECT_UNLOCK (basesrc);
1577
1578   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1579
1580   return result;
1581
1582   /* special cases */
1583 no_clock:
1584   {
1585     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1586     GST_OBJECT_UNLOCK (basesrc);
1587     return GST_CLOCK_OK;
1588   }
1589 invalid_start:
1590   {
1591     GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1592     GST_OBJECT_UNLOCK (basesrc);
1593     return GST_CLOCK_OK;
1594   }
1595 }
1596
1597 static gboolean
1598 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1599 {
1600   guint64 size, maxsize;
1601   GstBaseSrcClass *bclass;
1602
1603   bclass = GST_BASE_SRC_GET_CLASS (src);
1604
1605   /* only operate if we are working with bytes */
1606   if (src->segment.format != GST_FORMAT_BYTES)
1607     return TRUE;
1608
1609   /* get total file size */
1610   size = (guint64) src->segment.duration;
1611
1612   /* the max amount of bytes to read is the total size or
1613    * up to the segment.stop if present. */
1614   if (src->segment.stop != -1)
1615     maxsize = MIN (size, src->segment.stop);
1616   else
1617     maxsize = size;
1618
1619   GST_DEBUG_OBJECT (src,
1620       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1621       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1622       *length, size, src->segment.stop, maxsize);
1623
1624   /* check size if we have one */
1625   if (maxsize != -1) {
1626     /* if we run past the end, check if the file became bigger and 
1627      * retry. */
1628     if (G_UNLIKELY (offset + *length >= maxsize)) {
1629       /* see if length of the file changed */
1630       if (bclass->get_size)
1631         if (!bclass->get_size (src, &size))
1632           size = -1;
1633
1634       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1635
1636       /* make sure we don't exceed the configured segment stop
1637        * if it was set */
1638       if (src->segment.stop != -1)
1639         maxsize = MIN (size, src->segment.stop);
1640       else
1641         maxsize = size;
1642
1643       /* if we are at or past the end, EOS */
1644       if (G_UNLIKELY (offset >= maxsize))
1645         goto unexpected_length;
1646
1647       /* else we can clip to the end */
1648       if (G_UNLIKELY (offset + *length >= maxsize))
1649         *length = maxsize - offset;
1650
1651     }
1652   }
1653
1654   /* keep track of current position. segment is in bytes, we checked 
1655    * that above. */
1656   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1657
1658   return TRUE;
1659
1660   /* ERRORS */
1661 unexpected_length:
1662   {
1663     return FALSE;
1664   }
1665 }
1666
1667 static GstFlowReturn
1668 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1669     GstBuffer ** buf)
1670 {
1671   GstFlowReturn ret;
1672   GstBaseSrcClass *bclass;
1673   GstClockReturn status;
1674
1675   bclass = GST_BASE_SRC_GET_CLASS (src);
1676
1677   ret = gst_base_src_wait_playing (src);
1678   if (ret != GST_FLOW_OK)
1679     goto stopped;
1680
1681   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
1682     goto not_started;
1683
1684   if (G_UNLIKELY (!bclass->create))
1685     goto no_function;
1686
1687   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
1688     goto unexpected_length;
1689
1690   /* normally we don't count buffers */
1691   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
1692     if (src->num_buffers_left == 0)
1693       goto reached_num_buffers;
1694     else
1695       src->num_buffers_left--;
1696   }
1697
1698   GST_DEBUG_OBJECT (src,
1699       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
1700       G_GINT64_FORMAT, offset, length, src->segment.time);
1701
1702   ret = bclass->create (src, offset, length, buf);
1703   if (G_UNLIKELY (ret != GST_FLOW_OK))
1704     goto done;
1705
1706   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
1707   if (offset == 0 && src->segment.time == 0
1708       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1709     GST_BUFFER_TIMESTAMP (*buf) = 0;
1710
1711   /* now sync before pushing the buffer */
1712   status = gst_base_src_do_sync (src, *buf);
1713   switch (status) {
1714     case GST_CLOCK_EARLY:
1715       /* the buffer is too late. We currently don't drop the buffer. */
1716       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1717       break;
1718     case GST_CLOCK_OK:
1719       /* buffer synchronised properly */
1720       GST_DEBUG_OBJECT (src, "buffer ok");
1721       break;
1722     case GST_CLOCK_UNSCHEDULED:
1723       /* this case is triggered when we were waiting for the clock and
1724        * it got unlocked because we did a state change. We return 
1725        * WRONG_STATE in this case to stop the dataflow also get rid of the
1726        * produced buffer. */
1727       GST_DEBUG_OBJECT (src,
1728           "clock was unscheduled (%d), returning WRONG_STATE", status);
1729       gst_buffer_unref (*buf);
1730       *buf = NULL;
1731       ret = GST_FLOW_WRONG_STATE;
1732       break;
1733     default:
1734       /* all other result values are unexpected and errors */
1735       GST_ELEMENT_ERROR (src, CORE, CLOCK,
1736           (_("Internal clock error.")),
1737           ("clock returned unexpected return value %d", status));
1738       gst_buffer_unref (*buf);
1739       *buf = NULL;
1740       ret = GST_FLOW_ERROR;
1741       break;
1742   }
1743 done:
1744   return ret;
1745
1746   /* ERROR */
1747 stopped:
1748   {
1749     GST_DEBUG_OBJECT (src, "wait_playing returned %d", ret);
1750     return ret;
1751   }
1752 not_started:
1753   {
1754     GST_DEBUG_OBJECT (src, "getrange but not started");
1755     return GST_FLOW_WRONG_STATE;
1756   }
1757 no_function:
1758   {
1759     GST_DEBUG_OBJECT (src, "no create function");
1760     return GST_FLOW_ERROR;
1761   }
1762 unexpected_length:
1763   {
1764     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1765         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1766     return GST_FLOW_UNEXPECTED;
1767   }
1768 reached_num_buffers:
1769   {
1770     GST_DEBUG_OBJECT (src, "sent all buffers");
1771     return GST_FLOW_UNEXPECTED;
1772   }
1773 }
1774
1775 static GstFlowReturn
1776 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1777     GstBuffer ** buf)
1778 {
1779   GstBaseSrc *src;
1780   GstFlowReturn res;
1781
1782   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1783
1784   res = gst_base_src_get_range (src, offset, length, buf);
1785
1786   gst_object_unref (src);
1787
1788   return res;
1789 }
1790
1791 static gboolean
1792 gst_base_src_default_check_get_range (GstBaseSrc * src)
1793 {
1794   gboolean res;
1795
1796   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1797     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1798     if (G_LIKELY (gst_base_src_start (src)))
1799       gst_base_src_stop (src);
1800   }
1801
1802   /* we can operate in getrange mode if the native format is bytes
1803    * and we are seekable, this condition is set in the random_access
1804    * flag and is set in the _start() method. */
1805   res = src->random_access;
1806
1807   return res;
1808 }
1809
1810 static gboolean
1811 gst_base_src_check_get_range (GstBaseSrc * src)
1812 {
1813   GstBaseSrcClass *bclass;
1814   gboolean res;
1815
1816   bclass = GST_BASE_SRC_GET_CLASS (src);
1817
1818   if (bclass->check_get_range == NULL)
1819     goto no_function;
1820
1821   res = bclass->check_get_range (src);
1822   GST_LOG_OBJECT (src, "%s() returned %d",
1823       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1824
1825   return res;
1826
1827   /* ERRORS */
1828 no_function:
1829   {
1830     GST_WARNING_OBJECT (src, "no check_get_range function set");
1831     return FALSE;
1832   }
1833 }
1834
1835 static gboolean
1836 gst_base_src_pad_check_get_range (GstPad * pad)
1837 {
1838   GstBaseSrc *src;
1839   gboolean res;
1840
1841   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1842
1843   res = gst_base_src_check_get_range (src);
1844
1845   gst_object_unref (src);
1846
1847   return res;
1848 }
1849
1850 static void
1851 gst_base_src_loop (GstPad * pad)
1852 {
1853   GstBaseSrc *src;
1854   GstBuffer *buf = NULL;
1855   GstFlowReturn ret;
1856   gint64 position;
1857   gboolean eos;
1858
1859   eos = FALSE;
1860
1861   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1862
1863   src->priv->last_sent_eos = FALSE;
1864
1865   /* if we operate in bytes, we can calculate an offset */
1866   if (src->segment.format == GST_FORMAT_BYTES)
1867     position = src->segment.last_stop;
1868   else
1869     position = -1;
1870
1871   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1872   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1873     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
1874         gst_flow_get_name (ret));
1875     goto pause;
1876   }
1877   /* this should not happen */
1878   if (G_UNLIKELY (buf == NULL))
1879     goto null_buffer;
1880
1881   /* push events to close/start our segment before we push the buffer. */
1882   if (src->priv->close_segment) {
1883     gst_pad_push_event (pad, src->priv->close_segment);
1884     src->priv->close_segment = NULL;
1885   }
1886   if (src->priv->start_segment) {
1887     gst_pad_push_event (pad, src->priv->start_segment);
1888     src->priv->start_segment = NULL;
1889   }
1890
1891   /* figure out the new position */
1892   switch (src->segment.format) {
1893     case GST_FORMAT_BYTES:
1894       position += GST_BUFFER_SIZE (buf);
1895       break;
1896     case GST_FORMAT_TIME:
1897     {
1898       GstClockTime start, duration;
1899
1900       start = GST_BUFFER_TIMESTAMP (buf);
1901       duration = GST_BUFFER_DURATION (buf);
1902
1903       if (GST_CLOCK_TIME_IS_VALID (start))
1904         position = start;
1905       else
1906         position = src->segment.last_stop;
1907
1908       if (GST_CLOCK_TIME_IS_VALID (duration))
1909         position += duration;
1910       break;
1911     }
1912     case GST_FORMAT_DEFAULT:
1913       position = GST_BUFFER_OFFSET_END (buf);
1914       break;
1915     default:
1916       position = -1;
1917       break;
1918   }
1919   if (position != -1) {
1920     if (src->segment.stop != -1) {
1921       if (position >= src->segment.stop) {
1922         eos = TRUE;
1923         position = src->segment.stop;
1924       }
1925     }
1926     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1927   }
1928
1929   if (G_UNLIKELY (src->priv->discont)) {
1930     buf = gst_buffer_make_metadata_writable (buf);
1931     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1932     src->priv->discont = FALSE;
1933   }
1934
1935   ret = gst_pad_push (pad, buf);
1936   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1937     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
1938         gst_flow_get_name (ret));
1939     goto pause;
1940   }
1941
1942   if (eos) {
1943     GST_INFO_OBJECT (src, "pausing after EOS");
1944     ret = GST_FLOW_UNEXPECTED;
1945     goto pause;
1946   }
1947
1948 done:
1949   gst_object_unref (src);
1950   return;
1951
1952   /* special cases */
1953 pause:
1954   {
1955     const gchar *reason = gst_flow_get_name (ret);
1956
1957     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1958     src->data.ABI.running = FALSE;
1959     gst_pad_pause_task (pad);
1960     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1961       if (ret == GST_FLOW_UNEXPECTED) {
1962         /* perform EOS logic */
1963         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1964           gst_element_post_message (GST_ELEMENT_CAST (src),
1965               gst_message_new_segment_done (GST_OBJECT_CAST (src),
1966                   src->segment.format, src->segment.last_stop));
1967         } else {
1968           gst_pad_push_event (pad, gst_event_new_eos ());
1969           src->priv->last_sent_eos = TRUE;
1970         }
1971       } else {
1972         /* for fatal errors we post an error message, post the error
1973          * first so the app knows about the error first. */
1974         GST_ELEMENT_ERROR (src, STREAM, FAILED,
1975             (_("Internal data flow error.")),
1976             ("streaming task paused, reason %s (%d)", reason, ret));
1977         gst_pad_push_event (pad, gst_event_new_eos ());
1978         src->priv->last_sent_eos = TRUE;
1979       }
1980     }
1981     goto done;
1982   }
1983 null_buffer:
1984   {
1985     GST_ELEMENT_ERROR (src, STREAM, FAILED,
1986         (_("Internal data flow error.")), ("element returned NULL buffer"));
1987     /* we finished the segment on error */
1988     src->data.ABI.running = FALSE;
1989     gst_pad_pause_task (pad);
1990     gst_pad_push_event (pad, gst_event_new_eos ());
1991     src->priv->last_sent_eos = TRUE;
1992     goto done;
1993   }
1994 }
1995
1996 /* this will always be called between start() and stop(). So you can rely on
1997  * resources allocated by start() and freed from stop(). This needs to be added
1998  * to the docs at some point. */
1999 static gboolean
2000 gst_base_src_unlock (GstBaseSrc * basesrc)
2001 {
2002   GstBaseSrcClass *bclass;
2003   gboolean result = TRUE;
2004
2005   GST_DEBUG ("unlock");
2006   /* unblock whatever the subclass is doing */
2007   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2008   if (bclass->unlock)
2009     result = bclass->unlock (basesrc);
2010
2011   GST_DEBUG ("unschedule clock");
2012   /* and unblock the clock as well, if any */
2013   GST_OBJECT_LOCK (basesrc);
2014   if (basesrc->clock_id) {
2015     gst_clock_id_unschedule (basesrc->clock_id);
2016   }
2017   GST_OBJECT_UNLOCK (basesrc);
2018
2019   GST_DEBUG ("unlock done");
2020
2021   return result;
2022 }
2023
2024 /* this will always be called between start() and stop(). So you can rely on
2025  * resources allocated by start() and freed from stop(). This needs to be added
2026  * to the docs at some point. */
2027 static gboolean
2028 gst_base_src_unlock_stop (GstBaseSrc * basesrc)
2029 {
2030   GstBaseSrcClass *bclass;
2031   gboolean result = TRUE;
2032
2033   GST_DEBUG_OBJECT (basesrc, "unlock stop");
2034
2035   /* Finish a previous unblock request, allowing subclasses to flush command
2036    * queues or whatever they need to do */
2037   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2038   if (bclass->unlock_stop)
2039     result = bclass->unlock_stop (basesrc);
2040
2041   GST_DEBUG_OBJECT (basesrc, "unlock stop done");
2042
2043   return result;
2044 }
2045
2046 /* default negotiation code. 
2047  *
2048  * Take intersection between src and sink pads, take first
2049  * caps and fixate. 
2050  */
2051 static gboolean
2052 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2053 {
2054   GstCaps *thiscaps;
2055   GstCaps *caps = NULL;
2056   GstCaps *peercaps = NULL;
2057   gboolean result = FALSE;
2058
2059   /* first see what is possible on our source pad */
2060   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
2061   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2062   /* nothing or anything is allowed, we're done */
2063   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2064     goto no_nego_needed;
2065
2066   /* get the peer caps */
2067   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
2068   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2069   if (peercaps) {
2070     GstCaps *icaps;
2071
2072     /* get intersection */
2073     icaps = gst_caps_intersect (thiscaps, peercaps);
2074     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2075     gst_caps_unref (thiscaps);
2076     gst_caps_unref (peercaps);
2077     if (icaps) {
2078       /* take first (and best, since they are sorted) possibility */
2079       caps = gst_caps_copy_nth (icaps, 0);
2080       gst_caps_unref (icaps);
2081     }
2082   } else {
2083     /* no peer, work with our own caps then */
2084     caps = thiscaps;
2085   }
2086   if (caps) {
2087     caps = gst_caps_make_writable (caps);
2088     gst_caps_truncate (caps);
2089
2090     /* now fixate */
2091     if (!gst_caps_is_empty (caps)) {
2092       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2093       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2094
2095       if (gst_caps_is_any (caps)) {
2096         /* hmm, still anything, so element can do anything and
2097          * nego is not needed */
2098         result = TRUE;
2099       } else if (gst_caps_is_fixed (caps)) {
2100         /* yay, fixed caps, use those then */
2101         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2102         result = TRUE;
2103       }
2104     }
2105     gst_caps_unref (caps);
2106   }
2107   return result;
2108
2109 no_nego_needed:
2110   {
2111     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2112     if (thiscaps)
2113       gst_caps_unref (thiscaps);
2114     return TRUE;
2115   }
2116 }
2117
2118 static gboolean
2119 gst_base_src_negotiate (GstBaseSrc * basesrc)
2120 {
2121   GstBaseSrcClass *bclass;
2122   gboolean result = TRUE;
2123
2124   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2125
2126   if (bclass->negotiate)
2127     result = bclass->negotiate (basesrc);
2128
2129   return result;
2130 }
2131
2132 static gboolean
2133 gst_base_src_start (GstBaseSrc * basesrc)
2134 {
2135   GstBaseSrcClass *bclass;
2136   gboolean result;
2137   guint64 size;
2138
2139   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2140     return TRUE;
2141
2142   GST_DEBUG_OBJECT (basesrc, "starting source");
2143
2144   basesrc->num_buffers_left = basesrc->num_buffers;
2145
2146   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2147   basesrc->data.ABI.running = FALSE;
2148
2149   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2150   if (bclass->start)
2151     result = bclass->start (basesrc);
2152   else
2153     result = TRUE;
2154
2155   if (!result)
2156     goto could_not_start;
2157
2158   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2159
2160   /* figure out the size */
2161   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2162     if (bclass->get_size) {
2163       if (!(result = bclass->get_size (basesrc, &size)))
2164         size = -1;
2165     } else {
2166       result = FALSE;
2167       size = -1;
2168     }
2169     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2170     /* only update the size when operating in bytes, subclass is supposed
2171      * to set duration in the start method for other formats */
2172     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2173   } else {
2174     size = -1;
2175   }
2176
2177   GST_DEBUG_OBJECT (basesrc,
2178       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2179       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2180       basesrc->segment.duration);
2181
2182   /* check if we can seek */
2183   if (bclass->is_seekable)
2184     basesrc->seekable = bclass->is_seekable (basesrc);
2185   else
2186     basesrc->seekable = FALSE;
2187
2188   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
2189
2190   /* update for random access flag */
2191   basesrc->random_access = basesrc->seekable &&
2192       basesrc->segment.format == GST_FORMAT_BYTES;
2193
2194   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2195
2196   /* run typefind if we are random_access and the typefinding is enabled. */
2197   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2198     GstCaps *caps;
2199
2200     caps = gst_type_find_helper (basesrc->srcpad, size);
2201     gst_pad_set_caps (basesrc->srcpad, caps);
2202     gst_caps_unref (caps);
2203   } else {
2204     /* use class or default negotiate function */
2205     if (!gst_base_src_negotiate (basesrc))
2206       goto could_not_negotiate;
2207   }
2208
2209   return TRUE;
2210
2211   /* ERROR */
2212 could_not_start:
2213   {
2214     GST_DEBUG_OBJECT (basesrc, "could not start");
2215     /* subclass is supposed to post a message. We don't have to call _stop. */
2216     return FALSE;
2217   }
2218 could_not_negotiate:
2219   {
2220     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2221     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2222         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2223     /* we must call stop */
2224     gst_base_src_stop (basesrc);
2225     return FALSE;
2226   }
2227 }
2228
2229 static gboolean
2230 gst_base_src_stop (GstBaseSrc * basesrc)
2231 {
2232   GstBaseSrcClass *bclass;
2233   gboolean result = TRUE;
2234
2235   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2236     return TRUE;
2237
2238   GST_DEBUG_OBJECT (basesrc, "stopping source");
2239
2240   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2241   if (bclass->stop)
2242     result = bclass->stop (basesrc);
2243
2244   if (result)
2245     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2246
2247   return result;
2248 }
2249
2250 static gboolean
2251 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
2252 {
2253   gboolean result;
2254
2255   GST_LIVE_LOCK (basesrc);
2256   basesrc->live_running = TRUE;
2257   GST_LIVE_SIGNAL (basesrc);
2258   GST_LIVE_UNLOCK (basesrc);
2259
2260   /* step 1, unblock clock sync (if any) */
2261   result = gst_base_src_unlock (basesrc);
2262
2263   /* step 2, make sure streaming finishes */
2264   result &= gst_pad_stop_task (pad);
2265
2266   /* step 3, clear the unblock condition */
2267   result &= gst_base_src_unlock_stop (basesrc);
2268
2269   return result;
2270 }
2271
2272 static gboolean
2273 gst_base_src_activate_push (GstPad * pad, gboolean active)
2274 {
2275   GstBaseSrc *basesrc;
2276   GstEvent *event;
2277
2278   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2279
2280   /* prepare subclass first */
2281   if (active) {
2282     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2283
2284     if (G_UNLIKELY (!basesrc->can_activate_push))
2285       goto no_push_activation;
2286
2287     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2288       goto error_start;
2289
2290     basesrc->priv->last_sent_eos = FALSE;
2291
2292     /* do initial seek, which will start the task */
2293     GST_OBJECT_LOCK (basesrc);
2294     event = basesrc->data.ABI.pending_seek;
2295     basesrc->data.ABI.pending_seek = NULL;
2296     GST_OBJECT_UNLOCK (basesrc);
2297
2298     /* no need to unlock anything, the task is certainly
2299      * not running here. The perform seek code will start the task when
2300      * finished. */
2301     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2302       goto seek_failed;
2303
2304     if (event)
2305       gst_event_unref (event);
2306   } else {
2307     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2308     /* call the unlock function and stop the task */
2309     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2310       goto deactivate_failed;
2311
2312     /* now we can stop the source */
2313     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2314       goto error_stop;
2315   }
2316   return TRUE;
2317
2318   /* ERRORS */
2319 no_push_activation:
2320   {
2321     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2322     return FALSE;
2323   }
2324 error_start:
2325   {
2326     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2327     return FALSE;
2328   }
2329 seek_failed:
2330   {
2331     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2332     gst_base_src_stop (basesrc);
2333     if (event)
2334       gst_event_unref (event);
2335     return FALSE;
2336   }
2337 deactivate_failed:
2338   {
2339     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in push mode");
2340     return FALSE;
2341   }
2342 error_stop:
2343   {
2344     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2345     return FALSE;
2346   }
2347 }
2348
2349 static gboolean
2350 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2351 {
2352   GstBaseSrc *basesrc;
2353
2354   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2355
2356   /* prepare subclass first */
2357   if (active) {
2358     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2359     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2360       goto error_start;
2361
2362     /* if not random_access, we cannot operate in pull mode for now */
2363     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2364       goto no_get_range;
2365   } else {
2366     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2367     /* call the unlock function. We have no task to stop. */
2368     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2369       goto deactivate_failed;
2370
2371     /* don't send EOS when going from PAUSED => READY when in pull mode */
2372     basesrc->priv->last_sent_eos = TRUE;
2373
2374     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2375       goto error_stop;
2376   }
2377   return TRUE;
2378
2379   /* ERRORS */
2380 error_start:
2381   {
2382     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2383     return FALSE;
2384   }
2385 no_get_range:
2386   {
2387     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2388     gst_base_src_stop (basesrc);
2389     return FALSE;
2390   }
2391 deactivate_failed:
2392   {
2393     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in pull mode");
2394     return FALSE;
2395   }
2396 error_stop:
2397   {
2398     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2399     return FALSE;
2400   }
2401 }
2402
2403 static GstStateChangeReturn
2404 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2405 {
2406   GstBaseSrc *basesrc;
2407   GstStateChangeReturn result;
2408   gboolean no_preroll = FALSE;
2409
2410   basesrc = GST_BASE_SRC (element);
2411
2412   switch (transition) {
2413     case GST_STATE_CHANGE_NULL_TO_READY:
2414       break;
2415     case GST_STATE_CHANGE_READY_TO_PAUSED:
2416       GST_LIVE_LOCK (element);
2417       if (basesrc->is_live) {
2418         no_preroll = TRUE;
2419         basesrc->live_running = FALSE;
2420       }
2421       basesrc->priv->last_sent_eos = FALSE;
2422       basesrc->priv->discont = TRUE;
2423       basesrc->priv->startup_latency = -1;
2424       GST_LIVE_UNLOCK (element);
2425       break;
2426     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2427       GST_LIVE_LOCK (element);
2428       if (basesrc->is_live) {
2429         basesrc->live_running = TRUE;
2430         GST_LIVE_SIGNAL (element);
2431       }
2432       GST_LIVE_UNLOCK (element);
2433       break;
2434     default:
2435       break;
2436   }
2437
2438   if ((result =
2439           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2440               transition)) == GST_STATE_CHANGE_FAILURE)
2441     goto failure;
2442
2443   switch (transition) {
2444     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2445       GST_LIVE_LOCK (element);
2446       if (basesrc->is_live) {
2447         no_preroll = TRUE;
2448         basesrc->live_running = FALSE;
2449       }
2450       GST_LIVE_UNLOCK (element);
2451       break;
2452     case GST_STATE_CHANGE_PAUSED_TO_READY:
2453     {
2454       GstEvent **event_p;
2455
2456       /* FIXME, deprecate this behaviour, it is very dangerous.
2457        * the prefered way of sending EOS downstream is by sending
2458        * the EOS event to the element */
2459       if (!basesrc->priv->last_sent_eos) {
2460         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2461         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
2462         basesrc->priv->last_sent_eos = TRUE;
2463       }
2464       event_p = &basesrc->data.ABI.pending_seek;
2465       gst_event_replace (event_p, NULL);
2466       event_p = &basesrc->priv->close_segment;
2467       gst_event_replace (event_p, NULL);
2468       event_p = &basesrc->priv->start_segment;
2469       gst_event_replace (event_p, NULL);
2470       break;
2471     }
2472     case GST_STATE_CHANGE_READY_TO_NULL:
2473       break;
2474     default:
2475       break;
2476   }
2477
2478   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
2479     result = GST_STATE_CHANGE_NO_PREROLL;
2480
2481   return result;
2482
2483   /* ERRORS */
2484 failure:
2485   {
2486     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
2487     return result;
2488   }
2489 }