libs/gst/base/gstbasesink.c: Improve debugging.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
55  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
56  * </para>
57  * <para>
58  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
59  * automatically seekable in push mode as well. The following conditions must 
60  * be met to make the element seekable in push mode when the format is not
61  * #GST_FORMAT_BYTES:
62  * <itemizedlist>
63  *   <listitem><para>
64  *     #GstBaseSrc::is_seekable returns %TRUE.
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::query can convert all supported seek formats to the
68  *     internal format as set with gst_base_src_set_format().
69  *   </para></listitem>
70  *   <listitem><para>
71  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
72  *   </para></listitem>
73  * </itemizedlist>
74  * </para>
75  * <para>
76  * When the element does not meet the requirements to operate in pull mode,
77  * the offset and length in the #GstBaseSrc::create method should be ignored.
78  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
79  * element can operate in pull mode but only with specific offsets and
80  * lengths, it is allowed to generate an error when the wrong values are passed
81  * to the #GstBaseSrc::create function.
82  * </para>
83  * <para>
84  * #GstBaseSrc has support for live sources. Live sources are sources that 
85  * produce data at a fixed rate, such as audio or video capture devices. A 
86  * typical live source also provides a clock to publish the rate at which 
87  * they produce data.
88  * Use gst_base_src_set_live() to activate the live source mode.
89  * </para>
90  * <para>
91  * A live source does not produce data in the PAUSED state. This means that the 
92  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
93  * To signal the pipeline that the element will not produce data, the return
94  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
95  * </para>
96  * <para>
97  * A typical live source will timestamp the buffers it creates with the 
98  * current stream time of the pipeline. This is one reason why a live source
99  * can only produce data in the PLAYING state, when the clock is actually 
100  * distributed and running.
101  * </para>
102  * <para>
103  * Live sources that synchronize and block on the clock (an audio source, for
104  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
105  * function was interrupted by a state change to PAUSED.
106  * </para>
107  * <para>
108  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
109  * sources. The base source will wait for the specified stream time returned in 
110  * #GstBaseSrc::get_times before pushing out the buffer. 
111  * It only makes sense to implement the ::get_times function if the source is 
112  * a live source.
113  * </para>
114  * <para>
115  * For live sources, the base class will by default measure the time it takes to
116  * create the first buffer in the PLAYING state and will report this value as
117  * the latency. Subclasses should override the query function when this
118  * behaviour is not acceptable.
119  * </para>
120  * <para>
121  * There is only support in #GstBaseSrc for exactly one source pad, which 
122  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
123  * should install a pad template in its base_init function, like so:
124  * </para>
125  * <para>
126  * <programlisting>
127  * static void
128  * my_element_base_init (gpointer g_class)
129  * {
130  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
131  *   // srctemplate should be a #GstStaticPadTemplate with direction
132  *   // #GST_PAD_SRC and name "src"
133  *   gst_element_class_add_pad_template (gstelement_class,
134  *       gst_static_pad_template_get (&amp;srctemplate));
135  *   // see #GstElementDetails
136  *   gst_element_class_set_details (gstelement_class, &amp;details);
137  * }
138  * </programlisting>
139  * </para>
140  * <title>Controlled shutdown of live sources in applications</title>
141  * <para>
142  * Applications that record from a live source may want to stop recording
143  * in a controlled way, so that the recording is stopped, but the data
144  * already in the pipeline is processed to the end (remember that many live
145  * sources would go on recording forever otherwise). For that to happen the
146  * application needs to make the source stop recording and send an EOS
147  * event down the pipeline. The application would then wait for an
148  * EOS message posted on the pipeline's bus to know when all data has
149  * been processed and the pipeline can safely be stopped.
150  * </para>
151  * <para>
152  * Since GStreamer 0.10.3 an application may simply set the source
153  * element to NULL or READY state to make it send an EOS event downstream.
154  * The application should lock the state of the source afterwards, so that
155  * shutting down the pipeline from PLAYING doesn't temporarily start up the
156  * source element for a second time:
157  * <programlisting>
158  * ...
159  * // stop recording
160  * gst_element_set_state (audio_source, #GST_STATE_NULL);
161  * gst_element_set_locked_state (audio_source, %TRUE);
162  * ...
163  * </programlisting>
164  * Now the application should wait for an EOS message
165  * to be posted on the pipeline's bus. Once it has received
166  * an EOS message, it may safely shut down the entire pipeline:
167  * <programlisting>
168  * ...
169  * // everything done - shut down pipeline
170  * gst_element_set_state (pipeline, #GST_STATE_NULL);
171  * gst_element_set_locked_state (audio_source, %FALSE);
172  * ...
173  * </programlisting>
174  * </para>
175  * <para>
176  * Note that setting the source element to NULL or READY when the 
177  * pipeline is in the PAUSED state may cause a deadlock since the streaming
178  * thread might be blocked in PREROLL.
179  * </para>
180  * <para>
181  * Last reviewed on 2006-09-27 (0.10.11)
182  * </para>
183  * </refsect2>
184  */
185
186 #ifdef HAVE_CONFIG_H
187 #  include "config.h"
188 #endif
189
190 #include <stdlib.h>
191 #include <string.h>
192
193 #include "gstbasesrc.h"
194 #include "gsttypefindhelper.h"
195 #include <gst/gstmarshal.h>
196 #include <gst/gst-i18n-lib.h>
197
198 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
199 #define GST_CAT_DEFAULT gst_base_src_debug
200
201 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
202 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
203 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
204 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
205 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
206 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
207 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
208                                                                                 timeval)
209 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
210 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
211
212 /* BaseSrc signals and args */
213 enum
214 {
215   /* FILL ME */
216   LAST_SIGNAL
217 };
218
219 #define DEFAULT_BLOCKSIZE       4096
220 #define DEFAULT_NUM_BUFFERS     -1
221 #define DEFAULT_TYPEFIND        FALSE
222
223 enum
224 {
225   PROP_0,
226   PROP_BLOCKSIZE,
227   PROP_NUM_BUFFERS,
228   PROP_TYPEFIND,
229 };
230
231 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
232    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
233
234 struct _GstBaseSrcPrivate
235 {
236   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
237                                  * to avoid the sending of two EOS in some cases) */
238   gboolean discont;
239
240   /* two segments to be sent in the streaming thread with STREAM_LOCK */
241   GstEvent *close_segment;
242   GstEvent *start_segment;
243
244   /* startup latency is the time it takes between going to PLAYING and producing
245    * the first BUFFER with running_time 0. This value is included in the latency
246    * reporting. */
247   GstClockTime startup_latency;
248 };
249
250 static GstElementClass *parent_class = NULL;
251
252 static void gst_base_src_base_init (gpointer g_class);
253 static void gst_base_src_class_init (GstBaseSrcClass * klass);
254 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
255 static void gst_base_src_finalize (GObject * object);
256
257
258 GType
259 gst_base_src_get_type (void)
260 {
261   static GType base_src_type = 0;
262
263   if (G_UNLIKELY (base_src_type == 0)) {
264     static const GTypeInfo base_src_info = {
265       sizeof (GstBaseSrcClass),
266       (GBaseInitFunc) gst_base_src_base_init,
267       NULL,
268       (GClassInitFunc) gst_base_src_class_init,
269       NULL,
270       NULL,
271       sizeof (GstBaseSrc),
272       0,
273       (GInstanceInitFunc) gst_base_src_init,
274     };
275
276     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
277         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
278   }
279   return base_src_type;
280 }
281 static GstCaps *gst_base_src_getcaps (GstPad * pad);
282 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
283 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
284
285 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
286 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
287 static void gst_base_src_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec);
289 static void gst_base_src_get_property (GObject * object, guint prop_id,
290     GValue * value, GParamSpec * pspec);
291 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
292 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
293 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
294 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
295
296 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
297
298 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
299 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
300     GstSegment * segment);
301 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
302 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
303     GstEvent * event, GstSegment * segment);
304
305 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
306 static gboolean gst_base_src_unlock_stop (GstBaseSrc * basesrc);
307 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
308 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
309
310 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
311     GstStateChange transition);
312
313 static void gst_base_src_loop (GstPad * pad);
314 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
315 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
316 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
317     guint length, GstBuffer ** buf);
318 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
319     guint length, GstBuffer ** buf);
320
321 static void
322 gst_base_src_base_init (gpointer g_class)
323 {
324   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
325 }
326
327 static void
328 gst_base_src_class_init (GstBaseSrcClass * klass)
329 {
330   GObjectClass *gobject_class;
331   GstElementClass *gstelement_class;
332
333   gobject_class = G_OBJECT_CLASS (klass);
334   gstelement_class = GST_ELEMENT_CLASS (klass);
335
336   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
337
338   parent_class = g_type_class_peek_parent (klass);
339
340   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
341   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
342   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
343
344   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
345       g_param_spec_ulong ("blocksize", "Block size",
346           "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
347           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
348   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
349       g_param_spec_int ("num-buffers", "num-buffers",
350           "Number of buffers to output before sending EOS", -1, G_MAXINT,
351           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
352   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
353       g_param_spec_boolean ("typefind", "Typefind",
354           "Run typefind before negotiating", DEFAULT_TYPEFIND,
355           G_PARAM_READWRITE));
356
357   gstelement_class->change_state =
358       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
359   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
360   gstelement_class->get_query_types =
361       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
362
363   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
364   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
365   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
366   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
367   klass->check_get_range =
368       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
369   klass->prepare_seek_segment =
370       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
371 }
372
373 static void
374 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
375 {
376   GstPad *pad;
377   GstPadTemplate *pad_template;
378
379   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
380
381   basesrc->is_live = FALSE;
382   basesrc->live_lock = g_mutex_new ();
383   basesrc->live_cond = g_cond_new ();
384   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
385   basesrc->num_buffers_left = -1;
386
387   basesrc->can_activate_push = TRUE;
388   basesrc->pad_mode = GST_ACTIVATE_NONE;
389
390   pad_template =
391       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
392   g_return_if_fail (pad_template != NULL);
393
394   GST_DEBUG_OBJECT (basesrc, "creating src pad");
395   pad = gst_pad_new_from_template (pad_template, "src");
396
397   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
398   gst_pad_set_activatepush_function (pad,
399       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
400   gst_pad_set_activatepull_function (pad,
401       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
402   gst_pad_set_event_function (pad,
403       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
404   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
405   gst_pad_set_checkgetrange_function (pad,
406       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
407   gst_pad_set_getrange_function (pad,
408       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
409   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
410   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
411   gst_pad_set_fixatecaps_function (pad,
412       GST_DEBUG_FUNCPTR (gst_base_src_fixate));
413
414   /* hold pointer to pad */
415   basesrc->srcpad = pad;
416   GST_DEBUG_OBJECT (basesrc, "adding src pad");
417   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
418
419   basesrc->blocksize = DEFAULT_BLOCKSIZE;
420   basesrc->clock_id = NULL;
421   /* we operate in BYTES by default */
422   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
423   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
424
425   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
426
427   GST_DEBUG_OBJECT (basesrc, "init done");
428 }
429
430 static void
431 gst_base_src_finalize (GObject * object)
432 {
433   GstBaseSrc *basesrc;
434   GstEvent **event_p;
435
436   basesrc = GST_BASE_SRC (object);
437
438   g_mutex_free (basesrc->live_lock);
439   g_cond_free (basesrc->live_cond);
440
441   event_p = &basesrc->data.ABI.pending_seek;
442   gst_event_replace ((GstEvent **) event_p, NULL);
443
444   G_OBJECT_CLASS (parent_class)->finalize (object);
445 }
446
447 /**
448  * gst_base_src_wait_playing:
449  * @src: the src
450  *
451  * If the #GstBaseSrcClass::create method performs its own synchronisation against
452  * the clock it must unblock when going from PLAYING to the PAUSED state and call
453  * this method before continuing to produce the remaining data.
454  *
455  * This function will block until a state change to PLAYING happens (in which
456  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
457  * to a state change to READY or a FLUSH event (in which case this function
458  * returns #GST_FLOW_WRONG_STATE).
459  *
460  * Since: 0.10.12
461  *
462  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
463  * continue. Any other return value should be returned from the create vmethod.
464  */
465 GstFlowReturn
466 gst_base_src_wait_playing (GstBaseSrc * src)
467 {
468   /* block until the state changes, or we get a flush, or something */
469   GST_LIVE_LOCK (src);
470   if (src->is_live) {
471     while (G_UNLIKELY (!src->live_running)) {
472       GST_DEBUG ("live source signal waiting");
473       GST_LIVE_SIGNAL (src);
474       GST_DEBUG ("live source waiting for running state");
475       GST_LIVE_WAIT (src);
476       GST_DEBUG ("live source unlocked");
477     }
478     /* FIXME, use another variable to signal stopping so that we don't
479      * have to grab another lock. */
480     GST_OBJECT_LOCK (src->srcpad);
481     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (src->srcpad)))
482       goto flushing;
483     GST_OBJECT_UNLOCK (src->srcpad);
484   }
485   GST_LIVE_UNLOCK (src);
486
487   return GST_FLOW_OK;
488
489   /* ERRORS */
490 flushing:
491   {
492     GST_DEBUG_OBJECT (src, "pad is flushing");
493     GST_OBJECT_UNLOCK (src->srcpad);
494     GST_LIVE_UNLOCK (src);
495     return GST_FLOW_WRONG_STATE;
496   }
497 }
498
499 /**
500  * gst_base_src_set_live:
501  * @src: base source instance
502  * @live: new live-mode
503  *
504  * If the element listens to a live source, @live should
505  * be set to %TRUE. 
506  *
507  * A live source will not produce data in the PAUSED state and
508  * will therefore not be able to participate in the PREROLL phase
509  * of a pipeline. To signal this fact to the application and the 
510  * pipeline, the state change return value of the live source will
511  * be GST_STATE_CHANGE_NO_PREROLL.
512  */
513 void
514 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
515 {
516   GST_LIVE_LOCK (src);
517   src->is_live = live;
518   GST_LIVE_UNLOCK (src);
519 }
520
521 /**
522  * gst_base_src_is_live:
523  * @src: base source instance
524  *
525  * Check if an element is in live mode.
526  *
527  * Returns: %TRUE if element is in live mode.
528  */
529 gboolean
530 gst_base_src_is_live (GstBaseSrc * src)
531 {
532   gboolean result;
533
534   GST_LIVE_LOCK (src);
535   result = src->is_live;
536   GST_LIVE_UNLOCK (src);
537
538   return result;
539 }
540
541 /**
542  * gst_base_src_set_format:
543  * @src: base source instance
544  * @format: the format to use
545  *
546  * Sets the default format of the source. This will be the format used
547  * for sending NEW_SEGMENT events and for performing seeks.
548  *
549  * If a format of GST_FORMAT_BYTES is set, the element will be able to
550  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
551  *
552  * @Since: 0.10.1
553  */
554 void
555 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
556 {
557   gst_segment_init (&src->segment, format);
558 }
559
560 /**
561  * gst_base_src_query_latency:
562  * @src: the source
563  * @live: if the source is live
564  * @min_latency: the min latency of the source
565  * @max_latency: the max latency of the source
566  *
567  * Query the source for the latency parameters. @live will be TRUE when @src is
568  * configured as a live source. @min_latency will be set to the difference
569  * between the running time and the timestamp of the first buffer.
570  * @max_latency is always the undefined value of -1.
571  *
572  * This function is mostly used by subclasses. 
573  *
574  * Returns: TRUE if the query succeeded.
575  *
576  * Since: 0.10.13
577  */
578 gboolean
579 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
580     GstClockTime * min_latency, GstClockTime * max_latency)
581 {
582   GstClockTime min;
583
584   GST_LIVE_LOCK (src);
585   if (live)
586     *live = src->is_live;
587
588   /* if we have a startup latency, report this one, else report 0. Subclasses
589    * are supposed to override the query function if they want something
590    * else. */
591   if (src->priv->startup_latency != -1)
592     min = src->priv->startup_latency;
593   else
594     min = 0;
595
596   if (min_latency)
597     *min_latency = min;
598   if (max_latency)
599     *max_latency = -1;
600
601   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
602       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
603       GST_TIME_ARGS (-1));
604   GST_LIVE_UNLOCK (src);
605
606   return TRUE;
607 }
608
609 static gboolean
610 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
611 {
612   GstBaseSrcClass *bclass;
613   GstBaseSrc *bsrc;
614   gboolean res = TRUE;
615
616   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
617   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
618
619   if (bclass->set_caps)
620     res = bclass->set_caps (bsrc, caps);
621
622   return res;
623 }
624
625 static GstCaps *
626 gst_base_src_getcaps (GstPad * pad)
627 {
628   GstBaseSrcClass *bclass;
629   GstBaseSrc *bsrc;
630   GstCaps *caps = NULL;
631
632   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
633   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
634   if (bclass->get_caps)
635     caps = bclass->get_caps (bsrc);
636
637   if (caps == NULL) {
638     GstPadTemplate *pad_template;
639
640     pad_template =
641         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
642     if (pad_template != NULL) {
643       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
644     }
645   }
646   return caps;
647 }
648
649 static void
650 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
651 {
652   GstBaseSrcClass *bclass;
653   GstBaseSrc *bsrc;
654
655   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
656   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
657
658   if (bclass->fixate)
659     bclass->fixate (bsrc, caps);
660
661   gst_object_unref (bsrc);
662 }
663
664 static gboolean
665 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
666 {
667   gboolean res;
668
669   switch (GST_QUERY_TYPE (query)) {
670     case GST_QUERY_POSITION:
671     {
672       GstFormat format;
673
674       gst_query_parse_position (query, &format, NULL);
675       switch (format) {
676         case GST_FORMAT_PERCENT:
677         {
678           gint64 percent;
679           gint64 position;
680           gint64 duration;
681
682           position = src->segment.last_stop;
683           duration = src->segment.duration;
684
685           if (position != -1 && duration != -1) {
686             if (position < duration)
687               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
688                   duration);
689             else
690               percent = GST_FORMAT_PERCENT_MAX;
691           } else
692             percent = -1;
693
694           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
695           res = TRUE;
696           break;
697         }
698         default:
699         {
700           gint64 position;
701
702           position = src->segment.last_stop;
703
704           if (position != -1) {
705             /* convert to requested format */
706             res =
707                 gst_pad_query_convert (src->srcpad, src->segment.format,
708                 position, &format, &position);
709           } else
710             res = TRUE;
711
712           gst_query_set_position (query, format, position);
713           break;
714         }
715       }
716       break;
717     }
718     case GST_QUERY_DURATION:
719     {
720       GstFormat format;
721
722       gst_query_parse_duration (query, &format, NULL);
723
724       GST_DEBUG_OBJECT (src, "duration query in format %s",
725           gst_format_get_name (format));
726       switch (format) {
727         case GST_FORMAT_PERCENT:
728           gst_query_set_duration (query, GST_FORMAT_PERCENT,
729               GST_FORMAT_PERCENT_MAX);
730           res = TRUE;
731           break;
732         default:
733         {
734           gint64 duration;
735
736           duration = src->segment.duration;
737
738           if (duration != -1) {
739             /* convert to requested format */
740             res =
741                 gst_pad_query_convert (src->srcpad, src->segment.format,
742                 duration, &format, &duration);
743           } else {
744             res = TRUE;
745           }
746           gst_query_set_duration (query, format, duration);
747           break;
748         }
749       }
750       break;
751     }
752
753     case GST_QUERY_SEEKING:
754     {
755       gst_query_set_seeking (query, src->segment.format,
756           src->seekable, 0, src->segment.duration);
757       res = TRUE;
758       break;
759     }
760     case GST_QUERY_SEGMENT:
761     {
762       gint64 start, stop;
763
764       /* no end segment configured, current duration then */
765       if ((stop = src->segment.stop) == -1)
766         stop = src->segment.duration;
767       start = src->segment.start;
768
769       /* adjust to stream time */
770       if (src->segment.time != -1) {
771         start -= src->segment.time;
772         if (stop != -1)
773           stop -= src->segment.time;
774       }
775       gst_query_set_segment (query, src->segment.rate, src->segment.format,
776           start, stop);
777       res = TRUE;
778       break;
779     }
780
781     case GST_QUERY_FORMATS:
782     {
783       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
784           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
785       res = TRUE;
786       break;
787     }
788     case GST_QUERY_CONVERT:
789     {
790       GstFormat src_fmt, dest_fmt;
791       gint64 src_val, dest_val;
792
793       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
794
795       /* we can only convert between equal formats... */
796       if (src_fmt == dest_fmt) {
797         dest_val = src_val;
798         res = TRUE;
799       } else
800         res = FALSE;
801
802       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
803       break;
804     }
805     case GST_QUERY_LATENCY:
806     {
807       GstClockTime min, max;
808       gboolean live;
809
810       /* Subclasses should override and implement something usefull */
811       res = gst_base_src_query_latency (src, &live, &min, &max);
812
813       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
814           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
815           GST_TIME_ARGS (max));
816
817       gst_query_set_latency (query, live, min, max);
818       break;
819     }
820     case GST_QUERY_JITTER:
821     case GST_QUERY_RATE:
822     default:
823       res = FALSE;
824       break;
825   }
826   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
827       res);
828   return res;
829 }
830
831 static gboolean
832 gst_base_src_query (GstPad * pad, GstQuery * query)
833 {
834   GstBaseSrc *src;
835   GstBaseSrcClass *bclass;
836   gboolean result = FALSE;
837
838   src = GST_BASE_SRC (gst_pad_get_parent (pad));
839
840   bclass = GST_BASE_SRC_GET_CLASS (src);
841
842   if (bclass->query)
843     result = bclass->query (src, query);
844   else
845     result = gst_pad_query_default (pad, query);
846
847   gst_object_unref (src);
848
849   return result;
850 }
851
852 static gboolean
853 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
854 {
855   gboolean res = TRUE;
856
857   /* update our offset if the start/stop position was updated */
858   if (segment->format == GST_FORMAT_BYTES) {
859     segment->last_stop = segment->start;
860     segment->time = segment->start;
861   } else if (segment->start == 0) {
862     /* seek to start, we can implement a default for this. */
863     segment->last_stop = 0;
864     segment->time = 0;
865     res = TRUE;
866   } else
867     res = FALSE;
868
869   return res;
870 }
871
872 static gboolean
873 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
874 {
875   GstBaseSrcClass *bclass;
876   gboolean result = FALSE;
877
878   bclass = GST_BASE_SRC_GET_CLASS (src);
879
880   if (bclass->do_seek)
881     result = bclass->do_seek (src, segment);
882
883   return result;
884 }
885
886 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
887
888 static gboolean
889 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
890     GstSegment * segment)
891 {
892   /* By default, we try one of 2 things:
893    *   - For absolute seek positions, convert the requested position to our 
894    *     configured processing format and place it in the output segment \
895    *   - For relative seek positions, convert our current (input) values to the
896    *     seek format, adjust by the relative seek offset and then convert back to
897    *     the processing format
898    */
899   GstSeekType cur_type, stop_type;
900   gint64 cur, stop;
901   GstSeekFlags flags;
902   GstFormat seek_format, dest_format;
903   gdouble rate;
904   gboolean update;
905   gboolean res = TRUE;
906
907   gst_event_parse_seek (event, &rate, &seek_format, &flags,
908       &cur_type, &cur, &stop_type, &stop);
909   dest_format = segment->format;
910
911   if (seek_format == dest_format) {
912     gst_segment_set_seek (segment, rate, seek_format, flags,
913         cur_type, cur, stop_type, stop, &update);
914     return TRUE;
915   }
916
917   if (cur_type != GST_SEEK_TYPE_NONE) {
918     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
919     res =
920         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
921         &cur);
922     cur_type = GST_SEEK_TYPE_SET;
923   }
924
925   if (res && stop_type != GST_SEEK_TYPE_NONE) {
926     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
927     res =
928         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
929         &stop);
930     stop_type = GST_SEEK_TYPE_SET;
931   }
932
933   /* And finally, configure our output segment in the desired format */
934   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
935       stop_type, stop, &update);
936
937   if (!res)
938     goto no_format;
939
940   return res;
941
942 no_format:
943   {
944     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
945     return FALSE;
946   }
947 }
948
949 static gboolean
950 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
951     GstSegment * seeksegment)
952 {
953   GstBaseSrcClass *bclass;
954   gboolean result = FALSE;
955
956   bclass = GST_BASE_SRC_GET_CLASS (src);
957
958   if (bclass->prepare_seek_segment)
959     result = bclass->prepare_seek_segment (src, event, seeksegment);
960
961   return result;
962 }
963
964 /* this code implements the seeking. It is a good example
965  * handling all cases.
966  *
967  * A seek updates the currently configured segment.start
968  * and segment.stop values based on the SEEK_TYPE. If the
969  * segment.start value is updated, a seek to this new position
970  * should be performed.
971  *
972  * The seek can only be executed when we are not currently
973  * streaming any data, to make sure that this is the case, we
974  * acquire the STREAM_LOCK which is taken when we are in the
975  * _loop() function or when a getrange() is called. Normally
976  * we will not receive a seek if we are operating in pull mode
977  * though.
978  *
979  * When we are in the loop() function, we might be in the middle
980  * of pushing a buffer, which might block in a sink. To make sure
981  * that the push gets unblocked we push out a FLUSH_START event.
982  * Our loop function will get a WRONG_STATE return value from
983  * the push and will pause, effectively releasing the STREAM_LOCK.
984  *
985  * For a non-flushing seek, we pause the task, which might eventually
986  * release the STREAM_LOCK. We say eventually because when the sink
987  * blocks on the sample we might wait a very long time until the sink
988  * unblocks the sample. In any case we acquire the STREAM_LOCK and
989  * can continue the seek. A non-flushing seek is normally done in a 
990  * running pipeline to perform seamless playback.
991  * In the case of a non-flushing seek we need to make sure that the
992  * data we output after the seek is continuous with the previous data,
993  * this is because a non-flushing seek does not reset the stream-time
994  * to 0. We do this by closing the currently running segment, ie. sending
995  * a new_segment event with the stop position set to the last processed 
996  * position.
997  *
998  * After updating the segment.start/stop values, we prepare for
999  * streaming again. We push out a FLUSH_STOP to make the peer pad
1000  * accept data again and we start our task again.
1001  *
1002  * A segment seek posts a message on the bus saying that the playback
1003  * of the segment started. We store the segment flag internally because
1004  * when we reach the segment.stop we have to post a segment.done
1005  * instead of EOS when doing a segment seek.
1006  */
1007 /* FIXME (0.11), we have the unlock gboolean here because most current 
1008  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1009  * the streaming thread isn't running, resulting in bogus unlocks later when it 
1010  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1011  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1012  * until 0.11
1013  */
1014 static gboolean
1015 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1016 {
1017   gboolean res = TRUE;
1018   gdouble rate;
1019   GstFormat seek_format, dest_format;
1020   GstSeekFlags flags;
1021   GstSeekType cur_type, stop_type;
1022   gint64 cur, stop;
1023   gboolean flush;
1024   gboolean update;
1025   gboolean relative_seek = FALSE;
1026   gboolean seekseg_configured = FALSE;
1027   GstSegment seeksegment;
1028
1029   GST_DEBUG_OBJECT (src, "doing seek");
1030
1031   dest_format = src->segment.format;
1032
1033   if (event) {
1034     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1035         &cur_type, &cur, &stop_type, &stop);
1036
1037     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1038         SEEK_TYPE_IS_RELATIVE (stop_type);
1039
1040     if (dest_format != seek_format && !relative_seek) {
1041       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1042        * here before taking the stream lock, otherwise we must convert it later,
1043        * once we have the stream lock and can read the current position */
1044       gst_segment_init (&seeksegment, dest_format);
1045
1046       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1047         goto prepare_failed;
1048
1049       seekseg_configured = TRUE;
1050     }
1051
1052     flush = flags & GST_SEEK_FLAG_FLUSH;
1053   } else {
1054     flush = FALSE;
1055   }
1056
1057   /* send flush start */
1058   if (flush)
1059     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
1060   else
1061     gst_pad_pause_task (src->srcpad);
1062
1063   /* unblock streaming thread */
1064   if (unlock)
1065     gst_base_src_unlock (src);
1066
1067   /* grab streaming lock, this should eventually be possible, either
1068    * because the task is paused or our streaming thread stopped 
1069    * because our peer is flushing. */
1070   GST_PAD_STREAM_LOCK (src->srcpad);
1071
1072   if (unlock)
1073     gst_base_src_unlock_stop (src);
1074
1075   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1076    * copy the current segment info into the temp segment that we can actually
1077    * attempt the seek with. We only update the real segment if the seek suceeds. */
1078   if (!seekseg_configured) {
1079     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1080
1081     /* now configure the final seek segment */
1082     if (event) {
1083       if (src->segment.format != seek_format) {
1084         /* OK, here's where we give the subclass a chance to convert the relative
1085          * seek into an absolute one in the processing format. We set up any
1086          * absolute seek above, before taking the stream lock. */
1087         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1088           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1089               "Aborting seek");
1090           res = FALSE;
1091         }
1092       } else {
1093         /* The seek format matches our processing format, no need to ask the
1094          * the subclass to configure the segment. */
1095         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1096             cur_type, cur, stop_type, stop, &update);
1097       }
1098     }
1099     /* Else, no seek event passed, so we're just (re)starting the 
1100        current segment. */
1101   }
1102
1103   if (res) {
1104     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1105         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1106         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1107
1108     /* do the seek, segment.last_stop contains the new position. */
1109     res = gst_base_src_do_seek (src, &seeksegment);
1110   }
1111
1112   /* and prepare to continue streaming */
1113   if (flush) {
1114     /* send flush stop, peer will accept data and events again. We
1115      * are not yet providing data as we still have the STREAM_LOCK. */
1116     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
1117   } else if (res && src->data.ABI.running) {
1118     /* we are running the current segment and doing a non-flushing seek, 
1119      * close the segment first based on the last_stop. */
1120     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1121         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1122
1123     /* queue the segment for sending in the stream thread */
1124     if (src->priv->close_segment)
1125       gst_event_unref (src->priv->close_segment);
1126     src->priv->close_segment =
1127         gst_event_new_new_segment_full (TRUE,
1128         src->segment.rate, src->segment.applied_rate, src->segment.format,
1129         src->segment.start, src->segment.last_stop, src->segment.time);
1130   }
1131
1132   /* The subclass must have converted the segment to the processing format 
1133    * by now */
1134   if (res && seeksegment.format != dest_format) {
1135     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1136         "in the correct format. Aborting seek.");
1137     res = FALSE;
1138   }
1139
1140   /* if successfull seek, we update our real segment and push
1141    * out the new segment. */
1142   if (res) {
1143     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1144
1145     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1146       gst_element_post_message (GST_ELEMENT (src),
1147           gst_message_new_segment_start (GST_OBJECT (src),
1148               src->segment.format, src->segment.last_stop));
1149     }
1150
1151     /* for deriving a stop position for the playback segment form the seek
1152      * segment, we must take the duration when the stop is not set */
1153     if ((stop = src->segment.stop) == -1)
1154       stop = src->segment.duration;
1155
1156     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1157         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1158
1159     /* now replace the old segment so that we send it in the stream thread the
1160      * next time it is scheduled. */
1161     if (src->priv->start_segment)
1162       gst_event_unref (src->priv->start_segment);
1163     src->priv->start_segment =
1164         gst_event_new_new_segment_full (FALSE,
1165         src->segment.rate, src->segment.applied_rate, src->segment.format,
1166         src->segment.last_stop, stop, src->segment.time);
1167   }
1168
1169   src->priv->discont = TRUE;
1170   src->data.ABI.running = TRUE;
1171   /* and restart the task in case it got paused explicitely or by
1172    * the FLUSH_START event we pushed out. */
1173   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1174       src->srcpad);
1175
1176   /* and release the lock again so we can continue streaming */
1177   GST_PAD_STREAM_UNLOCK (src->srcpad);
1178
1179   return res;
1180
1181   /* ERROR */
1182 prepare_failed:
1183   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1184       "Aborting seek");
1185   return FALSE;
1186 }
1187
1188 static const GstQueryType *
1189 gst_base_src_get_query_types (GstElement * element)
1190 {
1191   static const GstQueryType query_types[] = {
1192     GST_QUERY_DURATION,
1193     GST_QUERY_POSITION,
1194     GST_QUERY_SEEKING,
1195     GST_QUERY_SEGMENT,
1196     GST_QUERY_FORMATS,
1197     GST_QUERY_LATENCY,
1198     GST_QUERY_JITTER,
1199     GST_QUERY_RATE,
1200     GST_QUERY_CONVERT,
1201     0
1202   };
1203
1204   return query_types;
1205 }
1206
1207 /* all events send to this element directly
1208  */
1209 static gboolean
1210 gst_base_src_send_event (GstElement * element, GstEvent * event)
1211 {
1212   GstBaseSrc *src;
1213   gboolean result = FALSE;
1214
1215   src = GST_BASE_SRC (element);
1216
1217   switch (GST_EVENT_TYPE (event)) {
1218     case GST_EVENT_FLUSH_START:
1219     case GST_EVENT_FLUSH_STOP:
1220       /* sending random flushes downstream can break stuff,
1221        * especially sync since all segment info will get flushed */
1222       break;
1223     case GST_EVENT_EOS:
1224       /* FIXME, queue EOS and make sure the task or pull function 
1225        * perform the EOS actions. */
1226       break;
1227     case GST_EVENT_NEWSEGMENT:
1228       /* sending random NEWSEGMENT downstream can break sync. */
1229       break;
1230     case GST_EVENT_TAG:
1231     case GST_EVENT_BUFFERSIZE:
1232       break;
1233     case GST_EVENT_QOS:
1234       break;
1235     case GST_EVENT_SEEK:
1236     {
1237       gboolean started;
1238
1239       GST_OBJECT_LOCK (src->srcpad);
1240       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1241         goto wrong_mode;
1242       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1243       GST_OBJECT_UNLOCK (src->srcpad);
1244
1245       if (started) {
1246         /* when we are running in push mode, we can execute the
1247          * seek right now, we need to unlock. */
1248         result = gst_base_src_perform_seek (src, event, TRUE);
1249       } else {
1250         GstEvent **event_p;
1251
1252         /* else we store the event and execute the seek when we
1253          * get activated */
1254         GST_OBJECT_LOCK (src);
1255         event_p = &src->data.ABI.pending_seek;
1256         gst_event_replace ((GstEvent **) event_p, event);
1257         GST_OBJECT_UNLOCK (src);
1258         /* assume the seek will work */
1259         result = TRUE;
1260       }
1261       break;
1262     }
1263     case GST_EVENT_NAVIGATION:
1264       break;
1265     default:
1266       break;
1267   }
1268 done:
1269   gst_event_unref (event);
1270
1271   return result;
1272
1273   /* ERRORS */
1274 wrong_mode:
1275   {
1276     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1277     GST_OBJECT_UNLOCK (src->srcpad);
1278     result = FALSE;
1279     goto done;
1280   }
1281 }
1282
1283 static gboolean
1284 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1285 {
1286   gboolean result;
1287
1288   switch (GST_EVENT_TYPE (event)) {
1289     case GST_EVENT_SEEK:
1290       /* is normally called when in push mode */
1291       if (!src->seekable)
1292         goto not_seekable;
1293
1294       result = gst_base_src_perform_seek (src, event, TRUE);
1295       break;
1296     case GST_EVENT_FLUSH_START:
1297       /* cancel any blocking getrange, is normally called
1298        * when in pull mode. */
1299       result = gst_base_src_unlock (src);
1300       break;
1301     case GST_EVENT_FLUSH_STOP:
1302       result = gst_base_src_unlock_stop (src);
1303       break;
1304     default:
1305       result = TRUE;
1306       break;
1307   }
1308   return result;
1309
1310   /* ERRORS */
1311 not_seekable:
1312   {
1313     GST_DEBUG_OBJECT (src, "is not seekable");
1314     return FALSE;
1315   }
1316 }
1317
1318 static gboolean
1319 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1320 {
1321   GstBaseSrc *src;
1322   GstBaseSrcClass *bclass;
1323   gboolean result = FALSE;
1324
1325   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1326   bclass = GST_BASE_SRC_GET_CLASS (src);
1327
1328   if (bclass->event) {
1329     if (!(result = bclass->event (src, event)))
1330       goto subclass_failed;
1331   }
1332
1333 done:
1334   gst_event_unref (event);
1335   gst_object_unref (src);
1336
1337   return result;
1338
1339   /* ERRORS */
1340 subclass_failed:
1341   {
1342     GST_DEBUG_OBJECT (src, "subclass refused event");
1343     goto done;
1344   }
1345 }
1346
1347 static void
1348 gst_base_src_set_property (GObject * object, guint prop_id,
1349     const GValue * value, GParamSpec * pspec)
1350 {
1351   GstBaseSrc *src;
1352
1353   src = GST_BASE_SRC (object);
1354
1355   switch (prop_id) {
1356     case PROP_BLOCKSIZE:
1357       src->blocksize = g_value_get_ulong (value);
1358       break;
1359     case PROP_NUM_BUFFERS:
1360       src->num_buffers = g_value_get_int (value);
1361       break;
1362     case PROP_TYPEFIND:
1363       src->data.ABI.typefind = g_value_get_boolean (value);
1364       break;
1365     default:
1366       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1367       break;
1368   }
1369 }
1370
1371 static void
1372 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1373     GParamSpec * pspec)
1374 {
1375   GstBaseSrc *src;
1376
1377   src = GST_BASE_SRC (object);
1378
1379   switch (prop_id) {
1380     case PROP_BLOCKSIZE:
1381       g_value_set_ulong (value, src->blocksize);
1382       break;
1383     case PROP_NUM_BUFFERS:
1384       g_value_set_int (value, src->num_buffers);
1385       break;
1386     case PROP_TYPEFIND:
1387       g_value_set_boolean (value, src->data.ABI.typefind);
1388       break;
1389     default:
1390       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1391       break;
1392   }
1393 }
1394
1395 /* with STREAM_LOCK and LOCK */
1396 static GstClockReturn
1397 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1398 {
1399   GstClockReturn ret;
1400   GstClockID id;
1401
1402   id = gst_clock_new_single_shot_id (clock, time);
1403
1404   basesrc->clock_id = id;
1405   /* release the object lock while waiting */
1406   GST_OBJECT_UNLOCK (basesrc);
1407
1408   ret = gst_clock_id_wait (id, NULL);
1409
1410   GST_OBJECT_LOCK (basesrc);
1411   gst_clock_id_unref (id);
1412   basesrc->clock_id = NULL;
1413
1414   return ret;
1415 }
1416
1417 /* perform synchronisation on a buffer. 
1418  * with STREAM_LOCK.
1419  */
1420 static GstClockReturn
1421 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1422 {
1423   GstClockReturn result;
1424   GstClockTime start, end;
1425   GstBaseSrcClass *bclass;
1426   GstClockTime base_time;
1427   GstClock *clock;
1428
1429   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1430
1431   start = end = -1;
1432   if (bclass->get_times)
1433     bclass->get_times (basesrc, buffer, &start, &end);
1434
1435   /* grab the lock to prepare for clocking and calculate the startup 
1436    * latency. */
1437   GST_OBJECT_LOCK (basesrc);
1438
1439   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1440
1441   /* get clock, if no clock, we can't sync or get the latency */
1442   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1443     goto no_clock;
1444
1445   if (basesrc->priv->startup_latency == -1) {
1446     GstClockTime now = gst_clock_get_time (clock);
1447
1448     /* startup latency is the diff between when we went to PLAYING (base_time)
1449      * and the current clock time */
1450     if (now > base_time)
1451       basesrc->priv->startup_latency = now - base_time;
1452     else
1453       basesrc->priv->startup_latency = 0;
1454
1455     GST_LOG_OBJECT (basesrc, "startup latency: %" GST_TIME_FORMAT,
1456         GST_TIME_ARGS (basesrc->priv->startup_latency));
1457   }
1458
1459   /* if we don't have a buffer timestamp, we don't sync */
1460   if (!GST_CLOCK_TIME_IS_VALID (start))
1461     goto invalid_start;
1462
1463   /* we have a timestamp, we can subtract it from the startup_latency when it is
1464    * smaller. If the timestamp is bigger, there is no startup latency. */
1465   if (start < basesrc->priv->startup_latency)
1466     basesrc->priv->startup_latency -= start;
1467   else
1468     basesrc->priv->startup_latency = 0;
1469
1470   GST_LOG_OBJECT (basesrc,
1471       "waiting for clock, base time %" GST_TIME_FORMAT
1472       ", stream_start %" GST_TIME_FORMAT,
1473       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1474
1475   result = gst_base_src_wait (basesrc, clock, start + base_time);
1476   GST_OBJECT_UNLOCK (basesrc);
1477
1478   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1479
1480   return result;
1481
1482   /* special cases */
1483 no_clock:
1484   {
1485     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1486     GST_OBJECT_UNLOCK (basesrc);
1487     return GST_CLOCK_OK;
1488   }
1489 invalid_start:
1490   {
1491     GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1492     GST_OBJECT_UNLOCK (basesrc);
1493     return GST_CLOCK_OK;
1494   }
1495 }
1496
1497 static gboolean
1498 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1499 {
1500   guint64 size, maxsize;
1501   GstBaseSrcClass *bclass;
1502
1503   bclass = GST_BASE_SRC_GET_CLASS (src);
1504
1505   /* only operate if we are working with bytes */
1506   if (src->segment.format != GST_FORMAT_BYTES)
1507     return TRUE;
1508
1509   /* get total file size */
1510   size = (guint64) src->segment.duration;
1511
1512   /* the max amount of bytes to read is the total size or
1513    * up to the segment.stop if present. */
1514   if (src->segment.stop != -1)
1515     maxsize = MIN (size, src->segment.stop);
1516   else
1517     maxsize = size;
1518
1519   GST_DEBUG_OBJECT (src,
1520       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1521       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1522       *length, size, src->segment.stop, maxsize);
1523
1524   /* check size if we have one */
1525   if (maxsize != -1) {
1526     /* if we run past the end, check if the file became bigger and 
1527      * retry. */
1528     if (G_UNLIKELY (offset + *length >= maxsize)) {
1529       /* see if length of the file changed */
1530       if (bclass->get_size)
1531         if (!bclass->get_size (src, &size))
1532           size = -1;
1533
1534       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1535
1536       /* make sure we don't exceed the configured segment stop
1537        * if it was set */
1538       if (src->segment.stop != -1)
1539         maxsize = MIN (size, src->segment.stop);
1540       else
1541         maxsize = size;
1542
1543       /* if we are at or past the end, EOS */
1544       if (G_UNLIKELY (offset >= maxsize))
1545         goto unexpected_length;
1546
1547       /* else we can clip to the end */
1548       if (G_UNLIKELY (offset + *length >= maxsize))
1549         *length = maxsize - offset;
1550
1551     }
1552   }
1553
1554   /* keep track of current position. segment is in bytes, we checked 
1555    * that above. */
1556   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1557
1558   return TRUE;
1559
1560   /* ERRORS */
1561 unexpected_length:
1562   {
1563     return FALSE;
1564   }
1565 }
1566
1567 static GstFlowReturn
1568 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1569     GstBuffer ** buf)
1570 {
1571   GstFlowReturn ret;
1572   GstBaseSrcClass *bclass;
1573   GstClockReturn status;
1574
1575   bclass = GST_BASE_SRC_GET_CLASS (src);
1576
1577   ret = gst_base_src_wait_playing (src);
1578   if (ret != GST_FLOW_OK)
1579     goto stopped;
1580
1581   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
1582     goto not_started;
1583
1584   if (G_UNLIKELY (!bclass->create))
1585     goto no_function;
1586
1587   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
1588     goto unexpected_length;
1589
1590   /* normally we don't count buffers */
1591   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
1592     if (src->num_buffers_left == 0)
1593       goto reached_num_buffers;
1594     else
1595       src->num_buffers_left--;
1596   }
1597
1598   GST_DEBUG_OBJECT (src,
1599       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
1600       G_GINT64_FORMAT, offset, length, src->segment.time);
1601
1602   ret = bclass->create (src, offset, length, buf);
1603   if (G_UNLIKELY (ret != GST_FLOW_OK))
1604     goto done;
1605
1606   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
1607   if (offset == 0 && src->segment.time == 0
1608       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1609     GST_BUFFER_TIMESTAMP (*buf) = 0;
1610
1611   /* now sync before pushing the buffer */
1612   status = gst_base_src_do_sync (src, *buf);
1613   switch (status) {
1614     case GST_CLOCK_EARLY:
1615       /* the buffer is too late. We currently don't drop the buffer. */
1616       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1617       break;
1618     case GST_CLOCK_OK:
1619       /* buffer synchronised properly */
1620       GST_DEBUG_OBJECT (src, "buffer ok");
1621       break;
1622     case GST_CLOCK_UNSCHEDULED:
1623       /* this case is triggered when we were waiting for the clock and
1624        * it got unlocked because we did a state change. We return 
1625        * WRONG_STATE in this case to stop the dataflow also get rid of the
1626        * produced buffer. */
1627       GST_DEBUG_OBJECT (src,
1628           "clock was unscheduled (%d), returning WRONG_STATE", status);
1629       gst_buffer_unref (*buf);
1630       *buf = NULL;
1631       ret = GST_FLOW_WRONG_STATE;
1632       break;
1633     default:
1634       /* all other result values are unexpected and errors */
1635       GST_ELEMENT_ERROR (src, CORE, CLOCK,
1636           (_("Internal clock error.")),
1637           ("clock returned unexpected return value %d", status));
1638       gst_buffer_unref (*buf);
1639       *buf = NULL;
1640       ret = GST_FLOW_ERROR;
1641       break;
1642   }
1643 done:
1644   return ret;
1645
1646   /* ERROR */
1647 stopped:
1648   {
1649     GST_DEBUG_OBJECT (src, "wait_playing returned %d", ret);
1650     return ret;
1651   }
1652 not_started:
1653   {
1654     GST_DEBUG_OBJECT (src, "getrange but not started");
1655     return GST_FLOW_WRONG_STATE;
1656   }
1657 no_function:
1658   {
1659     GST_DEBUG_OBJECT (src, "no create function");
1660     return GST_FLOW_ERROR;
1661   }
1662 unexpected_length:
1663   {
1664     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1665         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1666     return GST_FLOW_UNEXPECTED;
1667   }
1668 reached_num_buffers:
1669   {
1670     GST_DEBUG_OBJECT (src, "sent all buffers");
1671     return GST_FLOW_UNEXPECTED;
1672   }
1673 }
1674
1675 static GstFlowReturn
1676 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1677     GstBuffer ** buf)
1678 {
1679   GstBaseSrc *src;
1680   GstFlowReturn res;
1681
1682   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1683
1684   res = gst_base_src_get_range (src, offset, length, buf);
1685
1686   gst_object_unref (src);
1687
1688   return res;
1689 }
1690
1691 static gboolean
1692 gst_base_src_default_check_get_range (GstBaseSrc * src)
1693 {
1694   gboolean res;
1695
1696   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1697     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1698     if (G_LIKELY (gst_base_src_start (src)))
1699       gst_base_src_stop (src);
1700   }
1701
1702   /* we can operate in getrange mode if the native format is bytes
1703    * and we are seekable, this condition is set in the random_access
1704    * flag and is set in the _start() method. */
1705   res = src->random_access;
1706
1707   return res;
1708 }
1709
1710 static gboolean
1711 gst_base_src_check_get_range (GstBaseSrc * src)
1712 {
1713   GstBaseSrcClass *bclass;
1714   gboolean res;
1715
1716   bclass = GST_BASE_SRC_GET_CLASS (src);
1717
1718   if (bclass->check_get_range == NULL)
1719     goto no_function;
1720
1721   res = bclass->check_get_range (src);
1722   GST_LOG_OBJECT (src, "%s() returned %d",
1723       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1724
1725   return res;
1726
1727   /* ERRORS */
1728 no_function:
1729   {
1730     GST_WARNING_OBJECT (src, "no check_get_range function set");
1731     return FALSE;
1732   }
1733 }
1734
1735 static gboolean
1736 gst_base_src_pad_check_get_range (GstPad * pad)
1737 {
1738   GstBaseSrc *src;
1739   gboolean res;
1740
1741   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1742
1743   res = gst_base_src_check_get_range (src);
1744
1745   gst_object_unref (src);
1746
1747   return res;
1748 }
1749
1750 static void
1751 gst_base_src_loop (GstPad * pad)
1752 {
1753   GstBaseSrc *src;
1754   GstBuffer *buf = NULL;
1755   GstFlowReturn ret;
1756   gint64 position;
1757   gboolean eos;
1758
1759   eos = FALSE;
1760
1761   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1762
1763   src->priv->last_sent_eos = FALSE;
1764
1765   /* if we operate in bytes, we can calculate an offset */
1766   if (src->segment.format == GST_FORMAT_BYTES)
1767     position = src->segment.last_stop;
1768   else
1769     position = -1;
1770
1771   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1772   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1773     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
1774         gst_flow_get_name (ret));
1775     goto pause;
1776   }
1777   /* this should not happen */
1778   if (G_UNLIKELY (buf == NULL))
1779     goto null_buffer;
1780
1781   /* push events to close/start our segment before we push the buffer. */
1782   if (src->priv->close_segment) {
1783     gst_pad_push_event (pad, src->priv->close_segment);
1784     src->priv->close_segment = NULL;
1785   }
1786   if (src->priv->start_segment) {
1787     gst_pad_push_event (pad, src->priv->start_segment);
1788     src->priv->start_segment = NULL;
1789   }
1790
1791   /* figure out the new position */
1792   switch (src->segment.format) {
1793     case GST_FORMAT_BYTES:
1794       position += GST_BUFFER_SIZE (buf);
1795       break;
1796     case GST_FORMAT_TIME:
1797     {
1798       GstClockTime start, duration;
1799
1800       start = GST_BUFFER_TIMESTAMP (buf);
1801       duration = GST_BUFFER_DURATION (buf);
1802
1803       if (GST_CLOCK_TIME_IS_VALID (start))
1804         position = start;
1805       else
1806         position = src->segment.last_stop;
1807
1808       if (GST_CLOCK_TIME_IS_VALID (duration))
1809         position += duration;
1810       break;
1811     }
1812     case GST_FORMAT_DEFAULT:
1813       position = GST_BUFFER_OFFSET_END (buf);
1814       break;
1815     default:
1816       position = -1;
1817       break;
1818   }
1819   if (position != -1) {
1820     if (src->segment.stop != -1) {
1821       if (position >= src->segment.stop) {
1822         eos = TRUE;
1823         position = src->segment.stop;
1824       }
1825     }
1826     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1827   }
1828
1829   if (G_UNLIKELY (src->priv->discont)) {
1830     buf = gst_buffer_make_metadata_writable (buf);
1831     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1832     src->priv->discont = FALSE;
1833   }
1834
1835   ret = gst_pad_push (pad, buf);
1836   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1837     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
1838         gst_flow_get_name (ret));
1839     goto pause;
1840   }
1841
1842   if (eos) {
1843     GST_INFO_OBJECT (src, "pausing after EOS");
1844     ret = GST_FLOW_UNEXPECTED;
1845     goto pause;
1846   }
1847
1848 done:
1849   gst_object_unref (src);
1850   return;
1851
1852   /* special cases */
1853 pause:
1854   {
1855     const gchar *reason = gst_flow_get_name (ret);
1856
1857     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1858     src->data.ABI.running = FALSE;
1859     gst_pad_pause_task (pad);
1860     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1861       if (ret == GST_FLOW_UNEXPECTED) {
1862         /* perform EOS logic */
1863         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1864           gst_element_post_message (GST_ELEMENT_CAST (src),
1865               gst_message_new_segment_done (GST_OBJECT_CAST (src),
1866                   src->segment.format, src->segment.last_stop));
1867         } else {
1868           gst_pad_push_event (pad, gst_event_new_eos ());
1869           src->priv->last_sent_eos = TRUE;
1870         }
1871       } else {
1872         /* for fatal errors we post an error message, post the error
1873          * first so the app knows about the error first. */
1874         GST_ELEMENT_ERROR (src, STREAM, FAILED,
1875             (_("Internal data flow error.")),
1876             ("streaming task paused, reason %s (%d)", reason, ret));
1877         gst_pad_push_event (pad, gst_event_new_eos ());
1878         src->priv->last_sent_eos = TRUE;
1879       }
1880     }
1881     goto done;
1882   }
1883 null_buffer:
1884   {
1885     GST_ELEMENT_ERROR (src, STREAM, FAILED,
1886         (_("Internal data flow error.")), ("element returned NULL buffer"));
1887     /* we finished the segment on error */
1888     src->data.ABI.running = FALSE;
1889     gst_pad_pause_task (pad);
1890     gst_pad_push_event (pad, gst_event_new_eos ());
1891     src->priv->last_sent_eos = TRUE;
1892     goto done;
1893   }
1894 }
1895
1896 /* this will always be called between start() and stop(). So you can rely on
1897  * resources allocated by start() and freed from stop(). This needs to be added
1898  * to the docs at some point. */
1899 static gboolean
1900 gst_base_src_unlock (GstBaseSrc * basesrc)
1901 {
1902   GstBaseSrcClass *bclass;
1903   gboolean result = TRUE;
1904
1905   GST_DEBUG ("unlock");
1906   /* unblock whatever the subclass is doing */
1907   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1908   if (bclass->unlock)
1909     result = bclass->unlock (basesrc);
1910
1911   GST_DEBUG ("unschedule clock");
1912   /* and unblock the clock as well, if any */
1913   GST_OBJECT_LOCK (basesrc);
1914   if (basesrc->clock_id) {
1915     gst_clock_id_unschedule (basesrc->clock_id);
1916   }
1917   GST_OBJECT_UNLOCK (basesrc);
1918
1919   GST_DEBUG ("unlock done");
1920
1921   return result;
1922 }
1923
1924 /* this will always be called between start() and stop(). So you can rely on
1925  * resources allocated by start() and freed from stop(). This needs to be added
1926  * to the docs at some point. */
1927 static gboolean
1928 gst_base_src_unlock_stop (GstBaseSrc * basesrc)
1929 {
1930   GstBaseSrcClass *bclass;
1931   gboolean result = TRUE;
1932
1933   GST_DEBUG_OBJECT (basesrc, "unlock stop");
1934
1935   /* Finish a previous unblock request, allowing subclasses to flush command
1936    * queues or whatever they need to do */
1937   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1938   if (bclass->unlock_stop)
1939     result = bclass->unlock_stop (basesrc);
1940
1941   GST_DEBUG_OBJECT (basesrc, "unlock stop done");
1942
1943   return result;
1944 }
1945
1946 /* default negotiation code. 
1947  *
1948  * Take intersection between src and sink pads, take first
1949  * caps and fixate. 
1950  */
1951 static gboolean
1952 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
1953 {
1954   GstCaps *thiscaps;
1955   GstCaps *caps = NULL;
1956   GstCaps *peercaps = NULL;
1957   gboolean result = FALSE;
1958
1959   /* first see what is possible on our source pad */
1960   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
1961   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
1962   /* nothing or anything is allowed, we're done */
1963   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
1964     goto no_nego_needed;
1965
1966   /* get the peer caps */
1967   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
1968   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
1969   if (peercaps) {
1970     GstCaps *icaps;
1971
1972     /* get intersection */
1973     icaps = gst_caps_intersect (thiscaps, peercaps);
1974     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
1975     gst_caps_unref (thiscaps);
1976     gst_caps_unref (peercaps);
1977     if (icaps) {
1978       /* take first (and best, since they are sorted) possibility */
1979       caps = gst_caps_copy_nth (icaps, 0);
1980       gst_caps_unref (icaps);
1981     }
1982   } else {
1983     /* no peer, work with our own caps then */
1984     caps = thiscaps;
1985   }
1986   if (caps) {
1987     caps = gst_caps_make_writable (caps);
1988     gst_caps_truncate (caps);
1989
1990     /* now fixate */
1991     if (!gst_caps_is_empty (caps)) {
1992       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
1993       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
1994
1995       if (gst_caps_is_any (caps)) {
1996         /* hmm, still anything, so element can do anything and
1997          * nego is not needed */
1998         result = TRUE;
1999       } else if (gst_caps_is_fixed (caps)) {
2000         /* yay, fixed caps, use those then */
2001         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2002         result = TRUE;
2003       }
2004     }
2005     gst_caps_unref (caps);
2006   }
2007   return result;
2008
2009 no_nego_needed:
2010   {
2011     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2012     if (thiscaps)
2013       gst_caps_unref (thiscaps);
2014     return TRUE;
2015   }
2016 }
2017
2018 static gboolean
2019 gst_base_src_negotiate (GstBaseSrc * basesrc)
2020 {
2021   GstBaseSrcClass *bclass;
2022   gboolean result = TRUE;
2023
2024   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2025
2026   if (bclass->negotiate)
2027     result = bclass->negotiate (basesrc);
2028
2029   return result;
2030 }
2031
2032 static gboolean
2033 gst_base_src_start (GstBaseSrc * basesrc)
2034 {
2035   GstBaseSrcClass *bclass;
2036   gboolean result;
2037   guint64 size;
2038
2039   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2040     return TRUE;
2041
2042   GST_DEBUG_OBJECT (basesrc, "starting source");
2043
2044   basesrc->num_buffers_left = basesrc->num_buffers;
2045
2046   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2047   basesrc->data.ABI.running = FALSE;
2048
2049   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2050   if (bclass->start)
2051     result = bclass->start (basesrc);
2052   else
2053     result = TRUE;
2054
2055   if (!result)
2056     goto could_not_start;
2057
2058   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2059
2060   /* figure out the size */
2061   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2062     if (bclass->get_size) {
2063       if (!(result = bclass->get_size (basesrc, &size)))
2064         size = -1;
2065     } else {
2066       result = FALSE;
2067       size = -1;
2068     }
2069     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2070     /* only update the size when operating in bytes, subclass is supposed
2071      * to set duration in the start method for other formats */
2072     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2073   } else {
2074     size = -1;
2075   }
2076
2077   GST_DEBUG_OBJECT (basesrc,
2078       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2079       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2080       basesrc->segment.duration);
2081
2082   /* check if we can seek */
2083   if (bclass->is_seekable)
2084     basesrc->seekable = bclass->is_seekable (basesrc);
2085   else
2086     basesrc->seekable = FALSE;
2087
2088   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
2089
2090   /* update for random access flag */
2091   basesrc->random_access = basesrc->seekable &&
2092       basesrc->segment.format == GST_FORMAT_BYTES;
2093
2094   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2095
2096   /* run typefind if we are random_access and the typefinding is enabled. */
2097   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2098     GstCaps *caps;
2099
2100     caps = gst_type_find_helper (basesrc->srcpad, size);
2101     gst_pad_set_caps (basesrc->srcpad, caps);
2102     gst_caps_unref (caps);
2103   } else {
2104     /* use class or default negotiate function */
2105     if (!gst_base_src_negotiate (basesrc))
2106       goto could_not_negotiate;
2107   }
2108
2109   return TRUE;
2110
2111   /* ERROR */
2112 could_not_start:
2113   {
2114     GST_DEBUG_OBJECT (basesrc, "could not start");
2115     /* subclass is supposed to post a message. We don't have to call _stop. */
2116     return FALSE;
2117   }
2118 could_not_negotiate:
2119   {
2120     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2121     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2122         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2123     /* we must call stop */
2124     gst_base_src_stop (basesrc);
2125     return FALSE;
2126   }
2127 }
2128
2129 static gboolean
2130 gst_base_src_stop (GstBaseSrc * basesrc)
2131 {
2132   GstBaseSrcClass *bclass;
2133   gboolean result = TRUE;
2134
2135   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2136     return TRUE;
2137
2138   GST_DEBUG_OBJECT (basesrc, "stopping source");
2139
2140   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2141   if (bclass->stop)
2142     result = bclass->stop (basesrc);
2143
2144   if (result)
2145     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2146
2147   return result;
2148 }
2149
2150 static gboolean
2151 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
2152 {
2153   gboolean result;
2154
2155   GST_LIVE_LOCK (basesrc);
2156   basesrc->live_running = TRUE;
2157   GST_LIVE_SIGNAL (basesrc);
2158   GST_LIVE_UNLOCK (basesrc);
2159
2160   /* step 1, unblock clock sync (if any) */
2161   result = gst_base_src_unlock (basesrc);
2162
2163   /* step 2, make sure streaming finishes */
2164   result &= gst_pad_stop_task (pad);
2165
2166   /* step 3, clear the unblock condition */
2167   result &= gst_base_src_unlock_stop (basesrc);
2168
2169   return result;
2170 }
2171
2172 static gboolean
2173 gst_base_src_activate_push (GstPad * pad, gboolean active)
2174 {
2175   GstBaseSrc *basesrc;
2176   GstEvent *event;
2177
2178   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2179
2180   /* prepare subclass first */
2181   if (active) {
2182     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2183
2184     if (G_UNLIKELY (!basesrc->can_activate_push))
2185       goto no_push_activation;
2186
2187     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2188       goto error_start;
2189
2190     basesrc->priv->last_sent_eos = FALSE;
2191
2192     /* do initial seek, which will start the task */
2193     GST_OBJECT_LOCK (basesrc);
2194     event = basesrc->data.ABI.pending_seek;
2195     basesrc->data.ABI.pending_seek = NULL;
2196     GST_OBJECT_UNLOCK (basesrc);
2197
2198     /* no need to unlock anything, the task is certainly
2199      * not running here. The perform seek code will start the task when
2200      * finished. */
2201     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2202       goto seek_failed;
2203
2204     if (event)
2205       gst_event_unref (event);
2206   } else {
2207     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2208     /* call the unlock function and stop the task */
2209     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2210       goto deactivate_failed;
2211
2212     /* now we can stop the source */
2213     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2214       goto error_stop;
2215   }
2216   return TRUE;
2217
2218   /* ERRORS */
2219 no_push_activation:
2220   {
2221     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2222     return FALSE;
2223   }
2224 error_start:
2225   {
2226     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2227     return FALSE;
2228   }
2229 seek_failed:
2230   {
2231     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2232     gst_base_src_stop (basesrc);
2233     if (event)
2234       gst_event_unref (event);
2235     return FALSE;
2236   }
2237 deactivate_failed:
2238   {
2239     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in push mode");
2240     return FALSE;
2241   }
2242 error_stop:
2243   {
2244     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2245     return FALSE;
2246   }
2247 }
2248
2249 static gboolean
2250 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2251 {
2252   GstBaseSrc *basesrc;
2253
2254   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2255
2256   /* prepare subclass first */
2257   if (active) {
2258     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2259     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2260       goto error_start;
2261
2262     /* if not random_access, we cannot operate in pull mode for now */
2263     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2264       goto no_get_range;
2265   } else {
2266     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2267     /* call the unlock function. We have no task to stop. */
2268     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2269       goto deactivate_failed;
2270
2271     /* don't send EOS when going from PAUSED => READY when in pull mode */
2272     basesrc->priv->last_sent_eos = TRUE;
2273
2274     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2275       goto error_stop;
2276   }
2277   return TRUE;
2278
2279   /* ERRORS */
2280 error_start:
2281   {
2282     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2283     return FALSE;
2284   }
2285 no_get_range:
2286   {
2287     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2288     gst_base_src_stop (basesrc);
2289     return FALSE;
2290   }
2291 deactivate_failed:
2292   {
2293     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in pull mode");
2294     return FALSE;
2295   }
2296 error_stop:
2297   {
2298     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2299     return FALSE;
2300   }
2301 }
2302
2303 static GstStateChangeReturn
2304 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2305 {
2306   GstBaseSrc *basesrc;
2307   GstStateChangeReturn result;
2308   gboolean no_preroll = FALSE;
2309
2310   basesrc = GST_BASE_SRC (element);
2311
2312   switch (transition) {
2313     case GST_STATE_CHANGE_NULL_TO_READY:
2314       break;
2315     case GST_STATE_CHANGE_READY_TO_PAUSED:
2316       GST_LIVE_LOCK (element);
2317       if (basesrc->is_live) {
2318         no_preroll = TRUE;
2319         basesrc->live_running = FALSE;
2320       }
2321       basesrc->priv->last_sent_eos = FALSE;
2322       basesrc->priv->discont = TRUE;
2323       basesrc->priv->startup_latency = -1;
2324       GST_LIVE_UNLOCK (element);
2325       break;
2326     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2327       GST_LIVE_LOCK (element);
2328       if (basesrc->is_live) {
2329         basesrc->live_running = TRUE;
2330         GST_LIVE_SIGNAL (element);
2331       }
2332       GST_LIVE_UNLOCK (element);
2333       break;
2334     default:
2335       break;
2336   }
2337
2338   if ((result =
2339           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2340               transition)) == GST_STATE_CHANGE_FAILURE)
2341     goto failure;
2342
2343   switch (transition) {
2344     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2345       GST_LIVE_LOCK (element);
2346       if (basesrc->is_live) {
2347         no_preroll = TRUE;
2348         basesrc->live_running = FALSE;
2349       }
2350       GST_LIVE_UNLOCK (element);
2351       break;
2352     case GST_STATE_CHANGE_PAUSED_TO_READY:
2353     {
2354       GstEvent **event_p;
2355
2356       /* FIXME, deprecate this behaviour, it is very dangerous.
2357        * the prefered way of sending EOS downstream is by sending
2358        * the EOS event to the element */
2359       if (!basesrc->priv->last_sent_eos) {
2360         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2361         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
2362         basesrc->priv->last_sent_eos = TRUE;
2363       }
2364       event_p = &basesrc->data.ABI.pending_seek;
2365       gst_event_replace (event_p, NULL);
2366       event_p = &basesrc->priv->close_segment;
2367       gst_event_replace (event_p, NULL);
2368       event_p = &basesrc->priv->start_segment;
2369       gst_event_replace (event_p, NULL);
2370       break;
2371     }
2372     case GST_STATE_CHANGE_READY_TO_NULL:
2373       break;
2374     default:
2375       break;
2376   }
2377
2378   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
2379     result = GST_STATE_CHANGE_NO_PREROLL;
2380
2381   return result;
2382
2383   /* ERRORS */
2384 failure:
2385   {
2386     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
2387     return result;
2388   }
2389 }