basesrc: handle flush events on the element as well
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * Since GStreamer 0.10.16 an application may send an EOS event to a source
142  * element to make it perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163 #include <gst/glib-compat-private.h>
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gst-i18n-lib.h>
168
169 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
170 #define GST_CAT_DEFAULT gst_base_src_debug
171
172 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
173 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
174 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
177 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
178 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
179 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
180 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_TYPEFIND        FALSE
192 #define DEFAULT_DO_TIMESTAMP    FALSE
193
194 enum
195 {
196   PROP_0,
197   PROP_BLOCKSIZE,
198   PROP_NUM_BUFFERS,
199   PROP_TYPEFIND,
200   PROP_DO_TIMESTAMP
201 };
202
203 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
204    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
205
206 struct _GstBaseSrcPrivate
207 {
208   gboolean discont;
209   gboolean flushing;
210
211   GstFlowReturn start_result;
212   gboolean async;
213
214   /* if a stream-start event should be sent */
215   gboolean stream_start_pending;
216
217   /* if segment should be sent */
218   gboolean segment_pending;
219
220   /* if EOS is pending (atomic) */
221   gint pending_eos;
222
223   /* startup latency is the time it takes between going to PLAYING and producing
224    * the first BUFFER with running_time 0. This value is included in the latency
225    * reporting. */
226   GstClockTime latency;
227   /* timestamp offset, this is the offset add to the values of gst_times for
228    * pseudo live sources */
229   GstClockTimeDiff ts_offset;
230
231   gboolean do_timestamp;
232   volatile gint dynamic_size;
233
234   /* stream sequence number */
235   guint32 seqnum;
236
237   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
238    * pushed in the data stream */
239   GList *pending_events;
240   volatile gint have_events;
241
242   /* QoS *//* with LOCK */
243   gboolean qos_enabled;
244   gdouble proportion;
245   GstClockTime earliest_time;
246
247   GstBufferPool *pool;
248   GstAllocator *allocator;
249   GstAllocationParams params;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_class_init (GstBaseSrcClass * klass);
255 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
256 static void gst_base_src_finalize (GObject * object);
257
258
259 GType
260 gst_base_src_get_type (void)
261 {
262   static volatile gsize base_src_type = 0;
263
264   if (g_once_init_enter (&base_src_type)) {
265     GType _type;
266     static const GTypeInfo base_src_info = {
267       sizeof (GstBaseSrcClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_base_src_class_init,
271       NULL,
272       NULL,
273       sizeof (GstBaseSrc),
274       0,
275       (GInstanceInitFunc) gst_base_src_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_ELEMENT,
279         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
280     g_once_init_leave (&base_src_type, _type);
281   }
282   return base_src_type;
283 }
284
285 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
286     GstCaps * filter);
287 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
288 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
289
290 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
291 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
292     GstPadMode mode, gboolean active);
293 static void gst_base_src_set_property (GObject * object, guint prop_id,
294     const GValue * value, GParamSpec * pspec);
295 static void gst_base_src_get_property (GObject * object, guint prop_id,
296     GValue * value, GParamSpec * pspec);
297 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
298     GstEvent * event);
299 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
300 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
301
302 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
303     GstQuery * query);
304
305 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
306     gboolean active);
307 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
308 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
309     GstSegment * segment);
310 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
311 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
312     GstEvent * event, GstSegment * segment);
313 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
314     guint64 offset, guint size, GstBuffer ** buf);
315 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
316     guint64 offset, guint size, GstBuffer ** buf);
317 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
318     GstQuery * query);
319
320 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
321     gboolean flushing, gboolean live_play, gboolean * playing);
322
323 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
324 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
325
326 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
327     GstStateChange transition);
328
329 static void gst_base_src_loop (GstPad * pad);
330 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
331     guint64 offset, guint length, GstBuffer ** buf);
332 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
333     guint length, GstBuffer ** buf);
334 static gboolean gst_base_src_seekable (GstBaseSrc * src);
335 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
337     guint * length);
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
349
350   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
351
352   parent_class = g_type_class_peek_parent (klass);
353
354   gobject_class->finalize = gst_base_src_finalize;
355   gobject_class->set_property = gst_base_src_set_property;
356   gobject_class->get_property = gst_base_src_get_property;
357
358   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
359       g_param_spec_uint ("blocksize", "Block size",
360           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
361           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
363       g_param_spec_int ("num-buffers", "num-buffers",
364           "Number of buffers to output before sending EOS (-1 = unlimited)",
365           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
366           G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
368       g_param_spec_boolean ("typefind", "Typefind",
369           "Run typefind before negotiating", DEFAULT_TYPEFIND,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
372       g_param_spec_boolean ("do-timestamp", "Do timestamp",
373           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   gstelement_class->change_state =
377       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
378   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
379
380   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
381   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
382   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
383   klass->prepare_seek_segment =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
385   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
386   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
387   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
388   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
389   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
390   klass->decide_allocation =
391       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
392
393   /* Registering debug symbols for function pointers */
394   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
395   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
396   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
397   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
398   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
399 }
400
401 static void
402 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
403 {
404   GstPad *pad;
405   GstPadTemplate *pad_template;
406
407   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
408
409   basesrc->is_live = FALSE;
410   g_mutex_init (&basesrc->live_lock);
411   g_cond_init (&basesrc->live_cond);
412   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
413   basesrc->num_buffers_left = -1;
414
415   basesrc->can_activate_push = TRUE;
416
417   pad_template =
418       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
419   g_return_if_fail (pad_template != NULL);
420
421   GST_DEBUG_OBJECT (basesrc, "creating src pad");
422   pad = gst_pad_new_from_template (pad_template, "src");
423
424   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
425   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
426   gst_pad_set_event_function (pad, gst_base_src_event);
427   gst_pad_set_query_function (pad, gst_base_src_query);
428   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
429
430   /* hold pointer to pad */
431   basesrc->srcpad = pad;
432   GST_DEBUG_OBJECT (basesrc, "adding src pad");
433   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
434
435   basesrc->blocksize = DEFAULT_BLOCKSIZE;
436   basesrc->clock_id = NULL;
437   /* we operate in BYTES by default */
438   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
439   basesrc->typefind = DEFAULT_TYPEFIND;
440   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
441   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
442
443   basesrc->priv->start_result = GST_FLOW_FLUSHING;
444   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
445   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
446   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
447
448   GST_DEBUG_OBJECT (basesrc, "init done");
449 }
450
451 static void
452 gst_base_src_finalize (GObject * object)
453 {
454   GstBaseSrc *basesrc;
455   GstEvent **event_p;
456
457   basesrc = GST_BASE_SRC (object);
458
459   g_mutex_clear (&basesrc->live_lock);
460   g_cond_clear (&basesrc->live_cond);
461
462   event_p = &basesrc->pending_seek;
463   gst_event_replace (event_p, NULL);
464
465   if (basesrc->priv->pending_events) {
466     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
467         NULL);
468     g_list_free (basesrc->priv->pending_events);
469   }
470
471   G_OBJECT_CLASS (parent_class)->finalize (object);
472 }
473
474 /**
475  * gst_base_src_wait_playing:
476  * @src: the src
477  *
478  * If the #GstBaseSrcClass.create() method performs its own synchronisation
479  * against the clock it must unblock when going from PLAYING to the PAUSED state
480  * and call this method before continuing to produce the remaining data.
481  *
482  * This function will block until a state change to PLAYING happens (in which
483  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
484  * to a state change to READY or a FLUSH event (in which case this function
485  * returns #GST_FLOW_FLUSHING).
486  *
487  * Since: 0.10.12
488  *
489  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
490  * continue. Any other return value should be returned from the create vmethod.
491  */
492 GstFlowReturn
493 gst_base_src_wait_playing (GstBaseSrc * src)
494 {
495   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
496
497   do {
498     /* block until the state changes, or we get a flush, or something */
499     GST_DEBUG_OBJECT (src, "live source waiting for running state");
500     GST_LIVE_WAIT (src);
501     GST_DEBUG_OBJECT (src, "live source unlocked");
502     if (src->priv->flushing)
503       goto flushing;
504   } while (G_UNLIKELY (!src->live_running));
505
506   return GST_FLOW_OK;
507
508   /* ERRORS */
509 flushing:
510   {
511     GST_DEBUG_OBJECT (src, "we are flushing");
512     return GST_FLOW_FLUSHING;
513   }
514 }
515
516 /**
517  * gst_base_src_set_live:
518  * @src: base source instance
519  * @live: new live-mode
520  *
521  * If the element listens to a live source, @live should
522  * be set to %TRUE.
523  *
524  * A live source will not produce data in the PAUSED state and
525  * will therefore not be able to participate in the PREROLL phase
526  * of a pipeline. To signal this fact to the application and the
527  * pipeline, the state change return value of the live source will
528  * be GST_STATE_CHANGE_NO_PREROLL.
529  */
530 void
531 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
532 {
533   g_return_if_fail (GST_IS_BASE_SRC (src));
534
535   GST_OBJECT_LOCK (src);
536   src->is_live = live;
537   GST_OBJECT_UNLOCK (src);
538 }
539
540 /**
541  * gst_base_src_is_live:
542  * @src: base source instance
543  *
544  * Check if an element is in live mode.
545  *
546  * Returns: %TRUE if element is in live mode.
547  */
548 gboolean
549 gst_base_src_is_live (GstBaseSrc * src)
550 {
551   gboolean result;
552
553   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
554
555   GST_OBJECT_LOCK (src);
556   result = src->is_live;
557   GST_OBJECT_UNLOCK (src);
558
559   return result;
560 }
561
562 /**
563  * gst_base_src_set_format:
564  * @src: base source instance
565  * @format: the format to use
566  *
567  * Sets the default format of the source. This will be the format used
568  * for sending NEW_SEGMENT events and for performing seeks.
569  *
570  * If a format of GST_FORMAT_BYTES is set, the element will be able to
571  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
572  *
573  * This function must only be called in states < %GST_STATE_PAUSED.
574  *
575  * Since: 0.10.1
576  */
577 void
578 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
579 {
580   g_return_if_fail (GST_IS_BASE_SRC (src));
581   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
582
583   GST_OBJECT_LOCK (src);
584   gst_segment_init (&src->segment, format);
585   GST_OBJECT_UNLOCK (src);
586 }
587
588 /**
589  * gst_base_src_set_dynamic_size:
590  * @src: base source instance
591  * @dynamic: new dynamic size mode
592  *
593  * If not @dynamic, size is only updated when needed, such as when trying to
594  * read past current tracked size.  Otherwise, size is checked for upon each
595  * read.
596  *
597  * Since: 0.10.36
598  */
599 void
600 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
601 {
602   g_return_if_fail (GST_IS_BASE_SRC (src));
603
604   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
605 }
606
607 /**
608  * gst_base_src_set_async:
609  * @src: base source instance
610  * @async: new async mode
611  *
612  * Configure async behaviour in @src, no state change will block. The open,
613  * close, start, stop, play and pause virtual methods will be executed in a
614  * different thread and are thus allowed to perform blocking operations. Any
615  * blocking operation should be unblocked with the unlock vmethod.
616  */
617 void
618 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
619 {
620   g_return_if_fail (GST_IS_BASE_SRC (src));
621
622   GST_OBJECT_LOCK (src);
623   src->priv->async = async;
624   GST_OBJECT_UNLOCK (src);
625 }
626
627 /**
628  * gst_base_src_is_async:
629  * @src: base source instance
630  *
631  * Get the current async behaviour of @src. See also gst_base_src_set_async().
632  *
633  * Returns: %TRUE if @src is operating in async mode.
634  */
635 gboolean
636 gst_base_src_is_async (GstBaseSrc * src)
637 {
638   gboolean res;
639
640   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
641
642   GST_OBJECT_LOCK (src);
643   res = src->priv->async;
644   GST_OBJECT_UNLOCK (src);
645
646   return res;
647 }
648
649
650 /**
651  * gst_base_src_query_latency:
652  * @src: the source
653  * @live: (out) (allow-none): if the source is live
654  * @min_latency: (out) (allow-none): the min latency of the source
655  * @max_latency: (out) (allow-none): the max latency of the source
656  *
657  * Query the source for the latency parameters. @live will be TRUE when @src is
658  * configured as a live source. @min_latency will be set to the difference
659  * between the running time and the timestamp of the first buffer.
660  * @max_latency is always the undefined value of -1.
661  *
662  * This function is mostly used by subclasses.
663  *
664  * Returns: TRUE if the query succeeded.
665  *
666  * Since: 0.10.13
667  */
668 gboolean
669 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
670     GstClockTime * min_latency, GstClockTime * max_latency)
671 {
672   GstClockTime min;
673
674   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
675
676   GST_OBJECT_LOCK (src);
677   if (live)
678     *live = src->is_live;
679
680   /* if we have a startup latency, report this one, else report 0. Subclasses
681    * are supposed to override the query function if they want something
682    * else. */
683   if (src->priv->latency != -1)
684     min = src->priv->latency;
685   else
686     min = 0;
687
688   if (min_latency)
689     *min_latency = min;
690   if (max_latency)
691     *max_latency = -1;
692
693   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
694       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
695       GST_TIME_ARGS (-1));
696   GST_OBJECT_UNLOCK (src);
697
698   return TRUE;
699 }
700
701 /**
702  * gst_base_src_set_blocksize:
703  * @src: the source
704  * @blocksize: the new blocksize in bytes
705  *
706  * Set the number of bytes that @src will push out with each buffer. When
707  * @blocksize is set to -1, a default length will be used.
708  *
709  * Since: 0.10.22
710  */
711 void
712 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
713 {
714   g_return_if_fail (GST_IS_BASE_SRC (src));
715
716   GST_OBJECT_LOCK (src);
717   src->blocksize = blocksize;
718   GST_OBJECT_UNLOCK (src);
719 }
720
721 /**
722  * gst_base_src_get_blocksize:
723  * @src: the source
724  *
725  * Get the number of bytes that @src will push out with each buffer.
726  *
727  * Returns: the number of bytes pushed with each buffer.
728  *
729  * Since: 0.10.22
730  */
731 guint
732 gst_base_src_get_blocksize (GstBaseSrc * src)
733 {
734   gint res;
735
736   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
737
738   GST_OBJECT_LOCK (src);
739   res = src->blocksize;
740   GST_OBJECT_UNLOCK (src);
741
742   return res;
743 }
744
745
746 /**
747  * gst_base_src_set_do_timestamp:
748  * @src: the source
749  * @timestamp: enable or disable timestamping
750  *
751  * Configure @src to automatically timestamp outgoing buffers based on the
752  * current running_time of the pipeline. This property is mostly useful for live
753  * sources.
754  *
755  * Since: 0.10.15
756  */
757 void
758 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
759 {
760   g_return_if_fail (GST_IS_BASE_SRC (src));
761
762   GST_OBJECT_LOCK (src);
763   src->priv->do_timestamp = timestamp;
764   GST_OBJECT_UNLOCK (src);
765 }
766
767 /**
768  * gst_base_src_get_do_timestamp:
769  * @src: the source
770  *
771  * Query if @src timestamps outgoing buffers based on the current running_time.
772  *
773  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
774  *
775  * Since: 0.10.15
776  */
777 gboolean
778 gst_base_src_get_do_timestamp (GstBaseSrc * src)
779 {
780   gboolean res;
781
782   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
783
784   GST_OBJECT_LOCK (src);
785   res = src->priv->do_timestamp;
786   GST_OBJECT_UNLOCK (src);
787
788   return res;
789 }
790
791 /**
792  * gst_base_src_new_seamless_segment:
793  * @src: The source
794  * @start: The new start value for the segment
795  * @stop: Stop value for the new segment
796  * @position: The position value for the new segent
797  *
798  * Prepare a new seamless segment for emission downstream. This function must
799  * only be called by derived sub-classes, and only from the create() function,
800  * as the stream-lock needs to be held.
801  *
802  * The format for the new segment will be the current format of the source, as
803  * configured with gst_base_src_set_format()
804  *
805  * Returns: %TRUE if preparation of the seamless segment succeeded.
806  *
807  * Since: 0.10.26
808  */
809 gboolean
810 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
811     gint64 position)
812 {
813   gboolean res = TRUE;
814
815   GST_DEBUG_OBJECT (src,
816       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
817       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
818       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
819
820   GST_OBJECT_LOCK (src);
821
822   src->segment.base = gst_segment_to_running_time (&src->segment,
823       src->segment.format, src->segment.position);
824   src->segment.start = start;
825   src->segment.stop = stop;
826   src->segment.position = position;
827
828   /* forward, we send data from position to stop */
829   src->priv->segment_pending = TRUE;
830   GST_OBJECT_UNLOCK (src);
831
832   src->priv->discont = TRUE;
833   src->running = TRUE;
834
835   return res;
836 }
837
838 static gboolean
839 gst_base_src_send_stream_start (GstBaseSrc * src)
840 {
841   gboolean ret = TRUE;
842
843   if (src->priv->stream_start_pending) {
844     ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
845     src->priv->stream_start_pending = FALSE;
846   }
847
848   return ret;
849 }
850
851 /**
852  * gst_base_src_set_caps:
853  * @src: a #GstBaseSrc
854  * @caps: a #GstCaps
855  *
856  * Set new caps on the basesrc source pad.
857  *
858  * Returns: %TRUE if the caps could be set
859  */
860 gboolean
861 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
862 {
863   GstBaseSrcClass *bclass;
864   gboolean res = TRUE;
865
866   bclass = GST_BASE_SRC_GET_CLASS (src);
867
868   gst_base_src_send_stream_start (src);
869
870   if (bclass->set_caps)
871     res = bclass->set_caps (src, caps);
872   if (res)
873     res = gst_pad_set_caps (src->srcpad, caps);
874
875   return res;
876 }
877
878 static GstCaps *
879 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
880 {
881   GstCaps *caps = NULL;
882   GstPadTemplate *pad_template;
883   GstBaseSrcClass *bclass;
884
885   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
886
887   pad_template =
888       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
889
890   if (pad_template != NULL) {
891     caps = gst_pad_template_get_caps (pad_template);
892
893     if (filter) {
894       GstCaps *intersection;
895
896       intersection =
897           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
898       gst_caps_unref (caps);
899       caps = intersection;
900     }
901   }
902   return caps;
903 }
904
905 static GstCaps *
906 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
907 {
908   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
909   return gst_caps_fixate (caps);
910 }
911
912 static GstCaps *
913 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
914 {
915   GstBaseSrcClass *bclass;
916
917   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
918
919   if (bclass->fixate)
920     caps = bclass->fixate (bsrc, caps);
921
922   return caps;
923 }
924
925 static gboolean
926 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
927 {
928   gboolean res;
929
930   switch (GST_QUERY_TYPE (query)) {
931     case GST_QUERY_POSITION:
932     {
933       GstFormat format;
934
935       gst_query_parse_position (query, &format, NULL);
936
937       GST_DEBUG_OBJECT (src, "position query in format %s",
938           gst_format_get_name (format));
939
940       switch (format) {
941         case GST_FORMAT_PERCENT:
942         {
943           gint64 percent;
944           gint64 position;
945           gint64 duration;
946
947           GST_OBJECT_LOCK (src);
948           position = src->segment.position;
949           duration = src->segment.duration;
950           GST_OBJECT_UNLOCK (src);
951
952           if (position != -1 && duration != -1) {
953             if (position < duration)
954               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
955                   duration);
956             else
957               percent = GST_FORMAT_PERCENT_MAX;
958           } else
959             percent = -1;
960
961           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
962           res = TRUE;
963           break;
964         }
965         default:
966         {
967           gint64 position;
968           GstFormat seg_format;
969
970           GST_OBJECT_LOCK (src);
971           position =
972               gst_segment_to_stream_time (&src->segment, src->segment.format,
973               src->segment.position);
974           seg_format = src->segment.format;
975           GST_OBJECT_UNLOCK (src);
976
977           if (position != -1) {
978             /* convert to requested format */
979             res =
980                 gst_pad_query_convert (src->srcpad, seg_format,
981                 position, format, &position);
982           } else
983             res = TRUE;
984
985           gst_query_set_position (query, format, position);
986           break;
987         }
988       }
989       break;
990     }
991     case GST_QUERY_DURATION:
992     {
993       GstFormat format;
994
995       gst_query_parse_duration (query, &format, NULL);
996
997       GST_DEBUG_OBJECT (src, "duration query in format %s",
998           gst_format_get_name (format));
999
1000       switch (format) {
1001         case GST_FORMAT_PERCENT:
1002           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1003               GST_FORMAT_PERCENT_MAX);
1004           res = TRUE;
1005           break;
1006         default:
1007         {
1008           gint64 duration;
1009           GstFormat seg_format;
1010           guint length = 0;
1011
1012           /* may have to refresh duration */
1013           if (g_atomic_int_get (&src->priv->dynamic_size))
1014             gst_base_src_update_length (src, 0, &length);
1015
1016           /* this is the duration as configured by the subclass. */
1017           GST_OBJECT_LOCK (src);
1018           duration = src->segment.duration;
1019           seg_format = src->segment.format;
1020           GST_OBJECT_UNLOCK (src);
1021
1022           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1023               duration, gst_format_get_name (seg_format));
1024
1025           if (duration != -1) {
1026             /* convert to requested format, if this fails, we have a duration
1027              * but we cannot answer the query, we must return FALSE. */
1028             res =
1029                 gst_pad_query_convert (src->srcpad, seg_format,
1030                 duration, format, &duration);
1031           } else {
1032             /* The subclass did not configure a duration, we assume that the
1033              * media has an unknown duration then and we return TRUE to report
1034              * this. Note that this is not the same as returning FALSE, which
1035              * means that we cannot report the duration at all. */
1036             res = TRUE;
1037           }
1038           gst_query_set_duration (query, format, duration);
1039           break;
1040         }
1041       }
1042       break;
1043     }
1044
1045     case GST_QUERY_SEEKING:
1046     {
1047       GstFormat format, seg_format;
1048       gint64 duration;
1049
1050       GST_OBJECT_LOCK (src);
1051       duration = src->segment.duration;
1052       seg_format = src->segment.format;
1053       GST_OBJECT_UNLOCK (src);
1054
1055       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1056       if (format == seg_format) {
1057         gst_query_set_seeking (query, seg_format,
1058             gst_base_src_seekable (src), 0, duration);
1059         res = TRUE;
1060       } else {
1061         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1062         /* Don't reply to the query to make up for demuxers which don't
1063          * handle the SEEKING query yet. Players like Totem will fall back
1064          * to the duration when the SEEKING query isn't answered. */
1065         res = FALSE;
1066       }
1067       break;
1068     }
1069     case GST_QUERY_SEGMENT:
1070     {
1071       gint64 start, stop;
1072
1073       GST_OBJECT_LOCK (src);
1074       /* no end segment configured, current duration then */
1075       if ((stop = src->segment.stop) == -1)
1076         stop = src->segment.duration;
1077       start = src->segment.start;
1078
1079       /* adjust to stream time */
1080       if (src->segment.time != -1) {
1081         start -= src->segment.time;
1082         if (stop != -1)
1083           stop -= src->segment.time;
1084       }
1085
1086       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1087           start, stop);
1088       GST_OBJECT_UNLOCK (src);
1089       res = TRUE;
1090       break;
1091     }
1092
1093     case GST_QUERY_FORMATS:
1094     {
1095       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1096           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1097       res = TRUE;
1098       break;
1099     }
1100     case GST_QUERY_CONVERT:
1101     {
1102       GstFormat src_fmt, dest_fmt;
1103       gint64 src_val, dest_val;
1104
1105       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1106
1107       /* we can only convert between equal formats... */
1108       if (src_fmt == dest_fmt) {
1109         dest_val = src_val;
1110         res = TRUE;
1111       } else
1112         res = FALSE;
1113
1114       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1115       break;
1116     }
1117     case GST_QUERY_LATENCY:
1118     {
1119       GstClockTime min, max;
1120       gboolean live;
1121
1122       /* Subclasses should override and implement something useful */
1123       res = gst_base_src_query_latency (src, &live, &min, &max);
1124
1125       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1126           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1127           GST_TIME_ARGS (max));
1128
1129       gst_query_set_latency (query, live, min, max);
1130       break;
1131     }
1132     case GST_QUERY_JITTER:
1133     case GST_QUERY_RATE:
1134       res = FALSE;
1135       break;
1136     case GST_QUERY_BUFFERING:
1137     {
1138       GstFormat format, seg_format;
1139       gint64 start, stop, estimated;
1140
1141       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1142
1143       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1144           gst_format_get_name (format));
1145
1146       GST_OBJECT_LOCK (src);
1147       if (src->random_access) {
1148         estimated = 0;
1149         start = 0;
1150         if (format == GST_FORMAT_PERCENT)
1151           stop = GST_FORMAT_PERCENT_MAX;
1152         else
1153           stop = src->segment.duration;
1154       } else {
1155         estimated = -1;
1156         start = -1;
1157         stop = -1;
1158       }
1159       seg_format = src->segment.format;
1160       GST_OBJECT_UNLOCK (src);
1161
1162       /* convert to required format. When the conversion fails, we can't answer
1163        * the query. When the value is unknown, we can don't perform conversion
1164        * but report TRUE. */
1165       if (format != GST_FORMAT_PERCENT && stop != -1) {
1166         res = gst_pad_query_convert (src->srcpad, seg_format,
1167             stop, format, &stop);
1168       } else {
1169         res = TRUE;
1170       }
1171       if (res && format != GST_FORMAT_PERCENT && start != -1)
1172         res = gst_pad_query_convert (src->srcpad, seg_format,
1173             start, format, &start);
1174
1175       gst_query_set_buffering_range (query, format, start, stop, estimated);
1176       break;
1177     }
1178     case GST_QUERY_SCHEDULING:
1179     {
1180       gboolean random_access;
1181
1182       random_access = gst_base_src_is_random_access (src);
1183
1184       /* we can operate in getrange mode if the native format is bytes
1185        * and we are seekable, this condition is set in the random_access
1186        * flag and is set in the _start() method. */
1187       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1188       if (random_access)
1189         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1190       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1191
1192       res = TRUE;
1193       break;
1194     }
1195     case GST_QUERY_CAPS:
1196     {
1197       GstBaseSrcClass *bclass;
1198       GstCaps *caps, *filter;
1199
1200       bclass = GST_BASE_SRC_GET_CLASS (src);
1201       if (bclass->get_caps) {
1202         gst_query_parse_caps (query, &filter);
1203         if ((caps = bclass->get_caps (src, filter))) {
1204           gst_query_set_caps_result (query, caps);
1205           gst_caps_unref (caps);
1206           res = TRUE;
1207         } else {
1208           res = FALSE;
1209         }
1210       } else
1211         res = FALSE;
1212       break;
1213     }
1214     default:
1215       res = FALSE;
1216       break;
1217   }
1218   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1219       res);
1220
1221   return res;
1222 }
1223
1224 static gboolean
1225 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1226 {
1227   GstBaseSrc *src;
1228   GstBaseSrcClass *bclass;
1229   gboolean result = FALSE;
1230
1231   src = GST_BASE_SRC (parent);
1232   bclass = GST_BASE_SRC_GET_CLASS (src);
1233
1234   if (bclass->query)
1235     result = bclass->query (src, query);
1236
1237   return result;
1238 }
1239
1240 static gboolean
1241 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1242 {
1243   gboolean res = TRUE;
1244
1245   /* update our offset if the start/stop position was updated */
1246   if (segment->format == GST_FORMAT_BYTES) {
1247     segment->time = segment->start;
1248   } else if (segment->start == 0) {
1249     /* seek to start, we can implement a default for this. */
1250     segment->time = 0;
1251   } else {
1252     res = FALSE;
1253     GST_INFO_OBJECT (src, "Can't do a default seek");
1254   }
1255
1256   return res;
1257 }
1258
1259 static gboolean
1260 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1261 {
1262   GstBaseSrcClass *bclass;
1263   gboolean result = FALSE;
1264
1265   bclass = GST_BASE_SRC_GET_CLASS (src);
1266
1267   if (bclass->do_seek)
1268     result = bclass->do_seek (src, segment);
1269
1270   return result;
1271 }
1272
1273 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1274
1275 static gboolean
1276 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1277     GstSegment * segment)
1278 {
1279   /* By default, we try one of 2 things:
1280    *   - For absolute seek positions, convert the requested position to our
1281    *     configured processing format and place it in the output segment \
1282    *   - For relative seek positions, convert our current (input) values to the
1283    *     seek format, adjust by the relative seek offset and then convert back to
1284    *     the processing format
1285    */
1286   GstSeekType cur_type, stop_type;
1287   gint64 cur, stop;
1288   GstSeekFlags flags;
1289   GstFormat seek_format, dest_format;
1290   gdouble rate;
1291   gboolean update;
1292   gboolean res = TRUE;
1293
1294   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1295       &cur_type, &cur, &stop_type, &stop);
1296   dest_format = segment->format;
1297
1298   if (seek_format == dest_format) {
1299     gst_segment_do_seek (segment, rate, seek_format, flags,
1300         cur_type, cur, stop_type, stop, &update);
1301     return TRUE;
1302   }
1303
1304   if (cur_type != GST_SEEK_TYPE_NONE) {
1305     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1306     res =
1307         gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format,
1308         &cur);
1309     cur_type = GST_SEEK_TYPE_SET;
1310   }
1311
1312   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1313     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1314     res =
1315         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1316         &stop);
1317     stop_type = GST_SEEK_TYPE_SET;
1318   }
1319
1320   /* And finally, configure our output segment in the desired format */
1321   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1322       stop_type, stop, &update);
1323
1324   if (!res)
1325     goto no_format;
1326
1327   return res;
1328
1329 no_format:
1330   {
1331     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1332     return FALSE;
1333   }
1334 }
1335
1336 static gboolean
1337 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1338     GstSegment * seeksegment)
1339 {
1340   GstBaseSrcClass *bclass;
1341   gboolean result = FALSE;
1342
1343   bclass = GST_BASE_SRC_GET_CLASS (src);
1344
1345   if (bclass->prepare_seek_segment)
1346     result = bclass->prepare_seek_segment (src, event, seeksegment);
1347
1348   return result;
1349 }
1350
1351 static GstFlowReturn
1352 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1353     guint size, GstBuffer ** buffer)
1354 {
1355   GstFlowReturn ret;
1356   GstBaseSrcPrivate *priv = src->priv;
1357
1358   if (priv->pool) {
1359     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1360   } else if (size != -1) {
1361     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1362     if (G_UNLIKELY (*buffer == NULL))
1363       goto alloc_failed;
1364
1365     ret = GST_FLOW_OK;
1366   } else {
1367     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1368         size);
1369     goto alloc_failed;
1370   }
1371   return ret;
1372
1373   /* ERRORS */
1374 alloc_failed:
1375   {
1376     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1377     return GST_FLOW_ERROR;
1378   }
1379 }
1380
1381 static GstFlowReturn
1382 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1383     guint size, GstBuffer ** buffer)
1384 {
1385   GstBaseSrcClass *bclass;
1386   GstFlowReturn ret;
1387   GstBuffer *res_buf;
1388
1389   bclass = GST_BASE_SRC_GET_CLASS (src);
1390
1391   if (G_UNLIKELY (!bclass->alloc))
1392     goto no_function;
1393   if (G_UNLIKELY (!bclass->fill))
1394     goto no_function;
1395
1396   if (*buffer == NULL) {
1397     /* downstream did not provide us with a buffer to fill, allocate one
1398      * ourselves */
1399     ret = bclass->alloc (src, offset, size, &res_buf);
1400     if (G_UNLIKELY (ret != GST_FLOW_OK))
1401       goto alloc_failed;
1402   } else {
1403     res_buf = *buffer;
1404   }
1405
1406   if (G_LIKELY (size > 0)) {
1407     /* only call fill when there is a size */
1408     ret = bclass->fill (src, offset, size, res_buf);
1409     if (G_UNLIKELY (ret != GST_FLOW_OK))
1410       goto not_ok;
1411   }
1412
1413   *buffer = res_buf;
1414
1415   return GST_FLOW_OK;
1416
1417   /* ERRORS */
1418 no_function:
1419   {
1420     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1421     return GST_FLOW_NOT_SUPPORTED;
1422   }
1423 alloc_failed:
1424   {
1425     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1426     return ret;
1427   }
1428 not_ok:
1429   {
1430     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1431         gst_flow_get_name (ret));
1432     if (*buffer == NULL)
1433       gst_buffer_unref (res_buf);
1434     return ret;
1435   }
1436 }
1437
1438 /* this code implements the seeking. It is a good example
1439  * handling all cases.
1440  *
1441  * A seek updates the currently configured segment.start
1442  * and segment.stop values based on the SEEK_TYPE. If the
1443  * segment.start value is updated, a seek to this new position
1444  * should be performed.
1445  *
1446  * The seek can only be executed when we are not currently
1447  * streaming any data, to make sure that this is the case, we
1448  * acquire the STREAM_LOCK which is taken when we are in the
1449  * _loop() function or when a getrange() is called. Normally
1450  * we will not receive a seek if we are operating in pull mode
1451  * though. When we operate as a live source we might block on the live
1452  * cond, which does not release the STREAM_LOCK. Therefore we will try
1453  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1454  * safe to perform the seek.
1455  *
1456  * When we are in the loop() function, we might be in the middle
1457  * of pushing a buffer, which might block in a sink. To make sure
1458  * that the push gets unblocked we push out a FLUSH_START event.
1459  * Our loop function will get a FLUSHING return value from
1460  * the push and will pause, effectively releasing the STREAM_LOCK.
1461  *
1462  * For a non-flushing seek, we pause the task, which might eventually
1463  * release the STREAM_LOCK. We say eventually because when the sink
1464  * blocks on the sample we might wait a very long time until the sink
1465  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1466  * can continue the seek. A non-flushing seek is normally done in a
1467  * running pipeline to perform seamless playback, this means that the sink is
1468  * PLAYING and will return from its chain function.
1469  * In the case of a non-flushing seek we need to make sure that the
1470  * data we output after the seek is continuous with the previous data,
1471  * this is because a non-flushing seek does not reset the running-time
1472  * to 0. We do this by closing the currently running segment, ie. sending
1473  * a new_segment event with the stop position set to the last processed
1474  * position.
1475  *
1476  * After updating the segment.start/stop values, we prepare for
1477  * streaming again. We push out a FLUSH_STOP to make the peer pad
1478  * accept data again and we start our task again.
1479  *
1480  * A segment seek posts a message on the bus saying that the playback
1481  * of the segment started. We store the segment flag internally because
1482  * when we reach the segment.stop we have to post a segment.done
1483  * instead of EOS when doing a segment seek.
1484  */
1485 static gboolean
1486 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event)
1487 {
1488   gboolean res = TRUE, tres;
1489   gdouble rate;
1490   GstFormat seek_format, dest_format;
1491   GstSeekFlags flags;
1492   GstSeekType cur_type, stop_type;
1493   gint64 cur, stop;
1494   gboolean flush, playing;
1495   gboolean update;
1496   gboolean relative_seek = FALSE;
1497   gboolean seekseg_configured = FALSE;
1498   GstSegment seeksegment;
1499   guint32 seqnum;
1500   GstEvent *tevent;
1501
1502   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1503
1504   GST_OBJECT_LOCK (src);
1505   dest_format = src->segment.format;
1506   GST_OBJECT_UNLOCK (src);
1507
1508   if (event) {
1509     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1510         &cur_type, &cur, &stop_type, &stop);
1511
1512     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1513         SEEK_TYPE_IS_RELATIVE (stop_type);
1514
1515     if (dest_format != seek_format && !relative_seek) {
1516       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1517        * here before taking the stream lock, otherwise we must convert it later,
1518        * once we have the stream lock and can read the last configures segment
1519        * start and stop positions */
1520       gst_segment_init (&seeksegment, dest_format);
1521
1522       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1523         goto prepare_failed;
1524
1525       seekseg_configured = TRUE;
1526     }
1527
1528     flush = flags & GST_SEEK_FLAG_FLUSH;
1529     seqnum = gst_event_get_seqnum (event);
1530   } else {
1531     flush = FALSE;
1532     /* get next seqnum */
1533     seqnum = gst_util_seqnum_next ();
1534   }
1535
1536   /* send flush start */
1537   if (flush) {
1538     tevent = gst_event_new_flush_start ();
1539     gst_event_set_seqnum (tevent, seqnum);
1540     gst_pad_push_event (src->srcpad, tevent);
1541   } else
1542     gst_pad_pause_task (src->srcpad);
1543
1544   /* unblock streaming thread. */
1545   gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1546
1547   /* grab streaming lock, this should eventually be possible, either
1548    * because the task is paused, our streaming thread stopped
1549    * or because our peer is flushing. */
1550   GST_PAD_STREAM_LOCK (src->srcpad);
1551   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1552     /* we have seen this event before, issue a warning for now */
1553     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1554         seqnum);
1555   } else {
1556     src->priv->seqnum = seqnum;
1557     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1558   }
1559
1560   gst_base_src_set_flushing (src, FALSE, playing, NULL);
1561
1562   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1563    * copy the current segment info into the temp segment that we can actually
1564    * attempt the seek with. We only update the real segment if the seek succeeds. */
1565   if (!seekseg_configured) {
1566     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1567
1568     /* now configure the final seek segment */
1569     if (event) {
1570       if (seeksegment.format != seek_format) {
1571         /* OK, here's where we give the subclass a chance to convert the relative
1572          * seek into an absolute one in the processing format. We set up any
1573          * absolute seek above, before taking the stream lock. */
1574         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1575           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1576               "Aborting seek");
1577           res = FALSE;
1578         }
1579       } else {
1580         /* The seek format matches our processing format, no need to ask the
1581          * the subclass to configure the segment. */
1582         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1583             cur_type, cur, stop_type, stop, &update);
1584       }
1585     }
1586     /* Else, no seek event passed, so we're just (re)starting the
1587        current segment. */
1588   }
1589
1590   if (res) {
1591     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1592         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1593         seeksegment.start, seeksegment.stop, seeksegment.position);
1594
1595     /* do the seek, segment.position contains the new position. */
1596     res = gst_base_src_do_seek (src, &seeksegment);
1597   }
1598
1599   /* and prepare to continue streaming */
1600   if (flush) {
1601     tevent = gst_event_new_flush_stop (TRUE);
1602     gst_event_set_seqnum (tevent, seqnum);
1603     /* send flush stop, peer will accept data and events again. We
1604      * are not yet providing data as we still have the STREAM_LOCK. */
1605     gst_pad_push_event (src->srcpad, tevent);
1606   }
1607
1608   /* The subclass must have converted the segment to the processing format
1609    * by now */
1610   if (res && seeksegment.format != dest_format) {
1611     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1612         "in the correct format. Aborting seek.");
1613     res = FALSE;
1614   }
1615
1616   /* if the seek was successful, we update our real segment and push
1617    * out the new segment. */
1618   if (res) {
1619     GST_OBJECT_LOCK (src);
1620     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1621     GST_OBJECT_UNLOCK (src);
1622
1623     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1624       GstMessage *message;
1625
1626       message = gst_message_new_segment_start (GST_OBJECT (src),
1627           seeksegment.format, seeksegment.position);
1628       gst_message_set_seqnum (message, seqnum);
1629
1630       gst_element_post_message (GST_ELEMENT (src), message);
1631     }
1632
1633     /* for deriving a stop position for the playback segment from the seek
1634      * segment, we must take the duration when the stop is not set */
1635     if ((stop = seeksegment.stop) == -1)
1636       stop = seeksegment.duration;
1637
1638     src->priv->segment_pending = TRUE;
1639   }
1640
1641   src->priv->discont = TRUE;
1642   src->running = TRUE;
1643   /* and restart the task in case it got paused explicitly or by
1644    * the FLUSH_START event we pushed out. */
1645   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1646       src->srcpad);
1647   if (res && !tres)
1648     res = FALSE;
1649
1650   /* and release the lock again so we can continue streaming */
1651   GST_PAD_STREAM_UNLOCK (src->srcpad);
1652
1653   return res;
1654
1655   /* ERROR */
1656 prepare_failed:
1657   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1658       "Aborting seek");
1659   return FALSE;
1660 }
1661
1662 /* all events send to this element directly. This is mainly done from the
1663  * application.
1664  */
1665 static gboolean
1666 gst_base_src_send_event (GstElement * element, GstEvent * event)
1667 {
1668   GstBaseSrc *src;
1669   gboolean result = FALSE;
1670
1671   src = GST_BASE_SRC (element);
1672
1673   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1674
1675   switch (GST_EVENT_TYPE (event)) {
1676       /* bidirectional events */
1677     case GST_EVENT_FLUSH_START:
1678       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1679       result = gst_pad_push_event (src->srcpad, event);
1680       event = NULL;
1681       break;
1682     case GST_EVENT_FLUSH_STOP:
1683       GST_LIVE_LOCK (src->srcpad);
1684       src->priv->segment_pending = TRUE;
1685       /* sending random flushes downstream can break stuff,
1686        * especially sync since all segment info will get flushed */
1687       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1688       result = gst_pad_push_event (src->srcpad, event);
1689       GST_LIVE_UNLOCK (src->srcpad);
1690       event = NULL;
1691       break;
1692
1693       /* downstream serialized events */
1694     case GST_EVENT_EOS:
1695     {
1696       GstBaseSrcClass *bclass;
1697
1698       bclass = GST_BASE_SRC_GET_CLASS (src);
1699
1700       /* queue EOS and make sure the task or pull function performs the EOS
1701        * actions.
1702        *
1703        * We have two possibilities:
1704        *
1705        *  - Before we are to enter the _create function, we check the pending_eos
1706        *    first and do EOS instead of entering it.
1707        *  - If we are in the _create function or we did not manage to set the
1708        *    flag fast enough and we are about to enter the _create function,
1709        *    we unlock it so that we exit with FLUSHING immediately. We then
1710        *    check the EOS flag and do the EOS logic.
1711        */
1712       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1713       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1714
1715
1716       /* unlock the _create function so that we can check the pending_eos flag
1717        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1718        * that we can grab it and stop the unlock again. We don't take the stream
1719        * lock so that this operation is guaranteed to never block. */
1720       gst_base_src_activate_pool (src, FALSE);
1721       if (bclass->unlock)
1722         bclass->unlock (src);
1723
1724       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1725
1726       GST_LIVE_LOCK (src);
1727       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1728       /* now stop the unlock of the streaming thread again. Grabbing the live
1729        * lock is enough because that protects the create function. */
1730       if (bclass->unlock_stop)
1731         bclass->unlock_stop (src);
1732       gst_base_src_activate_pool (src, TRUE);
1733       GST_LIVE_UNLOCK (src);
1734
1735       result = TRUE;
1736       break;
1737     }
1738     case GST_EVENT_SEGMENT:
1739       /* sending random SEGMENT downstream can break sync. */
1740       break;
1741     case GST_EVENT_TAG:
1742     case GST_EVENT_CUSTOM_DOWNSTREAM:
1743     case GST_EVENT_CUSTOM_BOTH:
1744       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1745       GST_OBJECT_LOCK (src);
1746       src->priv->pending_events =
1747           g_list_append (src->priv->pending_events, event);
1748       g_atomic_int_set (&src->priv->have_events, TRUE);
1749       GST_OBJECT_UNLOCK (src);
1750       event = NULL;
1751       result = TRUE;
1752       break;
1753     case GST_EVENT_BUFFERSIZE:
1754       /* does not seem to make much sense currently */
1755       break;
1756
1757       /* upstream events */
1758     case GST_EVENT_QOS:
1759       /* elements should override send_event and do something */
1760       break;
1761     case GST_EVENT_SEEK:
1762     {
1763       gboolean started;
1764
1765       GST_OBJECT_LOCK (src->srcpad);
1766       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1767         goto wrong_mode;
1768       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1769       GST_OBJECT_UNLOCK (src->srcpad);
1770
1771       if (started) {
1772         GST_DEBUG_OBJECT (src, "performing seek");
1773         /* when we are running in push mode, we can execute the
1774          * seek right now. */
1775         result = gst_base_src_perform_seek (src, event);
1776       } else {
1777         GstEvent **event_p;
1778
1779         /* else we store the event and execute the seek when we
1780          * get activated */
1781         GST_OBJECT_LOCK (src);
1782         GST_DEBUG_OBJECT (src, "queueing seek");
1783         event_p = &src->pending_seek;
1784         gst_event_replace ((GstEvent **) event_p, event);
1785         GST_OBJECT_UNLOCK (src);
1786         /* assume the seek will work */
1787         result = TRUE;
1788       }
1789       break;
1790     }
1791     case GST_EVENT_NAVIGATION:
1792       /* could make sense for elements that do something with navigation events
1793        * but then they would need to override the send_event function */
1794       break;
1795     case GST_EVENT_LATENCY:
1796       /* does not seem to make sense currently */
1797       break;
1798
1799       /* custom events */
1800     case GST_EVENT_CUSTOM_UPSTREAM:
1801       /* override send_event if you want this */
1802       break;
1803     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1804     case GST_EVENT_CUSTOM_BOTH_OOB:
1805       /* insert a random custom event into the pipeline */
1806       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1807       result = gst_pad_push_event (src->srcpad, event);
1808       /* we gave away the ref to the event in the push */
1809       event = NULL;
1810       break;
1811     default:
1812       break;
1813   }
1814 done:
1815   /* if we still have a ref to the event, unref it now */
1816   if (event)
1817     gst_event_unref (event);
1818
1819   return result;
1820
1821   /* ERRORS */
1822 wrong_mode:
1823   {
1824     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1825     GST_OBJECT_UNLOCK (src->srcpad);
1826     result = FALSE;
1827     goto done;
1828   }
1829 }
1830
1831 static gboolean
1832 gst_base_src_seekable (GstBaseSrc * src)
1833 {
1834   GstBaseSrcClass *bclass;
1835   bclass = GST_BASE_SRC_GET_CLASS (src);
1836   if (bclass->is_seekable)
1837     return bclass->is_seekable (src);
1838   else
1839     return FALSE;
1840 }
1841
1842 static void
1843 gst_base_src_update_qos (GstBaseSrc * src,
1844     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1845 {
1846   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1847       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1848       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1849
1850   GST_OBJECT_LOCK (src);
1851   src->priv->proportion = proportion;
1852   src->priv->earliest_time = timestamp + diff;
1853   GST_OBJECT_UNLOCK (src);
1854 }
1855
1856
1857 static gboolean
1858 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1859 {
1860   gboolean result;
1861
1862   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1863
1864   switch (GST_EVENT_TYPE (event)) {
1865     case GST_EVENT_SEEK:
1866       /* is normally called when in push mode */
1867       if (!gst_base_src_seekable (src))
1868         goto not_seekable;
1869
1870       result = gst_base_src_perform_seek (src, event);
1871       break;
1872     case GST_EVENT_FLUSH_START:
1873       /* cancel any blocking getrange, is normally called
1874        * when in pull mode. */
1875       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1876       break;
1877     case GST_EVENT_FLUSH_STOP:
1878       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1879       break;
1880     case GST_EVENT_QOS:
1881     {
1882       gdouble proportion;
1883       GstClockTimeDiff diff;
1884       GstClockTime timestamp;
1885
1886       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1887       gst_base_src_update_qos (src, proportion, diff, timestamp);
1888       result = TRUE;
1889       break;
1890     }
1891     case GST_EVENT_RECONFIGURE:
1892       result = TRUE;
1893       break;
1894     case GST_EVENT_LATENCY:
1895       result = TRUE;
1896       break;
1897     default:
1898       result = FALSE;
1899       break;
1900   }
1901   return result;
1902
1903   /* ERRORS */
1904 not_seekable:
1905   {
1906     GST_DEBUG_OBJECT (src, "is not seekable");
1907     return FALSE;
1908   }
1909 }
1910
1911 static gboolean
1912 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1913 {
1914   GstBaseSrc *src;
1915   GstBaseSrcClass *bclass;
1916   gboolean result = FALSE;
1917
1918   src = GST_BASE_SRC (parent);
1919   bclass = GST_BASE_SRC_GET_CLASS (src);
1920
1921   if (bclass->event) {
1922     if (!(result = bclass->event (src, event)))
1923       goto subclass_failed;
1924   }
1925
1926 done:
1927   gst_event_unref (event);
1928
1929   return result;
1930
1931   /* ERRORS */
1932 subclass_failed:
1933   {
1934     GST_DEBUG_OBJECT (src, "subclass refused event");
1935     goto done;
1936   }
1937 }
1938
1939 static void
1940 gst_base_src_set_property (GObject * object, guint prop_id,
1941     const GValue * value, GParamSpec * pspec)
1942 {
1943   GstBaseSrc *src;
1944
1945   src = GST_BASE_SRC (object);
1946
1947   switch (prop_id) {
1948     case PROP_BLOCKSIZE:
1949       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1950       break;
1951     case PROP_NUM_BUFFERS:
1952       src->num_buffers = g_value_get_int (value);
1953       break;
1954     case PROP_TYPEFIND:
1955       src->typefind = g_value_get_boolean (value);
1956       break;
1957     case PROP_DO_TIMESTAMP:
1958       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1959       break;
1960     default:
1961       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1962       break;
1963   }
1964 }
1965
1966 static void
1967 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1968     GParamSpec * pspec)
1969 {
1970   GstBaseSrc *src;
1971
1972   src = GST_BASE_SRC (object);
1973
1974   switch (prop_id) {
1975     case PROP_BLOCKSIZE:
1976       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1977       break;
1978     case PROP_NUM_BUFFERS:
1979       g_value_set_int (value, src->num_buffers);
1980       break;
1981     case PROP_TYPEFIND:
1982       g_value_set_boolean (value, src->typefind);
1983       break;
1984     case PROP_DO_TIMESTAMP:
1985       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1986       break;
1987     default:
1988       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1989       break;
1990   }
1991 }
1992
1993 /* with STREAM_LOCK and LOCK */
1994 static GstClockReturn
1995 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1996 {
1997   GstClockReturn ret;
1998   GstClockID id;
1999
2000   id = gst_clock_new_single_shot_id (clock, time);
2001
2002   basesrc->clock_id = id;
2003   /* release the live lock while waiting */
2004   GST_LIVE_UNLOCK (basesrc);
2005
2006   ret = gst_clock_id_wait (id, NULL);
2007
2008   GST_LIVE_LOCK (basesrc);
2009   gst_clock_id_unref (id);
2010   basesrc->clock_id = NULL;
2011
2012   return ret;
2013 }
2014
2015 /* perform synchronisation on a buffer.
2016  * with STREAM_LOCK.
2017  */
2018 static GstClockReturn
2019 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2020 {
2021   GstClockReturn result;
2022   GstClockTime start, end;
2023   GstBaseSrcClass *bclass;
2024   GstClockTime base_time;
2025   GstClock *clock;
2026   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
2027   gboolean do_timestamp, first, pseudo_live, is_live;
2028
2029   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2030
2031   start = end = -1;
2032   if (bclass->get_times)
2033     bclass->get_times (basesrc, buffer, &start, &end);
2034
2035   /* get buffer timestamp */
2036   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2037
2038   /* grab the lock to prepare for clocking and calculate the startup
2039    * latency. */
2040   GST_OBJECT_LOCK (basesrc);
2041
2042   is_live = basesrc->is_live;
2043   /* if we are asked to sync against the clock we are a pseudo live element */
2044   pseudo_live = (start != -1 && is_live);
2045   /* check for the first buffer */
2046   first = (basesrc->priv->latency == -1);
2047
2048   if (timestamp != -1 && pseudo_live) {
2049     GstClockTime latency;
2050
2051     /* we have a timestamp and a sync time, latency is the diff */
2052     if (timestamp <= start)
2053       latency = start - timestamp;
2054     else
2055       latency = 0;
2056
2057     if (first) {
2058       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2059           GST_TIME_ARGS (latency));
2060       /* first time we calculate latency, just configure */
2061       basesrc->priv->latency = latency;
2062     } else {
2063       if (basesrc->priv->latency != latency) {
2064         /* we have a new latency, FIXME post latency message */
2065         basesrc->priv->latency = latency;
2066         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2067             GST_TIME_ARGS (latency));
2068       }
2069     }
2070   } else if (first) {
2071     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2072         is_live, start != -1);
2073     basesrc->priv->latency = 0;
2074   }
2075
2076   /* get clock, if no clock, we can't sync or do timestamps */
2077   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2078     goto no_clock;
2079   else
2080     gst_object_ref (clock);
2081
2082   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2083
2084   do_timestamp = basesrc->priv->do_timestamp;
2085   GST_OBJECT_UNLOCK (basesrc);
2086
2087   /* first buffer, calculate the timestamp offset */
2088   if (first) {
2089     GstClockTime running_time;
2090
2091     now = gst_clock_get_time (clock);
2092     running_time = now - base_time;
2093
2094     GST_LOG_OBJECT (basesrc,
2095         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
2096         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
2097         GST_TIME_ARGS (running_time));
2098
2099     if (pseudo_live && timestamp != -1) {
2100       /* live source and we need to sync, add startup latency to all timestamps
2101        * to get the real running_time. Live sources should always timestamp
2102        * according to the current running time. */
2103       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2104
2105       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2106           GST_TIME_ARGS (basesrc->priv->ts_offset));
2107     } else {
2108       basesrc->priv->ts_offset = 0;
2109       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2110     }
2111
2112     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
2113       if (do_timestamp)
2114         timestamp = running_time;
2115       else
2116         timestamp = 0;
2117
2118       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
2119
2120       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2121           GST_TIME_ARGS (timestamp));
2122     }
2123
2124     /* add the timestamp offset we need for sync */
2125     timestamp += basesrc->priv->ts_offset;
2126   } else {
2127     /* not the first buffer, the timestamp is the diff between the clock and
2128      * base_time */
2129     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
2130       now = gst_clock_get_time (clock);
2131
2132       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
2133
2134       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2135           GST_TIME_ARGS (now - base_time));
2136     }
2137   }
2138
2139   /* if we don't have a buffer timestamp, we don't sync */
2140   if (!GST_CLOCK_TIME_IS_VALID (start))
2141     goto no_sync;
2142
2143   if (is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
2144     /* for pseudo live sources, add our ts_offset to the timestamp */
2145     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
2146     start += basesrc->priv->ts_offset;
2147   }
2148
2149   GST_LOG_OBJECT (basesrc,
2150       "waiting for clock, base time %" GST_TIME_FORMAT
2151       ", stream_start %" GST_TIME_FORMAT,
2152       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2153
2154   result = gst_base_src_wait (basesrc, clock, start + base_time);
2155
2156   gst_object_unref (clock);
2157
2158   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2159
2160   return result;
2161
2162   /* special cases */
2163 no_clock:
2164   {
2165     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2166     GST_OBJECT_UNLOCK (basesrc);
2167     return GST_CLOCK_OK;
2168   }
2169 no_sync:
2170   {
2171     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2172     gst_object_unref (clock);
2173     return GST_CLOCK_OK;
2174   }
2175 }
2176
2177 /* Called with STREAM_LOCK and LIVE_LOCK */
2178 static gboolean
2179 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2180 {
2181   guint64 size, maxsize;
2182   GstBaseSrcClass *bclass;
2183   GstFormat format;
2184   gint64 stop;
2185   gboolean dynamic;
2186
2187   bclass = GST_BASE_SRC_GET_CLASS (src);
2188
2189   format = src->segment.format;
2190   stop = src->segment.stop;
2191   /* get total file size */
2192   size = src->segment.duration;
2193
2194   /* only operate if we are working with bytes */
2195   if (format != GST_FORMAT_BYTES)
2196     return TRUE;
2197
2198   /* the max amount of bytes to read is the total size or
2199    * up to the segment.stop if present. */
2200   if (stop != -1)
2201     maxsize = MIN (size, stop);
2202   else
2203     maxsize = size;
2204
2205   GST_DEBUG_OBJECT (src,
2206       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2207       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2208       *length, size, stop, maxsize);
2209
2210   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2211   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2212
2213   /* check size if we have one */
2214   if (maxsize != -1) {
2215     /* if we run past the end, check if the file became bigger and
2216      * retry. */
2217     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2218       /* see if length of the file changed */
2219       if (bclass->get_size)
2220         if (!bclass->get_size (src, &size))
2221           size = -1;
2222
2223       /* make sure we don't exceed the configured segment stop
2224        * if it was set */
2225       if (stop != -1)
2226         maxsize = MIN (size, stop);
2227       else
2228         maxsize = size;
2229
2230       /* if we are at or past the end, EOS */
2231       if (G_UNLIKELY (offset >= maxsize))
2232         goto unexpected_length;
2233
2234       /* else we can clip to the end */
2235       if (G_UNLIKELY (offset + *length >= maxsize))
2236         *length = maxsize - offset;
2237
2238     }
2239   }
2240
2241   /* keep track of current duration.
2242    * segment is in bytes, we checked that above. */
2243   GST_OBJECT_LOCK (src);
2244   src->segment.duration = size;
2245   GST_OBJECT_UNLOCK (src);
2246
2247   return TRUE;
2248
2249   /* ERRORS */
2250 unexpected_length:
2251   {
2252     return FALSE;
2253   }
2254 }
2255
2256 /* must be called with LIVE_LOCK */
2257 static GstFlowReturn
2258 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2259     GstBuffer ** buf)
2260 {
2261   GstFlowReturn ret;
2262   GstBaseSrcClass *bclass;
2263   GstClockReturn status;
2264   GstBuffer *res_buf;
2265
2266   bclass = GST_BASE_SRC_GET_CLASS (src);
2267
2268 again:
2269   if (src->is_live) {
2270     if (G_UNLIKELY (!src->live_running)) {
2271       ret = gst_base_src_wait_playing (src);
2272       if (ret != GST_FLOW_OK)
2273         goto stopped;
2274     }
2275   }
2276
2277   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2278           && !GST_BASE_SRC_IS_STARTING (src)))
2279     goto not_started;
2280
2281   if (G_UNLIKELY (!bclass->create))
2282     goto no_function;
2283
2284   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2285     goto unexpected_length;
2286
2287   /* track position */
2288   GST_OBJECT_LOCK (src);
2289   if (src->segment.format == GST_FORMAT_BYTES)
2290     src->segment.position = offset;
2291   GST_OBJECT_UNLOCK (src);
2292
2293   /* normally we don't count buffers */
2294   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2295     if (src->num_buffers_left == 0)
2296       goto reached_num_buffers;
2297     else
2298       src->num_buffers_left--;
2299   }
2300
2301   /* don't enter the create function if a pending EOS event was set. For the
2302    * logic of the pending_eos, check the event function of this class. */
2303   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2304     goto eos;
2305
2306   GST_DEBUG_OBJECT (src,
2307       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2308       G_GINT64_FORMAT, offset, length, src->segment.time);
2309
2310   res_buf = *buf;
2311
2312   ret = bclass->create (src, offset, length, &res_buf);
2313
2314   /* The create function could be unlocked because we have a pending EOS. It's
2315    * possible that we have a valid buffer from create that we need to
2316    * discard when the create function returned _OK. */
2317   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2318     if (ret == GST_FLOW_OK) {
2319       if (*buf == NULL)
2320         gst_buffer_unref (res_buf);
2321     }
2322     goto eos;
2323   }
2324
2325   if (G_UNLIKELY (ret != GST_FLOW_OK))
2326     goto not_ok;
2327
2328   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2329   if (offset == 0 && src->segment.time == 0
2330       && GST_BUFFER_TIMESTAMP (res_buf) == -1 && !src->is_live) {
2331     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2332     res_buf = gst_buffer_make_writable (res_buf);
2333     GST_BUFFER_TIMESTAMP (res_buf) = 0;
2334   }
2335
2336   /* now sync before pushing the buffer */
2337   status = gst_base_src_do_sync (src, res_buf);
2338
2339   /* waiting for the clock could have made us flushing */
2340   if (G_UNLIKELY (src->priv->flushing))
2341     goto flushing;
2342
2343   switch (status) {
2344     case GST_CLOCK_EARLY:
2345       /* the buffer is too late. We currently don't drop the buffer. */
2346       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2347       break;
2348     case GST_CLOCK_OK:
2349       /* buffer synchronised properly */
2350       GST_DEBUG_OBJECT (src, "buffer ok");
2351       break;
2352     case GST_CLOCK_UNSCHEDULED:
2353       /* this case is triggered when we were waiting for the clock and
2354        * it got unlocked because we did a state change. In any case, get rid of
2355        * the buffer. */
2356       if (*buf == NULL)
2357         gst_buffer_unref (res_buf);
2358
2359       if (!src->live_running) {
2360         /* We return FLUSHING when we are not running to stop the dataflow also
2361          * get rid of the produced buffer. */
2362         GST_DEBUG_OBJECT (src,
2363             "clock was unscheduled (%d), returning FLUSHING", status);
2364         ret = GST_FLOW_FLUSHING;
2365       } else {
2366         /* If we are running when this happens, we quickly switched between
2367          * pause and playing. We try to produce a new buffer */
2368         GST_DEBUG_OBJECT (src,
2369             "clock was unscheduled (%d), but we are running", status);
2370         goto again;
2371       }
2372       break;
2373     default:
2374       /* all other result values are unexpected and errors */
2375       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2376           (_("Internal clock error.")),
2377           ("clock returned unexpected return value %d", status));
2378       if (*buf == NULL)
2379         gst_buffer_unref (res_buf);
2380       ret = GST_FLOW_ERROR;
2381       break;
2382   }
2383   if (G_LIKELY (ret == GST_FLOW_OK))
2384     *buf = res_buf;
2385
2386   return ret;
2387
2388   /* ERROR */
2389 stopped:
2390   {
2391     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2392         gst_flow_get_name (ret));
2393     return ret;
2394   }
2395 not_ok:
2396   {
2397     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2398         gst_flow_get_name (ret));
2399     return ret;
2400   }
2401 not_started:
2402   {
2403     GST_DEBUG_OBJECT (src, "getrange but not started");
2404     return GST_FLOW_FLUSHING;
2405   }
2406 no_function:
2407   {
2408     GST_DEBUG_OBJECT (src, "no create function");
2409     return GST_FLOW_NOT_SUPPORTED;
2410   }
2411 unexpected_length:
2412   {
2413     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2414         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2415     return GST_FLOW_EOS;
2416   }
2417 reached_num_buffers:
2418   {
2419     GST_DEBUG_OBJECT (src, "sent all buffers");
2420     return GST_FLOW_EOS;
2421   }
2422 flushing:
2423   {
2424     GST_DEBUG_OBJECT (src, "we are flushing");
2425     if (*buf == NULL)
2426       gst_buffer_unref (res_buf);
2427     return GST_FLOW_FLUSHING;
2428   }
2429 eos:
2430   {
2431     GST_DEBUG_OBJECT (src, "we are EOS");
2432     return GST_FLOW_EOS;
2433   }
2434 }
2435
2436 static GstFlowReturn
2437 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2438     guint length, GstBuffer ** buf)
2439 {
2440   GstBaseSrc *src;
2441   GstFlowReturn res;
2442
2443   src = GST_BASE_SRC_CAST (parent);
2444
2445   GST_LIVE_LOCK (src);
2446   if (G_UNLIKELY (src->priv->flushing))
2447     goto flushing;
2448
2449   res = gst_base_src_get_range (src, offset, length, buf);
2450
2451 done:
2452   GST_LIVE_UNLOCK (src);
2453
2454   return res;
2455
2456   /* ERRORS */
2457 flushing:
2458   {
2459     GST_DEBUG_OBJECT (src, "we are flushing");
2460     res = GST_FLOW_FLUSHING;
2461     goto done;
2462   }
2463 }
2464
2465 static gboolean
2466 gst_base_src_is_random_access (GstBaseSrc * src)
2467 {
2468   /* we need to start the basesrc to check random access */
2469   if (!GST_BASE_SRC_IS_STARTED (src)) {
2470     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2471     if (G_LIKELY (gst_base_src_start (src))) {
2472       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2473         goto start_failed;
2474       gst_base_src_stop (src);
2475     }
2476   }
2477
2478   return src->random_access;
2479
2480   /* ERRORS */
2481 start_failed:
2482   {
2483     GST_DEBUG_OBJECT (src, "failed to start");
2484     return FALSE;
2485   }
2486 }
2487
2488 static void
2489 gst_base_src_loop (GstPad * pad)
2490 {
2491   GstBaseSrc *src;
2492   GstBuffer *buf = NULL;
2493   GstFlowReturn ret;
2494   gint64 position;
2495   gboolean eos;
2496   guint blocksize;
2497   GList *pending_events = NULL, *tmp;
2498
2499   eos = FALSE;
2500
2501   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2502
2503   gst_base_src_send_stream_start (src);
2504
2505   /* check if we need to renegotiate */
2506   if (gst_pad_check_reconfigure (pad)) {
2507     if (!gst_base_src_negotiate (src))
2508       goto not_negotiated;
2509   }
2510
2511   GST_LIVE_LOCK (src);
2512
2513   if (G_UNLIKELY (src->priv->flushing))
2514     goto flushing;
2515
2516   blocksize = src->blocksize;
2517
2518   /* if we operate in bytes, we can calculate an offset */
2519   if (src->segment.format == GST_FORMAT_BYTES) {
2520     position = src->segment.position;
2521     /* for negative rates, start with subtracting the blocksize */
2522     if (src->segment.rate < 0.0) {
2523       /* we cannot go below segment.start */
2524       if (position > src->segment.start + blocksize)
2525         position -= blocksize;
2526       else {
2527         /* last block, remainder up to segment.start */
2528         blocksize = position - src->segment.start;
2529         position = src->segment.start;
2530       }
2531     }
2532   } else
2533     position = -1;
2534
2535   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2536       GST_TIME_ARGS (position), blocksize);
2537
2538   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2539   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2540     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2541         gst_flow_get_name (ret));
2542     GST_LIVE_UNLOCK (src);
2543     goto pause;
2544   }
2545   /* this should not happen */
2546   if (G_UNLIKELY (buf == NULL))
2547     goto null_buffer;
2548
2549   /* push events to close/start our segment before we push the buffer. */
2550   if (G_UNLIKELY (src->priv->segment_pending)) {
2551     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2552     src->priv->segment_pending = FALSE;
2553   }
2554
2555   if (g_atomic_int_get (&src->priv->have_events)) {
2556     GST_OBJECT_LOCK (src);
2557     /* take the events */
2558     pending_events = src->priv->pending_events;
2559     src->priv->pending_events = NULL;
2560     g_atomic_int_set (&src->priv->have_events, FALSE);
2561     GST_OBJECT_UNLOCK (src);
2562   }
2563
2564   /* Push out pending events if any */
2565   if (G_UNLIKELY (pending_events != NULL)) {
2566     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2567       GstEvent *ev = (GstEvent *) tmp->data;
2568       gst_pad_push_event (pad, ev);
2569     }
2570     g_list_free (pending_events);
2571   }
2572
2573   /* figure out the new position */
2574   switch (src->segment.format) {
2575     case GST_FORMAT_BYTES:
2576     {
2577       guint bufsize = gst_buffer_get_size (buf);
2578
2579       /* we subtracted above for negative rates */
2580       if (src->segment.rate >= 0.0)
2581         position += bufsize;
2582       break;
2583     }
2584     case GST_FORMAT_TIME:
2585     {
2586       GstClockTime start, duration;
2587
2588       start = GST_BUFFER_TIMESTAMP (buf);
2589       duration = GST_BUFFER_DURATION (buf);
2590
2591       if (GST_CLOCK_TIME_IS_VALID (start))
2592         position = start;
2593       else
2594         position = src->segment.position;
2595
2596       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2597         if (src->segment.rate >= 0.0)
2598           position += duration;
2599         else if (position > duration)
2600           position -= duration;
2601         else
2602           position = 0;
2603       }
2604       break;
2605     }
2606     case GST_FORMAT_DEFAULT:
2607       if (src->segment.rate >= 0.0)
2608         position = GST_BUFFER_OFFSET_END (buf);
2609       else
2610         position = GST_BUFFER_OFFSET (buf);
2611       break;
2612     default:
2613       position = -1;
2614       break;
2615   }
2616   if (position != -1) {
2617     if (src->segment.rate >= 0.0) {
2618       /* positive rate, check if we reached the stop */
2619       if (src->segment.stop != -1) {
2620         if (position >= src->segment.stop) {
2621           eos = TRUE;
2622           position = src->segment.stop;
2623         }
2624       }
2625     } else {
2626       /* negative rate, check if we reached the start. start is always set to
2627        * something different from -1 */
2628       if (position <= src->segment.start) {
2629         eos = TRUE;
2630         position = src->segment.start;
2631       }
2632       /* when going reverse, all buffers are DISCONT */
2633       src->priv->discont = TRUE;
2634     }
2635     GST_OBJECT_LOCK (src);
2636     src->segment.position = position;
2637     GST_OBJECT_UNLOCK (src);
2638   }
2639
2640   if (G_UNLIKELY (src->priv->discont)) {
2641     GST_INFO_OBJECT (src, "marking pending DISCONT");
2642     buf = gst_buffer_make_writable (buf);
2643     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2644     src->priv->discont = FALSE;
2645   }
2646   GST_LIVE_UNLOCK (src);
2647
2648   ret = gst_pad_push (pad, buf);
2649   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2650     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2651         gst_flow_get_name (ret));
2652     goto pause;
2653   }
2654
2655   if (G_UNLIKELY (eos)) {
2656     GST_INFO_OBJECT (src, "pausing after end of segment");
2657     ret = GST_FLOW_EOS;
2658     goto pause;
2659   }
2660
2661 done:
2662   return;
2663
2664   /* special cases */
2665 not_negotiated:
2666   {
2667     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2668     ret = GST_FLOW_NOT_NEGOTIATED;
2669     goto pause;
2670   }
2671 flushing:
2672   {
2673     GST_DEBUG_OBJECT (src, "we are flushing");
2674     GST_LIVE_UNLOCK (src);
2675     ret = GST_FLOW_FLUSHING;
2676     goto pause;
2677   }
2678 pause:
2679   {
2680     const gchar *reason = gst_flow_get_name (ret);
2681     GstEvent *event;
2682
2683     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2684     src->running = FALSE;
2685     gst_pad_pause_task (pad);
2686     if (ret == GST_FLOW_EOS) {
2687       gboolean flag_segment;
2688       GstFormat format;
2689       gint64 position;
2690
2691       /* perform EOS logic */
2692       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2693       format = src->segment.format;
2694       position = src->segment.position;
2695
2696       if (flag_segment) {
2697         GstMessage *message;
2698
2699         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2700             format, position);
2701         gst_message_set_seqnum (message, src->priv->seqnum);
2702         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2703       } else {
2704         event = gst_event_new_eos ();
2705         gst_event_set_seqnum (event, src->priv->seqnum);
2706         gst_pad_push_event (pad, event);
2707       }
2708     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2709       event = gst_event_new_eos ();
2710       gst_event_set_seqnum (event, src->priv->seqnum);
2711       /* for fatal errors we post an error message, post the error
2712        * first so the app knows about the error first.
2713        * Also don't do this for FLUSHING because it happens
2714        * due to flushing and posting an error message because of
2715        * that is the wrong thing to do, e.g. when we're doing
2716        * a flushing seek. */
2717       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2718           (_("Internal data flow error.")),
2719           ("streaming task paused, reason %s (%d)", reason, ret));
2720       gst_pad_push_event (pad, event);
2721     }
2722     goto done;
2723   }
2724 null_buffer:
2725   {
2726     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2727         (_("Internal data flow error.")), ("element returned NULL buffer"));
2728     GST_LIVE_UNLOCK (src);
2729     goto done;
2730   }
2731 }
2732
2733 static gboolean
2734 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2735     GstAllocator * allocator, GstAllocationParams * params)
2736 {
2737   GstAllocator *oldalloc;
2738   GstBufferPool *oldpool;
2739   GstBaseSrcPrivate *priv = basesrc->priv;
2740
2741   if (pool) {
2742     GST_DEBUG_OBJECT (basesrc, "activate pool");
2743     if (!gst_buffer_pool_set_active (pool, TRUE))
2744       goto activate_failed;
2745   }
2746
2747   GST_OBJECT_LOCK (basesrc);
2748   oldpool = priv->pool;
2749   priv->pool = pool;
2750
2751   oldalloc = priv->allocator;
2752   priv->allocator = allocator;
2753
2754   if (params)
2755     priv->params = *params;
2756   else
2757     gst_allocation_params_init (&priv->params);
2758   GST_OBJECT_UNLOCK (basesrc);
2759
2760   if (oldpool) {
2761     /* only deactivate if the pool is not the one we're using */
2762     if (oldpool != pool) {
2763       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2764       gst_buffer_pool_set_active (oldpool, FALSE);
2765     }
2766     gst_object_unref (oldpool);
2767   }
2768   if (oldalloc) {
2769     gst_allocator_unref (oldalloc);
2770   }
2771   return TRUE;
2772
2773   /* ERRORS */
2774 activate_failed:
2775   {
2776     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2777     return FALSE;
2778   }
2779 }
2780
2781 static gboolean
2782 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2783 {
2784   GstBaseSrcPrivate *priv = basesrc->priv;
2785   GstBufferPool *pool;
2786   gboolean res = TRUE;
2787
2788   GST_OBJECT_LOCK (basesrc);
2789   if ((pool = priv->pool))
2790     pool = gst_object_ref (pool);
2791   GST_OBJECT_UNLOCK (basesrc);
2792
2793   if (pool) {
2794     res = gst_buffer_pool_set_active (pool, active);
2795     gst_object_unref (pool);
2796   }
2797   return res;
2798 }
2799
2800
2801 static gboolean
2802 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2803 {
2804   GstCaps *outcaps;
2805   GstBufferPool *pool;
2806   guint size, min, max;
2807   GstAllocator *allocator;
2808   GstAllocationParams params;
2809   GstStructure *config;
2810   gboolean update_allocator;
2811
2812   gst_query_parse_allocation (query, &outcaps, NULL);
2813
2814   /* we got configuration from our peer or the decide_allocation method,
2815    * parse them */
2816   if (gst_query_get_n_allocation_params (query) > 0) {
2817     /* try the allocator */
2818     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2819     update_allocator = TRUE;
2820   } else {
2821     allocator = NULL;
2822     gst_allocation_params_init (&params);
2823     update_allocator = FALSE;
2824   }
2825
2826   if (gst_query_get_n_allocation_pools (query) > 0) {
2827     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2828
2829     if (pool == NULL) {
2830       /* no pool, we can make our own */
2831       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2832       pool = gst_buffer_pool_new ();
2833     }
2834   } else {
2835     pool = NULL;
2836     size = min = max = 0;
2837   }
2838
2839   /* now configure */
2840   if (pool) {
2841     config = gst_buffer_pool_get_config (pool);
2842     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2843     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2844     gst_buffer_pool_set_config (pool, config);
2845   }
2846
2847   if (update_allocator)
2848     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2849   else
2850     gst_query_add_allocation_param (query, allocator, &params);
2851   if (allocator)
2852     gst_allocator_unref (allocator);
2853
2854   if (pool) {
2855     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2856     gst_object_unref (pool);
2857   }
2858
2859   return TRUE;
2860 }
2861
2862 static gboolean
2863 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2864 {
2865   GstBaseSrcClass *bclass;
2866   gboolean result = TRUE;
2867   GstQuery *query;
2868   GstBufferPool *pool = NULL;
2869   GstAllocator *allocator = NULL;
2870   GstAllocationParams params;
2871
2872   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2873
2874   /* make query and let peer pad answer, we don't really care if it worked or
2875    * not, if it failed, the allocation query would contain defaults and the
2876    * subclass would then set better values if needed */
2877   query = gst_query_new_allocation (caps, TRUE);
2878   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2879     /* not a problem, just debug a little */
2880     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2881   }
2882
2883   g_assert (bclass->decide_allocation != NULL);
2884   result = bclass->decide_allocation (basesrc, query);
2885
2886   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2887       query);
2888
2889   if (!result)
2890     goto no_decide_allocation;
2891
2892   /* we got configuration from our peer or the decide_allocation method,
2893    * parse them */
2894   if (gst_query_get_n_allocation_params (query) > 0) {
2895     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2896   } else {
2897     allocator = NULL;
2898     gst_allocation_params_init (&params);
2899   }
2900
2901   if (gst_query_get_n_allocation_pools (query) > 0)
2902     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2903
2904   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2905
2906   gst_query_unref (query);
2907
2908   return result;
2909
2910   /* Errors */
2911 no_decide_allocation:
2912   {
2913     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
2914     gst_query_unref (query);
2915
2916     return result;
2917   }
2918 }
2919
2920 /* default negotiation code.
2921  *
2922  * Take intersection between src and sink pads, take first
2923  * caps and fixate.
2924  */
2925 static gboolean
2926 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2927 {
2928   GstCaps *thiscaps;
2929   GstCaps *caps = NULL;
2930   GstCaps *peercaps = NULL;
2931   gboolean result = FALSE;
2932
2933   /* first see what is possible on our source pad */
2934   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2935   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2936   /* nothing or anything is allowed, we're done */
2937   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2938     goto no_nego_needed;
2939
2940   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2941     goto no_caps;
2942
2943   /* get the peer caps */
2944   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2945   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2946   if (peercaps) {
2947     /* The result is already a subset of our caps */
2948     caps = peercaps;
2949     gst_caps_unref (thiscaps);
2950   } else {
2951     /* no peer, work with our own caps then */
2952     caps = thiscaps;
2953   }
2954   if (caps && !gst_caps_is_empty (caps)) {
2955     /* now fixate */
2956     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2957     if (gst_caps_is_any (caps)) {
2958       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
2959       /* hmm, still anything, so element can do anything and
2960        * nego is not needed */
2961       result = TRUE;
2962     } else {
2963       caps = gst_base_src_fixate (basesrc, caps);
2964       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2965       if (gst_caps_is_fixed (caps)) {
2966         /* yay, fixed caps, use those then, it's possible that the subclass does
2967          * not accept this caps after all and we have to fail. */
2968         result = gst_base_src_set_caps (basesrc, caps);
2969       }
2970     }
2971     gst_caps_unref (caps);
2972   } else {
2973     if (caps)
2974       gst_caps_unref (caps);
2975     GST_DEBUG_OBJECT (basesrc, "no common caps");
2976   }
2977   return result;
2978
2979 no_nego_needed:
2980   {
2981     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2982     if (thiscaps)
2983       gst_caps_unref (thiscaps);
2984     return TRUE;
2985   }
2986 no_caps:
2987   {
2988     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2989         ("No supported formats found"),
2990         ("This element did not produce valid caps"));
2991     if (thiscaps)
2992       gst_caps_unref (thiscaps);
2993     return TRUE;
2994   }
2995 }
2996
2997 static gboolean
2998 gst_base_src_negotiate (GstBaseSrc * basesrc)
2999 {
3000   GstBaseSrcClass *bclass;
3001   gboolean result;
3002
3003   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3004
3005   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3006
3007   if (G_LIKELY (bclass->negotiate))
3008     result = bclass->negotiate (basesrc);
3009   else
3010     result = TRUE;
3011
3012   if (G_LIKELY (result)) {
3013     GstCaps *caps;
3014
3015     caps = gst_pad_get_current_caps (basesrc->srcpad);
3016
3017     result = gst_base_src_prepare_allocation (basesrc, caps);
3018
3019     if (caps)
3020       gst_caps_unref (caps);
3021   }
3022   return result;
3023 }
3024
3025 static gboolean
3026 gst_base_src_start (GstBaseSrc * basesrc)
3027 {
3028   GstBaseSrcClass *bclass;
3029   gboolean result;
3030
3031   GST_LIVE_LOCK (basesrc);
3032   if (GST_BASE_SRC_IS_STARTING (basesrc))
3033     goto was_starting;
3034   if (GST_BASE_SRC_IS_STARTED (basesrc))
3035     goto was_started;
3036
3037   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3038   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3039   basesrc->num_buffers_left = basesrc->num_buffers;
3040   basesrc->running = FALSE;
3041   basesrc->priv->segment_pending = FALSE;
3042   GST_OBJECT_LOCK (basesrc);
3043   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3044   GST_OBJECT_UNLOCK (basesrc);
3045   GST_LIVE_UNLOCK (basesrc);
3046
3047   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3048   if (bclass->start)
3049     result = bclass->start (basesrc);
3050   else
3051     result = TRUE;
3052
3053   if (!result)
3054     goto could_not_start;
3055
3056   if (!gst_base_src_is_async (basesrc))
3057     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3058
3059   return result;
3060
3061   /* ERROR */
3062 was_starting:
3063   {
3064     GST_DEBUG_OBJECT (basesrc, "was starting");
3065     GST_LIVE_UNLOCK (basesrc);
3066     return TRUE;
3067   }
3068 was_started:
3069   {
3070     GST_DEBUG_OBJECT (basesrc, "was started");
3071     GST_LIVE_UNLOCK (basesrc);
3072     return TRUE;
3073   }
3074 could_not_start:
3075   {
3076     GST_DEBUG_OBJECT (basesrc, "could not start");
3077     /* subclass is supposed to post a message. We don't have to call _stop. */
3078     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3079     return FALSE;
3080   }
3081 }
3082
3083 /**
3084  * gst_base_src_start_complete:
3085  * @src: base source instance
3086  * @ret: a #GstFlowReturn
3087  *
3088  * Complete an asynchronous start operation. When the subclass overrides the
3089  * start method, it should call gst_base_src_start_complete() when the start
3090  * operation completes either from the same thread or from an asynchronous
3091  * helper thread.
3092  */
3093 void
3094 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3095 {
3096   gboolean have_size;
3097   guint64 size;
3098   gboolean seekable;
3099   GstFormat format;
3100   GstPadMode mode;
3101   GstEvent *event;
3102
3103   if (ret != GST_FLOW_OK)
3104     goto error;
3105
3106   GST_DEBUG_OBJECT (basesrc, "starting source");
3107   format = basesrc->segment.format;
3108
3109   /* figure out the size */
3110   have_size = FALSE;
3111   size = -1;
3112   if (format == GST_FORMAT_BYTES) {
3113     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3114
3115     if (bclass->get_size) {
3116       if (!(have_size = bclass->get_size (basesrc, &size)))
3117         size = -1;
3118     }
3119     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3120     /* only update the size when operating in bytes, subclass is supposed
3121      * to set duration in the start method for other formats */
3122     GST_OBJECT_LOCK (basesrc);
3123     basesrc->segment.duration = size;
3124     GST_OBJECT_UNLOCK (basesrc);
3125   }
3126
3127   GST_DEBUG_OBJECT (basesrc,
3128       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3129       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3130       basesrc->segment.duration);
3131
3132   seekable = gst_base_src_seekable (basesrc);
3133   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3134
3135   /* update for random access flag */
3136   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3137
3138   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3139
3140   /* stop flushing now but for live sources, still block in the LIVE lock when
3141    * we are not yet PLAYING */
3142   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3143
3144   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3145
3146   GST_OBJECT_LOCK (basesrc->srcpad);
3147   mode = GST_PAD_MODE (basesrc->srcpad);
3148   GST_OBJECT_UNLOCK (basesrc->srcpad);
3149
3150   /* take the stream lock here, we only want to let the task run when we have
3151    * set the STARTED flag */
3152   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3153   if (mode == GST_PAD_MODE_PUSH) {
3154     /* do initial seek, which will start the task */
3155     GST_OBJECT_LOCK (basesrc);
3156     event = basesrc->pending_seek;
3157     basesrc->pending_seek = NULL;
3158     GST_OBJECT_UNLOCK (basesrc);
3159
3160     /* The perform seek code will start the task when finished. */
3161     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event)))
3162       goto seek_failed;
3163
3164     if (event)
3165       gst_event_unref (event);
3166   } else {
3167     /* if not random_access, we cannot operate in pull mode for now */
3168     if (G_UNLIKELY (!basesrc->random_access))
3169       goto no_get_range;
3170   }
3171
3172   GST_LIVE_LOCK (basesrc);
3173   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3174   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3175   basesrc->priv->start_result = ret;
3176   GST_LIVE_SIGNAL (basesrc);
3177   GST_LIVE_UNLOCK (basesrc);
3178
3179   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3180
3181   return;
3182
3183 seek_failed:
3184   {
3185     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3186     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3187     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3188     if (event)
3189       gst_event_unref (event);
3190     ret = GST_FLOW_ERROR;
3191     goto error;
3192   }
3193 no_get_range:
3194   {
3195     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3196     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3197     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3198     ret = GST_FLOW_ERROR;
3199     goto error;
3200   }
3201 error:
3202   {
3203     GST_LIVE_LOCK (basesrc);
3204     basesrc->priv->start_result = ret;
3205     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3206     GST_LIVE_SIGNAL (basesrc);
3207     GST_LIVE_UNLOCK (basesrc);
3208     return;
3209   }
3210 }
3211
3212 /**
3213  * gst_base_src_start_wait:
3214  * @src: base source instance
3215  * @ret: a #GstFlowReturn
3216  *
3217  * Wait until the start operation completes.
3218  *
3219  * Returns: a #GstFlowReturn.
3220  */
3221 GstFlowReturn
3222 gst_base_src_start_wait (GstBaseSrc * basesrc)
3223 {
3224   GstFlowReturn result;
3225
3226   GST_LIVE_LOCK (basesrc);
3227   if (G_UNLIKELY (basesrc->priv->flushing))
3228     goto flushing;
3229
3230   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3231     GST_LIVE_WAIT (basesrc);
3232     if (G_UNLIKELY (basesrc->priv->flushing))
3233       goto flushing;
3234   }
3235   result = basesrc->priv->start_result;
3236   GST_LIVE_UNLOCK (basesrc);
3237
3238   return result;
3239
3240   /* ERRORS */
3241 flushing:
3242   {
3243     GST_DEBUG_OBJECT (basesrc, "we are flushing");
3244     GST_LIVE_UNLOCK (basesrc);
3245     return GST_FLOW_FLUSHING;
3246   }
3247 }
3248
3249 static gboolean
3250 gst_base_src_stop (GstBaseSrc * basesrc)
3251 {
3252   GstBaseSrcClass *bclass;
3253   gboolean result = TRUE;
3254
3255   GST_DEBUG_OBJECT (basesrc, "stopping source");
3256
3257   /* flush all */
3258   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3259   /* stop the task */
3260   gst_pad_stop_task (basesrc->srcpad);
3261
3262   GST_LIVE_LOCK (basesrc);
3263   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3264     goto was_stopped;
3265
3266   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3267   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3268   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3269   GST_LIVE_SIGNAL (basesrc);
3270   GST_LIVE_UNLOCK (basesrc);
3271
3272   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3273   if (bclass->stop)
3274     result = bclass->stop (basesrc);
3275
3276   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3277
3278   return result;
3279
3280 was_stopped:
3281   {
3282     GST_DEBUG_OBJECT (basesrc, "was started");
3283     GST_LIVE_UNLOCK (basesrc);
3284     return TRUE;
3285   }
3286 }
3287
3288 /* start or stop flushing dataprocessing
3289  */
3290 static gboolean
3291 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3292     gboolean flushing, gboolean live_play, gboolean * playing)
3293 {
3294   GstBaseSrcClass *bclass;
3295
3296   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3297
3298   if (flushing) {
3299     gst_base_src_activate_pool (basesrc, FALSE);
3300     /* unlock any subclasses, we need to do this before grabbing the
3301      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3302      * unlock to the params because of backwards compat (see seek handler)*/
3303     if (bclass->unlock)
3304       bclass->unlock (basesrc);
3305   }
3306
3307   /* the live lock is released when we are blocked, waiting for playing or
3308    * when we sync to the clock. */
3309   GST_LIVE_LOCK (basesrc);
3310   if (playing)
3311     *playing = basesrc->live_running;
3312   basesrc->priv->flushing = flushing;
3313   if (flushing) {
3314     /* if we are locked in the live lock, signal it to make it flush */
3315     basesrc->live_running = TRUE;
3316
3317     /* clear pending EOS if any */
3318     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3319
3320     /* step 1, now that we have the LIVE lock, clear our unlock request */
3321     if (bclass->unlock_stop)
3322       bclass->unlock_stop (basesrc);
3323
3324     /* step 2, unblock clock sync (if any) or any other blocking thing */
3325     if (basesrc->clock_id)
3326       gst_clock_id_unschedule (basesrc->clock_id);
3327   } else {
3328     /* signal the live source that it can start playing */
3329     basesrc->live_running = live_play;
3330
3331     gst_base_src_activate_pool (basesrc, TRUE);
3332
3333     /* Drop all delayed events */
3334     GST_OBJECT_LOCK (basesrc);
3335     if (basesrc->priv->pending_events) {
3336       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3337           NULL);
3338       g_list_free (basesrc->priv->pending_events);
3339       basesrc->priv->pending_events = NULL;
3340       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3341     }
3342     GST_OBJECT_UNLOCK (basesrc);
3343   }
3344   GST_LIVE_SIGNAL (basesrc);
3345   GST_LIVE_UNLOCK (basesrc);
3346
3347   return TRUE;
3348 }
3349
3350 /* the purpose of this function is to make sure that a live source blocks in the
3351  * LIVE lock or leaves the LIVE lock and continues playing. */
3352 static gboolean
3353 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3354 {
3355   GstBaseSrcClass *bclass;
3356
3357   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3358
3359   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3360   if (!live_play) {
3361     GST_DEBUG_OBJECT (basesrc, "unlock");
3362     if (bclass->unlock)
3363       bclass->unlock (basesrc);
3364   }
3365
3366   /* we are now able to grab the LIVE lock, when we get it, we can be
3367    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3368    * for the clock. */
3369   GST_LIVE_LOCK (basesrc);
3370   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3371
3372   /* unblock clock sync (if any) */
3373   if (basesrc->clock_id)
3374     gst_clock_id_unschedule (basesrc->clock_id);
3375
3376   /* configure what to do when we get to the LIVE lock. */
3377   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3378   basesrc->live_running = live_play;
3379
3380   if (live_play) {
3381     gboolean start;
3382
3383     /* clear our unlock request when going to PLAYING */
3384     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3385     if (bclass->unlock_stop)
3386       bclass->unlock_stop (basesrc);
3387
3388     /* for live sources we restart the timestamp correction */
3389     basesrc->priv->latency = -1;
3390     /* have to restart the task in case it stopped because of the unlock when
3391      * we went to PAUSED. Only do this if we operating in push mode. */
3392     GST_OBJECT_LOCK (basesrc->srcpad);
3393     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3394     GST_OBJECT_UNLOCK (basesrc->srcpad);
3395     if (start)
3396       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3397           basesrc->srcpad);
3398     GST_DEBUG_OBJECT (basesrc, "signal");
3399     GST_LIVE_SIGNAL (basesrc);
3400   }
3401   GST_LIVE_UNLOCK (basesrc);
3402
3403   return TRUE;
3404 }
3405
3406 static gboolean
3407 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3408 {
3409   GstBaseSrc *basesrc;
3410
3411   basesrc = GST_BASE_SRC (parent);
3412
3413   /* prepare subclass first */
3414   if (active) {
3415     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3416
3417     if (G_UNLIKELY (!basesrc->can_activate_push))
3418       goto no_push_activation;
3419
3420     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3421       goto error_start;
3422   } else {
3423     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3424     /* now we can stop the source */
3425     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3426       goto error_stop;
3427   }
3428   return TRUE;
3429
3430   /* ERRORS */
3431 no_push_activation:
3432   {
3433     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3434     return FALSE;
3435   }
3436 error_start:
3437   {
3438     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3439     return FALSE;
3440   }
3441 error_stop:
3442   {
3443     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3444     return FALSE;
3445   }
3446 }
3447
3448 static gboolean
3449 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3450 {
3451   GstBaseSrc *basesrc;
3452
3453   basesrc = GST_BASE_SRC (parent);
3454
3455   /* prepare subclass first */
3456   if (active) {
3457     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3458     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3459       goto error_start;
3460   } else {
3461     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3462     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3463       goto error_stop;
3464   }
3465   return TRUE;
3466
3467   /* ERRORS */
3468 error_start:
3469   {
3470     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3471     return FALSE;
3472   }
3473 error_stop:
3474   {
3475     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3476     return FALSE;
3477   }
3478 }
3479
3480 static gboolean
3481 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3482     GstPadMode mode, gboolean active)
3483 {
3484   gboolean res;
3485
3486   switch (mode) {
3487     case GST_PAD_MODE_PULL:
3488       res = gst_base_src_activate_pull (pad, parent, active);
3489       break;
3490     case GST_PAD_MODE_PUSH:
3491       res = gst_base_src_activate_push (pad, parent, active);
3492       break;
3493     default:
3494       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3495       res = FALSE;
3496       break;
3497   }
3498   return res;
3499 }
3500
3501
3502 static GstStateChangeReturn
3503 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3504 {
3505   GstBaseSrc *basesrc;
3506   GstStateChangeReturn result;
3507   gboolean no_preroll = FALSE;
3508
3509   basesrc = GST_BASE_SRC (element);
3510
3511   switch (transition) {
3512     case GST_STATE_CHANGE_NULL_TO_READY:
3513       break;
3514     case GST_STATE_CHANGE_READY_TO_PAUSED:
3515       basesrc->priv->stream_start_pending = TRUE;
3516       no_preroll = gst_base_src_is_live (basesrc);
3517       break;
3518     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3519       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3520       if (gst_base_src_is_live (basesrc)) {
3521         /* now we can start playback */
3522         gst_base_src_set_playing (basesrc, TRUE);
3523       }
3524       break;
3525     default:
3526       break;
3527   }
3528
3529   if ((result =
3530           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3531               transition)) == GST_STATE_CHANGE_FAILURE)
3532     goto failure;
3533
3534   switch (transition) {
3535     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3536       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3537       if (gst_base_src_is_live (basesrc)) {
3538         /* make sure we block in the live lock in PAUSED */
3539         gst_base_src_set_playing (basesrc, FALSE);
3540         no_preroll = TRUE;
3541       }
3542       break;
3543     case GST_STATE_CHANGE_PAUSED_TO_READY:
3544     {
3545       /* we don't need to unblock anything here, the pad deactivation code
3546        * already did this */
3547       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3548       gst_event_replace (&basesrc->pending_seek, NULL);
3549       basesrc->priv->stream_start_pending = FALSE;
3550       break;
3551     }
3552     case GST_STATE_CHANGE_READY_TO_NULL:
3553       break;
3554     default:
3555       break;
3556   }
3557
3558   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3559     result = GST_STATE_CHANGE_NO_PREROLL;
3560
3561   return result;
3562
3563   /* ERRORS */
3564 failure:
3565   {
3566     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3567     return result;
3568   }
3569 }