basesrc: Shut down the pad task when the initial seek fails.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrc:Class.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include "gstbasesrc.h"
169 #include "gsttypefindhelper.h"
170 #include <gst/gstmarshal.h>
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
183                                                                                 timeval)
184 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
185 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
186
187 /* BaseSrc signals and args */
188 enum
189 {
190   /* FILL ME */
191   LAST_SIGNAL
192 };
193
194 #define DEFAULT_BLOCKSIZE       4096
195 #define DEFAULT_NUM_BUFFERS     -1
196 #define DEFAULT_TYPEFIND        FALSE
197 #define DEFAULT_DO_TIMESTAMP    FALSE
198
199 enum
200 {
201   PROP_0,
202   PROP_BLOCKSIZE,
203   PROP_NUM_BUFFERS,
204   PROP_TYPEFIND,
205   PROP_DO_TIMESTAMP
206 };
207
208 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
209    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
210
211 struct _GstBaseSrcPrivate
212 {
213   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
214                                  * to avoid the sending of two EOS in some cases) */
215   gboolean discont;
216   gboolean flushing;
217
218   /* two segments to be sent in the streaming thread with STREAM_LOCK */
219   GstEvent *close_segment;
220   GstEvent *start_segment;
221
222   /* if EOS is pending (atomic) */
223   gint pending_eos;
224
225   /* startup latency is the time it takes between going to PLAYING and producing
226    * the first BUFFER with running_time 0. This value is included in the latency
227    * reporting. */
228   GstClockTime latency;
229   /* timestamp offset, this is the offset add to the values of gst_times for
230    * pseudo live sources */
231   GstClockTimeDiff ts_offset;
232
233   gboolean do_timestamp;
234
235   /* stream sequence number */
236   guint32 seqnum;
237
238   /* pending tags to be pushed in the data stream */
239   GList *pending_tags;
240 };
241
242 static GstElementClass *parent_class = NULL;
243
244 static void gst_base_src_base_init (gpointer g_class);
245 static void gst_base_src_class_init (GstBaseSrcClass * klass);
246 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
247 static void gst_base_src_finalize (GObject * object);
248
249
250 GType
251 gst_base_src_get_type (void)
252 {
253   static volatile gsize base_src_type = 0;
254
255   if (g_once_init_enter (&base_src_type)) {
256     GType _type;
257     static const GTypeInfo base_src_info = {
258       sizeof (GstBaseSrcClass),
259       (GBaseInitFunc) gst_base_src_base_init,
260       NULL,
261       (GClassInitFunc) gst_base_src_class_init,
262       NULL,
263       NULL,
264       sizeof (GstBaseSrc),
265       0,
266       (GInstanceInitFunc) gst_base_src_init,
267     };
268
269     _type = g_type_register_static (GST_TYPE_ELEMENT,
270         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
271     g_once_init_leave (&base_src_type, _type);
272   }
273   return base_src_type;
274 }
275
276 static GstCaps *gst_base_src_getcaps (GstPad * pad);
277 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
278 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
279
280 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
281 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
282 static void gst_base_src_set_property (GObject * object, guint prop_id,
283     const GValue * value, GParamSpec * pspec);
284 static void gst_base_src_get_property (GObject * object, guint prop_id,
285     GValue * value, GParamSpec * pspec);
286 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
287 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
288 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
289 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
290
291 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
292
293 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
294 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
295     GstSegment * segment);
296 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
297 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
298     GstEvent * event, GstSegment * segment);
299
300 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
301     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
302 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
303 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
304
305 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
306     GstStateChange transition);
307
308 static void gst_base_src_loop (GstPad * pad);
309 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
310 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
311 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
312     guint length, GstBuffer ** buf);
313 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
314     guint length, GstBuffer ** buf);
315 static gboolean gst_base_src_seekable (GstBaseSrc * src);
316
317 static void
318 gst_base_src_base_init (gpointer g_class)
319 {
320   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
321 }
322
323 static void
324 gst_base_src_class_init (GstBaseSrcClass * klass)
325 {
326   GObjectClass *gobject_class;
327   GstElementClass *gstelement_class;
328
329   gobject_class = G_OBJECT_CLASS (klass);
330   gstelement_class = GST_ELEMENT_CLASS (klass);
331
332   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
333
334   parent_class = g_type_class_peek_parent (klass);
335
336   gobject_class->finalize = gst_base_src_finalize;
337   gobject_class->set_property = gst_base_src_set_property;
338   gobject_class->get_property = gst_base_src_get_property;
339
340   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
341       g_param_spec_ulong ("blocksize", "Block size",
342           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
343           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
345       g_param_spec_int ("num-buffers", "num-buffers",
346           "Number of buffers to output before sending EOS (-1 = unlimited)",
347           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
350       g_param_spec_boolean ("typefind", "Typefind",
351           "Run typefind before negotiating", DEFAULT_TYPEFIND,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
354       g_param_spec_boolean ("do-timestamp", "Do timestamp",
355           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
356           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357
358   gstelement_class->change_state =
359       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
360   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
361   gstelement_class->get_query_types =
362       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
363
364   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
365   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
366   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
367   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
368   klass->check_get_range =
369       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
370   klass->prepare_seek_segment =
371       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
372
373   /* Registering debug symbols for function pointers */
374   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
375   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
376   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
377   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
378   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
379   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
380   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
381   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
383 }
384
385 static void
386 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
387 {
388   GstPad *pad;
389   GstPadTemplate *pad_template;
390
391   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
392
393   basesrc->is_live = FALSE;
394   basesrc->live_lock = g_mutex_new ();
395   basesrc->live_cond = g_cond_new ();
396   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
397   basesrc->num_buffers_left = -1;
398
399   basesrc->can_activate_push = TRUE;
400   basesrc->pad_mode = GST_ACTIVATE_NONE;
401
402   pad_template =
403       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
404   g_return_if_fail (pad_template != NULL);
405
406   GST_DEBUG_OBJECT (basesrc, "creating src pad");
407   pad = gst_pad_new_from_template (pad_template, "src");
408
409   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
410   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
411   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
412   gst_pad_set_event_function (pad, gst_base_src_event_handler);
413   gst_pad_set_query_function (pad, gst_base_src_query);
414   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
415   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
416   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
417   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
418   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
419
420   /* hold pointer to pad */
421   basesrc->srcpad = pad;
422   GST_DEBUG_OBJECT (basesrc, "adding src pad");
423   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
424
425   basesrc->blocksize = DEFAULT_BLOCKSIZE;
426   basesrc->clock_id = NULL;
427   /* we operate in BYTES by default */
428   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
429   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
430   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
431
432   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
433
434   GST_DEBUG_OBJECT (basesrc, "init done");
435 }
436
437 static void
438 gst_base_src_finalize (GObject * object)
439 {
440   GstBaseSrc *basesrc;
441   GstEvent **event_p;
442
443   basesrc = GST_BASE_SRC (object);
444
445   g_mutex_free (basesrc->live_lock);
446   g_cond_free (basesrc->live_cond);
447
448   event_p = &basesrc->data.ABI.pending_seek;
449   gst_event_replace (event_p, NULL);
450
451   if (basesrc->priv->pending_tags) {
452     g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
453     g_list_free (basesrc->priv->pending_tags);
454   }
455
456   G_OBJECT_CLASS (parent_class)->finalize (object);
457 }
458
459 /**
460  * gst_base_src_wait_playing:
461  * @src: the src
462  *
463  * If the #GstBaseSrcClass.create() method performs its own synchronisation
464  * against the clock it must unblock when going from PLAYING to the PAUSED state
465  * and call this method before continuing to produce the remaining data.
466  *
467  * This function will block until a state change to PLAYING happens (in which
468  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
469  * to a state change to READY or a FLUSH event (in which case this function
470  * returns #GST_FLOW_WRONG_STATE).
471  *
472  * Since: 0.10.12
473  *
474  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
475  * continue. Any other return value should be returned from the create vmethod.
476  */
477 GstFlowReturn
478 gst_base_src_wait_playing (GstBaseSrc * src)
479 {
480   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
481
482   /* block until the state changes, or we get a flush, or something */
483   GST_DEBUG_OBJECT (src, "live source waiting for running state");
484   GST_LIVE_WAIT (src);
485   if (src->priv->flushing)
486     goto flushing;
487   GST_DEBUG_OBJECT (src, "live source unlocked");
488
489   return GST_FLOW_OK;
490
491   /* ERRORS */
492 flushing:
493   {
494     GST_DEBUG_OBJECT (src, "we are flushing");
495     return GST_FLOW_WRONG_STATE;
496   }
497 }
498
499 /**
500  * gst_base_src_set_live:
501  * @src: base source instance
502  * @live: new live-mode
503  *
504  * If the element listens to a live source, @live should
505  * be set to %TRUE.
506  *
507  * A live source will not produce data in the PAUSED state and
508  * will therefore not be able to participate in the PREROLL phase
509  * of a pipeline. To signal this fact to the application and the
510  * pipeline, the state change return value of the live source will
511  * be GST_STATE_CHANGE_NO_PREROLL.
512  */
513 void
514 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
515 {
516   g_return_if_fail (GST_IS_BASE_SRC (src));
517
518   GST_OBJECT_LOCK (src);
519   src->is_live = live;
520   GST_OBJECT_UNLOCK (src);
521 }
522
523 /**
524  * gst_base_src_is_live:
525  * @src: base source instance
526  *
527  * Check if an element is in live mode.
528  *
529  * Returns: %TRUE if element is in live mode.
530  */
531 gboolean
532 gst_base_src_is_live (GstBaseSrc * src)
533 {
534   gboolean result;
535
536   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
537
538   GST_OBJECT_LOCK (src);
539   result = src->is_live;
540   GST_OBJECT_UNLOCK (src);
541
542   return result;
543 }
544
545 /**
546  * gst_base_src_set_format:
547  * @src: base source instance
548  * @format: the format to use
549  *
550  * Sets the default format of the source. This will be the format used
551  * for sending NEW_SEGMENT events and for performing seeks.
552  *
553  * If a format of GST_FORMAT_BYTES is set, the element will be able to
554  * operate in pull mode if the #GstBaseSrc.is_seekable() returns TRUE.
555  *
556  * Since: 0.10.1
557  */
558 void
559 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
560 {
561   g_return_if_fail (GST_IS_BASE_SRC (src));
562
563   gst_segment_init (&src->segment, format);
564 }
565
566 /**
567  * gst_base_src_query_latency:
568  * @src: the source
569  * @live: if the source is live
570  * @min_latency: the min latency of the source
571  * @max_latency: the max latency of the source
572  *
573  * Query the source for the latency parameters. @live will be TRUE when @src is
574  * configured as a live source. @min_latency will be set to the difference
575  * between the running time and the timestamp of the first buffer.
576  * @max_latency is always the undefined value of -1.
577  *
578  * This function is mostly used by subclasses.
579  *
580  * Returns: TRUE if the query succeeded.
581  *
582  * Since: 0.10.13
583  */
584 gboolean
585 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
586     GstClockTime * min_latency, GstClockTime * max_latency)
587 {
588   GstClockTime min;
589
590   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
591
592   GST_OBJECT_LOCK (src);
593   if (live)
594     *live = src->is_live;
595
596   /* if we have a startup latency, report this one, else report 0. Subclasses
597    * are supposed to override the query function if they want something
598    * else. */
599   if (src->priv->latency != -1)
600     min = src->priv->latency;
601   else
602     min = 0;
603
604   if (min_latency)
605     *min_latency = min;
606   if (max_latency)
607     *max_latency = -1;
608
609   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
610       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
611       GST_TIME_ARGS (-1));
612   GST_OBJECT_UNLOCK (src);
613
614   return TRUE;
615 }
616
617 /**
618  * gst_base_src_set_blocksize:
619  * @src: the source
620  * @blocksize: the new blocksize in bytes
621  *
622  * Set the number of bytes that @src will push out with each buffer. When
623  * @blocksize is set to -1, a default length will be used.
624  *
625  * Since: 0.10.22
626  */
627 void
628 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
629 {
630   g_return_if_fail (GST_IS_BASE_SRC (src));
631
632   GST_OBJECT_LOCK (src);
633   src->blocksize = blocksize;
634   GST_OBJECT_UNLOCK (src);
635 }
636
637 /**
638  * gst_base_src_get_blocksize:
639  * @src: the source
640  *
641  * Get the number of bytes that @src will push out with each buffer.
642  *
643  * Returns: the number of bytes pushed with each buffer.
644  *
645  * Since: 0.10.22
646  */
647 gulong
648 gst_base_src_get_blocksize (GstBaseSrc * src)
649 {
650   gulong res;
651
652   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
653
654   GST_OBJECT_LOCK (src);
655   res = src->blocksize;
656   GST_OBJECT_UNLOCK (src);
657
658   return res;
659 }
660
661
662 /**
663  * gst_base_src_set_do_timestamp:
664  * @src: the source
665  * @timestamp: enable or disable timestamping
666  *
667  * Configure @src to automatically timestamp outgoing buffers based on the
668  * current running_time of the pipeline. This property is mostly useful for live
669  * sources.
670  *
671  * Since: 0.10.15
672  */
673 void
674 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
675 {
676   g_return_if_fail (GST_IS_BASE_SRC (src));
677
678   GST_OBJECT_LOCK (src);
679   src->priv->do_timestamp = timestamp;
680   GST_OBJECT_UNLOCK (src);
681 }
682
683 /**
684  * gst_base_src_get_do_timestamp:
685  * @src: the source
686  *
687  * Query if @src timestamps outgoing buffers based on the current running_time.
688  *
689  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
690  *
691  * Since: 0.10.15
692  */
693 gboolean
694 gst_base_src_get_do_timestamp (GstBaseSrc * src)
695 {
696   gboolean res;
697
698   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
699
700   GST_OBJECT_LOCK (src);
701   res = src->priv->do_timestamp;
702   GST_OBJECT_UNLOCK (src);
703
704   return res;
705 }
706
707 /**
708  * gst_base_src_new_seamless_segment:
709  * @src: The source
710  * @start: The new start value for the segment
711  * @stop: Stop value for the new segment
712  * @position: The position value for the new segent
713  *
714  * Prepare a new seamless segment for emission downstream. This function must
715  * only be called by derived sub-classes, and only from the create() function,
716  * as the stream-lock needs to be held.
717  *
718  * The format for the new segment will be the current format of the source, as
719  * configured with gst_base_src_set_format()
720  *
721  * Returns: %TRUE if preparation of the seamless segment succeeded.
722  *
723  * Since: 0.10.26
724  */
725 gboolean
726 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
727     gint64 position)
728 {
729   gboolean res = TRUE;
730
731   GST_DEBUG_OBJECT (src,
732       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
733       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
734       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
735
736   if (src->data.ABI.running) {
737     if (src->priv->close_segment)
738       gst_event_unref (src->priv->close_segment);
739     src->priv->close_segment =
740         gst_event_new_new_segment_full (TRUE,
741         src->segment.rate, src->segment.applied_rate, src->segment.format,
742         src->segment.start, src->segment.last_stop, src->segment.time);
743   }
744
745   gst_segment_set_newsegment_full (&src->segment, FALSE, src->segment.rate,
746       src->segment.applied_rate, src->segment.format, start, stop, position);
747
748   if (src->priv->start_segment)
749     gst_event_unref (src->priv->start_segment);
750   if (src->segment.rate >= 0.0) {
751     /* forward, we send data from last_stop to stop */
752     src->priv->start_segment =
753         gst_event_new_new_segment_full (FALSE,
754         src->segment.rate, src->segment.applied_rate, src->segment.format,
755         src->segment.last_stop, stop, src->segment.time);
756   } else {
757     /* reverse, we send data from last_stop to start */
758     src->priv->start_segment =
759         gst_event_new_new_segment_full (FALSE,
760         src->segment.rate, src->segment.applied_rate, src->segment.format,
761         src->segment.start, src->segment.last_stop, src->segment.time);
762   }
763
764   src->priv->discont = TRUE;
765   src->data.ABI.running = TRUE;
766
767   return res;
768 }
769
770 static gboolean
771 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
772 {
773   GstBaseSrcClass *bclass;
774   GstBaseSrc *bsrc;
775   gboolean res = TRUE;
776
777   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
778   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
779
780   if (bclass->set_caps)
781     res = bclass->set_caps (bsrc, caps);
782
783   return res;
784 }
785
786 static GstCaps *
787 gst_base_src_getcaps (GstPad * pad)
788 {
789   GstBaseSrcClass *bclass;
790   GstBaseSrc *bsrc;
791   GstCaps *caps = NULL;
792
793   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
794   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
795   if (bclass->get_caps)
796     caps = bclass->get_caps (bsrc);
797
798   if (caps == NULL) {
799     GstPadTemplate *pad_template;
800
801     pad_template =
802         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
803     if (pad_template != NULL) {
804       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
805     }
806   }
807   return caps;
808 }
809
810 static void
811 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
812 {
813   GstBaseSrcClass *bclass;
814   GstBaseSrc *bsrc;
815
816   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
817   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
818
819   if (bclass->fixate)
820     bclass->fixate (bsrc, caps);
821
822   gst_object_unref (bsrc);
823 }
824
825 static gboolean
826 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
827 {
828   gboolean res;
829
830   switch (GST_QUERY_TYPE (query)) {
831     case GST_QUERY_POSITION:
832     {
833       GstFormat format;
834
835       gst_query_parse_position (query, &format, NULL);
836       switch (format) {
837         case GST_FORMAT_PERCENT:
838         {
839           gint64 percent;
840           gint64 position;
841           gint64 duration;
842
843           position = src->segment.last_stop;
844           duration = src->segment.duration;
845
846           if (position != -1 && duration != -1) {
847             if (position < duration)
848               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
849                   duration);
850             else
851               percent = GST_FORMAT_PERCENT_MAX;
852           } else
853             percent = -1;
854
855           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
856           res = TRUE;
857           break;
858         }
859         default:
860         {
861           gint64 position;
862
863           position = src->segment.last_stop;
864
865           if (position != -1) {
866             /* convert to requested format */
867             res =
868                 gst_pad_query_convert (src->srcpad, src->segment.format,
869                 position, &format, &position);
870           } else
871             res = TRUE;
872
873           gst_query_set_position (query, format, position);
874           break;
875         }
876       }
877       break;
878     }
879     case GST_QUERY_DURATION:
880     {
881       GstFormat format;
882
883       gst_query_parse_duration (query, &format, NULL);
884
885       GST_DEBUG_OBJECT (src, "duration query in format %s",
886           gst_format_get_name (format));
887
888       switch (format) {
889         case GST_FORMAT_PERCENT:
890           gst_query_set_duration (query, GST_FORMAT_PERCENT,
891               GST_FORMAT_PERCENT_MAX);
892           res = TRUE;
893           break;
894         default:
895         {
896           gint64 duration;
897
898           /* this is the duration as configured by the subclass. */
899           duration = src->segment.duration;
900
901           if (duration != -1) {
902             /* convert to requested format, if this fails, we have a duration
903              * but we cannot answer the query, we must return FALSE. */
904             res =
905                 gst_pad_query_convert (src->srcpad, src->segment.format,
906                 duration, &format, &duration);
907           } else {
908             /* The subclass did not configure a duration, we assume that the
909              * media has an unknown duration then and we return TRUE to report
910              * this. Note that this is not the same as returning FALSE, which
911              * means that we cannot report the duration at all. */
912             res = TRUE;
913           }
914           gst_query_set_duration (query, format, duration);
915           break;
916         }
917       }
918       break;
919     }
920
921     case GST_QUERY_SEEKING:
922     {
923       GstFormat format;
924
925       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
926       if (format == src->segment.format) {
927         gst_query_set_seeking (query, src->segment.format,
928             gst_base_src_seekable (src), 0, src->segment.duration);
929         res = TRUE;
930       } else {
931         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
932         /* Don't reply to the query to make up for demuxers which don't
933          * handle the SEEKING query yet. Players like Totem will fall back
934          * to the duration when the SEEKING query isn't answered. */
935         res = FALSE;
936       }
937       break;
938     }
939     case GST_QUERY_SEGMENT:
940     {
941       gint64 start, stop;
942
943       /* no end segment configured, current duration then */
944       if ((stop = src->segment.stop) == -1)
945         stop = src->segment.duration;
946       start = src->segment.start;
947
948       /* adjust to stream time */
949       if (src->segment.time != -1) {
950         start -= src->segment.time;
951         if (stop != -1)
952           stop -= src->segment.time;
953       }
954       gst_query_set_segment (query, src->segment.rate, src->segment.format,
955           start, stop);
956       res = TRUE;
957       break;
958     }
959
960     case GST_QUERY_FORMATS:
961     {
962       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
963           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
964       res = TRUE;
965       break;
966     }
967     case GST_QUERY_CONVERT:
968     {
969       GstFormat src_fmt, dest_fmt;
970       gint64 src_val, dest_val;
971
972       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
973
974       /* we can only convert between equal formats... */
975       if (src_fmt == dest_fmt) {
976         dest_val = src_val;
977         res = TRUE;
978       } else
979         res = FALSE;
980
981       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
982       break;
983     }
984     case GST_QUERY_LATENCY:
985     {
986       GstClockTime min, max;
987       gboolean live;
988
989       /* Subclasses should override and implement something usefull */
990       res = gst_base_src_query_latency (src, &live, &min, &max);
991
992       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
993           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
994           GST_TIME_ARGS (max));
995
996       gst_query_set_latency (query, live, min, max);
997       break;
998     }
999     case GST_QUERY_JITTER:
1000     case GST_QUERY_RATE:
1001       res = FALSE;
1002       break;
1003     case GST_QUERY_BUFFERING:
1004     {
1005       GstFormat format;
1006       gint64 start, stop, estimated;
1007
1008       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1009
1010       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1011           gst_format_get_name (format));
1012
1013       if (src->random_access) {
1014         estimated = 0;
1015         start = 0;
1016         if (format == GST_FORMAT_PERCENT)
1017           stop = GST_FORMAT_PERCENT_MAX;
1018         else
1019           stop = src->segment.duration;
1020       } else {
1021         estimated = -1;
1022         start = -1;
1023         stop = -1;
1024       }
1025       /* convert to required format. When the conversion fails, we can't answer
1026        * the query. When the value is unknown, we can don't perform conversion
1027        * but report TRUE. */
1028       if (format != GST_FORMAT_PERCENT && stop != -1) {
1029         res = gst_pad_query_convert (src->srcpad, src->segment.format,
1030             stop, &format, &stop);
1031       } else {
1032         res = TRUE;
1033       }
1034       if (res && format != GST_FORMAT_PERCENT && start != -1)
1035         res = gst_pad_query_convert (src->srcpad, src->segment.format,
1036             start, &format, &start);
1037
1038       gst_query_set_buffering_range (query, format, start, stop, estimated);
1039       break;
1040     }
1041     default:
1042       res = FALSE;
1043       break;
1044   }
1045   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1046       res);
1047   return res;
1048 }
1049
1050 static gboolean
1051 gst_base_src_query (GstPad * pad, GstQuery * query)
1052 {
1053   GstBaseSrc *src;
1054   GstBaseSrcClass *bclass;
1055   gboolean result = FALSE;
1056
1057   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1058
1059   bclass = GST_BASE_SRC_GET_CLASS (src);
1060
1061   if (bclass->query)
1062     result = bclass->query (src, query);
1063   else
1064     result = gst_pad_query_default (pad, query);
1065
1066   gst_object_unref (src);
1067
1068   return result;
1069 }
1070
1071 static gboolean
1072 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1073 {
1074   gboolean res = TRUE;
1075
1076   /* update our offset if the start/stop position was updated */
1077   if (segment->format == GST_FORMAT_BYTES) {
1078     segment->time = segment->start;
1079   } else if (segment->start == 0) {
1080     /* seek to start, we can implement a default for this. */
1081     segment->time = 0;
1082   } else {
1083     res = FALSE;
1084     GST_INFO_OBJECT (src, "Can't do a default seek");
1085   }
1086
1087   return res;
1088 }
1089
1090 static gboolean
1091 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1092 {
1093   GstBaseSrcClass *bclass;
1094   gboolean result = FALSE;
1095
1096   bclass = GST_BASE_SRC_GET_CLASS (src);
1097
1098   if (bclass->do_seek)
1099     result = bclass->do_seek (src, segment);
1100
1101   return result;
1102 }
1103
1104 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1105
1106 static gboolean
1107 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1108     GstSegment * segment)
1109 {
1110   /* By default, we try one of 2 things:
1111    *   - For absolute seek positions, convert the requested position to our
1112    *     configured processing format and place it in the output segment \
1113    *   - For relative seek positions, convert our current (input) values to the
1114    *     seek format, adjust by the relative seek offset and then convert back to
1115    *     the processing format
1116    */
1117   GstSeekType cur_type, stop_type;
1118   gint64 cur, stop;
1119   GstSeekFlags flags;
1120   GstFormat seek_format, dest_format;
1121   gdouble rate;
1122   gboolean update;
1123   gboolean res = TRUE;
1124
1125   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1126       &cur_type, &cur, &stop_type, &stop);
1127   dest_format = segment->format;
1128
1129   if (seek_format == dest_format) {
1130     gst_segment_set_seek (segment, rate, seek_format, flags,
1131         cur_type, cur, stop_type, stop, &update);
1132     return TRUE;
1133   }
1134
1135   if (cur_type != GST_SEEK_TYPE_NONE) {
1136     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1137     res =
1138         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1139         &cur);
1140     cur_type = GST_SEEK_TYPE_SET;
1141   }
1142
1143   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1144     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1145     res =
1146         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1147         &stop);
1148     stop_type = GST_SEEK_TYPE_SET;
1149   }
1150
1151   /* And finally, configure our output segment in the desired format */
1152   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1153       stop_type, stop, &update);
1154
1155   if (!res)
1156     goto no_format;
1157
1158   return res;
1159
1160 no_format:
1161   {
1162     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1163     return FALSE;
1164   }
1165 }
1166
1167 static gboolean
1168 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1169     GstSegment * seeksegment)
1170 {
1171   GstBaseSrcClass *bclass;
1172   gboolean result = FALSE;
1173
1174   bclass = GST_BASE_SRC_GET_CLASS (src);
1175
1176   if (bclass->prepare_seek_segment)
1177     result = bclass->prepare_seek_segment (src, event, seeksegment);
1178
1179   return result;
1180 }
1181
1182 /* this code implements the seeking. It is a good example
1183  * handling all cases.
1184  *
1185  * A seek updates the currently configured segment.start
1186  * and segment.stop values based on the SEEK_TYPE. If the
1187  * segment.start value is updated, a seek to this new position
1188  * should be performed.
1189  *
1190  * The seek can only be executed when we are not currently
1191  * streaming any data, to make sure that this is the case, we
1192  * acquire the STREAM_LOCK which is taken when we are in the
1193  * _loop() function or when a getrange() is called. Normally
1194  * we will not receive a seek if we are operating in pull mode
1195  * though. When we operate as a live source we might block on the live
1196  * cond, which does not release the STREAM_LOCK. Therefore we will try
1197  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1198  * safe to perform the seek.
1199  *
1200  * When we are in the loop() function, we might be in the middle
1201  * of pushing a buffer, which might block in a sink. To make sure
1202  * that the push gets unblocked we push out a FLUSH_START event.
1203  * Our loop function will get a WRONG_STATE return value from
1204  * the push and will pause, effectively releasing the STREAM_LOCK.
1205  *
1206  * For a non-flushing seek, we pause the task, which might eventually
1207  * release the STREAM_LOCK. We say eventually because when the sink
1208  * blocks on the sample we might wait a very long time until the sink
1209  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1210  * can continue the seek. A non-flushing seek is normally done in a
1211  * running pipeline to perform seamless playback, this means that the sink is
1212  * PLAYING and will return from its chain function.
1213  * In the case of a non-flushing seek we need to make sure that the
1214  * data we output after the seek is continuous with the previous data,
1215  * this is because a non-flushing seek does not reset the running-time
1216  * to 0. We do this by closing the currently running segment, ie. sending
1217  * a new_segment event with the stop position set to the last processed
1218  * position.
1219  *
1220  * After updating the segment.start/stop values, we prepare for
1221  * streaming again. We push out a FLUSH_STOP to make the peer pad
1222  * accept data again and we start our task again.
1223  *
1224  * A segment seek posts a message on the bus saying that the playback
1225  * of the segment started. We store the segment flag internally because
1226  * when we reach the segment.stop we have to post a segment.done
1227  * instead of EOS when doing a segment seek.
1228  */
1229 /* FIXME (0.11), we have the unlock gboolean here because most current
1230  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1231  * the streaming thread isn't running, resulting in bogus unlocks later when it
1232  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1233  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1234  * until 0.11
1235  */
1236 static gboolean
1237 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1238 {
1239   gboolean res = TRUE, tres;
1240   gdouble rate;
1241   GstFormat seek_format, dest_format;
1242   GstSeekFlags flags;
1243   GstSeekType cur_type, stop_type;
1244   gint64 cur, stop;
1245   gboolean flush, playing;
1246   gboolean update;
1247   gboolean relative_seek = FALSE;
1248   gboolean seekseg_configured = FALSE;
1249   GstSegment seeksegment;
1250   guint32 seqnum;
1251   GstEvent *tevent;
1252
1253   GST_DEBUG_OBJECT (src, "doing seek");
1254
1255   dest_format = src->segment.format;
1256
1257   if (event) {
1258     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1259         &cur_type, &cur, &stop_type, &stop);
1260
1261     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1262         SEEK_TYPE_IS_RELATIVE (stop_type);
1263
1264     if (dest_format != seek_format && !relative_seek) {
1265       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1266        * here before taking the stream lock, otherwise we must convert it later,
1267        * once we have the stream lock and can read the last configures segment
1268        * start and stop positions */
1269       gst_segment_init (&seeksegment, dest_format);
1270
1271       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1272         goto prepare_failed;
1273
1274       seekseg_configured = TRUE;
1275     }
1276
1277     flush = flags & GST_SEEK_FLAG_FLUSH;
1278     seqnum = gst_event_get_seqnum (event);
1279   } else {
1280     flush = FALSE;
1281     /* get next seqnum */
1282     seqnum = gst_util_seqnum_next ();
1283   }
1284
1285   /* send flush start */
1286   if (flush) {
1287     tevent = gst_event_new_flush_start ();
1288     gst_event_set_seqnum (tevent, seqnum);
1289     gst_pad_push_event (src->srcpad, tevent);
1290   } else
1291     gst_pad_pause_task (src->srcpad);
1292
1293   /* unblock streaming thread. */
1294   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1295
1296   /* grab streaming lock, this should eventually be possible, either
1297    * because the task is paused, our streaming thread stopped
1298    * or because our peer is flushing. */
1299   GST_PAD_STREAM_LOCK (src->srcpad);
1300   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1301     /* we have seen this event before, issue a warning for now */
1302     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1303         seqnum);
1304   } else {
1305     src->priv->seqnum = seqnum;
1306     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1307   }
1308
1309   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1310
1311   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1312    * copy the current segment info into the temp segment that we can actually
1313    * attempt the seek with. We only update the real segment if the seek suceeds. */
1314   if (!seekseg_configured) {
1315     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1316
1317     /* now configure the final seek segment */
1318     if (event) {
1319       if (src->segment.format != seek_format) {
1320         /* OK, here's where we give the subclass a chance to convert the relative
1321          * seek into an absolute one in the processing format. We set up any
1322          * absolute seek above, before taking the stream lock. */
1323         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1324           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1325               "Aborting seek");
1326           res = FALSE;
1327         }
1328       } else {
1329         /* The seek format matches our processing format, no need to ask the
1330          * the subclass to configure the segment. */
1331         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1332             cur_type, cur, stop_type, stop, &update);
1333       }
1334     }
1335     /* Else, no seek event passed, so we're just (re)starting the
1336        current segment. */
1337   }
1338
1339   if (res) {
1340     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1341         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1342         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1343
1344     /* do the seek, segment.last_stop contains the new position. */
1345     res = gst_base_src_do_seek (src, &seeksegment);
1346   }
1347
1348   /* and prepare to continue streaming */
1349   if (flush) {
1350     tevent = gst_event_new_flush_stop ();
1351     gst_event_set_seqnum (tevent, seqnum);
1352     /* send flush stop, peer will accept data and events again. We
1353      * are not yet providing data as we still have the STREAM_LOCK. */
1354     gst_pad_push_event (src->srcpad, tevent);
1355   } else if (res && src->data.ABI.running) {
1356     /* we are running the current segment and doing a non-flushing seek,
1357      * close the segment first based on the last_stop. */
1358     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1359         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1360
1361     /* queue the segment for sending in the stream thread */
1362     if (src->priv->close_segment)
1363       gst_event_unref (src->priv->close_segment);
1364     src->priv->close_segment =
1365         gst_event_new_new_segment_full (TRUE,
1366         src->segment.rate, src->segment.applied_rate, src->segment.format,
1367         src->segment.start, src->segment.last_stop, src->segment.time);
1368     gst_event_set_seqnum (src->priv->close_segment, seqnum);
1369   }
1370
1371   /* The subclass must have converted the segment to the processing format
1372    * by now */
1373   if (res && seeksegment.format != dest_format) {
1374     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1375         "in the correct format. Aborting seek.");
1376     res = FALSE;
1377   }
1378
1379   /* if the seek was successful, we update our real segment and push
1380    * out the new segment. */
1381   if (res) {
1382     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1383
1384     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1385       GstMessage *message;
1386
1387       message = gst_message_new_segment_start (GST_OBJECT (src),
1388           src->segment.format, src->segment.last_stop);
1389       gst_message_set_seqnum (message, seqnum);
1390
1391       gst_element_post_message (GST_ELEMENT (src), message);
1392     }
1393
1394     /* for deriving a stop position for the playback segment from the seek
1395      * segment, we must take the duration when the stop is not set */
1396     if ((stop = src->segment.stop) == -1)
1397       stop = src->segment.duration;
1398
1399     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1400         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1401
1402     /* now replace the old segment so that we send it in the stream thread the
1403      * next time it is scheduled. */
1404     if (src->priv->start_segment)
1405       gst_event_unref (src->priv->start_segment);
1406     if (src->segment.rate >= 0.0) {
1407       /* forward, we send data from last_stop to stop */
1408       src->priv->start_segment =
1409           gst_event_new_new_segment_full (FALSE,
1410           src->segment.rate, src->segment.applied_rate, src->segment.format,
1411           src->segment.last_stop, stop, src->segment.time);
1412     } else {
1413       /* reverse, we send data from last_stop to start */
1414       src->priv->start_segment =
1415           gst_event_new_new_segment_full (FALSE,
1416           src->segment.rate, src->segment.applied_rate, src->segment.format,
1417           src->segment.start, src->segment.last_stop, src->segment.time);
1418     }
1419     gst_event_set_seqnum (src->priv->start_segment, seqnum);
1420   }
1421
1422   src->priv->discont = TRUE;
1423   src->data.ABI.running = TRUE;
1424   /* and restart the task in case it got paused explicitly or by
1425    * the FLUSH_START event we pushed out. */
1426   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1427       src->srcpad);
1428   if (res && !tres)
1429     res = FALSE;
1430
1431   /* and release the lock again so we can continue streaming */
1432   GST_PAD_STREAM_UNLOCK (src->srcpad);
1433
1434   return res;
1435
1436   /* ERROR */
1437 prepare_failed:
1438   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1439       "Aborting seek");
1440   return FALSE;
1441 }
1442
1443 static const GstQueryType *
1444 gst_base_src_get_query_types (GstElement * element)
1445 {
1446   static const GstQueryType query_types[] = {
1447     GST_QUERY_DURATION,
1448     GST_QUERY_POSITION,
1449     GST_QUERY_SEEKING,
1450     GST_QUERY_SEGMENT,
1451     GST_QUERY_FORMATS,
1452     GST_QUERY_LATENCY,
1453     GST_QUERY_JITTER,
1454     GST_QUERY_RATE,
1455     GST_QUERY_CONVERT,
1456     0
1457   };
1458
1459   return query_types;
1460 }
1461
1462 /* all events send to this element directly. This is mainly done from the
1463  * application.
1464  */
1465 static gboolean
1466 gst_base_src_send_event (GstElement * element, GstEvent * event)
1467 {
1468   GstBaseSrc *src;
1469   gboolean result = FALSE;
1470
1471   src = GST_BASE_SRC (element);
1472
1473   GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
1474
1475   switch (GST_EVENT_TYPE (event)) {
1476       /* bidirectional events */
1477     case GST_EVENT_FLUSH_START:
1478     case GST_EVENT_FLUSH_STOP:
1479       /* sending random flushes downstream can break stuff,
1480        * especially sync since all segment info will get flushed */
1481       break;
1482
1483       /* downstream serialized events */
1484     case GST_EVENT_EOS:
1485     {
1486       GstBaseSrcClass *bclass;
1487
1488       bclass = GST_BASE_SRC_GET_CLASS (src);
1489
1490       /* queue EOS and make sure the task or pull function performs the EOS
1491        * actions.
1492        *
1493        * We have two possibilities:
1494        *
1495        *  - Before we are to enter the _create function, we check the pending_eos
1496        *    first and do EOS instead of entering it.
1497        *  - If we are in the _create function or we did not manage to set the
1498        *    flag fast enough and we are about to enter the _create function,
1499        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1500        *    check the EOS flag and do the EOS logic.
1501        */
1502       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1503       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1504
1505       /* unlock the _create function so that we can check the pending_eos flag
1506        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1507        * that we can grab it and stop the unlock again. We don't take the stream
1508        * lock so that this operation is guaranteed to never block. */
1509       if (bclass->unlock)
1510         bclass->unlock (src);
1511
1512       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1513
1514       GST_LIVE_LOCK (src);
1515       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1516       /* now stop the unlock of the streaming thread again. Grabbing the live
1517        * lock is enough because that protects the create function. */
1518       if (bclass->unlock_stop)
1519         bclass->unlock_stop (src);
1520       GST_LIVE_UNLOCK (src);
1521
1522       result = TRUE;
1523       break;
1524     }
1525     case GST_EVENT_NEWSEGMENT:
1526       /* sending random NEWSEGMENT downstream can break sync. */
1527       break;
1528     case GST_EVENT_TAG:
1529       /* Insert tag in the dataflow */
1530       GST_OBJECT_LOCK (src);
1531       src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
1532       GST_OBJECT_UNLOCK (src);
1533       event = NULL;
1534       result = TRUE;
1535       break;
1536     case GST_EVENT_BUFFERSIZE:
1537       /* does not seem to make much sense currently */
1538       break;
1539
1540       /* upstream events */
1541     case GST_EVENT_QOS:
1542       /* elements should override send_event and do something */
1543       break;
1544     case GST_EVENT_SEEK:
1545     {
1546       gboolean started;
1547
1548       GST_OBJECT_LOCK (src->srcpad);
1549       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1550         goto wrong_mode;
1551       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1552       GST_OBJECT_UNLOCK (src->srcpad);
1553
1554       if (started) {
1555         GST_DEBUG_OBJECT (src, "performing seek");
1556         /* when we are running in push mode, we can execute the
1557          * seek right now, we need to unlock. */
1558         result = gst_base_src_perform_seek (src, event, TRUE);
1559       } else {
1560         GstEvent **event_p;
1561
1562         /* else we store the event and execute the seek when we
1563          * get activated */
1564         GST_OBJECT_LOCK (src);
1565         GST_DEBUG_OBJECT (src, "queueing seek");
1566         event_p = &src->data.ABI.pending_seek;
1567         gst_event_replace ((GstEvent **) event_p, event);
1568         GST_OBJECT_UNLOCK (src);
1569         /* assume the seek will work */
1570         result = TRUE;
1571       }
1572       break;
1573     }
1574     case GST_EVENT_NAVIGATION:
1575       /* could make sense for elements that do something with navigation events
1576        * but then they would need to override the send_event function */
1577       break;
1578     case GST_EVENT_LATENCY:
1579       /* does not seem to make sense currently */
1580       break;
1581
1582       /* custom events */
1583     case GST_EVENT_CUSTOM_UPSTREAM:
1584       /* override send_event if you want this */
1585       break;
1586     case GST_EVENT_CUSTOM_DOWNSTREAM:
1587     case GST_EVENT_CUSTOM_BOTH:
1588       /* FIXME, insert event in the dataflow */
1589       break;
1590     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1591     case GST_EVENT_CUSTOM_BOTH_OOB:
1592       /* insert a random custom event into the pipeline */
1593       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1594       result = gst_pad_push_event (src->srcpad, event);
1595       /* we gave away the ref to the event in the push */
1596       event = NULL;
1597       break;
1598     default:
1599       break;
1600   }
1601 done:
1602   /* if we still have a ref to the event, unref it now */
1603   if (event)
1604     gst_event_unref (event);
1605
1606   return result;
1607
1608   /* ERRORS */
1609 wrong_mode:
1610   {
1611     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1612     GST_OBJECT_UNLOCK (src->srcpad);
1613     result = FALSE;
1614     goto done;
1615   }
1616 }
1617
1618 static gboolean
1619 gst_base_src_seekable (GstBaseSrc * src)
1620 {
1621   GstBaseSrcClass *bclass;
1622   bclass = GST_BASE_SRC_GET_CLASS (src);
1623   if (bclass->is_seekable)
1624     return bclass->is_seekable (src);
1625   else
1626     return FALSE;
1627 }
1628
1629 static gboolean
1630 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1631 {
1632   gboolean result;
1633
1634   switch (GST_EVENT_TYPE (event)) {
1635     case GST_EVENT_SEEK:
1636       /* is normally called when in push mode */
1637       if (!gst_base_src_seekable (src))
1638         goto not_seekable;
1639
1640       result = gst_base_src_perform_seek (src, event, TRUE);
1641       break;
1642     case GST_EVENT_FLUSH_START:
1643       /* cancel any blocking getrange, is normally called
1644        * when in pull mode. */
1645       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1646       break;
1647     case GST_EVENT_FLUSH_STOP:
1648       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1649       break;
1650     default:
1651       result = TRUE;
1652       break;
1653   }
1654   return result;
1655
1656   /* ERRORS */
1657 not_seekable:
1658   {
1659     GST_DEBUG_OBJECT (src, "is not seekable");
1660     return FALSE;
1661   }
1662 }
1663
1664 static gboolean
1665 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1666 {
1667   GstBaseSrc *src;
1668   GstBaseSrcClass *bclass;
1669   gboolean result = FALSE;
1670
1671   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1672   bclass = GST_BASE_SRC_GET_CLASS (src);
1673
1674   if (bclass->event) {
1675     if (!(result = bclass->event (src, event)))
1676       goto subclass_failed;
1677   }
1678
1679 done:
1680   gst_event_unref (event);
1681   gst_object_unref (src);
1682
1683   return result;
1684
1685   /* ERRORS */
1686 subclass_failed:
1687   {
1688     GST_DEBUG_OBJECT (src, "subclass refused event");
1689     goto done;
1690   }
1691 }
1692
1693 static void
1694 gst_base_src_set_property (GObject * object, guint prop_id,
1695     const GValue * value, GParamSpec * pspec)
1696 {
1697   GstBaseSrc *src;
1698
1699   src = GST_BASE_SRC (object);
1700
1701   switch (prop_id) {
1702     case PROP_BLOCKSIZE:
1703       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1704       break;
1705     case PROP_NUM_BUFFERS:
1706       src->num_buffers = g_value_get_int (value);
1707       break;
1708     case PROP_TYPEFIND:
1709       src->data.ABI.typefind = g_value_get_boolean (value);
1710       break;
1711     case PROP_DO_TIMESTAMP:
1712       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1713       break;
1714     default:
1715       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1716       break;
1717   }
1718 }
1719
1720 static void
1721 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1722     GParamSpec * pspec)
1723 {
1724   GstBaseSrc *src;
1725
1726   src = GST_BASE_SRC (object);
1727
1728   switch (prop_id) {
1729     case PROP_BLOCKSIZE:
1730       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1731       break;
1732     case PROP_NUM_BUFFERS:
1733       g_value_set_int (value, src->num_buffers);
1734       break;
1735     case PROP_TYPEFIND:
1736       g_value_set_boolean (value, src->data.ABI.typefind);
1737       break;
1738     case PROP_DO_TIMESTAMP:
1739       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1740       break;
1741     default:
1742       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1743       break;
1744   }
1745 }
1746
1747 /* with STREAM_LOCK and LOCK */
1748 static GstClockReturn
1749 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1750 {
1751   GstClockReturn ret;
1752   GstClockID id;
1753
1754   id = gst_clock_new_single_shot_id (clock, time);
1755
1756   basesrc->clock_id = id;
1757   /* release the live lock while waiting */
1758   GST_LIVE_UNLOCK (basesrc);
1759
1760   ret = gst_clock_id_wait (id, NULL);
1761
1762   GST_LIVE_LOCK (basesrc);
1763   gst_clock_id_unref (id);
1764   basesrc->clock_id = NULL;
1765
1766   return ret;
1767 }
1768
1769 /* perform synchronisation on a buffer.
1770  * with STREAM_LOCK.
1771  */
1772 static GstClockReturn
1773 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1774 {
1775   GstClockReturn result;
1776   GstClockTime start, end;
1777   GstBaseSrcClass *bclass;
1778   GstClockTime base_time;
1779   GstClock *clock;
1780   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1781   gboolean do_timestamp, first, pseudo_live;
1782
1783   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1784
1785   start = end = -1;
1786   if (bclass->get_times)
1787     bclass->get_times (basesrc, buffer, &start, &end);
1788
1789   /* get buffer timestamp */
1790   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1791
1792   /* grab the lock to prepare for clocking and calculate the startup
1793    * latency. */
1794   GST_OBJECT_LOCK (basesrc);
1795
1796   /* if we are asked to sync against the clock we are a pseudo live element */
1797   pseudo_live = (start != -1 && basesrc->is_live);
1798   /* check for the first buffer */
1799   first = (basesrc->priv->latency == -1);
1800
1801   if (timestamp != -1 && pseudo_live) {
1802     GstClockTime latency;
1803
1804     /* we have a timestamp and a sync time, latency is the diff */
1805     if (timestamp <= start)
1806       latency = start - timestamp;
1807     else
1808       latency = 0;
1809
1810     if (first) {
1811       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1812           GST_TIME_ARGS (latency));
1813       /* first time we calculate latency, just configure */
1814       basesrc->priv->latency = latency;
1815     } else {
1816       if (basesrc->priv->latency != latency) {
1817         /* we have a new latency, FIXME post latency message */
1818         basesrc->priv->latency = latency;
1819         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1820             GST_TIME_ARGS (latency));
1821       }
1822     }
1823   } else if (first) {
1824     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1825         basesrc->is_live, start != -1);
1826     basesrc->priv->latency = 0;
1827   }
1828
1829   /* get clock, if no clock, we can't sync or do timestamps */
1830   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1831     goto no_clock;
1832
1833   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1834
1835   do_timestamp = basesrc->priv->do_timestamp;
1836
1837   /* first buffer, calculate the timestamp offset */
1838   if (first) {
1839     GstClockTime running_time;
1840
1841     now = gst_clock_get_time (clock);
1842     running_time = now - base_time;
1843
1844     GST_LOG_OBJECT (basesrc,
1845         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1846         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1847         GST_TIME_ARGS (running_time));
1848
1849     if (pseudo_live && timestamp != -1) {
1850       /* live source and we need to sync, add startup latency to all timestamps
1851        * to get the real running_time. Live sources should always timestamp
1852        * according to the current running time. */
1853       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1854
1855       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1856           GST_TIME_ARGS (basesrc->priv->ts_offset));
1857     } else {
1858       basesrc->priv->ts_offset = 0;
1859       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1860     }
1861
1862     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1863       if (do_timestamp)
1864         timestamp = running_time;
1865       else
1866         timestamp = 0;
1867
1868       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1869
1870       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1871           GST_TIME_ARGS (timestamp));
1872     }
1873
1874     /* add the timestamp offset we need for sync */
1875     timestamp += basesrc->priv->ts_offset;
1876   } else {
1877     /* not the first buffer, the timestamp is the diff between the clock and
1878      * base_time */
1879     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1880       now = gst_clock_get_time (clock);
1881
1882       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1883
1884       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1885           GST_TIME_ARGS (now - base_time));
1886     }
1887   }
1888
1889   /* if we don't have a buffer timestamp, we don't sync */
1890   if (!GST_CLOCK_TIME_IS_VALID (start))
1891     goto no_sync;
1892
1893   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1894     /* for pseudo live sources, add our ts_offset to the timestamp */
1895     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1896     start += basesrc->priv->ts_offset;
1897   }
1898
1899   GST_LOG_OBJECT (basesrc,
1900       "waiting for clock, base time %" GST_TIME_FORMAT
1901       ", stream_start %" GST_TIME_FORMAT,
1902       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1903   GST_OBJECT_UNLOCK (basesrc);
1904
1905   result = gst_base_src_wait (basesrc, clock, start + base_time);
1906
1907   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1908
1909   return result;
1910
1911   /* special cases */
1912 no_clock:
1913   {
1914     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1915     GST_OBJECT_UNLOCK (basesrc);
1916     return GST_CLOCK_OK;
1917   }
1918 no_sync:
1919   {
1920     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1921     GST_OBJECT_UNLOCK (basesrc);
1922     return GST_CLOCK_OK;
1923   }
1924 }
1925
1926 static gboolean
1927 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1928 {
1929   guint64 size, maxsize;
1930   GstBaseSrcClass *bclass;
1931
1932   bclass = GST_BASE_SRC_GET_CLASS (src);
1933
1934   /* only operate if we are working with bytes */
1935   if (src->segment.format != GST_FORMAT_BYTES)
1936     return TRUE;
1937
1938   /* get total file size */
1939   size = (guint64) src->segment.duration;
1940
1941   /* the max amount of bytes to read is the total size or
1942    * up to the segment.stop if present. */
1943   if (src->segment.stop != -1)
1944     maxsize = MIN (size, src->segment.stop);
1945   else
1946     maxsize = size;
1947
1948   GST_DEBUG_OBJECT (src,
1949       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1950       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1951       *length, size, src->segment.stop, maxsize);
1952
1953   /* check size if we have one */
1954   if (maxsize != -1) {
1955     /* if we run past the end, check if the file became bigger and
1956      * retry. */
1957     if (G_UNLIKELY (offset + *length >= maxsize)) {
1958       /* see if length of the file changed */
1959       if (bclass->get_size)
1960         if (!bclass->get_size (src, &size))
1961           size = -1;
1962
1963       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1964
1965       /* make sure we don't exceed the configured segment stop
1966        * if it was set */
1967       if (src->segment.stop != -1)
1968         maxsize = MIN (size, src->segment.stop);
1969       else
1970         maxsize = size;
1971
1972       /* if we are at or past the end, EOS */
1973       if (G_UNLIKELY (offset >= maxsize))
1974         goto unexpected_length;
1975
1976       /* else we can clip to the end */
1977       if (G_UNLIKELY (offset + *length >= maxsize))
1978         *length = maxsize - offset;
1979
1980     }
1981   }
1982
1983   /* keep track of current position. segment is in bytes, we checked
1984    * that above. */
1985   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1986
1987   return TRUE;
1988
1989   /* ERRORS */
1990 unexpected_length:
1991   {
1992     return FALSE;
1993   }
1994 }
1995
1996 /* must be called with LIVE_LOCK */
1997 static GstFlowReturn
1998 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1999     GstBuffer ** buf)
2000 {
2001   GstFlowReturn ret;
2002   GstBaseSrcClass *bclass;
2003   GstClockReturn status;
2004
2005   bclass = GST_BASE_SRC_GET_CLASS (src);
2006
2007 again:
2008   if (src->is_live) {
2009     while (G_UNLIKELY (!src->live_running)) {
2010       ret = gst_base_src_wait_playing (src);
2011       if (ret != GST_FLOW_OK)
2012         goto stopped;
2013     }
2014   }
2015
2016   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2017     goto not_started;
2018
2019   if (G_UNLIKELY (!bclass->create))
2020     goto no_function;
2021
2022   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2023     goto unexpected_length;
2024
2025   /* normally we don't count buffers */
2026   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2027     if (src->num_buffers_left == 0)
2028       goto reached_num_buffers;
2029     else
2030       src->num_buffers_left--;
2031   }
2032
2033   /* don't enter the create function if a pending EOS event was set. For the
2034    * logic of the pending_eos, check the event function of this class. */
2035   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2036     goto eos;
2037
2038   GST_DEBUG_OBJECT (src,
2039       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2040       G_GINT64_FORMAT, offset, length, src->segment.time);
2041
2042   ret = bclass->create (src, offset, length, buf);
2043
2044   /* The create function could be unlocked because we have a pending EOS. It's
2045    * possible that we have a valid buffer from create that we need to
2046    * discard when the create function returned _OK. */
2047   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2048     if (ret == GST_FLOW_OK) {
2049       gst_buffer_unref (*buf);
2050       *buf = NULL;
2051     }
2052     goto eos;
2053   }
2054
2055   if (G_UNLIKELY (ret != GST_FLOW_OK))
2056     goto not_ok;
2057
2058   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2059   if (offset == 0 && src->segment.time == 0
2060       && GST_BUFFER_TIMESTAMP (*buf) == -1)
2061     GST_BUFFER_TIMESTAMP (*buf) = 0;
2062
2063   /* set pad caps on the buffer if the buffer had no caps */
2064   if (GST_BUFFER_CAPS (*buf) == NULL)
2065     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
2066
2067   /* now sync before pushing the buffer */
2068   status = gst_base_src_do_sync (src, *buf);
2069
2070   /* waiting for the clock could have made us flushing */
2071   if (G_UNLIKELY (src->priv->flushing))
2072     goto flushing;
2073
2074   switch (status) {
2075     case GST_CLOCK_EARLY:
2076       /* the buffer is too late. We currently don't drop the buffer. */
2077       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2078       break;
2079     case GST_CLOCK_OK:
2080       /* buffer synchronised properly */
2081       GST_DEBUG_OBJECT (src, "buffer ok");
2082       break;
2083     case GST_CLOCK_UNSCHEDULED:
2084       /* this case is triggered when we were waiting for the clock and
2085        * it got unlocked because we did a state change. In any case, get rid of
2086        * the buffer. */
2087       gst_buffer_unref (*buf);
2088       *buf = NULL;
2089       if (!src->live_running) {
2090         /* We return WRONG_STATE when we are not running to stop the dataflow also
2091          * get rid of the produced buffer. */
2092         GST_DEBUG_OBJECT (src,
2093             "clock was unscheduled (%d), returning WRONG_STATE", status);
2094         ret = GST_FLOW_WRONG_STATE;
2095       } else {
2096         /* If we are running when this happens, we quickly switched between
2097          * pause and playing. We try to produce a new buffer */
2098         GST_DEBUG_OBJECT (src,
2099             "clock was unscheduled (%d), but we are running", status);
2100         goto again;
2101       }
2102       break;
2103     default:
2104       /* all other result values are unexpected and errors */
2105       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2106           (_("Internal clock error.")),
2107           ("clock returned unexpected return value %d", status));
2108       gst_buffer_unref (*buf);
2109       *buf = NULL;
2110       ret = GST_FLOW_ERROR;
2111       break;
2112   }
2113   return ret;
2114
2115   /* ERROR */
2116 stopped:
2117   {
2118     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2119         gst_flow_get_name (ret));
2120     return ret;
2121   }
2122 not_ok:
2123   {
2124     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2125         gst_flow_get_name (ret));
2126     return ret;
2127   }
2128 not_started:
2129   {
2130     GST_DEBUG_OBJECT (src, "getrange but not started");
2131     return GST_FLOW_WRONG_STATE;
2132   }
2133 no_function:
2134   {
2135     GST_DEBUG_OBJECT (src, "no create function");
2136     return GST_FLOW_ERROR;
2137   }
2138 unexpected_length:
2139   {
2140     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2141         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2142     return GST_FLOW_UNEXPECTED;
2143   }
2144 reached_num_buffers:
2145   {
2146     GST_DEBUG_OBJECT (src, "sent all buffers");
2147     return GST_FLOW_UNEXPECTED;
2148   }
2149 flushing:
2150   {
2151     GST_DEBUG_OBJECT (src, "we are flushing");
2152     gst_buffer_unref (*buf);
2153     *buf = NULL;
2154     return GST_FLOW_WRONG_STATE;
2155   }
2156 eos:
2157   {
2158     GST_DEBUG_OBJECT (src, "we are EOS");
2159     return GST_FLOW_UNEXPECTED;
2160   }
2161 }
2162
2163 static GstFlowReturn
2164 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2165     GstBuffer ** buf)
2166 {
2167   GstBaseSrc *src;
2168   GstFlowReturn res;
2169
2170   src = GST_BASE_SRC (gst_pad_get_parent (pad));
2171
2172   GST_LIVE_LOCK (src);
2173   if (G_UNLIKELY (src->priv->flushing))
2174     goto flushing;
2175
2176   res = gst_base_src_get_range (src, offset, length, buf);
2177
2178 done:
2179   GST_LIVE_UNLOCK (src);
2180
2181   gst_object_unref (src);
2182
2183   return res;
2184
2185   /* ERRORS */
2186 flushing:
2187   {
2188     GST_DEBUG_OBJECT (src, "we are flushing");
2189     res = GST_FLOW_WRONG_STATE;
2190     goto done;
2191   }
2192 }
2193
2194 static gboolean
2195 gst_base_src_default_check_get_range (GstBaseSrc * src)
2196 {
2197   gboolean res;
2198
2199   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2200     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2201     if (G_LIKELY (gst_base_src_start (src)))
2202       gst_base_src_stop (src);
2203   }
2204
2205   /* we can operate in getrange mode if the native format is bytes
2206    * and we are seekable, this condition is set in the random_access
2207    * flag and is set in the _start() method. */
2208   res = src->random_access;
2209
2210   return res;
2211 }
2212
2213 static gboolean
2214 gst_base_src_check_get_range (GstBaseSrc * src)
2215 {
2216   GstBaseSrcClass *bclass;
2217   gboolean res;
2218
2219   bclass = GST_BASE_SRC_GET_CLASS (src);
2220
2221   if (bclass->check_get_range == NULL)
2222     goto no_function;
2223
2224   res = bclass->check_get_range (src);
2225   GST_LOG_OBJECT (src, "%s() returned %d",
2226       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2227
2228   return res;
2229
2230   /* ERRORS */
2231 no_function:
2232   {
2233     GST_WARNING_OBJECT (src, "no check_get_range function set");
2234     return FALSE;
2235   }
2236 }
2237
2238 static gboolean
2239 gst_base_src_pad_check_get_range (GstPad * pad)
2240 {
2241   GstBaseSrc *src;
2242   gboolean res;
2243
2244   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2245
2246   res = gst_base_src_check_get_range (src);
2247
2248   return res;
2249 }
2250
2251 static void
2252 gst_base_src_loop (GstPad * pad)
2253 {
2254   GstBaseSrc *src;
2255   GstBuffer *buf = NULL;
2256   GstFlowReturn ret;
2257   gint64 position;
2258   gboolean eos;
2259   gulong blocksize;
2260   GList *tags, *tmp;
2261
2262   eos = FALSE;
2263
2264   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2265
2266   GST_LIVE_LOCK (src);
2267
2268   if (G_UNLIKELY (src->priv->flushing))
2269     goto flushing;
2270
2271   src->priv->last_sent_eos = FALSE;
2272
2273   blocksize = src->blocksize;
2274
2275   /* if we operate in bytes, we can calculate an offset */
2276   if (src->segment.format == GST_FORMAT_BYTES) {
2277     position = src->segment.last_stop;
2278     /* for negative rates, start with subtracting the blocksize */
2279     if (src->segment.rate < 0.0) {
2280       /* we cannot go below segment.start */
2281       if (position > src->segment.start + blocksize)
2282         position -= blocksize;
2283       else {
2284         /* last block, remainder up to segment.start */
2285         blocksize = position - src->segment.start;
2286         position = src->segment.start;
2287       }
2288     }
2289   } else
2290     position = -1;
2291
2292   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2293       GST_TIME_ARGS (position), blocksize);
2294
2295   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2296   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2297     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2298         gst_flow_get_name (ret));
2299     GST_LIVE_UNLOCK (src);
2300     goto pause;
2301   }
2302   /* this should not happen */
2303   if (G_UNLIKELY (buf == NULL))
2304     goto null_buffer;
2305
2306   /* push events to close/start our segment before we push the buffer. */
2307   if (G_UNLIKELY (src->priv->close_segment)) {
2308     gst_pad_push_event (pad, src->priv->close_segment);
2309     src->priv->close_segment = NULL;
2310   }
2311   if (G_UNLIKELY (src->priv->start_segment)) {
2312     gst_pad_push_event (pad, src->priv->start_segment);
2313     src->priv->start_segment = NULL;
2314   }
2315
2316   GST_OBJECT_LOCK (src);
2317   /* take the tags */
2318   tags = src->priv->pending_tags;
2319   src->priv->pending_tags = NULL;
2320   GST_OBJECT_UNLOCK (src);
2321
2322   /* Push out pending tags if any */
2323   if (G_UNLIKELY (tags != NULL)) {
2324     for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
2325       GstEvent *ev = (GstEvent *) tmp->data;
2326       gst_pad_push_event (pad, ev);
2327     }
2328     g_list_free (tags);
2329   }
2330
2331   /* figure out the new position */
2332   switch (src->segment.format) {
2333     case GST_FORMAT_BYTES:
2334     {
2335       guint bufsize = GST_BUFFER_SIZE (buf);
2336
2337       /* we subtracted above for negative rates */
2338       if (src->segment.rate >= 0.0)
2339         position += bufsize;
2340       break;
2341     }
2342     case GST_FORMAT_TIME:
2343     {
2344       GstClockTime start, duration;
2345
2346       start = GST_BUFFER_TIMESTAMP (buf);
2347       duration = GST_BUFFER_DURATION (buf);
2348
2349       if (GST_CLOCK_TIME_IS_VALID (start))
2350         position = start;
2351       else
2352         position = src->segment.last_stop;
2353
2354       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2355         if (src->segment.rate >= 0.0)
2356           position += duration;
2357         else if (position > duration)
2358           position -= duration;
2359         else
2360           position = 0;
2361       }
2362       break;
2363     }
2364     case GST_FORMAT_DEFAULT:
2365       if (src->segment.rate >= 0.0)
2366         position = GST_BUFFER_OFFSET_END (buf);
2367       else
2368         position = GST_BUFFER_OFFSET (buf);
2369       break;
2370     default:
2371       position = -1;
2372       break;
2373   }
2374   if (position != -1) {
2375     if (src->segment.rate >= 0.0) {
2376       /* positive rate, check if we reached the stop */
2377       if (src->segment.stop != -1) {
2378         if (position >= src->segment.stop) {
2379           eos = TRUE;
2380           position = src->segment.stop;
2381         }
2382       }
2383     } else {
2384       /* negative rate, check if we reached the start. start is always set to
2385        * something different from -1 */
2386       if (position <= src->segment.start) {
2387         eos = TRUE;
2388         position = src->segment.start;
2389       }
2390       /* when going reverse, all buffers are DISCONT */
2391       src->priv->discont = TRUE;
2392     }
2393     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2394   }
2395
2396   if (G_UNLIKELY (src->priv->discont)) {
2397     buf = gst_buffer_make_metadata_writable (buf);
2398     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2399     src->priv->discont = FALSE;
2400   }
2401   GST_LIVE_UNLOCK (src);
2402
2403   ret = gst_pad_push (pad, buf);
2404   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2405     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2406         gst_flow_get_name (ret));
2407     goto pause;
2408   }
2409
2410   if (G_UNLIKELY (eos)) {
2411     GST_INFO_OBJECT (src, "pausing after end of segment");
2412     ret = GST_FLOW_UNEXPECTED;
2413     goto pause;
2414   }
2415
2416 done:
2417   return;
2418
2419   /* special cases */
2420 flushing:
2421   {
2422     GST_DEBUG_OBJECT (src, "we are flushing");
2423     GST_LIVE_UNLOCK (src);
2424     ret = GST_FLOW_WRONG_STATE;
2425     goto pause;
2426   }
2427 pause:
2428   {
2429     const gchar *reason = gst_flow_get_name (ret);
2430     GstEvent *event;
2431
2432     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2433     src->data.ABI.running = FALSE;
2434     gst_pad_pause_task (pad);
2435     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2436       if (ret == GST_FLOW_UNEXPECTED) {
2437         /* perform EOS logic */
2438         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
2439           GstMessage *message;
2440
2441           message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2442               src->segment.format, src->segment.last_stop);
2443           gst_message_set_seqnum (message, src->priv->seqnum);
2444           gst_element_post_message (GST_ELEMENT_CAST (src), message);
2445         } else {
2446           event = gst_event_new_eos ();
2447           gst_event_set_seqnum (event, src->priv->seqnum);
2448           gst_pad_push_event (pad, event);
2449           src->priv->last_sent_eos = TRUE;
2450         }
2451       } else {
2452         event = gst_event_new_eos ();
2453         gst_event_set_seqnum (event, src->priv->seqnum);
2454         /* for fatal errors we post an error message, post the error
2455          * first so the app knows about the error first. */
2456         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2457             (_("Internal data flow error.")),
2458             ("streaming task paused, reason %s (%d)", reason, ret));
2459         gst_pad_push_event (pad, event);
2460         src->priv->last_sent_eos = TRUE;
2461       }
2462     }
2463     goto done;
2464   }
2465 null_buffer:
2466   {
2467     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2468         (_("Internal data flow error.")), ("element returned NULL buffer"));
2469     GST_LIVE_UNLOCK (src);
2470     /* we finished the segment on error */
2471     ret = GST_FLOW_ERROR;
2472     goto done;
2473   }
2474 }
2475
2476 /* default negotiation code.
2477  *
2478  * Take intersection between src and sink pads, take first
2479  * caps and fixate.
2480  */
2481 static gboolean
2482 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2483 {
2484   GstCaps *thiscaps;
2485   GstCaps *caps = NULL;
2486   GstCaps *peercaps = NULL;
2487   gboolean result = FALSE;
2488
2489   /* first see what is possible on our source pad */
2490   thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2491   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2492   /* nothing or anything is allowed, we're done */
2493   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2494     goto no_nego_needed;
2495
2496   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2497     goto no_caps;
2498
2499   /* get the peer caps */
2500   peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2501   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2502   if (peercaps) {
2503     GstCaps *icaps;
2504
2505     /* get intersection */
2506     icaps = gst_caps_intersect (thiscaps, peercaps);
2507     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2508     gst_caps_unref (thiscaps);
2509     gst_caps_unref (peercaps);
2510     if (icaps) {
2511       /* take first (and best, since they are sorted) possibility */
2512       caps = gst_caps_copy_nth (icaps, 0);
2513       gst_caps_unref (icaps);
2514     }
2515   } else {
2516     /* no peer, work with our own caps then */
2517     caps = thiscaps;
2518   }
2519   if (caps) {
2520     caps = gst_caps_make_writable (caps);
2521     gst_caps_truncate (caps);
2522
2523     /* now fixate */
2524     if (!gst_caps_is_empty (caps)) {
2525       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2526       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2527
2528       if (gst_caps_is_any (caps)) {
2529         /* hmm, still anything, so element can do anything and
2530          * nego is not needed */
2531         result = TRUE;
2532       } else if (gst_caps_is_fixed (caps)) {
2533         /* yay, fixed caps, use those then, it's possible that the subclass does
2534          * not accept this caps after all and we have to fail. */
2535         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2536       }
2537     }
2538     gst_caps_unref (caps);
2539   } else {
2540     GST_DEBUG_OBJECT (basesrc, "no common caps");
2541   }
2542   return result;
2543
2544 no_nego_needed:
2545   {
2546     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2547     if (thiscaps)
2548       gst_caps_unref (thiscaps);
2549     return TRUE;
2550   }
2551 no_caps:
2552   {
2553     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2554         ("No supported formats found"),
2555         ("This element did not produce valid caps"));
2556     if (thiscaps)
2557       gst_caps_unref (thiscaps);
2558     return TRUE;
2559   }
2560 }
2561
2562 static gboolean
2563 gst_base_src_negotiate (GstBaseSrc * basesrc)
2564 {
2565   GstBaseSrcClass *bclass;
2566   gboolean result = TRUE;
2567
2568   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2569
2570   if (bclass->negotiate)
2571     result = bclass->negotiate (basesrc);
2572
2573   return result;
2574 }
2575
2576 static gboolean
2577 gst_base_src_start (GstBaseSrc * basesrc)
2578 {
2579   GstBaseSrcClass *bclass;
2580   gboolean result;
2581   guint64 size;
2582   gboolean seekable;
2583
2584   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2585     return TRUE;
2586
2587   GST_DEBUG_OBJECT (basesrc, "starting source");
2588
2589   basesrc->num_buffers_left = basesrc->num_buffers;
2590
2591   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2592   basesrc->data.ABI.running = FALSE;
2593
2594   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2595   if (bclass->start)
2596     result = bclass->start (basesrc);
2597   else
2598     result = TRUE;
2599
2600   if (!result)
2601     goto could_not_start;
2602
2603   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2604
2605   /* figure out the size */
2606   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2607     if (bclass->get_size) {
2608       if (!(result = bclass->get_size (basesrc, &size)))
2609         size = -1;
2610     } else {
2611       result = FALSE;
2612       size = -1;
2613     }
2614     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2615     /* only update the size when operating in bytes, subclass is supposed
2616      * to set duration in the start method for other formats */
2617     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2618   } else {
2619     size = -1;
2620   }
2621
2622   GST_DEBUG_OBJECT (basesrc,
2623       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2624       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2625       basesrc->segment.duration);
2626
2627   seekable = gst_base_src_seekable (basesrc);
2628   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2629
2630   /* update for random access flag */
2631   basesrc->random_access = seekable &&
2632       basesrc->segment.format == GST_FORMAT_BYTES;
2633
2634   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2635
2636   /* run typefind if we are random_access and the typefinding is enabled. */
2637   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2638     GstCaps *caps;
2639
2640     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2641       goto typefind_failed;
2642
2643     result = gst_pad_set_caps (basesrc->srcpad, caps);
2644     gst_caps_unref (caps);
2645   } else {
2646     /* use class or default negotiate function */
2647     if (!(result = gst_base_src_negotiate (basesrc)))
2648       goto could_not_negotiate;
2649   }
2650
2651   return result;
2652
2653   /* ERROR */
2654 could_not_start:
2655   {
2656     GST_DEBUG_OBJECT (basesrc, "could not start");
2657     /* subclass is supposed to post a message. We don't have to call _stop. */
2658     return FALSE;
2659   }
2660 could_not_negotiate:
2661   {
2662     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2663     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2664         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2665     /* we must call stop */
2666     gst_base_src_stop (basesrc);
2667     return FALSE;
2668   }
2669 typefind_failed:
2670   {
2671     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2672     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2673     /* we must call stop */
2674     gst_base_src_stop (basesrc);
2675     return FALSE;
2676   }
2677 }
2678
2679 static gboolean
2680 gst_base_src_stop (GstBaseSrc * basesrc)
2681 {
2682   GstBaseSrcClass *bclass;
2683   gboolean result = TRUE;
2684
2685   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2686     return TRUE;
2687
2688   GST_DEBUG_OBJECT (basesrc, "stopping source");
2689
2690   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2691   if (bclass->stop)
2692     result = bclass->stop (basesrc);
2693
2694   if (result)
2695     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2696
2697   return result;
2698 }
2699
2700 /* start or stop flushing dataprocessing
2701  */
2702 static gboolean
2703 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2704     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2705 {
2706   GstBaseSrcClass *bclass;
2707
2708   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2709
2710   if (flushing && unlock) {
2711     /* unlock any subclasses, we need to do this before grabbing the
2712      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2713      * unlock to the params because of backwards compat (see seek handler)*/
2714     if (bclass->unlock)
2715       bclass->unlock (basesrc);
2716   }
2717
2718   /* the live lock is released when we are blocked, waiting for playing or
2719    * when we sync to the clock. */
2720   GST_LIVE_LOCK (basesrc);
2721   if (playing)
2722     *playing = basesrc->live_running;
2723   basesrc->priv->flushing = flushing;
2724   if (flushing) {
2725     /* if we are locked in the live lock, signal it to make it flush */
2726     basesrc->live_running = TRUE;
2727
2728     /* clear pending EOS if any */
2729     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2730
2731     /* step 1, now that we have the LIVE lock, clear our unlock request */
2732     if (bclass->unlock_stop)
2733       bclass->unlock_stop (basesrc);
2734
2735     /* step 2, unblock clock sync (if any) or any other blocking thing */
2736     if (basesrc->clock_id)
2737       gst_clock_id_unschedule (basesrc->clock_id);
2738   } else {
2739     /* signal the live source that it can start playing */
2740     basesrc->live_running = live_play;
2741   }
2742   GST_LIVE_SIGNAL (basesrc);
2743   GST_LIVE_UNLOCK (basesrc);
2744
2745   return TRUE;
2746 }
2747
2748 /* the purpose of this function is to make sure that a live source blocks in the
2749  * LIVE lock or leaves the LIVE lock and continues playing. */
2750 static gboolean
2751 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2752 {
2753   GstBaseSrcClass *bclass;
2754
2755   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2756
2757   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2758   if (!live_play) {
2759     GST_DEBUG_OBJECT (basesrc, "unlock");
2760     if (bclass->unlock)
2761       bclass->unlock (basesrc);
2762   }
2763
2764   /* we are now able to grab the LIVE lock, when we get it, we can be
2765    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2766    * for the clock. */
2767   GST_LIVE_LOCK (basesrc);
2768   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2769
2770   /* unblock clock sync (if any) */
2771   if (basesrc->clock_id)
2772     gst_clock_id_unschedule (basesrc->clock_id);
2773
2774   /* configure what to do when we get to the LIVE lock. */
2775   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2776   basesrc->live_running = live_play;
2777
2778   if (live_play) {
2779     gboolean start;
2780
2781     /* clear our unlock request when going to PLAYING */
2782     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2783     if (bclass->unlock_stop)
2784       bclass->unlock_stop (basesrc);
2785
2786     /* for live sources we restart the timestamp correction */
2787     basesrc->priv->latency = -1;
2788     /* have to restart the task in case it stopped because of the unlock when
2789      * we went to PAUSED. Only do this if we operating in push mode. */
2790     GST_OBJECT_LOCK (basesrc->srcpad);
2791     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2792     GST_OBJECT_UNLOCK (basesrc->srcpad);
2793     if (start)
2794       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2795           basesrc->srcpad);
2796     GST_DEBUG_OBJECT (basesrc, "signal");
2797     GST_LIVE_SIGNAL (basesrc);
2798   }
2799   GST_LIVE_UNLOCK (basesrc);
2800
2801   return TRUE;
2802 }
2803
2804 static gboolean
2805 gst_base_src_activate_push (GstPad * pad, gboolean active)
2806 {
2807   GstBaseSrc *basesrc;
2808   GstEvent *event;
2809
2810   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2811
2812   /* prepare subclass first */
2813   if (active) {
2814     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2815
2816     if (G_UNLIKELY (!basesrc->can_activate_push))
2817       goto no_push_activation;
2818
2819     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2820       goto error_start;
2821
2822     basesrc->priv->last_sent_eos = FALSE;
2823     basesrc->priv->discont = TRUE;
2824     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2825
2826     /* do initial seek, which will start the task */
2827     GST_OBJECT_LOCK (basesrc);
2828     event = basesrc->data.ABI.pending_seek;
2829     basesrc->data.ABI.pending_seek = NULL;
2830     GST_OBJECT_UNLOCK (basesrc);
2831
2832     /* no need to unlock anything, the task is certainly
2833      * not running here. The perform seek code will start the task when
2834      * finished. */
2835     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2836       goto seek_failed;
2837
2838     if (event)
2839       gst_event_unref (event);
2840   } else {
2841     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2842     /* flush all */
2843     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2844     /* stop the task */
2845     gst_pad_stop_task (pad);
2846     /* now we can stop the source */
2847     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2848       goto error_stop;
2849   }
2850   return TRUE;
2851
2852   /* ERRORS */
2853 no_push_activation:
2854   {
2855     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2856     return FALSE;
2857   }
2858 error_start:
2859   {
2860     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2861     return FALSE;
2862   }
2863 seek_failed:
2864   {
2865     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2866     /* flush all */
2867     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2868     /* stop the task */
2869     gst_pad_stop_task (pad);
2870     /* Stop the basesrc */
2871     gst_base_src_stop (basesrc);
2872     if (event)
2873       gst_event_unref (event);
2874     return FALSE;
2875   }
2876 error_stop:
2877   {
2878     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2879     return FALSE;
2880   }
2881 }
2882
2883 static gboolean
2884 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2885 {
2886   GstBaseSrc *basesrc;
2887
2888   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2889
2890   /* prepare subclass first */
2891   if (active) {
2892     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2893     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2894       goto error_start;
2895
2896     /* if not random_access, we cannot operate in pull mode for now */
2897     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2898       goto no_get_range;
2899
2900     /* stop flushing now but for live sources, still block in the LIVE lock when
2901      * we are not yet PLAYING */
2902     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2903   } else {
2904     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2905     /* flush all, there is no task to stop */
2906     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2907
2908     /* don't send EOS when going from PAUSED => READY when in pull mode */
2909     basesrc->priv->last_sent_eos = TRUE;
2910
2911     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2912       goto error_stop;
2913   }
2914   return TRUE;
2915
2916   /* ERRORS */
2917 error_start:
2918   {
2919     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2920     return FALSE;
2921   }
2922 no_get_range:
2923   {
2924     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2925     gst_base_src_stop (basesrc);
2926     return FALSE;
2927   }
2928 error_stop:
2929   {
2930     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2931     return FALSE;
2932   }
2933 }
2934
2935 static GstStateChangeReturn
2936 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2937 {
2938   GstBaseSrc *basesrc;
2939   GstStateChangeReturn result;
2940   gboolean no_preroll = FALSE;
2941
2942   basesrc = GST_BASE_SRC (element);
2943
2944   switch (transition) {
2945     case GST_STATE_CHANGE_NULL_TO_READY:
2946       break;
2947     case GST_STATE_CHANGE_READY_TO_PAUSED:
2948       no_preroll = gst_base_src_is_live (basesrc);
2949       break;
2950     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2951       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
2952       if (gst_base_src_is_live (basesrc)) {
2953         /* now we can start playback */
2954         gst_base_src_set_playing (basesrc, TRUE);
2955       }
2956       break;
2957     default:
2958       break;
2959   }
2960
2961   if ((result =
2962           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2963               transition)) == GST_STATE_CHANGE_FAILURE)
2964     goto failure;
2965
2966   switch (transition) {
2967     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2968       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
2969       if (gst_base_src_is_live (basesrc)) {
2970         /* make sure we block in the live lock in PAUSED */
2971         gst_base_src_set_playing (basesrc, FALSE);
2972         no_preroll = TRUE;
2973       }
2974       break;
2975     case GST_STATE_CHANGE_PAUSED_TO_READY:
2976     {
2977       GstEvent **event_p, *event;
2978
2979       /* we don't need to unblock anything here, the pad deactivation code
2980        * already did this */
2981
2982       /* FIXME, deprecate this behaviour, it is very dangerous.
2983        * the prefered way of sending EOS downstream is by sending
2984        * the EOS event to the element */
2985       if (!basesrc->priv->last_sent_eos) {
2986         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2987         event = gst_event_new_eos ();
2988         gst_event_set_seqnum (event, basesrc->priv->seqnum);
2989         gst_pad_push_event (basesrc->srcpad, event);
2990         basesrc->priv->last_sent_eos = TRUE;
2991       }
2992       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2993       event_p = &basesrc->data.ABI.pending_seek;
2994       gst_event_replace (event_p, NULL);
2995       event_p = &basesrc->priv->close_segment;
2996       gst_event_replace (event_p, NULL);
2997       event_p = &basesrc->priv->start_segment;
2998       gst_event_replace (event_p, NULL);
2999       break;
3000     }
3001     case GST_STATE_CHANGE_READY_TO_NULL:
3002       break;
3003     default:
3004       break;
3005   }
3006
3007   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3008     result = GST_STATE_CHANGE_NO_PREROLL;
3009
3010   return result;
3011
3012   /* ERRORS */
3013 failure:
3014   {
3015     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3016     return result;
3017   }
3018 }