libs/gst/base/gstbasesrc.c: Make sending an EOS event to the basesrc non-blocking...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
55  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
56  * </para>
57  * <para>
58  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
59  * automatically seekable in push mode as well. The following conditions must 
60  * be met to make the element seekable in push mode when the format is not
61  * #GST_FORMAT_BYTES:
62  * <itemizedlist>
63  *   <listitem><para>
64  *     #GstBaseSrc::is_seekable returns %TRUE.
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::query can convert all supported seek formats to the
68  *     internal format as set with gst_base_src_set_format().
69  *   </para></listitem>
70  *   <listitem><para>
71  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
72  *   </para></listitem>
73  * </itemizedlist>
74  * </para>
75  * <para>
76  * When the element does not meet the requirements to operate in pull mode,
77  * the offset and length in the #GstBaseSrc::create method should be ignored.
78  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
79  * element can operate in pull mode but only with specific offsets and
80  * lengths, it is allowed to generate an error when the wrong values are passed
81  * to the #GstBaseSrc::create function.
82  * </para>
83  * <para>
84  * #GstBaseSrc has support for live sources. Live sources are sources that when
85  * paused discard data, such as audio or video capture devices. A typical live
86  * source also produces data at a fixed rate and thus provides a clock to publish
87  * this rate.
88  * Use gst_base_src_set_live() to activate the live source mode.
89  * </para>
90  * <para>
91  * A live source does not produce data in the PAUSED state. This means that the 
92  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
93  * To signal the pipeline that the element will not produce data, the return
94  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
95  * </para>
96  * <para>
97  * A typical live source will timestamp the buffers it creates with the 
98  * current running time of the pipeline. This is one reason why a live source
99  * can only produce data in the PLAYING state, when the clock is actually 
100  * distributed and running. 
101  * </para>
102  * <para>
103  * Live sources that synchronize and block on the clock (an audio source, for
104  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
105  * function was interrupted by a state change to PAUSED.
106  * </para>
107  * <para>
108  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
109  * sources.
110  * It only makes sense to implement the ::get_times function if the source is 
111  * a live source. The ::get_times function should return timestamps starting
112  * from 0, as if it were a non-live source. The base class will make sure that
113  * the timestamps are transformed into the current running_time.
114  * The base source will then wait for the calculated running_time before pushing
115  * out the buffer.
116  * </para>
117  * <para>
118  * For live sources, the base class will by default report a latency of 0.
119  * For pseudo live sources, the base class will by default measure the difference
120  * between the first buffer timestamp and the start time of get_times and will
121  * report this value as the latency. 
122  * Subclasses should override the query function when this behaviour is not
123  * acceptable.
124  * </para>
125  * <para>
126  * There is only support in #GstBaseSrc for exactly one source pad, which 
127  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
128  * should install a pad template in its class_init function, like so:
129  * </para>
130  * <para>
131  * <programlisting>
132  * static void
133  * my_element_class_init (GstMyElementClass *klass)
134  * {
135  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
136  *   // srctemplate should be a #GstStaticPadTemplate with direction
137  *   // #GST_PAD_SRC and name "src"
138  *   gst_element_class_add_pad_template (gstelement_class,
139  *       gst_static_pad_template_get (&amp;srctemplate));
140  *   // see #GstElementDetails
141  *   gst_element_class_set_details (gstelement_class, &amp;details);
142  * }
143  * </programlisting>
144  * </para>
145  * <title>Controlled shutdown of live sources in applications</title>
146  * <para>
147  * Applications that record from a live source may want to stop recording
148  * in a controlled way, so that the recording is stopped, but the data
149  * already in the pipeline is processed to the end (remember that many live
150  * sources would go on recording forever otherwise). For that to happen the
151  * application needs to make the source stop recording and send an EOS
152  * event down the pipeline. The application would then wait for an
153  * EOS message posted on the pipeline's bus to know when all data has
154  * been processed and the pipeline can safely be stopped.
155  * </para>
156  * <para>
157  * Since GStreamer 0.10.16 an application may send an EOS event to a source
158  * element to make it perform the EOS logic (send EOS event downstream or post a
159  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
160  * with the gst_element_send_event() function on the element or its parent bin.
161  * </para>
162  * <para>
163  * After the EOS has been sent to the element, the application should wait for
164  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
165  * received, it may safely shut down the entire pipeline.
166  * </para>
167  * <para>
168  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
169  * is still available but deprecated as it is dangerous and less flexible.
170  * </para>
171  * <para>
172  * Last reviewed on 2007-12-19 (0.10.16)
173  * </para>
174  * </refsect2>
175  */
176
177 #ifdef HAVE_CONFIG_H
178 #  include "config.h"
179 #endif
180
181 #include <stdlib.h>
182 #include <string.h>
183
184 #include "gstbasesrc.h"
185 #include "gsttypefindhelper.h"
186 #include <gst/gstmarshal.h>
187 #include <gst/gst-i18n-lib.h>
188
189 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
190 #define GST_CAT_DEFAULT gst_base_src_debug
191
192 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
193 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
194 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
195 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
196 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
197 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
198 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
199                                                                                 timeval)
200 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
201 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
202
203 /* BaseSrc signals and args */
204 enum
205 {
206   /* FILL ME */
207   LAST_SIGNAL
208 };
209
210 #define DEFAULT_BLOCKSIZE       4096
211 #define DEFAULT_NUM_BUFFERS     -1
212 #define DEFAULT_TYPEFIND        FALSE
213 #define DEFAULT_DO_TIMESTAMP    FALSE
214
215 enum
216 {
217   PROP_0,
218   PROP_BLOCKSIZE,
219   PROP_NUM_BUFFERS,
220   PROP_TYPEFIND,
221   PROP_DO_TIMESTAMP
222 };
223
224 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
225    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
226
227 struct _GstBaseSrcPrivate
228 {
229   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
230                                  * to avoid the sending of two EOS in some cases) */
231   gboolean discont;
232   gboolean flushing;
233
234   /* two segments to be sent in the streaming thread with STREAM_LOCK */
235   GstEvent *close_segment;
236   GstEvent *start_segment;
237
238   /* if EOS is pending (atomic) */
239   gint pending_eos;
240
241   /* startup latency is the time it takes between going to PLAYING and producing
242    * the first BUFFER with running_time 0. This value is included in the latency
243    * reporting. */
244   GstClockTime latency;
245   /* timestamp offset, this is the offset add to the values of gst_times for
246    * pseudo live sources */
247   GstClockTimeDiff ts_offset;
248
249   gboolean do_timestamp;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_base_init (gpointer g_class);
255 static void gst_base_src_class_init (GstBaseSrcClass * klass);
256 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
257 static void gst_base_src_finalize (GObject * object);
258
259
260 GType
261 gst_base_src_get_type (void)
262 {
263   static GType base_src_type = 0;
264
265   if (G_UNLIKELY (base_src_type == 0)) {
266     static const GTypeInfo base_src_info = {
267       sizeof (GstBaseSrcClass),
268       (GBaseInitFunc) gst_base_src_base_init,
269       NULL,
270       (GClassInitFunc) gst_base_src_class_init,
271       NULL,
272       NULL,
273       sizeof (GstBaseSrc),
274       0,
275       (GInstanceInitFunc) gst_base_src_init,
276     };
277
278     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
279         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
280   }
281   return base_src_type;
282 }
283 static GstCaps *gst_base_src_getcaps (GstPad * pad);
284 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
285 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
286
287 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
288 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
289 static void gst_base_src_set_property (GObject * object, guint prop_id,
290     const GValue * value, GParamSpec * pspec);
291 static void gst_base_src_get_property (GObject * object, guint prop_id,
292     GValue * value, GParamSpec * pspec);
293 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
294 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
295 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
296 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
297
298 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
299
300 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
301 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
302     GstSegment * segment);
303 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
304 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
305     GstEvent * event, GstSegment * segment);
306
307 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
308     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
309 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
310 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
311
312 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
313     GstStateChange transition);
314
315 static void gst_base_src_loop (GstPad * pad);
316 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
317 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
318 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
319     guint length, GstBuffer ** buf);
320 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
321     guint length, GstBuffer ** buf);
322
323 static void
324 gst_base_src_base_init (gpointer g_class)
325 {
326   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
327 }
328
329 static void
330 gst_base_src_class_init (GstBaseSrcClass * klass)
331 {
332   GObjectClass *gobject_class;
333   GstElementClass *gstelement_class;
334
335   gobject_class = G_OBJECT_CLASS (klass);
336   gstelement_class = GST_ELEMENT_CLASS (klass);
337
338   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
339
340   parent_class = g_type_class_peek_parent (klass);
341
342   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
343   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
344   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
345
346   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
347       g_param_spec_ulong ("blocksize", "Block size",
348           "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
349           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
351       g_param_spec_int ("num-buffers", "num-buffers",
352           "Number of buffers to output before sending EOS", -1, G_MAXINT,
353           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
354   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
355       g_param_spec_boolean ("typefind", "Typefind",
356           "Run typefind before negotiating", DEFAULT_TYPEFIND,
357           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
358   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
359       g_param_spec_boolean ("do-timestamp", "Do timestamp",
360           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
361           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362
363   gstelement_class->change_state =
364       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
365   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
366   gstelement_class->get_query_types =
367       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
368
369   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
370   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
371   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
372   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
373   klass->check_get_range =
374       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
375   klass->prepare_seek_segment =
376       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
377 }
378
379 static void
380 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
381 {
382   GstPad *pad;
383   GstPadTemplate *pad_template;
384
385   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
386
387   basesrc->is_live = FALSE;
388   basesrc->live_lock = g_mutex_new ();
389   basesrc->live_cond = g_cond_new ();
390   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
391   basesrc->num_buffers_left = -1;
392
393   basesrc->can_activate_push = TRUE;
394   basesrc->pad_mode = GST_ACTIVATE_NONE;
395
396   pad_template =
397       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
398   g_return_if_fail (pad_template != NULL);
399
400   GST_DEBUG_OBJECT (basesrc, "creating src pad");
401   pad = gst_pad_new_from_template (pad_template, "src");
402
403   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
404   gst_pad_set_activatepush_function (pad,
405       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
406   gst_pad_set_activatepull_function (pad,
407       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
408   gst_pad_set_event_function (pad,
409       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
410   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
411   gst_pad_set_checkgetrange_function (pad,
412       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
413   gst_pad_set_getrange_function (pad,
414       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
415   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
416   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
417   gst_pad_set_fixatecaps_function (pad,
418       GST_DEBUG_FUNCPTR (gst_base_src_fixate));
419
420   /* hold pointer to pad */
421   basesrc->srcpad = pad;
422   GST_DEBUG_OBJECT (basesrc, "adding src pad");
423   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
424
425   basesrc->blocksize = DEFAULT_BLOCKSIZE;
426   basesrc->clock_id = NULL;
427   /* we operate in BYTES by default */
428   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
429   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
430   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
431
432   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
433
434   GST_DEBUG_OBJECT (basesrc, "init done");
435 }
436
437 static void
438 gst_base_src_finalize (GObject * object)
439 {
440   GstBaseSrc *basesrc;
441   GstEvent **event_p;
442
443   basesrc = GST_BASE_SRC (object);
444
445   g_mutex_free (basesrc->live_lock);
446   g_cond_free (basesrc->live_cond);
447
448   event_p = &basesrc->data.ABI.pending_seek;
449   gst_event_replace (event_p, NULL);
450
451   G_OBJECT_CLASS (parent_class)->finalize (object);
452 }
453
454 /**
455  * gst_base_src_wait_playing:
456  * @src: the src
457  *
458  * If the #GstBaseSrcClass::create method performs its own synchronisation against
459  * the clock it must unblock when going from PLAYING to the PAUSED state and call
460  * this method before continuing to produce the remaining data.
461  *
462  * This function will block until a state change to PLAYING happens (in which
463  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
464  * to a state change to READY or a FLUSH event (in which case this function
465  * returns #GST_FLOW_WRONG_STATE).
466  *
467  * Since: 0.10.12
468  *
469  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
470  * continue. Any other return value should be returned from the create vmethod.
471  */
472 GstFlowReturn
473 gst_base_src_wait_playing (GstBaseSrc * src)
474 {
475   /* block until the state changes, or we get a flush, or something */
476   GST_DEBUG_OBJECT (src, "live source waiting for running state");
477   GST_LIVE_WAIT (src);
478   if (src->priv->flushing)
479     goto flushing;
480   GST_DEBUG_OBJECT (src, "live source unlocked");
481
482   return GST_FLOW_OK;
483
484   /* ERRORS */
485 flushing:
486   {
487     GST_DEBUG_OBJECT (src, "we are flushing");
488     return GST_FLOW_WRONG_STATE;
489   }
490 }
491
492 /**
493  * gst_base_src_set_live:
494  * @src: base source instance
495  * @live: new live-mode
496  *
497  * If the element listens to a live source, @live should
498  * be set to %TRUE. 
499  *
500  * A live source will not produce data in the PAUSED state and
501  * will therefore not be able to participate in the PREROLL phase
502  * of a pipeline. To signal this fact to the application and the 
503  * pipeline, the state change return value of the live source will
504  * be GST_STATE_CHANGE_NO_PREROLL.
505  */
506 void
507 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
508 {
509   GST_OBJECT_LOCK (src);
510   src->is_live = live;
511   GST_OBJECT_UNLOCK (src);
512 }
513
514 /**
515  * gst_base_src_is_live:
516  * @src: base source instance
517  *
518  * Check if an element is in live mode.
519  *
520  * Returns: %TRUE if element is in live mode.
521  */
522 gboolean
523 gst_base_src_is_live (GstBaseSrc * src)
524 {
525   gboolean result;
526
527   GST_OBJECT_LOCK (src);
528   result = src->is_live;
529   GST_OBJECT_UNLOCK (src);
530
531   return result;
532 }
533
534 /**
535  * gst_base_src_set_format:
536  * @src: base source instance
537  * @format: the format to use
538  *
539  * Sets the default format of the source. This will be the format used
540  * for sending NEW_SEGMENT events and for performing seeks.
541  *
542  * If a format of GST_FORMAT_BYTES is set, the element will be able to
543  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
544  *
545  * Since: 0.10.1
546  */
547 void
548 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
549 {
550   gst_segment_init (&src->segment, format);
551 }
552
553 /**
554  * gst_base_src_query_latency:
555  * @src: the source
556  * @live: if the source is live
557  * @min_latency: the min latency of the source
558  * @max_latency: the max latency of the source
559  *
560  * Query the source for the latency parameters. @live will be TRUE when @src is
561  * configured as a live source. @min_latency will be set to the difference
562  * between the running time and the timestamp of the first buffer.
563  * @max_latency is always the undefined value of -1.
564  *
565  * This function is mostly used by subclasses. 
566  *
567  * Returns: TRUE if the query succeeded.
568  *
569  * Since: 0.10.13
570  */
571 gboolean
572 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
573     GstClockTime * min_latency, GstClockTime * max_latency)
574 {
575   GstClockTime min;
576
577   GST_OBJECT_LOCK (src);
578   if (live)
579     *live = src->is_live;
580
581   /* if we have a startup latency, report this one, else report 0. Subclasses
582    * are supposed to override the query function if they want something
583    * else. */
584   if (src->priv->latency != -1)
585     min = src->priv->latency;
586   else
587     min = 0;
588
589   if (min_latency)
590     *min_latency = min;
591   if (max_latency)
592     *max_latency = -1;
593
594   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
595       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
596       GST_TIME_ARGS (-1));
597   GST_OBJECT_UNLOCK (src);
598
599   return TRUE;
600 }
601
602 /**
603  * gst_base_src_set_do_timestamp:
604  * @src: the source
605  * @timestamp: enable or disable timestamping
606  *
607  * Configure @src to automatically timestamp outgoing buffers based on the
608  * current running_time of the pipeline. This property is mostly useful for live
609  * sources.
610  *
611  * Since: 0.10.15
612  */
613 void
614 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
615 {
616   GST_OBJECT_LOCK (src);
617   src->priv->do_timestamp = timestamp;
618   GST_OBJECT_UNLOCK (src);
619 }
620
621 /**
622  * gst_base_src_get_do_timestamp:
623  * @src: the source
624  *
625  * Query if @src timestamps outgoing buffers based on the current running_time.
626  *
627  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
628  *
629  * Since: 0.10.15
630  */
631 gboolean
632 gst_base_src_get_do_timestamp (GstBaseSrc * src)
633 {
634   gboolean res;
635
636   GST_OBJECT_LOCK (src);
637   res = src->priv->do_timestamp;
638   GST_OBJECT_UNLOCK (src);
639
640   return res;
641 }
642
643 static gboolean
644 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
645 {
646   GstBaseSrcClass *bclass;
647   GstBaseSrc *bsrc;
648   gboolean res = TRUE;
649
650   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
651   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
652
653   if (bclass->set_caps)
654     res = bclass->set_caps (bsrc, caps);
655
656   return res;
657 }
658
659 static GstCaps *
660 gst_base_src_getcaps (GstPad * pad)
661 {
662   GstBaseSrcClass *bclass;
663   GstBaseSrc *bsrc;
664   GstCaps *caps = NULL;
665
666   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
667   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
668   if (bclass->get_caps)
669     caps = bclass->get_caps (bsrc);
670
671   if (caps == NULL) {
672     GstPadTemplate *pad_template;
673
674     pad_template =
675         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
676     if (pad_template != NULL) {
677       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
678     }
679   }
680   return caps;
681 }
682
683 static void
684 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
685 {
686   GstBaseSrcClass *bclass;
687   GstBaseSrc *bsrc;
688
689   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
690   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
691
692   if (bclass->fixate)
693     bclass->fixate (bsrc, caps);
694
695   gst_object_unref (bsrc);
696 }
697
698 static gboolean
699 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
700 {
701   gboolean res;
702
703   switch (GST_QUERY_TYPE (query)) {
704     case GST_QUERY_POSITION:
705     {
706       GstFormat format;
707
708       gst_query_parse_position (query, &format, NULL);
709       switch (format) {
710         case GST_FORMAT_PERCENT:
711         {
712           gint64 percent;
713           gint64 position;
714           gint64 duration;
715
716           position = src->segment.last_stop;
717           duration = src->segment.duration;
718
719           if (position != -1 && duration != -1) {
720             if (position < duration)
721               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
722                   duration);
723             else
724               percent = GST_FORMAT_PERCENT_MAX;
725           } else
726             percent = -1;
727
728           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
729           res = TRUE;
730           break;
731         }
732         default:
733         {
734           gint64 position;
735
736           position = src->segment.last_stop;
737
738           if (position != -1) {
739             /* convert to requested format */
740             res =
741                 gst_pad_query_convert (src->srcpad, src->segment.format,
742                 position, &format, &position);
743           } else
744             res = TRUE;
745
746           gst_query_set_position (query, format, position);
747           break;
748         }
749       }
750       break;
751     }
752     case GST_QUERY_DURATION:
753     {
754       GstFormat format;
755
756       gst_query_parse_duration (query, &format, NULL);
757
758       GST_DEBUG_OBJECT (src, "duration query in format %s",
759           gst_format_get_name (format));
760
761       switch (format) {
762         case GST_FORMAT_PERCENT:
763           gst_query_set_duration (query, GST_FORMAT_PERCENT,
764               GST_FORMAT_PERCENT_MAX);
765           res = TRUE;
766           break;
767         default:
768         {
769           gint64 duration;
770
771           /* this is the duration as configured by the subclass. */
772           duration = src->segment.duration;
773
774           if (duration != -1) {
775             /* convert to requested format, if this fails, we have a duration
776              * but we cannot answer the query, we must return FALSE. */
777             res =
778                 gst_pad_query_convert (src->srcpad, src->segment.format,
779                 duration, &format, &duration);
780           } else {
781             /* The subclass did not configure a duration, we assume that the
782              * media has an unknown duration then and we return TRUE to report
783              * this. Note that this is not the same as returning FALSE, which
784              * means that we cannot report the duration at all. */
785             res = TRUE;
786           }
787           gst_query_set_duration (query, format, duration);
788           break;
789         }
790       }
791       break;
792     }
793
794     case GST_QUERY_SEEKING:
795     {
796       gst_query_set_seeking (query, src->segment.format,
797           src->seekable, 0, src->segment.duration);
798       res = TRUE;
799       break;
800     }
801     case GST_QUERY_SEGMENT:
802     {
803       gint64 start, stop;
804
805       /* no end segment configured, current duration then */
806       if ((stop = src->segment.stop) == -1)
807         stop = src->segment.duration;
808       start = src->segment.start;
809
810       /* adjust to stream time */
811       if (src->segment.time != -1) {
812         start -= src->segment.time;
813         if (stop != -1)
814           stop -= src->segment.time;
815       }
816       gst_query_set_segment (query, src->segment.rate, src->segment.format,
817           start, stop);
818       res = TRUE;
819       break;
820     }
821
822     case GST_QUERY_FORMATS:
823     {
824       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
825           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
826       res = TRUE;
827       break;
828     }
829     case GST_QUERY_CONVERT:
830     {
831       GstFormat src_fmt, dest_fmt;
832       gint64 src_val, dest_val;
833
834       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
835
836       /* we can only convert between equal formats... */
837       if (src_fmt == dest_fmt) {
838         dest_val = src_val;
839         res = TRUE;
840       } else
841         res = FALSE;
842
843       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
844       break;
845     }
846     case GST_QUERY_LATENCY:
847     {
848       GstClockTime min, max;
849       gboolean live;
850
851       /* Subclasses should override and implement something usefull */
852       res = gst_base_src_query_latency (src, &live, &min, &max);
853
854       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
855           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
856           GST_TIME_ARGS (max));
857
858       gst_query_set_latency (query, live, min, max);
859       break;
860     }
861     case GST_QUERY_JITTER:
862     case GST_QUERY_RATE:
863       res = FALSE;
864       break;
865     case GST_QUERY_BUFFERING:
866     {
867       GstFormat format;
868       gint64 start, stop, estimated;
869
870       res = FALSE;
871
872       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
873
874       GST_DEBUG_OBJECT (src, "buffering query in format %s",
875           gst_format_get_name (format));
876
877       if (src->random_access) {
878         estimated = 0;
879         start = 0;
880         if (format == GST_FORMAT_PERCENT)
881           stop = GST_FORMAT_PERCENT_MAX;
882         else
883           stop = src->segment.duration;
884       } else {
885         estimated = -1;
886         start = -1;
887         stop = -1;
888       }
889       /* convert to required format. When the conversion fails, we can't answer
890        * the query. When the value is unknown, we can don't perform conversion
891        * but report TRUE. */
892       if (format != GST_FORMAT_PERCENT && stop != -1) {
893         res = gst_pad_query_convert (src->srcpad, src->segment.format,
894             stop, &format, &stop);
895       } else {
896         res = TRUE;
897       }
898       if (res && format != GST_FORMAT_PERCENT && start != -1)
899         res = gst_pad_query_convert (src->srcpad, src->segment.format,
900             start, &format, &start);
901
902       gst_query_set_buffering_range (query, format, start, stop, estimated);
903       break;
904     }
905     default:
906       res = FALSE;
907       break;
908   }
909   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
910       res);
911   return res;
912 }
913
914 static gboolean
915 gst_base_src_query (GstPad * pad, GstQuery * query)
916 {
917   GstBaseSrc *src;
918   GstBaseSrcClass *bclass;
919   gboolean result = FALSE;
920
921   src = GST_BASE_SRC (gst_pad_get_parent (pad));
922
923   bclass = GST_BASE_SRC_GET_CLASS (src);
924
925   if (bclass->query)
926     result = bclass->query (src, query);
927   else
928     result = gst_pad_query_default (pad, query);
929
930   gst_object_unref (src);
931
932   return result;
933 }
934
935 static gboolean
936 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
937 {
938   gboolean res = TRUE;
939
940   /* update our offset if the start/stop position was updated */
941   if (segment->format == GST_FORMAT_BYTES) {
942     segment->time = segment->start;
943   } else if (segment->start == 0) {
944     /* seek to start, we can implement a default for this. */
945     segment->time = 0;
946     res = TRUE;
947   } else
948     res = FALSE;
949
950   return res;
951 }
952
953 static gboolean
954 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
955 {
956   GstBaseSrcClass *bclass;
957   gboolean result = FALSE;
958
959   bclass = GST_BASE_SRC_GET_CLASS (src);
960
961   if (bclass->do_seek)
962     result = bclass->do_seek (src, segment);
963
964   return result;
965 }
966
967 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
968
969 static gboolean
970 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
971     GstSegment * segment)
972 {
973   /* By default, we try one of 2 things:
974    *   - For absolute seek positions, convert the requested position to our 
975    *     configured processing format and place it in the output segment \
976    *   - For relative seek positions, convert our current (input) values to the
977    *     seek format, adjust by the relative seek offset and then convert back to
978    *     the processing format
979    */
980   GstSeekType cur_type, stop_type;
981   gint64 cur, stop;
982   GstSeekFlags flags;
983   GstFormat seek_format, dest_format;
984   gdouble rate;
985   gboolean update;
986   gboolean res = TRUE;
987
988   gst_event_parse_seek (event, &rate, &seek_format, &flags,
989       &cur_type, &cur, &stop_type, &stop);
990   dest_format = segment->format;
991
992   if (seek_format == dest_format) {
993     gst_segment_set_seek (segment, rate, seek_format, flags,
994         cur_type, cur, stop_type, stop, &update);
995     return TRUE;
996   }
997
998   if (cur_type != GST_SEEK_TYPE_NONE) {
999     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1000     res =
1001         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1002         &cur);
1003     cur_type = GST_SEEK_TYPE_SET;
1004   }
1005
1006   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1007     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1008     res =
1009         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1010         &stop);
1011     stop_type = GST_SEEK_TYPE_SET;
1012   }
1013
1014   /* And finally, configure our output segment in the desired format */
1015   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1016       stop_type, stop, &update);
1017
1018   if (!res)
1019     goto no_format;
1020
1021   return res;
1022
1023 no_format:
1024   {
1025     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1026     return FALSE;
1027   }
1028 }
1029
1030 static gboolean
1031 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1032     GstSegment * seeksegment)
1033 {
1034   GstBaseSrcClass *bclass;
1035   gboolean result = FALSE;
1036
1037   bclass = GST_BASE_SRC_GET_CLASS (src);
1038
1039   if (bclass->prepare_seek_segment)
1040     result = bclass->prepare_seek_segment (src, event, seeksegment);
1041
1042   return result;
1043 }
1044
1045 /* this code implements the seeking. It is a good example
1046  * handling all cases.
1047  *
1048  * A seek updates the currently configured segment.start
1049  * and segment.stop values based on the SEEK_TYPE. If the
1050  * segment.start value is updated, a seek to this new position
1051  * should be performed.
1052  *
1053  * The seek can only be executed when we are not currently
1054  * streaming any data, to make sure that this is the case, we
1055  * acquire the STREAM_LOCK which is taken when we are in the
1056  * _loop() function or when a getrange() is called. Normally
1057  * we will not receive a seek if we are operating in pull mode
1058  * though. When we operate as a live source we might block on the live
1059  * cond, which does not release the STREAM_LOCK. Therefore we will try
1060  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1061  * safe to perform the seek.
1062  *
1063  * When we are in the loop() function, we might be in the middle
1064  * of pushing a buffer, which might block in a sink. To make sure
1065  * that the push gets unblocked we push out a FLUSH_START event.
1066  * Our loop function will get a WRONG_STATE return value from
1067  * the push and will pause, effectively releasing the STREAM_LOCK.
1068  *
1069  * For a non-flushing seek, we pause the task, which might eventually
1070  * release the STREAM_LOCK. We say eventually because when the sink
1071  * blocks on the sample we might wait a very long time until the sink
1072  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1073  * can continue the seek. A non-flushing seek is normally done in a 
1074  * running pipeline to perform seamless playback, this means that the sink is
1075  * PLAYING and will return from its chain function.
1076  * In the case of a non-flushing seek we need to make sure that the
1077  * data we output after the seek is continuous with the previous data,
1078  * this is because a non-flushing seek does not reset the running-time
1079  * to 0. We do this by closing the currently running segment, ie. sending
1080  * a new_segment event with the stop position set to the last processed 
1081  * position.
1082  *
1083  * After updating the segment.start/stop values, we prepare for
1084  * streaming again. We push out a FLUSH_STOP to make the peer pad
1085  * accept data again and we start our task again.
1086  *
1087  * A segment seek posts a message on the bus saying that the playback
1088  * of the segment started. We store the segment flag internally because
1089  * when we reach the segment.stop we have to post a segment.done
1090  * instead of EOS when doing a segment seek.
1091  */
1092 /* FIXME (0.11), we have the unlock gboolean here because most current 
1093  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1094  * the streaming thread isn't running, resulting in bogus unlocks later when it 
1095  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1096  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1097  * until 0.11
1098  */
1099 static gboolean
1100 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1101 {
1102   gboolean res = TRUE;
1103   gdouble rate;
1104   GstFormat seek_format, dest_format;
1105   GstSeekFlags flags;
1106   GstSeekType cur_type, stop_type;
1107   gint64 cur, stop;
1108   gboolean flush, playing;
1109   gboolean update;
1110   gboolean relative_seek = FALSE;
1111   gboolean seekseg_configured = FALSE;
1112   GstSegment seeksegment;
1113
1114   GST_DEBUG_OBJECT (src, "doing seek");
1115
1116   dest_format = src->segment.format;
1117
1118   if (event) {
1119     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1120         &cur_type, &cur, &stop_type, &stop);
1121
1122     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1123         SEEK_TYPE_IS_RELATIVE (stop_type);
1124
1125     if (dest_format != seek_format && !relative_seek) {
1126       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1127        * here before taking the stream lock, otherwise we must convert it later,
1128        * once we have the stream lock and can read the current position */
1129       gst_segment_init (&seeksegment, dest_format);
1130
1131       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1132         goto prepare_failed;
1133
1134       seekseg_configured = TRUE;
1135     }
1136
1137     flush = flags & GST_SEEK_FLAG_FLUSH;
1138   } else {
1139     flush = FALSE;
1140   }
1141
1142   /* send flush start */
1143   if (flush)
1144     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
1145   else
1146     gst_pad_pause_task (src->srcpad);
1147
1148   /* unblock streaming thread. */
1149   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1150
1151   /* grab streaming lock, this should eventually be possible, either
1152    * because the task is paused, our streaming thread stopped 
1153    * or because our peer is flushing. */
1154   GST_PAD_STREAM_LOCK (src->srcpad);
1155
1156   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1157
1158   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1159    * copy the current segment info into the temp segment that we can actually
1160    * attempt the seek with. We only update the real segment if the seek suceeds. */
1161   if (!seekseg_configured) {
1162     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1163
1164     /* now configure the final seek segment */
1165     if (event) {
1166       if (src->segment.format != seek_format) {
1167         /* OK, here's where we give the subclass a chance to convert the relative
1168          * seek into an absolute one in the processing format. We set up any
1169          * absolute seek above, before taking the stream lock. */
1170         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1171           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1172               "Aborting seek");
1173           res = FALSE;
1174         }
1175       } else {
1176         /* The seek format matches our processing format, no need to ask the
1177          * the subclass to configure the segment. */
1178         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1179             cur_type, cur, stop_type, stop, &update);
1180       }
1181     }
1182     /* Else, no seek event passed, so we're just (re)starting the 
1183        current segment. */
1184   }
1185
1186   if (res) {
1187     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1188         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1189         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1190
1191     /* do the seek, segment.last_stop contains the new position. */
1192     res = gst_base_src_do_seek (src, &seeksegment);
1193   }
1194
1195   /* and prepare to continue streaming */
1196   if (flush) {
1197     /* send flush stop, peer will accept data and events again. We
1198      * are not yet providing data as we still have the STREAM_LOCK. */
1199     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
1200   } else if (res && src->data.ABI.running) {
1201     /* we are running the current segment and doing a non-flushing seek, 
1202      * close the segment first based on the last_stop. */
1203     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1204         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1205
1206     /* queue the segment for sending in the stream thread */
1207     if (src->priv->close_segment)
1208       gst_event_unref (src->priv->close_segment);
1209     src->priv->close_segment =
1210         gst_event_new_new_segment_full (TRUE,
1211         src->segment.rate, src->segment.applied_rate, src->segment.format,
1212         src->segment.start, src->segment.last_stop, src->segment.time);
1213   }
1214
1215   /* The subclass must have converted the segment to the processing format 
1216    * by now */
1217   if (res && seeksegment.format != dest_format) {
1218     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1219         "in the correct format. Aborting seek.");
1220     res = FALSE;
1221   }
1222
1223   /* if successfull seek, we update our real segment and push
1224    * out the new segment. */
1225   if (res) {
1226     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1227
1228     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1229       gst_element_post_message (GST_ELEMENT (src),
1230           gst_message_new_segment_start (GST_OBJECT (src),
1231               src->segment.format, src->segment.last_stop));
1232     }
1233
1234     /* for deriving a stop position for the playback segment from the seek
1235      * segment, we must take the duration when the stop is not set */
1236     if ((stop = src->segment.stop) == -1)
1237       stop = src->segment.duration;
1238
1239     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1240         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1241
1242     /* now replace the old segment so that we send it in the stream thread the
1243      * next time it is scheduled. */
1244     if (src->priv->start_segment)
1245       gst_event_unref (src->priv->start_segment);
1246     if (src->segment.rate >= 0.0) {
1247       /* forward, we send data from last_stop to stop */
1248       src->priv->start_segment =
1249           gst_event_new_new_segment_full (FALSE,
1250           src->segment.rate, src->segment.applied_rate, src->segment.format,
1251           src->segment.last_stop, stop, src->segment.time);
1252     } else {
1253       /* reverse, we send data from last_stop to start */
1254       src->priv->start_segment =
1255           gst_event_new_new_segment_full (FALSE,
1256           src->segment.rate, src->segment.applied_rate, src->segment.format,
1257           src->segment.start, src->segment.last_stop, src->segment.time);
1258     }
1259   }
1260
1261   src->priv->discont = TRUE;
1262   src->data.ABI.running = TRUE;
1263   /* and restart the task in case it got paused explicitely or by
1264    * the FLUSH_START event we pushed out. */
1265   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1266       src->srcpad);
1267
1268   /* and release the lock again so we can continue streaming */
1269   GST_PAD_STREAM_UNLOCK (src->srcpad);
1270
1271   return res;
1272
1273   /* ERROR */
1274 prepare_failed:
1275   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1276       "Aborting seek");
1277   return FALSE;
1278 }
1279
1280 static const GstQueryType *
1281 gst_base_src_get_query_types (GstElement * element)
1282 {
1283   static const GstQueryType query_types[] = {
1284     GST_QUERY_DURATION,
1285     GST_QUERY_POSITION,
1286     GST_QUERY_SEEKING,
1287     GST_QUERY_SEGMENT,
1288     GST_QUERY_FORMATS,
1289     GST_QUERY_LATENCY,
1290     GST_QUERY_JITTER,
1291     GST_QUERY_RATE,
1292     GST_QUERY_CONVERT,
1293     0
1294   };
1295
1296   return query_types;
1297 }
1298
1299 /* all events send to this element directly. This is mainly done from the
1300  * application.
1301  */
1302 static gboolean
1303 gst_base_src_send_event (GstElement * element, GstEvent * event)
1304 {
1305   GstBaseSrc *src;
1306   gboolean result = FALSE;
1307
1308   src = GST_BASE_SRC (element);
1309
1310   GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
1311
1312   switch (GST_EVENT_TYPE (event)) {
1313       /* bidirectional events */
1314     case GST_EVENT_FLUSH_START:
1315     case GST_EVENT_FLUSH_STOP:
1316       /* sending random flushes downstream can break stuff,
1317        * especially sync since all segment info will get flushed */
1318       break;
1319
1320       /* downstream serialized events */
1321     case GST_EVENT_EOS:
1322     {
1323       GstBaseSrcClass *bclass;
1324
1325       bclass = GST_BASE_SRC_GET_CLASS (src);
1326
1327       /* queue EOS and make sure the task or pull function performs the EOS
1328        * actions. 
1329        *
1330        * We have two possibilities:
1331        *
1332        *  - Before we are to enter the _create function, we check the pending_eos
1333        *    first and do EOS instead of entering it.
1334        *  - If we are in the _create function or we did not manage to set the
1335        *    flag fast enough and we are about to enter the _create function,
1336        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1337        *    check the EOS flag and do the EOS logic.
1338        */
1339       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1340       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1341
1342       /* unlock the _create function so that we can check the pending_eos flag
1343        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1344        * that we can grab it and stop the unlock again. We don't take the stream
1345        * lock so that this operation is guaranteed to never block. */
1346       if (bclass->unlock)
1347         bclass->unlock (src);
1348
1349       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1350
1351       GST_LIVE_LOCK (src);
1352       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1353       /* now stop the unlock of the streaming thread again. Grabbing the live
1354        * lock is enough because that protects the create function. */
1355       if (bclass->unlock_stop)
1356         bclass->unlock_stop (src);
1357       GST_LIVE_UNLOCK (src);
1358
1359       result = TRUE;
1360       break;
1361     }
1362     case GST_EVENT_NEWSEGMENT:
1363       /* sending random NEWSEGMENT downstream can break sync. */
1364       break;
1365     case GST_EVENT_TAG:
1366       /* sending tags could be useful, FIXME insert in dataflow */
1367       break;
1368     case GST_EVENT_BUFFERSIZE:
1369       /* does not seem to make much sense currently */
1370       break;
1371
1372       /* upstream events */
1373     case GST_EVENT_QOS:
1374       /* elements should override send_event and do something */
1375       break;
1376     case GST_EVENT_SEEK:
1377     {
1378       gboolean started;
1379
1380       GST_OBJECT_LOCK (src->srcpad);
1381       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1382         goto wrong_mode;
1383       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1384       GST_OBJECT_UNLOCK (src->srcpad);
1385
1386       if (started) {
1387         /* when we are running in push mode, we can execute the
1388          * seek right now, we need to unlock. */
1389         result = gst_base_src_perform_seek (src, event, TRUE);
1390       } else {
1391         GstEvent **event_p;
1392
1393         /* else we store the event and execute the seek when we
1394          * get activated */
1395         GST_OBJECT_LOCK (src);
1396         event_p = &src->data.ABI.pending_seek;
1397         gst_event_replace ((GstEvent **) event_p, event);
1398         GST_OBJECT_UNLOCK (src);
1399         /* assume the seek will work */
1400         result = TRUE;
1401       }
1402       break;
1403     }
1404     case GST_EVENT_NAVIGATION:
1405       /* could make sense for elements that do something with navigation events
1406        * but then they would need to override the send_event function */
1407       break;
1408     case GST_EVENT_LATENCY:
1409       /* does not seem to make sense currently */
1410       break;
1411
1412       /* custom events */
1413     case GST_EVENT_CUSTOM_UPSTREAM:
1414       /* override send_event if you want this */
1415       break;
1416     case GST_EVENT_CUSTOM_DOWNSTREAM:
1417     case GST_EVENT_CUSTOM_BOTH:
1418       /* FIXME, insert event in the dataflow */
1419       break;
1420     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1421     case GST_EVENT_CUSTOM_BOTH_OOB:
1422       /* insert a random custom event into the pipeline */
1423       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1424       result = gst_pad_push_event (src->srcpad, event);
1425       /* we gave away the ref to the event in the push */
1426       event = NULL;
1427       break;
1428     default:
1429       break;
1430   }
1431 done:
1432   /* if we still have a ref to the event, unref it now */
1433   if (event)
1434     gst_event_unref (event);
1435
1436   return result;
1437
1438   /* ERRORS */
1439 wrong_mode:
1440   {
1441     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1442     GST_OBJECT_UNLOCK (src->srcpad);
1443     result = FALSE;
1444     goto done;
1445   }
1446 }
1447
1448 static gboolean
1449 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1450 {
1451   gboolean result;
1452
1453   switch (GST_EVENT_TYPE (event)) {
1454     case GST_EVENT_SEEK:
1455       /* is normally called when in push mode */
1456       if (!src->seekable)
1457         goto not_seekable;
1458
1459       result = gst_base_src_perform_seek (src, event, TRUE);
1460       break;
1461     case GST_EVENT_FLUSH_START:
1462       /* cancel any blocking getrange, is normally called
1463        * when in pull mode. */
1464       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1465       break;
1466     case GST_EVENT_FLUSH_STOP:
1467       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1468       break;
1469     default:
1470       result = TRUE;
1471       break;
1472   }
1473   return result;
1474
1475   /* ERRORS */
1476 not_seekable:
1477   {
1478     GST_DEBUG_OBJECT (src, "is not seekable");
1479     return FALSE;
1480   }
1481 }
1482
1483 static gboolean
1484 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1485 {
1486   GstBaseSrc *src;
1487   GstBaseSrcClass *bclass;
1488   gboolean result = FALSE;
1489
1490   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1491   bclass = GST_BASE_SRC_GET_CLASS (src);
1492
1493   if (bclass->event) {
1494     if (!(result = bclass->event (src, event)))
1495       goto subclass_failed;
1496   }
1497
1498 done:
1499   gst_event_unref (event);
1500   gst_object_unref (src);
1501
1502   return result;
1503
1504   /* ERRORS */
1505 subclass_failed:
1506   {
1507     GST_DEBUG_OBJECT (src, "subclass refused event");
1508     goto done;
1509   }
1510 }
1511
1512 static void
1513 gst_base_src_set_property (GObject * object, guint prop_id,
1514     const GValue * value, GParamSpec * pspec)
1515 {
1516   GstBaseSrc *src;
1517
1518   src = GST_BASE_SRC (object);
1519
1520   switch (prop_id) {
1521     case PROP_BLOCKSIZE:
1522       src->blocksize = g_value_get_ulong (value);
1523       break;
1524     case PROP_NUM_BUFFERS:
1525       src->num_buffers = g_value_get_int (value);
1526       break;
1527     case PROP_TYPEFIND:
1528       src->data.ABI.typefind = g_value_get_boolean (value);
1529       break;
1530     case PROP_DO_TIMESTAMP:
1531       src->priv->do_timestamp = g_value_get_boolean (value);
1532       break;
1533     default:
1534       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1535       break;
1536   }
1537 }
1538
1539 static void
1540 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1541     GParamSpec * pspec)
1542 {
1543   GstBaseSrc *src;
1544
1545   src = GST_BASE_SRC (object);
1546
1547   switch (prop_id) {
1548     case PROP_BLOCKSIZE:
1549       g_value_set_ulong (value, src->blocksize);
1550       break;
1551     case PROP_NUM_BUFFERS:
1552       g_value_set_int (value, src->num_buffers);
1553       break;
1554     case PROP_TYPEFIND:
1555       g_value_set_boolean (value, src->data.ABI.typefind);
1556       break;
1557     case PROP_DO_TIMESTAMP:
1558       g_value_set_boolean (value, src->priv->do_timestamp);
1559       break;
1560     default:
1561       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1562       break;
1563   }
1564 }
1565
1566 /* with STREAM_LOCK and LOCK */
1567 static GstClockReturn
1568 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1569 {
1570   GstClockReturn ret;
1571   GstClockID id;
1572
1573   id = gst_clock_new_single_shot_id (clock, time);
1574
1575   basesrc->clock_id = id;
1576   /* release the live lock while waiting */
1577   GST_LIVE_UNLOCK (basesrc);
1578
1579   ret = gst_clock_id_wait (id, NULL);
1580
1581   GST_LIVE_LOCK (basesrc);
1582   gst_clock_id_unref (id);
1583   basesrc->clock_id = NULL;
1584
1585   return ret;
1586 }
1587
1588 /* perform synchronisation on a buffer. 
1589  * with STREAM_LOCK.
1590  */
1591 static GstClockReturn
1592 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1593 {
1594   GstClockReturn result;
1595   GstClockTime start, end;
1596   GstBaseSrcClass *bclass;
1597   GstClockTime base_time;
1598   GstClock *clock;
1599   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1600   gboolean do_timestamp, first, pseudo_live;
1601
1602   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1603
1604   start = end = -1;
1605   if (bclass->get_times)
1606     bclass->get_times (basesrc, buffer, &start, &end);
1607
1608   /* get buffer timestamp */
1609   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1610
1611   /* grab the lock to prepare for clocking and calculate the startup 
1612    * latency. */
1613   GST_OBJECT_LOCK (basesrc);
1614
1615   /* if we are asked to sync against the clock we are a pseudo live element */
1616   pseudo_live = (start != -1 && basesrc->is_live);
1617   /* check for the first buffer */
1618   first = (basesrc->priv->latency == -1);
1619
1620   if (timestamp != -1 && pseudo_live) {
1621     GstClockTime latency;
1622
1623     /* we have a timestamp and a sync time, latency is the diff */
1624     if (timestamp <= start)
1625       latency = start - timestamp;
1626     else
1627       latency = 0;
1628
1629     if (first) {
1630       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1631           GST_TIME_ARGS (latency));
1632       /* first time we calculate latency, just configure */
1633       basesrc->priv->latency = latency;
1634     } else {
1635       if (basesrc->priv->latency != latency) {
1636         /* we have a new latency, FIXME post latency message */
1637         basesrc->priv->latency = latency;
1638         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1639             GST_TIME_ARGS (latency));
1640       }
1641     }
1642   } else if (first) {
1643     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1644         basesrc->is_live, start != -1);
1645     basesrc->priv->latency = 0;
1646   }
1647
1648   /* get clock, if no clock, we can't sync or do timestamps */
1649   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1650     goto no_clock;
1651
1652   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1653
1654   do_timestamp = basesrc->priv->do_timestamp;
1655
1656   /* first buffer, calculate the timestamp offset */
1657   if (first) {
1658     GstClockTime running_time;
1659
1660     now = gst_clock_get_time (clock);
1661     running_time = now - base_time;
1662
1663     GST_LOG_OBJECT (basesrc,
1664         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1665         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1666         GST_TIME_ARGS (running_time));
1667
1668     if (pseudo_live && timestamp != -1) {
1669       /* live source and we need to sync, add startup latency to all timestamps
1670        * to get the real running_time. Live sources should always timestamp
1671        * according to the current running time. */
1672       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1673
1674       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1675           GST_TIME_ARGS (basesrc->priv->ts_offset));
1676     } else {
1677       basesrc->priv->ts_offset = 0;
1678       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1679     }
1680
1681     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1682       if (do_timestamp)
1683         timestamp = running_time;
1684       else
1685         timestamp = 0;
1686
1687       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1688
1689       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1690           GST_TIME_ARGS (timestamp));
1691     }
1692
1693     /* add the timestamp offset we need for sync */
1694     timestamp += basesrc->priv->ts_offset;
1695   } else {
1696     /* not the first buffer, the timestamp is the diff between the clock and
1697      * base_time */
1698     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1699       now = gst_clock_get_time (clock);
1700
1701       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1702
1703       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1704           GST_TIME_ARGS (now - base_time));
1705     }
1706   }
1707
1708   /* if we don't have a buffer timestamp, we don't sync */
1709   if (!GST_CLOCK_TIME_IS_VALID (start))
1710     goto no_sync;
1711
1712   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1713     /* for pseudo live sources, add our ts_offset to the timestamp */
1714     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1715     start += basesrc->priv->ts_offset;
1716   }
1717
1718   GST_LOG_OBJECT (basesrc,
1719       "waiting for clock, base time %" GST_TIME_FORMAT
1720       ", stream_start %" GST_TIME_FORMAT,
1721       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1722   GST_OBJECT_UNLOCK (basesrc);
1723
1724   result = gst_base_src_wait (basesrc, clock, start + base_time);
1725
1726   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1727
1728   return result;
1729
1730   /* special cases */
1731 no_clock:
1732   {
1733     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1734     GST_OBJECT_UNLOCK (basesrc);
1735     return GST_CLOCK_OK;
1736   }
1737 no_sync:
1738   {
1739     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1740     GST_OBJECT_UNLOCK (basesrc);
1741     return GST_CLOCK_OK;
1742   }
1743 }
1744
1745 static gboolean
1746 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1747 {
1748   guint64 size, maxsize;
1749   GstBaseSrcClass *bclass;
1750
1751   bclass = GST_BASE_SRC_GET_CLASS (src);
1752
1753   /* only operate if we are working with bytes */
1754   if (src->segment.format != GST_FORMAT_BYTES)
1755     return TRUE;
1756
1757   /* get total file size */
1758   size = (guint64) src->segment.duration;
1759
1760   /* the max amount of bytes to read is the total size or
1761    * up to the segment.stop if present. */
1762   if (src->segment.stop != -1)
1763     maxsize = MIN (size, src->segment.stop);
1764   else
1765     maxsize = size;
1766
1767   GST_DEBUG_OBJECT (src,
1768       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1769       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1770       *length, size, src->segment.stop, maxsize);
1771
1772   /* check size if we have one */
1773   if (maxsize != -1) {
1774     /* if we run past the end, check if the file became bigger and 
1775      * retry. */
1776     if (G_UNLIKELY (offset + *length >= maxsize)) {
1777       /* see if length of the file changed */
1778       if (bclass->get_size)
1779         if (!bclass->get_size (src, &size))
1780           size = -1;
1781
1782       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1783
1784       /* make sure we don't exceed the configured segment stop
1785        * if it was set */
1786       if (src->segment.stop != -1)
1787         maxsize = MIN (size, src->segment.stop);
1788       else
1789         maxsize = size;
1790
1791       /* if we are at or past the end, EOS */
1792       if (G_UNLIKELY (offset >= maxsize))
1793         goto unexpected_length;
1794
1795       /* else we can clip to the end */
1796       if (G_UNLIKELY (offset + *length >= maxsize))
1797         *length = maxsize - offset;
1798
1799     }
1800   }
1801
1802   /* keep track of current position. segment is in bytes, we checked 
1803    * that above. */
1804   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1805
1806   return TRUE;
1807
1808   /* ERRORS */
1809 unexpected_length:
1810   {
1811     return FALSE;
1812   }
1813 }
1814
1815 /* must be called with LIVE_LOCK */
1816 static GstFlowReturn
1817 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1818     GstBuffer ** buf)
1819 {
1820   GstFlowReturn ret;
1821   GstBaseSrcClass *bclass;
1822   GstClockReturn status;
1823
1824   bclass = GST_BASE_SRC_GET_CLASS (src);
1825
1826   if (src->is_live) {
1827     while (G_UNLIKELY (!src->live_running)) {
1828       ret = gst_base_src_wait_playing (src);
1829       if (ret != GST_FLOW_OK)
1830         goto stopped;
1831     }
1832   }
1833
1834   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
1835     goto not_started;
1836
1837   if (G_UNLIKELY (!bclass->create))
1838     goto no_function;
1839
1840   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
1841     goto unexpected_length;
1842
1843   /* normally we don't count buffers */
1844   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
1845     if (src->num_buffers_left == 0)
1846       goto reached_num_buffers;
1847     else
1848       src->num_buffers_left--;
1849   }
1850
1851   /* don't enter the create function if a pending EOS event was set. For the
1852    * logic of the pending_eos, check the event function of this class. */
1853   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
1854     goto eos;
1855
1856   GST_DEBUG_OBJECT (src,
1857       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
1858       G_GINT64_FORMAT, offset, length, src->segment.time);
1859
1860   ret = bclass->create (src, offset, length, buf);
1861
1862   /* The create function could be unlocked because we have a pending EOS. It's
1863    * possible that we have a valid buffer from create that we need to
1864    * discard when the create function returned _OK. */
1865   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
1866     if (ret == GST_FLOW_OK) {
1867       gst_buffer_unref (*buf);
1868       *buf = NULL;
1869     }
1870     goto eos;
1871   }
1872
1873   if (G_UNLIKELY (ret != GST_FLOW_OK))
1874     goto not_ok;
1875
1876   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
1877   if (offset == 0 && src->segment.time == 0
1878       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1879     GST_BUFFER_TIMESTAMP (*buf) = 0;
1880
1881   /* set pad caps on the buffer if the buffer had no caps */
1882   if (GST_BUFFER_CAPS (*buf) == NULL)
1883     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
1884
1885   /* now sync before pushing the buffer */
1886   status = gst_base_src_do_sync (src, *buf);
1887
1888   /* waiting for the clock could have made us flushing */
1889   if (G_UNLIKELY (src->priv->flushing))
1890     goto flushing;
1891
1892   switch (status) {
1893     case GST_CLOCK_EARLY:
1894       /* the buffer is too late. We currently don't drop the buffer. */
1895       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1896       break;
1897     case GST_CLOCK_OK:
1898       /* buffer synchronised properly */
1899       GST_DEBUG_OBJECT (src, "buffer ok");
1900       break;
1901     case GST_CLOCK_UNSCHEDULED:
1902       /* this case is triggered when we were waiting for the clock and
1903        * it got unlocked because we did a state change. We return 
1904        * WRONG_STATE in this case to stop the dataflow also get rid of the
1905        * produced buffer. */
1906       GST_DEBUG_OBJECT (src,
1907           "clock was unscheduled (%d), returning WRONG_STATE", status);
1908       gst_buffer_unref (*buf);
1909       *buf = NULL;
1910       ret = GST_FLOW_WRONG_STATE;
1911       break;
1912     default:
1913       /* all other result values are unexpected and errors */
1914       GST_ELEMENT_ERROR (src, CORE, CLOCK,
1915           (_("Internal clock error.")),
1916           ("clock returned unexpected return value %d", status));
1917       gst_buffer_unref (*buf);
1918       *buf = NULL;
1919       ret = GST_FLOW_ERROR;
1920       break;
1921   }
1922   return ret;
1923
1924   /* ERROR */
1925 stopped:
1926   {
1927     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
1928         gst_flow_get_name (ret));
1929     return ret;
1930   }
1931 not_ok:
1932   {
1933     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
1934         gst_flow_get_name (ret));
1935     return ret;
1936   }
1937 not_started:
1938   {
1939     GST_DEBUG_OBJECT (src, "getrange but not started");
1940     return GST_FLOW_WRONG_STATE;
1941   }
1942 no_function:
1943   {
1944     GST_DEBUG_OBJECT (src, "no create function");
1945     return GST_FLOW_ERROR;
1946   }
1947 unexpected_length:
1948   {
1949     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1950         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1951     return GST_FLOW_UNEXPECTED;
1952   }
1953 reached_num_buffers:
1954   {
1955     GST_DEBUG_OBJECT (src, "sent all buffers");
1956     return GST_FLOW_UNEXPECTED;
1957   }
1958 flushing:
1959   {
1960     GST_DEBUG_OBJECT (src, "we are flushing");
1961     gst_buffer_unref (*buf);
1962     *buf = NULL;
1963     return GST_FLOW_WRONG_STATE;
1964   }
1965 eos:
1966   {
1967     GST_DEBUG_OBJECT (src, "we are EOS");
1968     return GST_FLOW_UNEXPECTED;
1969   }
1970 }
1971
1972 static GstFlowReturn
1973 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1974     GstBuffer ** buf)
1975 {
1976   GstBaseSrc *src;
1977   GstFlowReturn res;
1978
1979   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1980
1981   GST_LIVE_LOCK (src);
1982   if (G_UNLIKELY (src->priv->flushing))
1983     goto flushing;
1984
1985   res = gst_base_src_get_range (src, offset, length, buf);
1986
1987 done:
1988   GST_LIVE_UNLOCK (src);
1989
1990   gst_object_unref (src);
1991
1992   return res;
1993
1994   /* ERRORS */
1995 flushing:
1996   {
1997     GST_DEBUG_OBJECT (src, "we are flushing");
1998     res = GST_FLOW_WRONG_STATE;
1999     goto done;
2000   }
2001 }
2002
2003 static gboolean
2004 gst_base_src_default_check_get_range (GstBaseSrc * src)
2005 {
2006   gboolean res;
2007
2008   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2009     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2010     if (G_LIKELY (gst_base_src_start (src)))
2011       gst_base_src_stop (src);
2012   }
2013
2014   /* we can operate in getrange mode if the native format is bytes
2015    * and we are seekable, this condition is set in the random_access
2016    * flag and is set in the _start() method. */
2017   res = src->random_access;
2018
2019   return res;
2020 }
2021
2022 static gboolean
2023 gst_base_src_check_get_range (GstBaseSrc * src)
2024 {
2025   GstBaseSrcClass *bclass;
2026   gboolean res;
2027
2028   bclass = GST_BASE_SRC_GET_CLASS (src);
2029
2030   if (bclass->check_get_range == NULL)
2031     goto no_function;
2032
2033   res = bclass->check_get_range (src);
2034   GST_LOG_OBJECT (src, "%s() returned %d",
2035       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2036
2037   return res;
2038
2039   /* ERRORS */
2040 no_function:
2041   {
2042     GST_WARNING_OBJECT (src, "no check_get_range function set");
2043     return FALSE;
2044   }
2045 }
2046
2047 static gboolean
2048 gst_base_src_pad_check_get_range (GstPad * pad)
2049 {
2050   GstBaseSrc *src;
2051   gboolean res;
2052
2053   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2054
2055   res = gst_base_src_check_get_range (src);
2056
2057   return res;
2058 }
2059
2060 static void
2061 gst_base_src_loop (GstPad * pad)
2062 {
2063   GstBaseSrc *src;
2064   GstBuffer *buf = NULL;
2065   GstFlowReturn ret;
2066   gint64 position;
2067   gboolean eos;
2068   gulong blocksize;
2069
2070   eos = FALSE;
2071
2072   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2073
2074   GST_LIVE_LOCK (src);
2075
2076   if (G_UNLIKELY (src->priv->flushing))
2077     goto flushing;
2078
2079   src->priv->last_sent_eos = FALSE;
2080
2081   blocksize = src->blocksize;
2082
2083   /* if we operate in bytes, we can calculate an offset */
2084   if (src->segment.format == GST_FORMAT_BYTES) {
2085     position = src->segment.last_stop;
2086     /* for negative rates, start with subtracting the blocksize */
2087     if (src->segment.rate < 0.0) {
2088       /* we cannot go below segment.start */
2089       if (position > src->segment.start + blocksize)
2090         position -= blocksize;
2091       else {
2092         /* last block, remainder up to segment.start */
2093         blocksize = position - src->segment.start;
2094         position = src->segment.start;
2095       }
2096     }
2097   } else
2098     position = -1;
2099
2100   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2101   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2102     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2103         gst_flow_get_name (ret));
2104     GST_LIVE_UNLOCK (src);
2105     goto pause;
2106   }
2107   /* this should not happen */
2108   if (G_UNLIKELY (buf == NULL))
2109     goto null_buffer;
2110
2111   /* push events to close/start our segment before we push the buffer. */
2112   if (G_UNLIKELY (src->priv->close_segment)) {
2113     gst_pad_push_event (pad, src->priv->close_segment);
2114     src->priv->close_segment = NULL;
2115   }
2116   if (G_UNLIKELY (src->priv->start_segment)) {
2117     gst_pad_push_event (pad, src->priv->start_segment);
2118     src->priv->start_segment = NULL;
2119   }
2120
2121   /* figure out the new position */
2122   switch (src->segment.format) {
2123     case GST_FORMAT_BYTES:
2124     {
2125       guint bufsize = GST_BUFFER_SIZE (buf);
2126
2127       /* we subtracted above for negative rates */
2128       if (src->segment.rate >= 0.0)
2129         position += bufsize;
2130       break;
2131     }
2132     case GST_FORMAT_TIME:
2133     {
2134       GstClockTime start, duration;
2135
2136       start = GST_BUFFER_TIMESTAMP (buf);
2137       duration = GST_BUFFER_DURATION (buf);
2138
2139       if (GST_CLOCK_TIME_IS_VALID (start))
2140         position = start;
2141       else
2142         position = src->segment.last_stop;
2143
2144       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2145         if (src->segment.rate >= 0.0)
2146           position += duration;
2147         else if (position > duration)
2148           position -= duration;
2149         else
2150           position = 0;
2151       }
2152       break;
2153     }
2154     case GST_FORMAT_DEFAULT:
2155       if (src->segment.rate >= 0.0)
2156         position = GST_BUFFER_OFFSET_END (buf);
2157       else
2158         position = GST_BUFFER_OFFSET (buf);
2159       break;
2160     default:
2161       position = -1;
2162       break;
2163   }
2164   if (position != -1) {
2165     if (src->segment.rate >= 0.0) {
2166       /* positive rate, check if we reached the stop */
2167       if (src->segment.stop != -1) {
2168         if (position >= src->segment.stop) {
2169           eos = TRUE;
2170           position = src->segment.stop;
2171         }
2172       }
2173     } else {
2174       /* negative rate, check if we reached the start. start is always set to
2175        * something different from -1 */
2176       if (position <= src->segment.start) {
2177         eos = TRUE;
2178         position = src->segment.start;
2179       }
2180       /* when going reverse, all buffers are DISCONT */
2181       src->priv->discont = TRUE;
2182     }
2183     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2184   }
2185
2186   if (G_UNLIKELY (src->priv->discont)) {
2187     buf = gst_buffer_make_metadata_writable (buf);
2188     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2189     src->priv->discont = FALSE;
2190   }
2191   GST_LIVE_UNLOCK (src);
2192
2193   ret = gst_pad_push (pad, buf);
2194   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2195     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2196         gst_flow_get_name (ret));
2197     goto pause;
2198   }
2199
2200   if (G_UNLIKELY (eos)) {
2201     GST_INFO_OBJECT (src, "pausing after end of segment");
2202     ret = GST_FLOW_UNEXPECTED;
2203     goto pause;
2204   }
2205
2206 done:
2207   return;
2208
2209   /* special cases */
2210 flushing:
2211   {
2212     GST_DEBUG_OBJECT (src, "we are flushing");
2213     GST_LIVE_UNLOCK (src);
2214     ret = GST_FLOW_WRONG_STATE;
2215     goto pause;
2216   }
2217 pause:
2218   {
2219     const gchar *reason = gst_flow_get_name (ret);
2220
2221     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2222     src->data.ABI.running = FALSE;
2223     gst_pad_pause_task (pad);
2224     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2225       if (ret == GST_FLOW_UNEXPECTED) {
2226         /* perform EOS logic */
2227         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
2228           gst_element_post_message (GST_ELEMENT_CAST (src),
2229               gst_message_new_segment_done (GST_OBJECT_CAST (src),
2230                   src->segment.format, src->segment.last_stop));
2231         } else {
2232           gst_pad_push_event (pad, gst_event_new_eos ());
2233           src->priv->last_sent_eos = TRUE;
2234         }
2235       } else {
2236         /* for fatal errors we post an error message, post the error
2237          * first so the app knows about the error first. */
2238         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2239             (_("Internal data flow error.")),
2240             ("streaming task paused, reason %s (%d)", reason, ret));
2241         gst_pad_push_event (pad, gst_event_new_eos ());
2242         src->priv->last_sent_eos = TRUE;
2243       }
2244     }
2245     goto done;
2246   }
2247 null_buffer:
2248   {
2249     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2250         (_("Internal data flow error.")), ("element returned NULL buffer"));
2251     GST_LIVE_UNLOCK (src);
2252     /* we finished the segment on error */
2253     ret = GST_FLOW_ERROR;
2254     goto done;
2255   }
2256 }
2257
2258 /* default negotiation code. 
2259  *
2260  * Take intersection between src and sink pads, take first
2261  * caps and fixate. 
2262  */
2263 static gboolean
2264 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2265 {
2266   GstCaps *thiscaps;
2267   GstCaps *caps = NULL;
2268   GstCaps *peercaps = NULL;
2269   gboolean result = FALSE;
2270
2271   /* first see what is possible on our source pad */
2272   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
2273   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2274   /* nothing or anything is allowed, we're done */
2275   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2276     goto no_nego_needed;
2277
2278   /* get the peer caps */
2279   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
2280   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2281   if (peercaps) {
2282     GstCaps *icaps;
2283
2284     /* get intersection */
2285     icaps = gst_caps_intersect (thiscaps, peercaps);
2286     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2287     gst_caps_unref (thiscaps);
2288     gst_caps_unref (peercaps);
2289     if (icaps) {
2290       /* take first (and best, since they are sorted) possibility */
2291       caps = gst_caps_copy_nth (icaps, 0);
2292       gst_caps_unref (icaps);
2293     }
2294   } else {
2295     /* no peer, work with our own caps then */
2296     caps = thiscaps;
2297   }
2298   if (caps) {
2299     caps = gst_caps_make_writable (caps);
2300     gst_caps_truncate (caps);
2301
2302     /* now fixate */
2303     if (!gst_caps_is_empty (caps)) {
2304       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2305       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2306
2307       if (gst_caps_is_any (caps)) {
2308         /* hmm, still anything, so element can do anything and
2309          * nego is not needed */
2310         result = TRUE;
2311       } else if (gst_caps_is_fixed (caps)) {
2312         /* yay, fixed caps, use those then */
2313         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2314         result = TRUE;
2315       }
2316     }
2317     gst_caps_unref (caps);
2318   }
2319   return result;
2320
2321 no_nego_needed:
2322   {
2323     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2324     if (thiscaps)
2325       gst_caps_unref (thiscaps);
2326     return TRUE;
2327   }
2328 }
2329
2330 static gboolean
2331 gst_base_src_negotiate (GstBaseSrc * basesrc)
2332 {
2333   GstBaseSrcClass *bclass;
2334   gboolean result = TRUE;
2335
2336   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2337
2338   if (bclass->negotiate)
2339     result = bclass->negotiate (basesrc);
2340
2341   return result;
2342 }
2343
2344 static gboolean
2345 gst_base_src_start (GstBaseSrc * basesrc)
2346 {
2347   GstBaseSrcClass *bclass;
2348   gboolean result;
2349   guint64 size;
2350
2351   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2352     return TRUE;
2353
2354   GST_DEBUG_OBJECT (basesrc, "starting source");
2355
2356   basesrc->num_buffers_left = basesrc->num_buffers;
2357
2358   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2359   basesrc->data.ABI.running = FALSE;
2360
2361   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2362   if (bclass->start)
2363     result = bclass->start (basesrc);
2364   else
2365     result = TRUE;
2366
2367   if (!result)
2368     goto could_not_start;
2369
2370   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2371
2372   /* figure out the size */
2373   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2374     if (bclass->get_size) {
2375       if (!(result = bclass->get_size (basesrc, &size)))
2376         size = -1;
2377     } else {
2378       result = FALSE;
2379       size = -1;
2380     }
2381     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2382     /* only update the size when operating in bytes, subclass is supposed
2383      * to set duration in the start method for other formats */
2384     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2385   } else {
2386     size = -1;
2387   }
2388
2389   GST_DEBUG_OBJECT (basesrc,
2390       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2391       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2392       basesrc->segment.duration);
2393
2394   /* check if we can seek */
2395   if (bclass->is_seekable)
2396     basesrc->seekable = bclass->is_seekable (basesrc);
2397   else
2398     basesrc->seekable = FALSE;
2399
2400   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
2401
2402   /* update for random access flag */
2403   basesrc->random_access = basesrc->seekable &&
2404       basesrc->segment.format == GST_FORMAT_BYTES;
2405
2406   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2407
2408   /* run typefind if we are random_access and the typefinding is enabled. */
2409   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2410     GstCaps *caps;
2411
2412     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2413       goto typefind_failed;
2414
2415     gst_pad_set_caps (basesrc->srcpad, caps);
2416     gst_caps_unref (caps);
2417   } else {
2418     /* use class or default negotiate function */
2419     if (!gst_base_src_negotiate (basesrc))
2420       goto could_not_negotiate;
2421   }
2422
2423   return TRUE;
2424
2425   /* ERROR */
2426 could_not_start:
2427   {
2428     GST_DEBUG_OBJECT (basesrc, "could not start");
2429     /* subclass is supposed to post a message. We don't have to call _stop. */
2430     return FALSE;
2431   }
2432 could_not_negotiate:
2433   {
2434     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2435     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2436         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2437     /* we must call stop */
2438     gst_base_src_stop (basesrc);
2439     return FALSE;
2440   }
2441 typefind_failed:
2442   {
2443     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2444     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2445     /* we must call stop */
2446     gst_base_src_stop (basesrc);
2447     return FALSE;
2448   }
2449 }
2450
2451 static gboolean
2452 gst_base_src_stop (GstBaseSrc * basesrc)
2453 {
2454   GstBaseSrcClass *bclass;
2455   gboolean result = TRUE;
2456
2457   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2458     return TRUE;
2459
2460   GST_DEBUG_OBJECT (basesrc, "stopping source");
2461
2462   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2463   if (bclass->stop)
2464     result = bclass->stop (basesrc);
2465
2466   if (result)
2467     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2468
2469   return result;
2470 }
2471
2472 /* start or stop flushing dataprocessing 
2473  */
2474 static gboolean
2475 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2476     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2477 {
2478   GstBaseSrcClass *bclass;
2479
2480   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2481
2482   if (flushing && unlock) {
2483     /* unlock any subclasses, we need to do this before grabbing the
2484      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2485      * unlock to the params because of backwards compat (see seek handler)*/
2486     if (bclass->unlock)
2487       bclass->unlock (basesrc);
2488   }
2489
2490   /* the live lock is released when we are blocked, waiting for playing or
2491    * when we sync to the clock. */
2492   GST_LIVE_LOCK (basesrc);
2493   if (playing)
2494     *playing = basesrc->live_running;
2495   basesrc->priv->flushing = flushing;
2496   if (flushing) {
2497     /* if we are locked in the live lock, signal it to make it flush */
2498     basesrc->live_running = TRUE;
2499
2500     /* clear pending EOS if any */
2501     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2502
2503     /* step 1, now that we have the LIVE lock, clear our unlock request */
2504     if (bclass->unlock_stop)
2505       bclass->unlock_stop (basesrc);
2506
2507     /* step 2, unblock clock sync (if any) or any other blocking thing */
2508     if (basesrc->clock_id)
2509       gst_clock_id_unschedule (basesrc->clock_id);
2510   } else {
2511     /* signal the live source that it can start playing */
2512     basesrc->live_running = live_play;
2513   }
2514   GST_LIVE_SIGNAL (basesrc);
2515   GST_LIVE_UNLOCK (basesrc);
2516
2517   return TRUE;
2518 }
2519
2520 /* the purpose of this function is to make sure that a live source blocks in the
2521  * LIVE lock or leaves the LIVE lock and continues playing. */
2522 static gboolean
2523 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2524 {
2525   GstBaseSrcClass *bclass;
2526
2527   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2528
2529   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2530   if (!live_play) {
2531     GST_DEBUG_OBJECT (basesrc, "unlock");
2532     if (bclass->unlock)
2533       bclass->unlock (basesrc);
2534   }
2535
2536   /* we are now able to grab the LIVE lock, when we get it, we can be
2537    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2538    * for the clock. */
2539   GST_LIVE_LOCK (basesrc);
2540   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2541
2542   /* unblock clock sync (if any) */
2543   if (basesrc->clock_id)
2544     gst_clock_id_unschedule (basesrc->clock_id);
2545
2546   /* configure what to do when we get to the LIVE lock. */
2547   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2548   basesrc->live_running = live_play;
2549
2550   if (live_play) {
2551     gboolean start;
2552
2553     /* clear our unlock request when going to PLAYING */
2554     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2555     if (bclass->unlock_stop)
2556       bclass->unlock_stop (basesrc);
2557
2558     /* for live sources we restart the timestamp correction */
2559     basesrc->priv->latency = -1;
2560     /* have to restart the task in case it stopped because of the unlock when
2561      * we went to PAUSED. Only do this if we operating in push mode. */
2562     GST_OBJECT_LOCK (basesrc->srcpad);
2563     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2564     GST_OBJECT_UNLOCK (basesrc->srcpad);
2565     if (start)
2566       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2567           basesrc->srcpad);
2568     GST_DEBUG_OBJECT (basesrc, "signal");
2569     GST_LIVE_SIGNAL (basesrc);
2570   }
2571   GST_LIVE_UNLOCK (basesrc);
2572
2573   return TRUE;
2574 }
2575
2576 static gboolean
2577 gst_base_src_activate_push (GstPad * pad, gboolean active)
2578 {
2579   GstBaseSrc *basesrc;
2580   GstEvent *event;
2581
2582   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2583
2584   /* prepare subclass first */
2585   if (active) {
2586     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2587
2588     if (G_UNLIKELY (!basesrc->can_activate_push))
2589       goto no_push_activation;
2590
2591     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2592       goto error_start;
2593
2594     basesrc->priv->last_sent_eos = FALSE;
2595     basesrc->priv->discont = TRUE;
2596     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2597
2598     /* do initial seek, which will start the task */
2599     GST_OBJECT_LOCK (basesrc);
2600     event = basesrc->data.ABI.pending_seek;
2601     basesrc->data.ABI.pending_seek = NULL;
2602     GST_OBJECT_UNLOCK (basesrc);
2603
2604     /* no need to unlock anything, the task is certainly
2605      * not running here. The perform seek code will start the task when
2606      * finished. */
2607     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2608       goto seek_failed;
2609
2610     if (event)
2611       gst_event_unref (event);
2612   } else {
2613     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2614     /* flush all */
2615     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2616     /* stop the task */
2617     gst_pad_stop_task (pad);
2618     /* now we can stop the source */
2619     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2620       goto error_stop;
2621   }
2622   return TRUE;
2623
2624   /* ERRORS */
2625 no_push_activation:
2626   {
2627     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2628     return FALSE;
2629   }
2630 error_start:
2631   {
2632     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2633     return FALSE;
2634   }
2635 seek_failed:
2636   {
2637     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2638     gst_base_src_stop (basesrc);
2639     if (event)
2640       gst_event_unref (event);
2641     return FALSE;
2642   }
2643 error_stop:
2644   {
2645     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2646     return FALSE;
2647   }
2648 }
2649
2650 static gboolean
2651 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2652 {
2653   GstBaseSrc *basesrc;
2654
2655   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2656
2657   /* prepare subclass first */
2658   if (active) {
2659     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2660     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2661       goto error_start;
2662
2663     /* if not random_access, we cannot operate in pull mode for now */
2664     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2665       goto no_get_range;
2666
2667     /* stop flushing now but for live sources, still block in the LIVE lock when
2668      * we are not yet PLAYING */
2669     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2670   } else {
2671     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2672     /* flush all, there is no task to stop */
2673     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2674
2675     /* don't send EOS when going from PAUSED => READY when in pull mode */
2676     basesrc->priv->last_sent_eos = TRUE;
2677
2678     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2679       goto error_stop;
2680   }
2681   return TRUE;
2682
2683   /* ERRORS */
2684 error_start:
2685   {
2686     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2687     return FALSE;
2688   }
2689 no_get_range:
2690   {
2691     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2692     gst_base_src_stop (basesrc);
2693     return FALSE;
2694   }
2695 error_stop:
2696   {
2697     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2698     return FALSE;
2699   }
2700 }
2701
2702 static GstStateChangeReturn
2703 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2704 {
2705   GstBaseSrc *basesrc;
2706   GstStateChangeReturn result;
2707   gboolean no_preroll = FALSE;
2708
2709   basesrc = GST_BASE_SRC (element);
2710
2711   switch (transition) {
2712     case GST_STATE_CHANGE_NULL_TO_READY:
2713       break;
2714     case GST_STATE_CHANGE_READY_TO_PAUSED:
2715       no_preroll = gst_base_src_is_live (basesrc);
2716       break;
2717     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2718       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
2719       if (gst_base_src_is_live (basesrc)) {
2720         /* now we can start playback */
2721         gst_base_src_set_playing (basesrc, TRUE);
2722       }
2723       break;
2724     default:
2725       break;
2726   }
2727
2728   if ((result =
2729           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2730               transition)) == GST_STATE_CHANGE_FAILURE)
2731     goto failure;
2732
2733   switch (transition) {
2734     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2735       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
2736       if (gst_base_src_is_live (basesrc)) {
2737         /* make sure we block in the live lock in PAUSED */
2738         gst_base_src_set_playing (basesrc, FALSE);
2739         no_preroll = TRUE;
2740       }
2741       break;
2742     case GST_STATE_CHANGE_PAUSED_TO_READY:
2743     {
2744       GstEvent **event_p;
2745
2746       /* we don't need to unblock anything here, the pad deactivation code
2747        * already did this */
2748
2749       /* FIXME, deprecate this behaviour, it is very dangerous.
2750        * the prefered way of sending EOS downstream is by sending
2751        * the EOS event to the element */
2752       if (!basesrc->priv->last_sent_eos) {
2753         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2754         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
2755         basesrc->priv->last_sent_eos = TRUE;
2756       }
2757       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2758       event_p = &basesrc->data.ABI.pending_seek;
2759       gst_event_replace (event_p, NULL);
2760       event_p = &basesrc->priv->close_segment;
2761       gst_event_replace (event_p, NULL);
2762       event_p = &basesrc->priv->start_segment;
2763       gst_event_replace (event_p, NULL);
2764       break;
2765     }
2766     case GST_STATE_CHANGE_READY_TO_NULL:
2767       break;
2768     default:
2769       break;
2770   }
2771
2772   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
2773     result = GST_STATE_CHANGE_NO_PREROLL;
2774
2775   return result;
2776
2777   /* ERRORS */
2778 failure:
2779   {
2780     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
2781     return result;
2782   }
2783 }