basesrc: improve flush-start handling
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_SEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *
125  *   gst_element_class_set_static_metadata (gstelement_class,
126  *      "Source name",
127  *      "Source",
128  *      "My Source element",
129  *      "The author &lt;my.sink@my.email&gt;");
130  * }
131  * ]|
132  *
133  * <refsect2>
134  * <title>Controlled shutdown of live sources in applications</title>
135  * <para>
136  * Applications that record from a live source may want to stop recording
137  * in a controlled way, so that the recording is stopped, but the data
138  * already in the pipeline is processed to the end (remember that many live
139  * sources would go on recording forever otherwise). For that to happen the
140  * application needs to make the source stop recording and send an EOS
141  * event down the pipeline. The application would then wait for an
142  * EOS message posted on the pipeline's bus to know when all data has
143  * been processed and the pipeline can safely be stopped.
144  *
145  * An application may send an EOS event to a source element to make it
146  * perform the EOS logic (send EOS event downstream or post a
147  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
148  * with the gst_element_send_event() function on the element or its parent bin.
149  *
150  * After the EOS has been sent to the element, the application should wait for
151  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
152  * received, it may safely shut down the entire pipeline.
153  *
154  * Last reviewed on 2007-12-19 (0.10.16)
155  * </para>
156  * </refsect2>
157  */
158
159 #ifdef HAVE_CONFIG_H
160 #  include "config.h"
161 #endif
162
163 #include <stdlib.h>
164 #include <string.h>
165
166 #include <gst/gst_private.h>
167 #include <gst/glib-compat-private.h>
168
169 #include "gstbasesrc.h"
170 #include "gsttypefindhelper.h"
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
183 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
184 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
185
186
187 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
188 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
189 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
190
191
192 /* BaseSrc signals and args */
193 enum
194 {
195   /* FILL ME */
196   LAST_SIGNAL
197 };
198
199 #define DEFAULT_BLOCKSIZE       4096
200 #define DEFAULT_NUM_BUFFERS     -1
201 #define DEFAULT_TYPEFIND        FALSE
202 #define DEFAULT_DO_TIMESTAMP    FALSE
203
204 enum
205 {
206   PROP_0,
207   PROP_BLOCKSIZE,
208   PROP_NUM_BUFFERS,
209   PROP_TYPEFIND,
210   PROP_DO_TIMESTAMP
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean discont;
219   gboolean flushing;
220
221   GstFlowReturn start_result;
222   gboolean async;
223
224   /* if a stream-start event should be sent */
225   gboolean stream_start_pending;
226
227   /* if segment should be sent */
228   gboolean segment_pending;
229
230   /* if EOS is pending (atomic) */
231   gint pending_eos;
232
233   /* startup latency is the time it takes between going to PLAYING and producing
234    * the first BUFFER with running_time 0. This value is included in the latency
235    * reporting. */
236   GstClockTime latency;
237   /* timestamp offset, this is the offset add to the values of gst_times for
238    * pseudo live sources */
239   GstClockTimeDiff ts_offset;
240
241   gboolean do_timestamp;
242   volatile gint dynamic_size;
243
244   /* stream sequence number */
245   guint32 seqnum;
246
247   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
248    * pushed in the data stream */
249   GList *pending_events;
250   volatile gint have_events;
251
252   /* QoS *//* with LOCK */
253   gboolean qos_enabled;
254   gdouble proportion;
255   GstClockTime earliest_time;
256
257   GstBufferPool *pool;
258   GstAllocator *allocator;
259   GstAllocationParams params;
260
261   GCond async_cond;
262 };
263
264 static GstElementClass *parent_class = NULL;
265
266 static void gst_base_src_class_init (GstBaseSrcClass * klass);
267 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
268 static void gst_base_src_finalize (GObject * object);
269
270
271 GType
272 gst_base_src_get_type (void)
273 {
274   static volatile gsize base_src_type = 0;
275
276   if (g_once_init_enter (&base_src_type)) {
277     GType _type;
278     static const GTypeInfo base_src_info = {
279       sizeof (GstBaseSrcClass),
280       NULL,
281       NULL,
282       (GClassInitFunc) gst_base_src_class_init,
283       NULL,
284       NULL,
285       sizeof (GstBaseSrc),
286       0,
287       (GInstanceInitFunc) gst_base_src_init,
288     };
289
290     _type = g_type_register_static (GST_TYPE_ELEMENT,
291         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
292     g_once_init_leave (&base_src_type, _type);
293   }
294   return base_src_type;
295 }
296
297 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
298     GstCaps * filter);
299 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
300 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
301
302 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
303 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
304     GstPadMode mode, gboolean active);
305 static void gst_base_src_set_property (GObject * object, guint prop_id,
306     const GValue * value, GParamSpec * pspec);
307 static void gst_base_src_get_property (GObject * object, guint prop_id,
308     GValue * value, GParamSpec * pspec);
309 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
310     GstEvent * event);
311 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
312 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
313
314 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
315     GstQuery * query);
316
317 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
318     gboolean active);
319 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
321     GstSegment * segment);
322 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
323 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
324     GstEvent * event, GstSegment * segment);
325 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
326     guint64 offset, guint size, GstBuffer ** buf);
327 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
328     guint64 offset, guint size, GstBuffer ** buf);
329 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
330     GstQuery * query);
331
332 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
333     gboolean flushing, gboolean live_play, gboolean * playing);
334
335 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
337
338 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
339     GstStateChange transition);
340
341 static void gst_base_src_loop (GstPad * pad);
342 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
343     guint64 offset, guint length, GstBuffer ** buf);
344 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
345     guint length, GstBuffer ** buf);
346 static gboolean gst_base_src_seekable (GstBaseSrc * src);
347 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
348 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
349     guint * length, gboolean force);
350
351 static void
352 gst_base_src_class_init (GstBaseSrcClass * klass)
353 {
354   GObjectClass *gobject_class;
355   GstElementClass *gstelement_class;
356
357   gobject_class = G_OBJECT_CLASS (klass);
358   gstelement_class = GST_ELEMENT_CLASS (klass);
359
360   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
361
362   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
363
364   parent_class = g_type_class_peek_parent (klass);
365
366   gobject_class->finalize = gst_base_src_finalize;
367   gobject_class->set_property = gst_base_src_set_property;
368   gobject_class->get_property = gst_base_src_get_property;
369
370   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
371       g_param_spec_uint ("blocksize", "Block size",
372           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
373           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
375       g_param_spec_int ("num-buffers", "num-buffers",
376           "Number of buffers to output before sending EOS (-1 = unlimited)",
377           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
378           G_PARAM_STATIC_STRINGS));
379   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
380       g_param_spec_boolean ("typefind", "Typefind",
381           "Run typefind before negotiating", DEFAULT_TYPEFIND,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
384       g_param_spec_boolean ("do-timestamp", "Do timestamp",
385           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   gstelement_class->change_state =
389       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
390   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
391
392   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
393   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
394   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
395   klass->prepare_seek_segment =
396       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
397   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
398   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
399   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
400   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
401   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
402   klass->decide_allocation =
403       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
404
405   /* Registering debug symbols for function pointers */
406   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
407   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
408   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
409   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
410   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
411 }
412
413 static void
414 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
415 {
416   GstPad *pad;
417   GstPadTemplate *pad_template;
418
419   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
420
421   basesrc->is_live = FALSE;
422   g_mutex_init (&basesrc->live_lock);
423   g_cond_init (&basesrc->live_cond);
424   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
425   basesrc->num_buffers_left = -1;
426
427   basesrc->can_activate_push = TRUE;
428
429   pad_template =
430       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
431   g_return_if_fail (pad_template != NULL);
432
433   GST_DEBUG_OBJECT (basesrc, "creating src pad");
434   pad = gst_pad_new_from_template (pad_template, "src");
435
436   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
437   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
438   gst_pad_set_event_function (pad, gst_base_src_event);
439   gst_pad_set_query_function (pad, gst_base_src_query);
440   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
441
442   /* hold pointer to pad */
443   basesrc->srcpad = pad;
444   GST_DEBUG_OBJECT (basesrc, "adding src pad");
445   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
446
447   basesrc->blocksize = DEFAULT_BLOCKSIZE;
448   basesrc->clock_id = NULL;
449   /* we operate in BYTES by default */
450   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
451   basesrc->typefind = DEFAULT_TYPEFIND;
452   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
453   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
454
455   g_cond_init (&basesrc->priv->async_cond);
456   basesrc->priv->start_result = GST_FLOW_FLUSHING;
457   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
458   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
459   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
460
461   GST_DEBUG_OBJECT (basesrc, "init done");
462 }
463
464 static void
465 gst_base_src_finalize (GObject * object)
466 {
467   GstBaseSrc *basesrc;
468   GstEvent **event_p;
469
470   basesrc = GST_BASE_SRC (object);
471
472   g_mutex_clear (&basesrc->live_lock);
473   g_cond_clear (&basesrc->live_cond);
474   g_cond_clear (&basesrc->priv->async_cond);
475
476   event_p = &basesrc->pending_seek;
477   gst_event_replace (event_p, NULL);
478
479   if (basesrc->priv->pending_events) {
480     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
481         NULL);
482     g_list_free (basesrc->priv->pending_events);
483   }
484
485   G_OBJECT_CLASS (parent_class)->finalize (object);
486 }
487
488 /**
489  * gst_base_src_wait_playing:
490  * @src: the src
491  *
492  * If the #GstBaseSrcClass.create() method performs its own synchronisation
493  * against the clock it must unblock when going from PLAYING to the PAUSED state
494  * and call this method before continuing to produce the remaining data.
495  *
496  * This function will block until a state change to PLAYING happens (in which
497  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
498  * to a state change to READY or a FLUSH event (in which case this function
499  * returns #GST_FLOW_FLUSHING).
500  *
501  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
502  * continue. Any other return value should be returned from the create vmethod.
503  */
504 GstFlowReturn
505 gst_base_src_wait_playing (GstBaseSrc * src)
506 {
507   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
508
509   do {
510     /* block until the state changes, or we get a flush, or something */
511     GST_DEBUG_OBJECT (src, "live source waiting for running state");
512     GST_LIVE_WAIT (src);
513     GST_DEBUG_OBJECT (src, "live source unlocked");
514     if (src->priv->flushing)
515       goto flushing;
516   } while (G_UNLIKELY (!src->live_running));
517
518   return GST_FLOW_OK;
519
520   /* ERRORS */
521 flushing:
522   {
523     GST_DEBUG_OBJECT (src, "we are flushing");
524     return GST_FLOW_FLUSHING;
525   }
526 }
527
528 /**
529  * gst_base_src_set_live:
530  * @src: base source instance
531  * @live: new live-mode
532  *
533  * If the element listens to a live source, @live should
534  * be set to %TRUE.
535  *
536  * A live source will not produce data in the PAUSED state and
537  * will therefore not be able to participate in the PREROLL phase
538  * of a pipeline. To signal this fact to the application and the
539  * pipeline, the state change return value of the live source will
540  * be GST_STATE_CHANGE_NO_PREROLL.
541  */
542 void
543 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
544 {
545   g_return_if_fail (GST_IS_BASE_SRC (src));
546
547   GST_OBJECT_LOCK (src);
548   src->is_live = live;
549   GST_OBJECT_UNLOCK (src);
550 }
551
552 /**
553  * gst_base_src_is_live:
554  * @src: base source instance
555  *
556  * Check if an element is in live mode.
557  *
558  * Returns: %TRUE if element is in live mode.
559  */
560 gboolean
561 gst_base_src_is_live (GstBaseSrc * src)
562 {
563   gboolean result;
564
565   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
566
567   GST_OBJECT_LOCK (src);
568   result = src->is_live;
569   GST_OBJECT_UNLOCK (src);
570
571   return result;
572 }
573
574 /**
575  * gst_base_src_set_format:
576  * @src: base source instance
577  * @format: the format to use
578  *
579  * Sets the default format of the source. This will be the format used
580  * for sending SEGMENT events and for performing seeks.
581  *
582  * If a format of GST_FORMAT_BYTES is set, the element will be able to
583  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
584  *
585  * This function must only be called in states < %GST_STATE_PAUSED.
586  */
587 void
588 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
589 {
590   g_return_if_fail (GST_IS_BASE_SRC (src));
591   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
592
593   GST_OBJECT_LOCK (src);
594   gst_segment_init (&src->segment, format);
595   GST_OBJECT_UNLOCK (src);
596 }
597
598 /**
599  * gst_base_src_set_dynamic_size:
600  * @src: base source instance
601  * @dynamic: new dynamic size mode
602  *
603  * If not @dynamic, size is only updated when needed, such as when trying to
604  * read past current tracked size.  Otherwise, size is checked for upon each
605  * read.
606  */
607 void
608 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
609 {
610   g_return_if_fail (GST_IS_BASE_SRC (src));
611
612   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
613 }
614
615 /**
616  * gst_base_src_set_async:
617  * @src: base source instance
618  * @async: new async mode
619  *
620  * Configure async behaviour in @src, no state change will block. The open,
621  * close, start, stop, play and pause virtual methods will be executed in a
622  * different thread and are thus allowed to perform blocking operations. Any
623  * blocking operation should be unblocked with the unlock vmethod.
624  */
625 void
626 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
627 {
628   g_return_if_fail (GST_IS_BASE_SRC (src));
629
630   GST_OBJECT_LOCK (src);
631   src->priv->async = async;
632   GST_OBJECT_UNLOCK (src);
633 }
634
635 /**
636  * gst_base_src_is_async:
637  * @src: base source instance
638  *
639  * Get the current async behaviour of @src. See also gst_base_src_set_async().
640  *
641  * Returns: %TRUE if @src is operating in async mode.
642  */
643 gboolean
644 gst_base_src_is_async (GstBaseSrc * src)
645 {
646   gboolean res;
647
648   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
649
650   GST_OBJECT_LOCK (src);
651   res = src->priv->async;
652   GST_OBJECT_UNLOCK (src);
653
654   return res;
655 }
656
657
658 /**
659  * gst_base_src_query_latency:
660  * @src: the source
661  * @live: (out) (allow-none): if the source is live
662  * @min_latency: (out) (allow-none): the min latency of the source
663  * @max_latency: (out) (allow-none): the max latency of the source
664  *
665  * Query the source for the latency parameters. @live will be TRUE when @src is
666  * configured as a live source. @min_latency will be set to the difference
667  * between the running time and the timestamp of the first buffer.
668  * @max_latency is always the undefined value of -1.
669  *
670  * This function is mostly used by subclasses.
671  *
672  * Returns: TRUE if the query succeeded.
673  */
674 gboolean
675 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
676     GstClockTime * min_latency, GstClockTime * max_latency)
677 {
678   GstClockTime min;
679
680   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
681
682   GST_OBJECT_LOCK (src);
683   if (live)
684     *live = src->is_live;
685
686   /* if we have a startup latency, report this one, else report 0. Subclasses
687    * are supposed to override the query function if they want something
688    * else. */
689   if (src->priv->latency != -1)
690     min = src->priv->latency;
691   else
692     min = 0;
693
694   if (min_latency)
695     *min_latency = min;
696   if (max_latency)
697     *max_latency = -1;
698
699   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
700       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
701       GST_TIME_ARGS (-1));
702   GST_OBJECT_UNLOCK (src);
703
704   return TRUE;
705 }
706
707 /**
708  * gst_base_src_set_blocksize:
709  * @src: the source
710  * @blocksize: the new blocksize in bytes
711  *
712  * Set the number of bytes that @src will push out with each buffer. When
713  * @blocksize is set to -1, a default length will be used.
714  */
715 void
716 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
717 {
718   g_return_if_fail (GST_IS_BASE_SRC (src));
719
720   GST_OBJECT_LOCK (src);
721   src->blocksize = blocksize;
722   GST_OBJECT_UNLOCK (src);
723 }
724
725 /**
726  * gst_base_src_get_blocksize:
727  * @src: the source
728  *
729  * Get the number of bytes that @src will push out with each buffer.
730  *
731  * Returns: the number of bytes pushed with each buffer.
732  */
733 guint
734 gst_base_src_get_blocksize (GstBaseSrc * src)
735 {
736   gint res;
737
738   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
739
740   GST_OBJECT_LOCK (src);
741   res = src->blocksize;
742   GST_OBJECT_UNLOCK (src);
743
744   return res;
745 }
746
747
748 /**
749  * gst_base_src_set_do_timestamp:
750  * @src: the source
751  * @timestamp: enable or disable timestamping
752  *
753  * Configure @src to automatically timestamp outgoing buffers based on the
754  * current running_time of the pipeline. This property is mostly useful for live
755  * sources.
756  */
757 void
758 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
759 {
760   g_return_if_fail (GST_IS_BASE_SRC (src));
761
762   GST_OBJECT_LOCK (src);
763   src->priv->do_timestamp = timestamp;
764   GST_OBJECT_UNLOCK (src);
765 }
766
767 /**
768  * gst_base_src_get_do_timestamp:
769  * @src: the source
770  *
771  * Query if @src timestamps outgoing buffers based on the current running_time.
772  *
773  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
774  */
775 gboolean
776 gst_base_src_get_do_timestamp (GstBaseSrc * src)
777 {
778   gboolean res;
779
780   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
781
782   GST_OBJECT_LOCK (src);
783   res = src->priv->do_timestamp;
784   GST_OBJECT_UNLOCK (src);
785
786   return res;
787 }
788
789 /**
790  * gst_base_src_new_seamless_segment:
791  * @src: The source
792  * @start: The new start value for the segment
793  * @stop: Stop value for the new segment
794  * @time: The new time value for the start of the new segent
795  *
796  * Prepare a new seamless segment for emission downstream. This function must
797  * only be called by derived sub-classes, and only from the create() function,
798  * as the stream-lock needs to be held.
799  *
800  * The format for the new segment will be the current format of the source, as
801  * configured with gst_base_src_set_format()
802  *
803  * Returns: %TRUE if preparation of the seamless segment succeeded.
804  */
805 gboolean
806 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
807     gint64 time)
808 {
809   gboolean res = TRUE;
810
811   GST_OBJECT_LOCK (src);
812
813   src->segment.base = gst_segment_to_running_time (&src->segment,
814       src->segment.format, src->segment.position);
815   src->segment.position = src->segment.start = start;
816   src->segment.stop = stop;
817   src->segment.time = time;
818
819   /* Mark pending segment. Will be sent before next data */
820   src->priv->segment_pending = TRUE;
821
822   GST_DEBUG_OBJECT (src,
823       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
824       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
825       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
826       GST_TIME_ARGS (src->segment.base));
827
828   GST_OBJECT_UNLOCK (src);
829
830   src->priv->discont = TRUE;
831   src->running = TRUE;
832
833   return res;
834 }
835
836 static gboolean
837 gst_base_src_send_stream_start (GstBaseSrc * src)
838 {
839   gboolean ret = TRUE;
840
841   if (src->priv->stream_start_pending) {
842     gchar *stream_id;
843     GstEvent *event;
844
845     stream_id =
846         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
847
848     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
849     event = gst_event_new_stream_start (stream_id);
850     gst_event_set_group_id (event, gst_util_group_id_next ());
851
852     ret = gst_pad_push_event (src->srcpad, event);
853     src->priv->stream_start_pending = FALSE;
854     g_free (stream_id);
855   }
856
857   return ret;
858 }
859
860 /**
861  * gst_base_src_set_caps:
862  * @src: a #GstBaseSrc
863  * @caps: a #GstCaps
864  *
865  * Set new caps on the basesrc source pad.
866  *
867  * Returns: %TRUE if the caps could be set
868  */
869 gboolean
870 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
871 {
872   GstBaseSrcClass *bclass;
873   gboolean res = TRUE;
874
875   bclass = GST_BASE_SRC_GET_CLASS (src);
876
877   gst_base_src_send_stream_start (src);
878
879   if (bclass->set_caps)
880     res = bclass->set_caps (src, caps);
881
882   if (res)
883     res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
884
885   return res;
886 }
887
888 static GstCaps *
889 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
890 {
891   GstCaps *caps = NULL;
892   GstPadTemplate *pad_template;
893   GstBaseSrcClass *bclass;
894
895   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
896
897   pad_template =
898       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
899
900   if (pad_template != NULL) {
901     caps = gst_pad_template_get_caps (pad_template);
902
903     if (filter) {
904       GstCaps *intersection;
905
906       intersection =
907           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
908       gst_caps_unref (caps);
909       caps = intersection;
910     }
911   }
912   return caps;
913 }
914
915 static GstCaps *
916 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
917 {
918   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
919   return gst_caps_fixate (caps);
920 }
921
922 static GstCaps *
923 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
924 {
925   GstBaseSrcClass *bclass;
926
927   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
928
929   if (bclass->fixate)
930     caps = bclass->fixate (bsrc, caps);
931
932   return caps;
933 }
934
935 static gboolean
936 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
937 {
938   gboolean res;
939
940   switch (GST_QUERY_TYPE (query)) {
941     case GST_QUERY_POSITION:
942     {
943       GstFormat format;
944
945       gst_query_parse_position (query, &format, NULL);
946
947       GST_DEBUG_OBJECT (src, "position query in format %s",
948           gst_format_get_name (format));
949
950       switch (format) {
951         case GST_FORMAT_PERCENT:
952         {
953           gint64 percent;
954           gint64 position;
955           gint64 duration;
956
957           GST_OBJECT_LOCK (src);
958           position = src->segment.position;
959           duration = src->segment.duration;
960           GST_OBJECT_UNLOCK (src);
961
962           if (position != -1 && duration != -1) {
963             if (position < duration)
964               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
965                   duration);
966             else
967               percent = GST_FORMAT_PERCENT_MAX;
968           } else
969             percent = -1;
970
971           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
972           res = TRUE;
973           break;
974         }
975         default:
976         {
977           gint64 position;
978           GstFormat seg_format;
979
980           GST_OBJECT_LOCK (src);
981           position =
982               gst_segment_to_stream_time (&src->segment, src->segment.format,
983               src->segment.position);
984           seg_format = src->segment.format;
985           GST_OBJECT_UNLOCK (src);
986
987           if (position != -1) {
988             /* convert to requested format */
989             res =
990                 gst_pad_query_convert (src->srcpad, seg_format,
991                 position, format, &position);
992           } else
993             res = TRUE;
994
995           gst_query_set_position (query, format, position);
996           break;
997         }
998       }
999       break;
1000     }
1001     case GST_QUERY_DURATION:
1002     {
1003       GstFormat format;
1004
1005       gst_query_parse_duration (query, &format, NULL);
1006
1007       GST_DEBUG_OBJECT (src, "duration query in format %s",
1008           gst_format_get_name (format));
1009
1010       switch (format) {
1011         case GST_FORMAT_PERCENT:
1012           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1013               GST_FORMAT_PERCENT_MAX);
1014           res = TRUE;
1015           break;
1016         default:
1017         {
1018           gint64 duration;
1019           GstFormat seg_format;
1020           guint length = 0;
1021
1022           /* may have to refresh duration */
1023           gst_base_src_update_length (src, 0, &length,
1024               g_atomic_int_get (&src->priv->dynamic_size));
1025
1026           /* this is the duration as configured by the subclass. */
1027           GST_OBJECT_LOCK (src);
1028           duration = src->segment.duration;
1029           seg_format = src->segment.format;
1030           GST_OBJECT_UNLOCK (src);
1031
1032           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1033               duration, gst_format_get_name (seg_format));
1034
1035           if (duration != -1) {
1036             /* convert to requested format, if this fails, we have a duration
1037              * but we cannot answer the query, we must return FALSE. */
1038             res =
1039                 gst_pad_query_convert (src->srcpad, seg_format,
1040                 duration, format, &duration);
1041           } else {
1042             /* The subclass did not configure a duration, we assume that the
1043              * media has an unknown duration then and we return TRUE to report
1044              * this. Note that this is not the same as returning FALSE, which
1045              * means that we cannot report the duration at all. */
1046             res = TRUE;
1047           }
1048           gst_query_set_duration (query, format, duration);
1049           break;
1050         }
1051       }
1052       break;
1053     }
1054
1055     case GST_QUERY_SEEKING:
1056     {
1057       GstFormat format, seg_format;
1058       gint64 duration;
1059
1060       GST_OBJECT_LOCK (src);
1061       duration = src->segment.duration;
1062       seg_format = src->segment.format;
1063       GST_OBJECT_UNLOCK (src);
1064
1065       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1066       if (format == seg_format) {
1067         gst_query_set_seeking (query, seg_format,
1068             gst_base_src_seekable (src), 0, duration);
1069         res = TRUE;
1070       } else {
1071         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1072         /* Don't reply to the query to make up for demuxers which don't
1073          * handle the SEEKING query yet. Players like Totem will fall back
1074          * to the duration when the SEEKING query isn't answered. */
1075         res = FALSE;
1076       }
1077       break;
1078     }
1079     case GST_QUERY_SEGMENT:
1080     {
1081       GstFormat format;
1082       gint64 start, stop;
1083
1084       GST_OBJECT_LOCK (src);
1085
1086       format = src->segment.format;
1087
1088       start =
1089           gst_segment_to_stream_time (&src->segment, format,
1090           src->segment.start);
1091       if ((stop = src->segment.stop) == -1)
1092         stop = src->segment.duration;
1093       else
1094         stop = gst_segment_to_stream_time (&src->segment, format, stop);
1095
1096       gst_query_set_segment (query, src->segment.rate, format, start, stop);
1097
1098       GST_OBJECT_UNLOCK (src);
1099       res = TRUE;
1100       break;
1101     }
1102
1103     case GST_QUERY_FORMATS:
1104     {
1105       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1106           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1107       res = TRUE;
1108       break;
1109     }
1110     case GST_QUERY_CONVERT:
1111     {
1112       GstFormat src_fmt, dest_fmt;
1113       gint64 src_val, dest_val;
1114
1115       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1116
1117       /* we can only convert between equal formats... */
1118       if (src_fmt == dest_fmt) {
1119         dest_val = src_val;
1120         res = TRUE;
1121       } else
1122         res = FALSE;
1123
1124       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1125       break;
1126     }
1127     case GST_QUERY_LATENCY:
1128     {
1129       GstClockTime min, max;
1130       gboolean live;
1131
1132       /* Subclasses should override and implement something useful */
1133       res = gst_base_src_query_latency (src, &live, &min, &max);
1134
1135       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1136           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1137           GST_TIME_ARGS (max));
1138
1139       gst_query_set_latency (query, live, min, max);
1140       break;
1141     }
1142     case GST_QUERY_JITTER:
1143     case GST_QUERY_RATE:
1144       res = FALSE;
1145       break;
1146     case GST_QUERY_BUFFERING:
1147     {
1148       GstFormat format, seg_format;
1149       gint64 start, stop, estimated;
1150
1151       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1152
1153       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1154           gst_format_get_name (format));
1155
1156       GST_OBJECT_LOCK (src);
1157       if (src->random_access) {
1158         estimated = 0;
1159         start = 0;
1160         if (format == GST_FORMAT_PERCENT)
1161           stop = GST_FORMAT_PERCENT_MAX;
1162         else
1163           stop = src->segment.duration;
1164       } else {
1165         estimated = -1;
1166         start = -1;
1167         stop = -1;
1168       }
1169       seg_format = src->segment.format;
1170       GST_OBJECT_UNLOCK (src);
1171
1172       /* convert to required format. When the conversion fails, we can't answer
1173        * the query. When the value is unknown, we can don't perform conversion
1174        * but report TRUE. */
1175       if (format != GST_FORMAT_PERCENT && stop != -1) {
1176         res = gst_pad_query_convert (src->srcpad, seg_format,
1177             stop, format, &stop);
1178       } else {
1179         res = TRUE;
1180       }
1181       if (res && format != GST_FORMAT_PERCENT && start != -1)
1182         res = gst_pad_query_convert (src->srcpad, seg_format,
1183             start, format, &start);
1184
1185       gst_query_set_buffering_range (query, format, start, stop, estimated);
1186       break;
1187     }
1188     case GST_QUERY_SCHEDULING:
1189     {
1190       gboolean random_access;
1191
1192       random_access = gst_base_src_is_random_access (src);
1193
1194       /* we can operate in getrange mode if the native format is bytes
1195        * and we are seekable, this condition is set in the random_access
1196        * flag and is set in the _start() method. */
1197       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1198       if (random_access)
1199         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1200       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1201
1202       res = TRUE;
1203       break;
1204     }
1205     case GST_QUERY_CAPS:
1206     {
1207       GstBaseSrcClass *bclass;
1208       GstCaps *caps, *filter;
1209
1210       bclass = GST_BASE_SRC_GET_CLASS (src);
1211       if (bclass->get_caps) {
1212         gst_query_parse_caps (query, &filter);
1213         if ((caps = bclass->get_caps (src, filter))) {
1214           gst_query_set_caps_result (query, caps);
1215           gst_caps_unref (caps);
1216           res = TRUE;
1217         } else {
1218           res = FALSE;
1219         }
1220       } else
1221         res = FALSE;
1222       break;
1223     }
1224     case GST_QUERY_URI:{
1225       if (GST_IS_URI_HANDLER (src)) {
1226         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1227
1228         if (uri != NULL) {
1229           gst_query_set_uri (query, uri);
1230           g_free (uri);
1231           res = TRUE;
1232         } else {
1233           res = FALSE;
1234         }
1235       } else {
1236         res = FALSE;
1237       }
1238       break;
1239     }
1240     default:
1241       res = FALSE;
1242       break;
1243   }
1244   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1245       res);
1246
1247   return res;
1248 }
1249
1250 static gboolean
1251 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1252 {
1253   GstBaseSrc *src;
1254   GstBaseSrcClass *bclass;
1255   gboolean result = FALSE;
1256
1257   src = GST_BASE_SRC (parent);
1258   bclass = GST_BASE_SRC_GET_CLASS (src);
1259
1260   if (bclass->query)
1261     result = bclass->query (src, query);
1262
1263   return result;
1264 }
1265
1266 static gboolean
1267 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1268 {
1269   gboolean res = TRUE;
1270
1271   /* update our offset if the start/stop position was updated */
1272   if (segment->format == GST_FORMAT_BYTES) {
1273     segment->time = segment->start;
1274   } else if (segment->start == 0) {
1275     /* seek to start, we can implement a default for this. */
1276     segment->time = 0;
1277   } else {
1278     res = FALSE;
1279     GST_INFO_OBJECT (src, "Can't do a default seek");
1280   }
1281
1282   return res;
1283 }
1284
1285 static gboolean
1286 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1287 {
1288   GstBaseSrcClass *bclass;
1289   gboolean result = FALSE;
1290
1291   bclass = GST_BASE_SRC_GET_CLASS (src);
1292
1293   GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
1294
1295   if (bclass->do_seek)
1296     result = bclass->do_seek (src, segment);
1297
1298   return result;
1299 }
1300
1301 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1302
1303 static gboolean
1304 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1305     GstSegment * segment)
1306 {
1307   /* By default, we try one of 2 things:
1308    *   - For absolute seek positions, convert the requested position to our
1309    *     configured processing format and place it in the output segment \
1310    *   - For relative seek positions, convert our current (input) values to the
1311    *     seek format, adjust by the relative seek offset and then convert back to
1312    *     the processing format
1313    */
1314   GstSeekType start_type, stop_type;
1315   gint64 start, stop;
1316   GstSeekFlags flags;
1317   GstFormat seek_format, dest_format;
1318   gdouble rate;
1319   gboolean update;
1320   gboolean res = TRUE;
1321
1322   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1323       &start_type, &start, &stop_type, &stop);
1324   dest_format = segment->format;
1325
1326   if (seek_format == dest_format) {
1327     gst_segment_do_seek (segment, rate, seek_format, flags,
1328         start_type, start, stop_type, stop, &update);
1329     return TRUE;
1330   }
1331
1332   if (start_type != GST_SEEK_TYPE_NONE) {
1333     /* FIXME: Handle seek_end by converting the input segment vals */
1334     res =
1335         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1336         &start);
1337     start_type = GST_SEEK_TYPE_SET;
1338   }
1339
1340   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1341     /* FIXME: Handle seek_end by converting the input segment vals */
1342     res =
1343         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1344         &stop);
1345     stop_type = GST_SEEK_TYPE_SET;
1346   }
1347
1348   /* And finally, configure our output segment in the desired format */
1349   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1350       stop_type, stop, &update);
1351
1352   if (!res)
1353     goto no_format;
1354
1355   return res;
1356
1357 no_format:
1358   {
1359     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1360     return FALSE;
1361   }
1362 }
1363
1364 static gboolean
1365 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1366     GstSegment * seeksegment)
1367 {
1368   GstBaseSrcClass *bclass;
1369   gboolean result = FALSE;
1370
1371   bclass = GST_BASE_SRC_GET_CLASS (src);
1372
1373   if (bclass->prepare_seek_segment)
1374     result = bclass->prepare_seek_segment (src, event, seeksegment);
1375
1376   return result;
1377 }
1378
1379 static GstFlowReturn
1380 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1381     guint size, GstBuffer ** buffer)
1382 {
1383   GstFlowReturn ret;
1384   GstBaseSrcPrivate *priv = src->priv;
1385
1386   if (priv->pool) {
1387     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1388   } else if (size != -1) {
1389     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1390     if (G_UNLIKELY (*buffer == NULL))
1391       goto alloc_failed;
1392
1393     ret = GST_FLOW_OK;
1394   } else {
1395     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1396         size);
1397     goto alloc_failed;
1398   }
1399   return ret;
1400
1401   /* ERRORS */
1402 alloc_failed:
1403   {
1404     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1405     return GST_FLOW_ERROR;
1406   }
1407 }
1408
1409 static GstFlowReturn
1410 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1411     guint size, GstBuffer ** buffer)
1412 {
1413   GstBaseSrcClass *bclass;
1414   GstFlowReturn ret;
1415   GstBuffer *res_buf;
1416
1417   bclass = GST_BASE_SRC_GET_CLASS (src);
1418
1419   if (G_UNLIKELY (!bclass->alloc))
1420     goto no_function;
1421   if (G_UNLIKELY (!bclass->fill))
1422     goto no_function;
1423
1424   if (*buffer == NULL) {
1425     /* downstream did not provide us with a buffer to fill, allocate one
1426      * ourselves */
1427     ret = bclass->alloc (src, offset, size, &res_buf);
1428     if (G_UNLIKELY (ret != GST_FLOW_OK))
1429       goto alloc_failed;
1430   } else {
1431     res_buf = *buffer;
1432   }
1433
1434   if (G_LIKELY (size > 0)) {
1435     /* only call fill when there is a size */
1436     ret = bclass->fill (src, offset, size, res_buf);
1437     if (G_UNLIKELY (ret != GST_FLOW_OK))
1438       goto not_ok;
1439   }
1440
1441   *buffer = res_buf;
1442
1443   return GST_FLOW_OK;
1444
1445   /* ERRORS */
1446 no_function:
1447   {
1448     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1449     return GST_FLOW_NOT_SUPPORTED;
1450   }
1451 alloc_failed:
1452   {
1453     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1454     return ret;
1455   }
1456 not_ok:
1457   {
1458     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1459         gst_flow_get_name (ret));
1460     if (*buffer == NULL)
1461       gst_buffer_unref (res_buf);
1462     return ret;
1463   }
1464 }
1465
1466 /* this code implements the seeking. It is a good example
1467  * handling all cases.
1468  *
1469  * A seek updates the currently configured segment.start
1470  * and segment.stop values based on the SEEK_TYPE. If the
1471  * segment.start value is updated, a seek to this new position
1472  * should be performed.
1473  *
1474  * The seek can only be executed when we are not currently
1475  * streaming any data, to make sure that this is the case, we
1476  * acquire the STREAM_LOCK which is taken when we are in the
1477  * _loop() function or when a getrange() is called. Normally
1478  * we will not receive a seek if we are operating in pull mode
1479  * though. When we operate as a live source we might block on the live
1480  * cond, which does not release the STREAM_LOCK. Therefore we will try
1481  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1482  * safe to perform the seek.
1483  *
1484  * When we are in the loop() function, we might be in the middle
1485  * of pushing a buffer, which might block in a sink. To make sure
1486  * that the push gets unblocked we push out a FLUSH_START event.
1487  * Our loop function will get a FLUSHING return value from
1488  * the push and will pause, effectively releasing the STREAM_LOCK.
1489  *
1490  * For a non-flushing seek, we pause the task, which might eventually
1491  * release the STREAM_LOCK. We say eventually because when the sink
1492  * blocks on the sample we might wait a very long time until the sink
1493  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1494  * can continue the seek. A non-flushing seek is normally done in a
1495  * running pipeline to perform seamless playback, this means that the sink is
1496  * PLAYING and will return from its chain function.
1497  * In the case of a non-flushing seek we need to make sure that the
1498  * data we output after the seek is continuous with the previous data,
1499  * this is because a non-flushing seek does not reset the running-time
1500  * to 0. We do this by closing the currently running segment, ie. sending
1501  * a new_segment event with the stop position set to the last processed
1502  * position.
1503  *
1504  * After updating the segment.start/stop values, we prepare for
1505  * streaming again. We push out a FLUSH_STOP to make the peer pad
1506  * accept data again and we start our task again.
1507  *
1508  * A segment seek posts a message on the bus saying that the playback
1509  * of the segment started. We store the segment flag internally because
1510  * when we reach the segment.stop we have to post a segment.done
1511  * instead of EOS when doing a segment seek.
1512  */
1513 static gboolean
1514 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1515 {
1516   gboolean res = TRUE, tres;
1517   gdouble rate;
1518   GstFormat seek_format, dest_format;
1519   GstSeekFlags flags;
1520   GstSeekType start_type, stop_type;
1521   gint64 start, stop;
1522   gboolean flush, playing;
1523   gboolean update;
1524   gboolean relative_seek = FALSE;
1525   gboolean seekseg_configured = FALSE;
1526   GstSegment seeksegment;
1527   guint32 seqnum;
1528   GstEvent *tevent;
1529
1530   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1531
1532   GST_OBJECT_LOCK (src);
1533   dest_format = src->segment.format;
1534   GST_OBJECT_UNLOCK (src);
1535
1536   if (event) {
1537     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1538         &start_type, &start, &stop_type, &stop);
1539
1540     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1541         SEEK_TYPE_IS_RELATIVE (stop_type);
1542
1543     if (dest_format != seek_format && !relative_seek) {
1544       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1545        * here before taking the stream lock, otherwise we must convert it later,
1546        * once we have the stream lock and can read the last configures segment
1547        * start and stop positions */
1548       gst_segment_init (&seeksegment, dest_format);
1549
1550       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1551         goto prepare_failed;
1552
1553       seekseg_configured = TRUE;
1554     }
1555
1556     flush = flags & GST_SEEK_FLAG_FLUSH;
1557     seqnum = gst_event_get_seqnum (event);
1558   } else {
1559     flush = FALSE;
1560     /* get next seqnum */
1561     seqnum = gst_util_seqnum_next ();
1562   }
1563
1564   /* send flush start */
1565   if (flush) {
1566     tevent = gst_event_new_flush_start ();
1567     gst_event_set_seqnum (tevent, seqnum);
1568     gst_pad_push_event (src->srcpad, tevent);
1569   } else
1570     gst_pad_pause_task (src->srcpad);
1571
1572   /* unblock streaming thread. */
1573   if (unlock)
1574     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1575
1576   /* grab streaming lock, this should eventually be possible, either
1577    * because the task is paused, our streaming thread stopped
1578    * or because our peer is flushing. */
1579   GST_PAD_STREAM_LOCK (src->srcpad);
1580   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1581     /* we have seen this event before, issue a warning for now */
1582     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1583         seqnum);
1584   } else {
1585     src->priv->seqnum = seqnum;
1586     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1587   }
1588
1589   if (unlock)
1590     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1591
1592   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1593    * copy the current segment info into the temp segment that we can actually
1594    * attempt the seek with. We only update the real segment if the seek succeeds. */
1595   if (!seekseg_configured) {
1596     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1597
1598     /* now configure the final seek segment */
1599     if (event) {
1600       if (seeksegment.format != seek_format) {
1601         /* OK, here's where we give the subclass a chance to convert the relative
1602          * seek into an absolute one in the processing format. We set up any
1603          * absolute seek above, before taking the stream lock. */
1604         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1605           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1606               "Aborting seek");
1607           res = FALSE;
1608         }
1609       } else {
1610         /* The seek format matches our processing format, no need to ask the
1611          * the subclass to configure the segment. */
1612         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1613             start_type, start, stop_type, stop, &update);
1614       }
1615     }
1616     /* Else, no seek event passed, so we're just (re)starting the
1617        current segment. */
1618   }
1619
1620   if (res) {
1621     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1622         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1623         seeksegment.start, seeksegment.stop, seeksegment.position);
1624
1625     /* do the seek, segment.position contains the new position. */
1626     res = gst_base_src_do_seek (src, &seeksegment);
1627   }
1628
1629   /* and prepare to continue streaming */
1630   if (flush) {
1631     tevent = gst_event_new_flush_stop (TRUE);
1632     gst_event_set_seqnum (tevent, seqnum);
1633     /* send flush stop, peer will accept data and events again. We
1634      * are not yet providing data as we still have the STREAM_LOCK. */
1635     gst_pad_push_event (src->srcpad, tevent);
1636   }
1637
1638   /* The subclass must have converted the segment to the processing format
1639    * by now */
1640   if (res && seeksegment.format != dest_format) {
1641     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1642         "in the correct format. Aborting seek.");
1643     res = FALSE;
1644   }
1645
1646   /* if the seek was successful, we update our real segment and push
1647    * out the new segment. */
1648   if (res) {
1649     GST_OBJECT_LOCK (src);
1650     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1651     GST_OBJECT_UNLOCK (src);
1652
1653     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1654       GstMessage *message;
1655
1656       message = gst_message_new_segment_start (GST_OBJECT (src),
1657           seeksegment.format, seeksegment.position);
1658       gst_message_set_seqnum (message, seqnum);
1659
1660       gst_element_post_message (GST_ELEMENT (src), message);
1661     }
1662
1663     /* for deriving a stop position for the playback segment from the seek
1664      * segment, we must take the duration when the stop is not set */
1665     /* FIXME: This is never used below */
1666     if ((stop = seeksegment.stop) == -1)
1667       stop = seeksegment.duration;
1668
1669     src->priv->segment_pending = TRUE;
1670   }
1671
1672   src->priv->discont = TRUE;
1673   src->running = TRUE;
1674   /* and restart the task in case it got paused explicitly or by
1675    * the FLUSH_START event we pushed out. */
1676   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1677       src->srcpad, NULL);
1678   if (res && !tres)
1679     res = FALSE;
1680
1681   /* and release the lock again so we can continue streaming */
1682   GST_PAD_STREAM_UNLOCK (src->srcpad);
1683
1684   return res;
1685
1686   /* ERROR */
1687 prepare_failed:
1688   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1689       "Aborting seek");
1690   return FALSE;
1691 }
1692
1693 /* all events send to this element directly. This is mainly done from the
1694  * application.
1695  */
1696 static gboolean
1697 gst_base_src_send_event (GstElement * element, GstEvent * event)
1698 {
1699   GstBaseSrc *src;
1700   gboolean result = FALSE;
1701   GstBaseSrcClass *bclass;
1702
1703   src = GST_BASE_SRC (element);
1704   bclass = GST_BASE_SRC_GET_CLASS (src);
1705
1706   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1707
1708   switch (GST_EVENT_TYPE (event)) {
1709       /* bidirectional events */
1710     case GST_EVENT_FLUSH_START:
1711       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1712       result = gst_pad_push_event (src->srcpad, event);
1713       /* also unblock the create function */
1714       gst_base_src_activate_pool (src, FALSE);
1715       /* unlock any subclasses, we need to do this before grabbing the
1716        * LIVE_LOCK since we hold this lock before going into ::create. We pass an
1717        * unlock to the params because of backwards compat (see seek handler)*/
1718       if (bclass->unlock)
1719         bclass->unlock (src);
1720
1721       /* the live lock is released when we are blocked, waiting for playing or
1722        * when we sync to the clock. */
1723       GST_LIVE_LOCK (src);
1724       src->priv->flushing = TRUE;
1725       /* clear pending EOS if any */
1726       g_atomic_int_set (&src->priv->pending_eos, FALSE);
1727       if (bclass->unlock_stop)
1728         bclass->unlock_stop (src);
1729       if (src->clock_id)
1730         gst_clock_id_unschedule (src->clock_id);
1731       GST_DEBUG_OBJECT (src, "signal");
1732       GST_LIVE_SIGNAL (src);
1733       GST_LIVE_UNLOCK (src);
1734       event = NULL;
1735       break;
1736     case GST_EVENT_FLUSH_STOP:
1737     {
1738       gboolean start;
1739
1740       GST_LIVE_LOCK (src);
1741       src->priv->segment_pending = TRUE;
1742       src->priv->flushing = FALSE;
1743       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1744       result = gst_pad_push_event (src->srcpad, event);
1745
1746       gst_base_src_activate_pool (src, TRUE);
1747
1748       GST_OBJECT_LOCK (src->srcpad);
1749       start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
1750       GST_OBJECT_UNLOCK (src->srcpad);
1751       if (start)
1752         gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1753             src->srcpad, NULL);
1754       GST_LIVE_UNLOCK (src);
1755       event = NULL;
1756       break;
1757     }
1758
1759       /* downstream serialized events */
1760     case GST_EVENT_EOS:
1761     {
1762       /* queue EOS and make sure the task or pull function performs the EOS
1763        * actions.
1764        *
1765        * We have two possibilities:
1766        *
1767        *  - Before we are to enter the _create function, we check the pending_eos
1768        *    first and do EOS instead of entering it.
1769        *  - If we are in the _create function or we did not manage to set the
1770        *    flag fast enough and we are about to enter the _create function,
1771        *    we unlock it so that we exit with FLUSHING immediately. We then
1772        *    check the EOS flag and do the EOS logic.
1773        */
1774       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1775       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1776
1777
1778       /* unlock the _create function so that we can check the pending_eos flag
1779        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1780        * that we can grab it and stop the unlock again. We don't take the stream
1781        * lock so that this operation is guaranteed to never block. */
1782       gst_base_src_activate_pool (src, FALSE);
1783       if (bclass->unlock)
1784         bclass->unlock (src);
1785
1786       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1787
1788       GST_LIVE_LOCK (src);
1789       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1790       /* now stop the unlock of the streaming thread again. Grabbing the live
1791        * lock is enough because that protects the create function. */
1792       if (bclass->unlock_stop)
1793         bclass->unlock_stop (src);
1794       gst_base_src_activate_pool (src, TRUE);
1795       GST_LIVE_UNLOCK (src);
1796
1797       result = TRUE;
1798       break;
1799     }
1800     case GST_EVENT_SEGMENT:
1801       /* sending random SEGMENT downstream can break sync. */
1802       break;
1803     case GST_EVENT_TAG:
1804     case GST_EVENT_CUSTOM_DOWNSTREAM:
1805     case GST_EVENT_CUSTOM_BOTH:
1806       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1807       GST_OBJECT_LOCK (src);
1808       src->priv->pending_events =
1809           g_list_append (src->priv->pending_events, event);
1810       g_atomic_int_set (&src->priv->have_events, TRUE);
1811       GST_OBJECT_UNLOCK (src);
1812       event = NULL;
1813       result = TRUE;
1814       break;
1815     case GST_EVENT_BUFFERSIZE:
1816       /* does not seem to make much sense currently */
1817       break;
1818
1819       /* upstream events */
1820     case GST_EVENT_QOS:
1821       /* elements should override send_event and do something */
1822       break;
1823     case GST_EVENT_SEEK:
1824     {
1825       gboolean started;
1826
1827       GST_OBJECT_LOCK (src->srcpad);
1828       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1829         goto wrong_mode;
1830       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1831       GST_OBJECT_UNLOCK (src->srcpad);
1832
1833       if (started) {
1834         GST_DEBUG_OBJECT (src, "performing seek");
1835         /* when we are running in push mode, we can execute the
1836          * seek right now. */
1837         result = gst_base_src_perform_seek (src, event, TRUE);
1838       } else {
1839         GstEvent **event_p;
1840
1841         /* else we store the event and execute the seek when we
1842          * get activated */
1843         GST_OBJECT_LOCK (src);
1844         GST_DEBUG_OBJECT (src, "queueing seek");
1845         event_p = &src->pending_seek;
1846         gst_event_replace ((GstEvent **) event_p, event);
1847         GST_OBJECT_UNLOCK (src);
1848         /* assume the seek will work */
1849         result = TRUE;
1850       }
1851       break;
1852     }
1853     case GST_EVENT_NAVIGATION:
1854       /* could make sense for elements that do something with navigation events
1855        * but then they would need to override the send_event function */
1856       break;
1857     case GST_EVENT_LATENCY:
1858       /* does not seem to make sense currently */
1859       break;
1860
1861       /* custom events */
1862     case GST_EVENT_CUSTOM_UPSTREAM:
1863       /* override send_event if you want this */
1864       break;
1865     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1866     case GST_EVENT_CUSTOM_BOTH_OOB:
1867       /* insert a random custom event into the pipeline */
1868       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1869       result = gst_pad_push_event (src->srcpad, event);
1870       /* we gave away the ref to the event in the push */
1871       event = NULL;
1872       break;
1873     default:
1874       break;
1875   }
1876 done:
1877   /* if we still have a ref to the event, unref it now */
1878   if (event)
1879     gst_event_unref (event);
1880
1881   return result;
1882
1883   /* ERRORS */
1884 wrong_mode:
1885   {
1886     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1887     GST_OBJECT_UNLOCK (src->srcpad);
1888     result = FALSE;
1889     goto done;
1890   }
1891 }
1892
1893 static gboolean
1894 gst_base_src_seekable (GstBaseSrc * src)
1895 {
1896   GstBaseSrcClass *bclass;
1897   bclass = GST_BASE_SRC_GET_CLASS (src);
1898   if (bclass->is_seekable)
1899     return bclass->is_seekable (src);
1900   else
1901     return FALSE;
1902 }
1903
1904 static void
1905 gst_base_src_update_qos (GstBaseSrc * src,
1906     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1907 {
1908   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1909       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1910       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1911
1912   GST_OBJECT_LOCK (src);
1913   src->priv->proportion = proportion;
1914   src->priv->earliest_time = timestamp + diff;
1915   GST_OBJECT_UNLOCK (src);
1916 }
1917
1918
1919 static gboolean
1920 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1921 {
1922   gboolean result;
1923
1924   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1925
1926   switch (GST_EVENT_TYPE (event)) {
1927     case GST_EVENT_SEEK:
1928       /* is normally called when in push mode */
1929       if (!gst_base_src_seekable (src))
1930         goto not_seekable;
1931
1932       result = gst_base_src_perform_seek (src, event, TRUE);
1933       break;
1934     case GST_EVENT_FLUSH_START:
1935       /* cancel any blocking getrange, is normally called
1936        * when in pull mode. */
1937       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1938       break;
1939     case GST_EVENT_FLUSH_STOP:
1940       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1941       break;
1942     case GST_EVENT_QOS:
1943     {
1944       gdouble proportion;
1945       GstClockTimeDiff diff;
1946       GstClockTime timestamp;
1947
1948       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1949       gst_base_src_update_qos (src, proportion, diff, timestamp);
1950       result = TRUE;
1951       break;
1952     }
1953     case GST_EVENT_RECONFIGURE:
1954       result = TRUE;
1955       break;
1956     case GST_EVENT_LATENCY:
1957       result = TRUE;
1958       break;
1959     default:
1960       result = FALSE;
1961       break;
1962   }
1963   return result;
1964
1965   /* ERRORS */
1966 not_seekable:
1967   {
1968     GST_DEBUG_OBJECT (src, "is not seekable");
1969     return FALSE;
1970   }
1971 }
1972
1973 static gboolean
1974 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1975 {
1976   GstBaseSrc *src;
1977   GstBaseSrcClass *bclass;
1978   gboolean result = FALSE;
1979
1980   src = GST_BASE_SRC (parent);
1981   bclass = GST_BASE_SRC_GET_CLASS (src);
1982
1983   if (bclass->event) {
1984     if (!(result = bclass->event (src, event)))
1985       goto subclass_failed;
1986   }
1987
1988 done:
1989   gst_event_unref (event);
1990
1991   return result;
1992
1993   /* ERRORS */
1994 subclass_failed:
1995   {
1996     GST_DEBUG_OBJECT (src, "subclass refused event");
1997     goto done;
1998   }
1999 }
2000
2001 static void
2002 gst_base_src_set_property (GObject * object, guint prop_id,
2003     const GValue * value, GParamSpec * pspec)
2004 {
2005   GstBaseSrc *src;
2006
2007   src = GST_BASE_SRC (object);
2008
2009   switch (prop_id) {
2010     case PROP_BLOCKSIZE:
2011       gst_base_src_set_blocksize (src, g_value_get_uint (value));
2012       break;
2013     case PROP_NUM_BUFFERS:
2014       src->num_buffers = g_value_get_int (value);
2015       break;
2016     case PROP_TYPEFIND:
2017       src->typefind = g_value_get_boolean (value);
2018       break;
2019     case PROP_DO_TIMESTAMP:
2020       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
2021       break;
2022     default:
2023       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2024       break;
2025   }
2026 }
2027
2028 static void
2029 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
2030     GParamSpec * pspec)
2031 {
2032   GstBaseSrc *src;
2033
2034   src = GST_BASE_SRC (object);
2035
2036   switch (prop_id) {
2037     case PROP_BLOCKSIZE:
2038       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2039       break;
2040     case PROP_NUM_BUFFERS:
2041       g_value_set_int (value, src->num_buffers);
2042       break;
2043     case PROP_TYPEFIND:
2044       g_value_set_boolean (value, src->typefind);
2045       break;
2046     case PROP_DO_TIMESTAMP:
2047       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2048       break;
2049     default:
2050       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2051       break;
2052   }
2053 }
2054
2055 /* with STREAM_LOCK and LOCK */
2056 static GstClockReturn
2057 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2058 {
2059   GstClockReturn ret;
2060   GstClockID id;
2061
2062   id = gst_clock_new_single_shot_id (clock, time);
2063
2064   basesrc->clock_id = id;
2065   /* release the live lock while waiting */
2066   GST_LIVE_UNLOCK (basesrc);
2067
2068   ret = gst_clock_id_wait (id, NULL);
2069
2070   GST_LIVE_LOCK (basesrc);
2071   gst_clock_id_unref (id);
2072   basesrc->clock_id = NULL;
2073
2074   return ret;
2075 }
2076
2077 /* perform synchronisation on a buffer.
2078  * with STREAM_LOCK.
2079  */
2080 static GstClockReturn
2081 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2082 {
2083   GstClockReturn result;
2084   GstClockTime start, end;
2085   GstBaseSrcClass *bclass;
2086   GstClockTime base_time;
2087   GstClock *clock;
2088   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2089   gboolean do_timestamp, first, pseudo_live, is_live;
2090
2091   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2092
2093   start = end = -1;
2094   if (bclass->get_times)
2095     bclass->get_times (basesrc, buffer, &start, &end);
2096
2097   /* get buffer timestamp */
2098   dts = GST_BUFFER_DTS (buffer);
2099   pts = GST_BUFFER_PTS (buffer);
2100
2101   if (GST_CLOCK_TIME_IS_VALID (dts))
2102     timestamp = dts;
2103   else
2104     timestamp = pts;
2105
2106   /* grab the lock to prepare for clocking and calculate the startup
2107    * latency. */
2108   GST_OBJECT_LOCK (basesrc);
2109
2110   is_live = basesrc->is_live;
2111   /* if we are asked to sync against the clock we are a pseudo live element */
2112   pseudo_live = (start != -1 && is_live);
2113   /* check for the first buffer */
2114   first = (basesrc->priv->latency == -1);
2115
2116   if (timestamp != -1 && pseudo_live) {
2117     GstClockTime latency;
2118
2119     /* we have a timestamp and a sync time, latency is the diff */
2120     if (timestamp <= start)
2121       latency = start - timestamp;
2122     else
2123       latency = 0;
2124
2125     if (first) {
2126       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2127           GST_TIME_ARGS (latency));
2128       /* first time we calculate latency, just configure */
2129       basesrc->priv->latency = latency;
2130     } else {
2131       if (basesrc->priv->latency != latency) {
2132         /* we have a new latency, FIXME post latency message */
2133         basesrc->priv->latency = latency;
2134         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2135             GST_TIME_ARGS (latency));
2136       }
2137     }
2138   } else if (first) {
2139     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2140         is_live, start != -1);
2141     basesrc->priv->latency = 0;
2142   }
2143
2144   /* get clock, if no clock, we can't sync or do timestamps */
2145   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2146     goto no_clock;
2147   else
2148     gst_object_ref (clock);
2149
2150   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2151
2152   do_timestamp = basesrc->priv->do_timestamp;
2153   GST_OBJECT_UNLOCK (basesrc);
2154
2155   /* first buffer, calculate the timestamp offset */
2156   if (first) {
2157     GstClockTime running_time;
2158
2159     now = gst_clock_get_time (clock);
2160     running_time = now - base_time;
2161
2162     GST_LOG_OBJECT (basesrc,
2163         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2164         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2165         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2166
2167     if (pseudo_live && timestamp != -1) {
2168       /* live source and we need to sync, add startup latency to all timestamps
2169        * to get the real running_time. Live sources should always timestamp
2170        * according to the current running time. */
2171       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2172
2173       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2174           GST_TIME_ARGS (basesrc->priv->ts_offset));
2175     } else {
2176       basesrc->priv->ts_offset = 0;
2177       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2178     }
2179
2180     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2181       if (do_timestamp) {
2182         dts = running_time;
2183       } else {
2184         dts = 0;
2185       }
2186       GST_BUFFER_DTS (buffer) = dts;
2187
2188       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2189           GST_TIME_ARGS (dts));
2190     }
2191   } else {
2192     /* not the first buffer, the timestamp is the diff between the clock and
2193      * base_time */
2194     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2195       now = gst_clock_get_time (clock);
2196
2197       dts = now - base_time;
2198       GST_BUFFER_DTS (buffer) = dts;
2199
2200       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2201           GST_TIME_ARGS (dts));
2202     }
2203   }
2204   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2205     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2206       pts = dts;
2207
2208     GST_BUFFER_PTS (buffer) = dts;
2209
2210     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2211         GST_TIME_ARGS (pts));
2212   }
2213
2214   /* if we don't have a buffer timestamp, we don't sync */
2215   if (!GST_CLOCK_TIME_IS_VALID (start))
2216     goto no_sync;
2217
2218   if (is_live) {
2219     /* for pseudo live sources, add our ts_offset to the timestamp */
2220     if (GST_CLOCK_TIME_IS_VALID (pts))
2221       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2222     if (GST_CLOCK_TIME_IS_VALID (dts))
2223       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2224     start += basesrc->priv->ts_offset;
2225   }
2226
2227   GST_LOG_OBJECT (basesrc,
2228       "waiting for clock, base time %" GST_TIME_FORMAT
2229       ", stream_start %" GST_TIME_FORMAT,
2230       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2231
2232   result = gst_base_src_wait (basesrc, clock, start + base_time);
2233
2234   gst_object_unref (clock);
2235
2236   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2237
2238   return result;
2239
2240   /* special cases */
2241 no_clock:
2242   {
2243     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2244     GST_OBJECT_UNLOCK (basesrc);
2245     return GST_CLOCK_OK;
2246   }
2247 no_sync:
2248   {
2249     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2250     gst_object_unref (clock);
2251     return GST_CLOCK_OK;
2252   }
2253 }
2254
2255 /* Called with STREAM_LOCK and LIVE_LOCK */
2256 static gboolean
2257 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
2258     gboolean force)
2259 {
2260   guint64 size, maxsize;
2261   GstBaseSrcClass *bclass;
2262   GstFormat format;
2263   gint64 stop;
2264
2265   bclass = GST_BASE_SRC_GET_CLASS (src);
2266
2267   format = src->segment.format;
2268   stop = src->segment.stop;
2269   /* get total file size */
2270   size = src->segment.duration;
2271
2272   /* only operate if we are working with bytes */
2273   if (format != GST_FORMAT_BYTES)
2274     return TRUE;
2275
2276   /* the max amount of bytes to read is the total size or
2277    * up to the segment.stop if present. */
2278   if (stop != -1)
2279     maxsize = MIN (size, stop);
2280   else
2281     maxsize = size;
2282
2283   GST_DEBUG_OBJECT (src,
2284       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2285       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2286       *length, size, stop, maxsize);
2287
2288   /* check size if we have one */
2289   if (maxsize != -1) {
2290     /* if we run past the end, check if the file became bigger and
2291      * retry. */
2292     if (G_UNLIKELY (offset + *length >= maxsize || force)) {
2293       /* see if length of the file changed */
2294       if (bclass->get_size)
2295         if (!bclass->get_size (src, &size))
2296           size = -1;
2297
2298       /* make sure we don't exceed the configured segment stop
2299        * if it was set */
2300       if (stop != -1)
2301         maxsize = MIN (size, stop);
2302       else
2303         maxsize = size;
2304
2305       /* if we are at or past the end, EOS */
2306       if (G_UNLIKELY (offset >= maxsize))
2307         goto unexpected_length;
2308
2309       /* else we can clip to the end */
2310       if (G_UNLIKELY (offset + *length >= maxsize))
2311         *length = maxsize - offset;
2312
2313     }
2314   }
2315
2316   /* keep track of current duration.
2317    * segment is in bytes, we checked that above. */
2318   GST_OBJECT_LOCK (src);
2319   src->segment.duration = size;
2320   GST_OBJECT_UNLOCK (src);
2321
2322   return TRUE;
2323
2324   /* ERRORS */
2325 unexpected_length:
2326   {
2327     return FALSE;
2328   }
2329 }
2330
2331 /* must be called with LIVE_LOCK */
2332 static GstFlowReturn
2333 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2334     GstBuffer ** buf)
2335 {
2336   GstFlowReturn ret;
2337   GstBaseSrcClass *bclass;
2338   GstClockReturn status;
2339   GstBuffer *res_buf;
2340   GstBuffer *in_buf;
2341
2342   bclass = GST_BASE_SRC_GET_CLASS (src);
2343
2344 again:
2345   if (src->is_live) {
2346     if (G_UNLIKELY (!src->live_running)) {
2347       ret = gst_base_src_wait_playing (src);
2348       if (ret != GST_FLOW_OK)
2349         goto stopped;
2350     }
2351   }
2352
2353   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2354           && !GST_BASE_SRC_IS_STARTING (src)))
2355     goto not_started;
2356
2357   if (G_UNLIKELY (!bclass->create))
2358     goto no_function;
2359
2360   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE)))
2361     goto unexpected_length;
2362
2363   /* track position */
2364   GST_OBJECT_LOCK (src);
2365   if (src->segment.format == GST_FORMAT_BYTES)
2366     src->segment.position = offset;
2367   GST_OBJECT_UNLOCK (src);
2368
2369   /* normally we don't count buffers */
2370   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2371     if (src->num_buffers_left == 0)
2372       goto reached_num_buffers;
2373     else
2374       src->num_buffers_left--;
2375   }
2376
2377   /* don't enter the create function if a pending EOS event was set. For the
2378    * logic of the pending_eos, check the event function of this class. */
2379   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2380     goto eos;
2381
2382   GST_DEBUG_OBJECT (src,
2383       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2384       G_GINT64_FORMAT, offset, length, src->segment.time);
2385
2386   res_buf = in_buf = *buf;
2387
2388   ret = bclass->create (src, offset, length, &res_buf);
2389
2390   /* The create function could be unlocked because we have a pending EOS. It's
2391    * possible that we have a valid buffer from create that we need to
2392    * discard when the create function returned _OK. */
2393   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2394     if (ret == GST_FLOW_OK) {
2395       if (*buf == NULL)
2396         gst_buffer_unref (res_buf);
2397     }
2398     goto eos;
2399   }
2400
2401   if (G_UNLIKELY (ret != GST_FLOW_OK))
2402     goto not_ok;
2403
2404   /* fallback in case the create function didn't fill a provided buffer */
2405   if (in_buf != NULL && res_buf != in_buf) {
2406     GstMapInfo info;
2407     gsize copied_size;
2408
2409     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2410         "fill the provided buffer, copying");
2411
2412     if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
2413       goto map_failed;
2414
2415     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2416     gst_buffer_unmap (in_buf, &info);
2417     gst_buffer_set_size (in_buf, copied_size);
2418
2419     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2420
2421     gst_buffer_unref (res_buf);
2422     res_buf = in_buf;
2423   }
2424
2425   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2426   if (offset == 0 && src->segment.time == 0
2427       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2428     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2429     res_buf = gst_buffer_make_writable (res_buf);
2430     GST_BUFFER_DTS (res_buf) = 0;
2431   }
2432
2433   /* now sync before pushing the buffer */
2434   status = gst_base_src_do_sync (src, res_buf);
2435
2436   /* waiting for the clock could have made us flushing */
2437   if (G_UNLIKELY (src->priv->flushing))
2438     goto flushing;
2439
2440   switch (status) {
2441     case GST_CLOCK_EARLY:
2442       /* the buffer is too late. We currently don't drop the buffer. */
2443       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2444       break;
2445     case GST_CLOCK_OK:
2446       /* buffer synchronised properly */
2447       GST_DEBUG_OBJECT (src, "buffer ok");
2448       break;
2449     case GST_CLOCK_UNSCHEDULED:
2450       /* this case is triggered when we were waiting for the clock and
2451        * it got unlocked because we did a state change. In any case, get rid of
2452        * the buffer. */
2453       if (*buf == NULL)
2454         gst_buffer_unref (res_buf);
2455
2456       if (!src->live_running) {
2457         /* We return FLUSHING when we are not running to stop the dataflow also
2458          * get rid of the produced buffer. */
2459         GST_DEBUG_OBJECT (src,
2460             "clock was unscheduled (%d), returning FLUSHING", status);
2461         ret = GST_FLOW_FLUSHING;
2462       } else {
2463         /* If we are running when this happens, we quickly switched between
2464          * pause and playing. We try to produce a new buffer */
2465         GST_DEBUG_OBJECT (src,
2466             "clock was unscheduled (%d), but we are running", status);
2467         goto again;
2468       }
2469       break;
2470     default:
2471       /* all other result values are unexpected and errors */
2472       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2473           (_("Internal clock error.")),
2474           ("clock returned unexpected return value %d", status));
2475       if (*buf == NULL)
2476         gst_buffer_unref (res_buf);
2477       ret = GST_FLOW_ERROR;
2478       break;
2479   }
2480   if (G_LIKELY (ret == GST_FLOW_OK))
2481     *buf = res_buf;
2482
2483   return ret;
2484
2485   /* ERROR */
2486 stopped:
2487   {
2488     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2489         gst_flow_get_name (ret));
2490     return ret;
2491   }
2492 not_ok:
2493   {
2494     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2495         gst_flow_get_name (ret));
2496     return ret;
2497   }
2498 map_failed:
2499   {
2500     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
2501         (_("Failed to map buffer.")),
2502         ("failed to map result buffer in WRITE mode"));
2503     if (*buf == NULL)
2504       gst_buffer_unref (res_buf);
2505     return GST_FLOW_ERROR;
2506   }
2507 not_started:
2508   {
2509     GST_DEBUG_OBJECT (src, "getrange but not started");
2510     return GST_FLOW_FLUSHING;
2511   }
2512 no_function:
2513   {
2514     GST_DEBUG_OBJECT (src, "no create function");
2515     return GST_FLOW_NOT_SUPPORTED;
2516   }
2517 unexpected_length:
2518   {
2519     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2520         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2521     return GST_FLOW_EOS;
2522   }
2523 reached_num_buffers:
2524   {
2525     GST_DEBUG_OBJECT (src, "sent all buffers");
2526     return GST_FLOW_EOS;
2527   }
2528 flushing:
2529   {
2530     GST_DEBUG_OBJECT (src, "we are flushing");
2531     if (*buf == NULL)
2532       gst_buffer_unref (res_buf);
2533     return GST_FLOW_FLUSHING;
2534   }
2535 eos:
2536   {
2537     GST_DEBUG_OBJECT (src, "we are EOS");
2538     return GST_FLOW_EOS;
2539   }
2540 }
2541
2542 static GstFlowReturn
2543 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2544     guint length, GstBuffer ** buf)
2545 {
2546   GstBaseSrc *src;
2547   GstFlowReturn res;
2548
2549   src = GST_BASE_SRC_CAST (parent);
2550
2551   GST_LIVE_LOCK (src);
2552   if (G_UNLIKELY (src->priv->flushing))
2553     goto flushing;
2554
2555   res = gst_base_src_get_range (src, offset, length, buf);
2556
2557 done:
2558   GST_LIVE_UNLOCK (src);
2559
2560   return res;
2561
2562   /* ERRORS */
2563 flushing:
2564   {
2565     GST_DEBUG_OBJECT (src, "we are flushing");
2566     res = GST_FLOW_FLUSHING;
2567     goto done;
2568   }
2569 }
2570
2571 static gboolean
2572 gst_base_src_is_random_access (GstBaseSrc * src)
2573 {
2574   /* we need to start the basesrc to check random access */
2575   if (!GST_BASE_SRC_IS_STARTED (src)) {
2576     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2577     if (G_LIKELY (gst_base_src_start (src))) {
2578       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2579         goto start_failed;
2580       gst_base_src_stop (src);
2581     }
2582   }
2583
2584   return src->random_access;
2585
2586   /* ERRORS */
2587 start_failed:
2588   {
2589     GST_DEBUG_OBJECT (src, "failed to start");
2590     return FALSE;
2591   }
2592 }
2593
2594 static void
2595 gst_base_src_loop (GstPad * pad)
2596 {
2597   GstBaseSrc *src;
2598   GstBuffer *buf = NULL;
2599   GstFlowReturn ret;
2600   gint64 position;
2601   gboolean eos;
2602   guint blocksize;
2603   GList *pending_events = NULL, *tmp;
2604
2605   eos = FALSE;
2606
2607   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2608
2609   /* Just leave immediately if we're flushing */
2610   GST_LIVE_LOCK (src);
2611   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2612     goto flushing;
2613   GST_LIVE_UNLOCK (src);
2614
2615   gst_base_src_send_stream_start (src);
2616
2617   /* The stream-start event could've caused something to flush us */
2618   GST_LIVE_LOCK (src);
2619   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2620     goto flushing;
2621   GST_LIVE_UNLOCK (src);
2622
2623   /* check if we need to renegotiate */
2624   if (gst_pad_check_reconfigure (pad)) {
2625     if (!gst_base_src_negotiate (src)) {
2626       gst_pad_mark_reconfigure (pad);
2627       if (GST_PAD_IS_FLUSHING (pad))
2628         goto flushing;
2629       else
2630         goto negotiate_failed;
2631     }
2632   }
2633
2634   GST_LIVE_LOCK (src);
2635
2636   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2637     goto flushing;
2638
2639   blocksize = src->blocksize;
2640
2641   /* if we operate in bytes, we can calculate an offset */
2642   if (src->segment.format == GST_FORMAT_BYTES) {
2643     position = src->segment.position;
2644     /* for negative rates, start with subtracting the blocksize */
2645     if (src->segment.rate < 0.0) {
2646       /* we cannot go below segment.start */
2647       if (position > src->segment.start + blocksize)
2648         position -= blocksize;
2649       else {
2650         /* last block, remainder up to segment.start */
2651         blocksize = position - src->segment.start;
2652         position = src->segment.start;
2653       }
2654     }
2655   } else
2656     position = -1;
2657
2658   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2659       GST_TIME_ARGS (position), blocksize);
2660
2661   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2662   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2663     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2664         gst_flow_get_name (ret));
2665     GST_LIVE_UNLOCK (src);
2666     goto pause;
2667   }
2668   /* this should not happen */
2669   if (G_UNLIKELY (buf == NULL))
2670     goto null_buffer;
2671
2672   /* push events to close/start our segment before we push the buffer. */
2673   if (G_UNLIKELY (src->priv->segment_pending)) {
2674     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2675     src->priv->segment_pending = FALSE;
2676   }
2677
2678   if (g_atomic_int_get (&src->priv->have_events)) {
2679     GST_OBJECT_LOCK (src);
2680     /* take the events */
2681     pending_events = src->priv->pending_events;
2682     src->priv->pending_events = NULL;
2683     g_atomic_int_set (&src->priv->have_events, FALSE);
2684     GST_OBJECT_UNLOCK (src);
2685   }
2686
2687   /* Push out pending events if any */
2688   if (G_UNLIKELY (pending_events != NULL)) {
2689     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2690       GstEvent *ev = (GstEvent *) tmp->data;
2691       gst_pad_push_event (pad, ev);
2692     }
2693     g_list_free (pending_events);
2694   }
2695
2696   /* figure out the new position */
2697   switch (src->segment.format) {
2698     case GST_FORMAT_BYTES:
2699     {
2700       guint bufsize = gst_buffer_get_size (buf);
2701
2702       /* we subtracted above for negative rates */
2703       if (src->segment.rate >= 0.0)
2704         position += bufsize;
2705       break;
2706     }
2707     case GST_FORMAT_TIME:
2708     {
2709       GstClockTime start, duration;
2710
2711       start = GST_BUFFER_TIMESTAMP (buf);
2712       duration = GST_BUFFER_DURATION (buf);
2713
2714       if (GST_CLOCK_TIME_IS_VALID (start))
2715         position = start;
2716       else
2717         position = src->segment.position;
2718
2719       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2720         if (src->segment.rate >= 0.0)
2721           position += duration;
2722         else if (position > duration)
2723           position -= duration;
2724         else
2725           position = 0;
2726       }
2727       break;
2728     }
2729     case GST_FORMAT_DEFAULT:
2730       if (src->segment.rate >= 0.0)
2731         position = GST_BUFFER_OFFSET_END (buf);
2732       else
2733         position = GST_BUFFER_OFFSET (buf);
2734       break;
2735     default:
2736       position = -1;
2737       break;
2738   }
2739   if (position != -1) {
2740     if (src->segment.rate >= 0.0) {
2741       /* positive rate, check if we reached the stop */
2742       if (src->segment.stop != -1) {
2743         if (position >= src->segment.stop) {
2744           eos = TRUE;
2745           position = src->segment.stop;
2746         }
2747       }
2748     } else {
2749       /* negative rate, check if we reached the start. start is always set to
2750        * something different from -1 */
2751       if (position <= src->segment.start) {
2752         eos = TRUE;
2753         position = src->segment.start;
2754       }
2755       /* when going reverse, all buffers are DISCONT */
2756       src->priv->discont = TRUE;
2757     }
2758     GST_OBJECT_LOCK (src);
2759     src->segment.position = position;
2760     GST_OBJECT_UNLOCK (src);
2761   }
2762
2763   if (G_UNLIKELY (src->priv->discont)) {
2764     GST_INFO_OBJECT (src, "marking pending DISCONT");
2765     buf = gst_buffer_make_writable (buf);
2766     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2767     src->priv->discont = FALSE;
2768   }
2769   GST_LIVE_UNLOCK (src);
2770
2771   ret = gst_pad_push (pad, buf);
2772   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2773     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2774       goto not_negotiated;
2775     }
2776     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2777         gst_flow_get_name (ret));
2778     goto pause;
2779   }
2780
2781   if (G_UNLIKELY (eos)) {
2782     GST_INFO_OBJECT (src, "pausing after end of segment");
2783     ret = GST_FLOW_EOS;
2784     goto pause;
2785   }
2786
2787 done:
2788   return;
2789
2790   /* special cases */
2791 not_negotiated:
2792   {
2793     if (gst_pad_needs_reconfigure (pad)) {
2794       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2795       return;
2796     }
2797     /* fallthrough when push returns NOT_NEGOTIATED and we don't have
2798      * a pending negotiation request on our srcpad */
2799   }
2800 negotiate_failed:
2801   {
2802     GST_DEBUG_OBJECT (src, "Not negotiated");
2803     ret = GST_FLOW_NOT_NEGOTIATED;
2804     goto pause;
2805   }
2806 flushing:
2807   {
2808     GST_DEBUG_OBJECT (src, "we are flushing");
2809     GST_LIVE_UNLOCK (src);
2810     ret = GST_FLOW_FLUSHING;
2811     goto pause;
2812   }
2813 pause:
2814   {
2815     const gchar *reason = gst_flow_get_name (ret);
2816     GstEvent *event;
2817
2818     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2819     src->running = FALSE;
2820     gst_pad_pause_task (pad);
2821     if (ret == GST_FLOW_EOS) {
2822       gboolean flag_segment;
2823       GstFormat format;
2824       gint64 position;
2825
2826       /* perform EOS logic */
2827       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2828       format = src->segment.format;
2829       position = src->segment.position;
2830
2831       if (flag_segment) {
2832         GstMessage *message;
2833
2834         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2835             format, position);
2836         gst_message_set_seqnum (message, src->priv->seqnum);
2837         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2838         event = gst_event_new_segment_done (format, position);
2839         gst_event_set_seqnum (event, src->priv->seqnum);
2840         gst_pad_push_event (pad, event);
2841       } else {
2842         event = gst_event_new_eos ();
2843         gst_event_set_seqnum (event, src->priv->seqnum);
2844         gst_pad_push_event (pad, event);
2845       }
2846     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2847       event = gst_event_new_eos ();
2848       gst_event_set_seqnum (event, src->priv->seqnum);
2849       /* for fatal errors we post an error message, post the error
2850        * first so the app knows about the error first.
2851        * Also don't do this for FLUSHING because it happens
2852        * due to flushing and posting an error message because of
2853        * that is the wrong thing to do, e.g. when we're doing
2854        * a flushing seek. */
2855       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2856           (_("Internal data flow error.")),
2857           ("streaming task paused, reason %s (%d)", reason, ret));
2858       gst_pad_push_event (pad, event);
2859     }
2860     goto done;
2861   }
2862 null_buffer:
2863   {
2864     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2865         (_("Internal data flow error.")), ("element returned NULL buffer"));
2866     GST_LIVE_UNLOCK (src);
2867     goto done;
2868   }
2869 }
2870
2871 static gboolean
2872 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2873     GstAllocator * allocator, GstAllocationParams * params)
2874 {
2875   GstAllocator *oldalloc;
2876   GstBufferPool *oldpool;
2877   GstBaseSrcPrivate *priv = basesrc->priv;
2878
2879   if (pool) {
2880     GST_DEBUG_OBJECT (basesrc, "activate pool");
2881     if (!gst_buffer_pool_set_active (pool, TRUE))
2882       goto activate_failed;
2883   }
2884
2885   GST_OBJECT_LOCK (basesrc);
2886   oldpool = priv->pool;
2887   priv->pool = pool;
2888
2889   oldalloc = priv->allocator;
2890   priv->allocator = allocator;
2891
2892   if (params)
2893     priv->params = *params;
2894   else
2895     gst_allocation_params_init (&priv->params);
2896   GST_OBJECT_UNLOCK (basesrc);
2897
2898   if (oldpool) {
2899     /* only deactivate if the pool is not the one we're using */
2900     if (oldpool != pool) {
2901       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2902       gst_buffer_pool_set_active (oldpool, FALSE);
2903     }
2904     gst_object_unref (oldpool);
2905   }
2906   if (oldalloc) {
2907     gst_object_unref (oldalloc);
2908   }
2909   return TRUE;
2910
2911   /* ERRORS */
2912 activate_failed:
2913   {
2914     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2915     return FALSE;
2916   }
2917 }
2918
2919 static gboolean
2920 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2921 {
2922   GstBaseSrcPrivate *priv = basesrc->priv;
2923   GstBufferPool *pool;
2924   gboolean res = TRUE;
2925
2926   GST_OBJECT_LOCK (basesrc);
2927   if ((pool = priv->pool))
2928     pool = gst_object_ref (pool);
2929   GST_OBJECT_UNLOCK (basesrc);
2930
2931   if (pool) {
2932     res = gst_buffer_pool_set_active (pool, active);
2933     gst_object_unref (pool);
2934   }
2935   return res;
2936 }
2937
2938
2939 static gboolean
2940 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2941 {
2942   GstCaps *outcaps;
2943   GstBufferPool *pool;
2944   guint size, min, max;
2945   GstAllocator *allocator;
2946   GstAllocationParams params;
2947   GstStructure *config;
2948   gboolean update_allocator;
2949
2950   gst_query_parse_allocation (query, &outcaps, NULL);
2951
2952   /* we got configuration from our peer or the decide_allocation method,
2953    * parse them */
2954   if (gst_query_get_n_allocation_params (query) > 0) {
2955     /* try the allocator */
2956     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2957     update_allocator = TRUE;
2958   } else {
2959     allocator = NULL;
2960     gst_allocation_params_init (&params);
2961     update_allocator = FALSE;
2962   }
2963
2964   if (gst_query_get_n_allocation_pools (query) > 0) {
2965     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2966
2967     if (pool == NULL) {
2968       /* no pool, we can make our own */
2969       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2970       pool = gst_buffer_pool_new ();
2971     }
2972   } else {
2973     pool = NULL;
2974     size = min = max = 0;
2975   }
2976
2977   /* now configure */
2978   if (pool) {
2979     config = gst_buffer_pool_get_config (pool);
2980     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2981     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2982     gst_buffer_pool_set_config (pool, config);
2983   }
2984
2985   if (update_allocator)
2986     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2987   else
2988     gst_query_add_allocation_param (query, allocator, &params);
2989   if (allocator)
2990     gst_object_unref (allocator);
2991
2992   if (pool) {
2993     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2994     gst_object_unref (pool);
2995   }
2996
2997   return TRUE;
2998 }
2999
3000 static gboolean
3001 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
3002 {
3003   GstBaseSrcClass *bclass;
3004   gboolean result = TRUE;
3005   GstQuery *query;
3006   GstBufferPool *pool = NULL;
3007   GstAllocator *allocator = NULL;
3008   GstAllocationParams params;
3009
3010   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3011
3012   /* make query and let peer pad answer, we don't really care if it worked or
3013    * not, if it failed, the allocation query would contain defaults and the
3014    * subclass would then set better values if needed */
3015   query = gst_query_new_allocation (caps, TRUE);
3016   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
3017     /* not a problem, just debug a little */
3018     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
3019   }
3020
3021   g_assert (bclass->decide_allocation != NULL);
3022   result = bclass->decide_allocation (basesrc, query);
3023
3024   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
3025       query);
3026
3027   if (!result)
3028     goto no_decide_allocation;
3029
3030   /* we got configuration from our peer or the decide_allocation method,
3031    * parse them */
3032   if (gst_query_get_n_allocation_params (query) > 0) {
3033     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
3034   } else {
3035     allocator = NULL;
3036     gst_allocation_params_init (&params);
3037   }
3038
3039   if (gst_query_get_n_allocation_pools (query) > 0)
3040     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
3041
3042   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
3043
3044   gst_query_unref (query);
3045
3046   return result;
3047
3048   /* Errors */
3049 no_decide_allocation:
3050   {
3051     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
3052     gst_query_unref (query);
3053
3054     return result;
3055   }
3056 }
3057
3058 /* default negotiation code.
3059  *
3060  * Take intersection between src and sink pads, take first
3061  * caps and fixate.
3062  */
3063 static gboolean
3064 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
3065 {
3066   GstCaps *thiscaps;
3067   GstCaps *caps = NULL;
3068   GstCaps *peercaps = NULL;
3069   gboolean result = FALSE;
3070
3071   /* first see what is possible on our source pad */
3072   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3073   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3074   /* nothing or anything is allowed, we're done */
3075   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3076     goto no_nego_needed;
3077
3078   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3079     goto no_caps;
3080
3081   /* get the peer caps */
3082   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3083   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3084   if (peercaps) {
3085     /* The result is already a subset of our caps */
3086     caps = peercaps;
3087     gst_caps_unref (thiscaps);
3088   } else {
3089     /* no peer, work with our own caps then */
3090     caps = thiscaps;
3091   }
3092   if (caps && !gst_caps_is_empty (caps)) {
3093     /* now fixate */
3094     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3095     if (gst_caps_is_any (caps)) {
3096       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3097       /* hmm, still anything, so element can do anything and
3098        * nego is not needed */
3099       result = TRUE;
3100     } else {
3101       caps = gst_base_src_fixate (basesrc, caps);
3102       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3103       if (gst_caps_is_fixed (caps)) {
3104         /* yay, fixed caps, use those then, it's possible that the subclass does
3105          * not accept this caps after all and we have to fail. */
3106         result = gst_base_src_set_caps (basesrc, caps);
3107       }
3108     }
3109     gst_caps_unref (caps);
3110   } else {
3111     if (caps)
3112       gst_caps_unref (caps);
3113     GST_DEBUG_OBJECT (basesrc, "no common caps");
3114   }
3115   return result;
3116
3117 no_nego_needed:
3118   {
3119     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3120     if (thiscaps)
3121       gst_caps_unref (thiscaps);
3122     return TRUE;
3123   }
3124 no_caps:
3125   {
3126     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3127         ("No supported formats found"),
3128         ("This element did not produce valid caps"));
3129     if (thiscaps)
3130       gst_caps_unref (thiscaps);
3131     return TRUE;
3132   }
3133 }
3134
3135 static gboolean
3136 gst_base_src_negotiate (GstBaseSrc * basesrc)
3137 {
3138   GstBaseSrcClass *bclass;
3139   gboolean result;
3140
3141   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3142
3143   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3144
3145   if (G_LIKELY (bclass->negotiate))
3146     result = bclass->negotiate (basesrc);
3147   else
3148     result = TRUE;
3149
3150   if (G_LIKELY (result)) {
3151     GstCaps *caps;
3152
3153     caps = gst_pad_get_current_caps (basesrc->srcpad);
3154
3155     result = gst_base_src_prepare_allocation (basesrc, caps);
3156
3157     if (caps)
3158       gst_caps_unref (caps);
3159   }
3160   return result;
3161 }
3162
3163 static gboolean
3164 gst_base_src_start (GstBaseSrc * basesrc)
3165 {
3166   GstBaseSrcClass *bclass;
3167   gboolean result;
3168
3169   GST_LIVE_LOCK (basesrc);
3170
3171   GST_OBJECT_LOCK (basesrc);
3172   if (GST_BASE_SRC_IS_STARTING (basesrc))
3173     goto was_starting;
3174   if (GST_BASE_SRC_IS_STARTED (basesrc))
3175     goto was_started;
3176
3177   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3178   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3179   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3180   GST_OBJECT_UNLOCK (basesrc);
3181
3182   basesrc->num_buffers_left = basesrc->num_buffers;
3183   basesrc->running = FALSE;
3184   basesrc->priv->segment_pending = FALSE;
3185   GST_LIVE_UNLOCK (basesrc);
3186
3187   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3188   if (bclass->start)
3189     result = bclass->start (basesrc);
3190   else
3191     result = TRUE;
3192
3193   if (!result)
3194     goto could_not_start;
3195
3196   if (!gst_base_src_is_async (basesrc)) {
3197     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3198     /* not really waiting here, we call this to get the result
3199      * from the start_complete call */
3200     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3201   }
3202
3203   return result;
3204
3205   /* ERROR */
3206 was_starting:
3207   {
3208     GST_DEBUG_OBJECT (basesrc, "was starting");
3209     GST_OBJECT_UNLOCK (basesrc);
3210     GST_LIVE_UNLOCK (basesrc);
3211     return TRUE;
3212   }
3213 was_started:
3214   {
3215     GST_DEBUG_OBJECT (basesrc, "was started");
3216     GST_OBJECT_UNLOCK (basesrc);
3217     GST_LIVE_UNLOCK (basesrc);
3218     return TRUE;
3219   }
3220 could_not_start:
3221   {
3222     GST_DEBUG_OBJECT (basesrc, "could not start");
3223     /* subclass is supposed to post a message. We don't have to call _stop. */
3224     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3225     return FALSE;
3226   }
3227 }
3228
3229 /**
3230  * gst_base_src_start_complete:
3231  * @basesrc: base source instance
3232  * @ret: a #GstFlowReturn
3233  *
3234  * Complete an asynchronous start operation. When the subclass overrides the
3235  * start method, it should call gst_base_src_start_complete() when the start
3236  * operation completes either from the same thread or from an asynchronous
3237  * helper thread.
3238  */
3239 void
3240 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3241 {
3242   gboolean have_size;
3243   guint64 size;
3244   gboolean seekable;
3245   GstFormat format;
3246   GstPadMode mode;
3247   GstEvent *event;
3248
3249   if (ret != GST_FLOW_OK)
3250     goto error;
3251
3252   GST_DEBUG_OBJECT (basesrc, "starting source");
3253   format = basesrc->segment.format;
3254
3255   /* figure out the size */
3256   have_size = FALSE;
3257   size = -1;
3258   if (format == GST_FORMAT_BYTES) {
3259     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3260
3261     if (bclass->get_size) {
3262       if (!(have_size = bclass->get_size (basesrc, &size)))
3263         size = -1;
3264     }
3265     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3266     /* only update the size when operating in bytes, subclass is supposed
3267      * to set duration in the start method for other formats */
3268     GST_OBJECT_LOCK (basesrc);
3269     basesrc->segment.duration = size;
3270     GST_OBJECT_UNLOCK (basesrc);
3271   }
3272
3273   GST_DEBUG_OBJECT (basesrc,
3274       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3275       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3276       basesrc->segment.duration);
3277
3278   seekable = gst_base_src_seekable (basesrc);
3279   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3280
3281   /* update for random access flag */
3282   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3283
3284   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3285
3286   /* stop flushing now but for live sources, still block in the LIVE lock when
3287    * we are not yet PLAYING */
3288   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3289
3290   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3291
3292   GST_OBJECT_LOCK (basesrc->srcpad);
3293   mode = GST_PAD_MODE (basesrc->srcpad);
3294   GST_OBJECT_UNLOCK (basesrc->srcpad);
3295
3296   /* take the stream lock here, we only want to let the task run when we have
3297    * set the STARTED flag */
3298   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3299   if (mode == GST_PAD_MODE_PUSH) {
3300     /* do initial seek, which will start the task */
3301     GST_OBJECT_LOCK (basesrc);
3302     event = basesrc->pending_seek;
3303     basesrc->pending_seek = NULL;
3304     GST_OBJECT_UNLOCK (basesrc);
3305
3306     /* The perform seek code will start the task when finished. We don't have to
3307      * unlock the streaming thread because it is not running yet */
3308     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3309       goto seek_failed;
3310
3311     if (event)
3312       gst_event_unref (event);
3313   } else {
3314     /* if not random_access, we cannot operate in pull mode for now */
3315     if (G_UNLIKELY (!basesrc->random_access))
3316       goto no_get_range;
3317   }
3318
3319   GST_OBJECT_LOCK (basesrc);
3320   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3321   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3322   basesrc->priv->start_result = ret;
3323   GST_ASYNC_SIGNAL (basesrc);
3324   GST_OBJECT_UNLOCK (basesrc);
3325
3326   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3327
3328   return;
3329
3330 seek_failed:
3331   {
3332     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3333     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3334     gst_base_src_stop (basesrc);
3335     if (event)
3336       gst_event_unref (event);
3337     ret = GST_FLOW_ERROR;
3338     goto error;
3339   }
3340 no_get_range:
3341   {
3342     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3343     gst_base_src_stop (basesrc);
3344     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3345     ret = GST_FLOW_ERROR;
3346     goto error;
3347   }
3348 error:
3349   {
3350     GST_OBJECT_LOCK (basesrc);
3351     basesrc->priv->start_result = ret;
3352     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3353     GST_ASYNC_SIGNAL (basesrc);
3354     GST_OBJECT_UNLOCK (basesrc);
3355     return;
3356   }
3357 }
3358
3359 /**
3360  * gst_base_src_start_wait:
3361  * @basesrc: base source instance
3362  *
3363  * Wait until the start operation completes.
3364  *
3365  * Returns: a #GstFlowReturn.
3366  */
3367 GstFlowReturn
3368 gst_base_src_start_wait (GstBaseSrc * basesrc)
3369 {
3370   GstFlowReturn result;
3371
3372   GST_OBJECT_LOCK (basesrc);
3373   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3374     GST_ASYNC_WAIT (basesrc);
3375   }
3376   result = basesrc->priv->start_result;
3377   GST_OBJECT_UNLOCK (basesrc);
3378
3379   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3380
3381   return result;
3382 }
3383
3384 static gboolean
3385 gst_base_src_stop (GstBaseSrc * basesrc)
3386 {
3387   GstBaseSrcClass *bclass;
3388   gboolean result = TRUE;
3389
3390   GST_DEBUG_OBJECT (basesrc, "stopping source");
3391
3392   /* flush all */
3393   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3394   /* stop the task */
3395   gst_pad_stop_task (basesrc->srcpad);
3396
3397   GST_OBJECT_LOCK (basesrc);
3398   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3399     goto was_stopped;
3400
3401   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3402   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3403   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3404   GST_ASYNC_SIGNAL (basesrc);
3405   GST_OBJECT_UNLOCK (basesrc);
3406
3407   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3408   if (bclass->stop)
3409     result = bclass->stop (basesrc);
3410
3411   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3412
3413   return result;
3414
3415 was_stopped:
3416   {
3417     GST_DEBUG_OBJECT (basesrc, "was stopped");
3418     GST_OBJECT_UNLOCK (basesrc);
3419     return TRUE;
3420   }
3421 }
3422
3423 /* start or stop flushing dataprocessing
3424  */
3425 static gboolean
3426 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3427     gboolean flushing, gboolean live_play, gboolean * playing)
3428 {
3429   GstBaseSrcClass *bclass;
3430
3431   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3432
3433   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3434
3435   if (flushing) {
3436     gst_base_src_activate_pool (basesrc, FALSE);
3437     /* unlock any subclasses, we need to do this before grabbing the
3438      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3439      * unlock to the params because of backwards compat (see seek handler)*/
3440     if (bclass->unlock)
3441       bclass->unlock (basesrc);
3442   }
3443
3444   /* the live lock is released when we are blocked, waiting for playing or
3445    * when we sync to the clock. */
3446   GST_LIVE_LOCK (basesrc);
3447   if (playing)
3448     *playing = basesrc->live_running;
3449   basesrc->priv->flushing = flushing;
3450   if (flushing) {
3451     /* if we are locked in the live lock, signal it to make it flush */
3452     basesrc->live_running = TRUE;
3453
3454     /* clear pending EOS if any */
3455     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3456
3457     /* step 1, now that we have the LIVE lock, clear our unlock request */
3458     if (bclass->unlock_stop)
3459       bclass->unlock_stop (basesrc);
3460
3461     /* step 2, unblock clock sync (if any) or any other blocking thing */
3462     if (basesrc->clock_id)
3463       gst_clock_id_unschedule (basesrc->clock_id);
3464   } else {
3465     /* signal the live source that it can start playing */
3466     basesrc->live_running = live_play;
3467
3468     gst_base_src_activate_pool (basesrc, TRUE);
3469
3470     /* Drop all delayed events */
3471     GST_OBJECT_LOCK (basesrc);
3472     if (basesrc->priv->pending_events) {
3473       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3474           NULL);
3475       g_list_free (basesrc->priv->pending_events);
3476       basesrc->priv->pending_events = NULL;
3477       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3478     }
3479     GST_OBJECT_UNLOCK (basesrc);
3480   }
3481   GST_LIVE_SIGNAL (basesrc);
3482   GST_LIVE_UNLOCK (basesrc);
3483
3484   return TRUE;
3485 }
3486
3487 /* the purpose of this function is to make sure that a live source blocks in the
3488  * LIVE lock or leaves the LIVE lock and continues playing. */
3489 static gboolean
3490 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3491 {
3492   GstBaseSrcClass *bclass;
3493
3494   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3495
3496   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3497   if (!live_play) {
3498     GST_DEBUG_OBJECT (basesrc, "unlock");
3499     if (bclass->unlock)
3500       bclass->unlock (basesrc);
3501   }
3502
3503   /* we are now able to grab the LIVE lock, when we get it, we can be
3504    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3505    * for the clock. */
3506   GST_LIVE_LOCK (basesrc);
3507   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3508
3509   /* unblock clock sync (if any) */
3510   if (basesrc->clock_id)
3511     gst_clock_id_unschedule (basesrc->clock_id);
3512
3513   /* configure what to do when we get to the LIVE lock. */
3514   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3515   basesrc->live_running = live_play;
3516
3517   if (live_play) {
3518     gboolean start;
3519
3520     /* clear our unlock request when going to PLAYING */
3521     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3522     if (bclass->unlock_stop)
3523       bclass->unlock_stop (basesrc);
3524
3525     /* for live sources we restart the timestamp correction */
3526     basesrc->priv->latency = -1;
3527     /* have to restart the task in case it stopped because of the unlock when
3528      * we went to PAUSED. Only do this if we operating in push mode. */
3529     GST_OBJECT_LOCK (basesrc->srcpad);
3530     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3531     GST_OBJECT_UNLOCK (basesrc->srcpad);
3532     if (start)
3533       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3534           basesrc->srcpad, NULL);
3535     GST_DEBUG_OBJECT (basesrc, "signal");
3536     GST_LIVE_SIGNAL (basesrc);
3537   }
3538   GST_LIVE_UNLOCK (basesrc);
3539
3540   return TRUE;
3541 }
3542
3543 static gboolean
3544 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3545 {
3546   GstBaseSrc *basesrc;
3547
3548   basesrc = GST_BASE_SRC (parent);
3549
3550   /* prepare subclass first */
3551   if (active) {
3552     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3553
3554     if (G_UNLIKELY (!basesrc->can_activate_push))
3555       goto no_push_activation;
3556
3557     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3558       goto error_start;
3559   } else {
3560     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3561     /* now we can stop the source */
3562     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3563       goto error_stop;
3564   }
3565   return TRUE;
3566
3567   /* ERRORS */
3568 no_push_activation:
3569   {
3570     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3571     return FALSE;
3572   }
3573 error_start:
3574   {
3575     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3576     return FALSE;
3577   }
3578 error_stop:
3579   {
3580     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3581     return FALSE;
3582   }
3583 }
3584
3585 static gboolean
3586 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3587 {
3588   GstBaseSrc *basesrc;
3589
3590   basesrc = GST_BASE_SRC (parent);
3591
3592   /* prepare subclass first */
3593   if (active) {
3594     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3595     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3596       goto error_start;
3597   } else {
3598     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3599     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3600       goto error_stop;
3601   }
3602   return TRUE;
3603
3604   /* ERRORS */
3605 error_start:
3606   {
3607     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3608     return FALSE;
3609   }
3610 error_stop:
3611   {
3612     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3613     return FALSE;
3614   }
3615 }
3616
3617 static gboolean
3618 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3619     GstPadMode mode, gboolean active)
3620 {
3621   gboolean res;
3622   GstBaseSrc *src = GST_BASE_SRC (parent);
3623
3624   src->priv->stream_start_pending = FALSE;
3625
3626   switch (mode) {
3627     case GST_PAD_MODE_PULL:
3628       res = gst_base_src_activate_pull (pad, parent, active);
3629       break;
3630     case GST_PAD_MODE_PUSH:
3631       src->priv->stream_start_pending = active;
3632       res = gst_base_src_activate_push (pad, parent, active);
3633       break;
3634     default:
3635       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3636       res = FALSE;
3637       break;
3638   }
3639   return res;
3640 }
3641
3642
3643 static GstStateChangeReturn
3644 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3645 {
3646   GstBaseSrc *basesrc;
3647   GstStateChangeReturn result;
3648   gboolean no_preroll = FALSE;
3649
3650   basesrc = GST_BASE_SRC (element);
3651
3652   switch (transition) {
3653     case GST_STATE_CHANGE_NULL_TO_READY:
3654       break;
3655     case GST_STATE_CHANGE_READY_TO_PAUSED:
3656       no_preroll = gst_base_src_is_live (basesrc);
3657       break;
3658     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3659       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3660       if (gst_base_src_is_live (basesrc)) {
3661         /* now we can start playback */
3662         gst_base_src_set_playing (basesrc, TRUE);
3663       }
3664       break;
3665     default:
3666       break;
3667   }
3668
3669   if ((result =
3670           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3671               transition)) == GST_STATE_CHANGE_FAILURE)
3672     goto failure;
3673
3674   switch (transition) {
3675     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3676       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3677       if (gst_base_src_is_live (basesrc)) {
3678         /* make sure we block in the live lock in PAUSED */
3679         gst_base_src_set_playing (basesrc, FALSE);
3680         no_preroll = TRUE;
3681       }
3682       break;
3683     case GST_STATE_CHANGE_PAUSED_TO_READY:
3684     {
3685       /* we don't need to unblock anything here, the pad deactivation code
3686        * already did this */
3687       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3688       gst_event_replace (&basesrc->pending_seek, NULL);
3689       break;
3690     }
3691     case GST_STATE_CHANGE_READY_TO_NULL:
3692       break;
3693     default:
3694       break;
3695   }
3696
3697   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3698     result = GST_STATE_CHANGE_NO_PREROLL;
3699
3700   return result;
3701
3702   /* ERRORS */
3703 failure:
3704   {
3705     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3706     return result;
3707   }
3708 }
3709
3710 /**
3711  * gst_base_src_get_buffer_pool:
3712  * @src: a #GstBaseSrc
3713  *
3714  * Returns: (transfer full): the instance of the #GstBufferPool used
3715  * by the src; free it after use it
3716  */
3717 GstBufferPool *
3718 gst_base_src_get_buffer_pool (GstBaseSrc * src)
3719 {
3720   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
3721
3722   if (src->priv->pool)
3723     return gst_object_ref (src->priv->pool);
3724
3725   return NULL;
3726 }
3727
3728 /**
3729  * gst_base_src_get_allocator:
3730  * @src: a #GstBaseSrc
3731  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3732  * used
3733  * @params: (out) (allow-none) (transfer full): the
3734  * #GstAllocatorParams of @allocator
3735  *
3736  * Lets #GstBaseSrc sub-classes to know the memory @allocator
3737  * used by the base class and its @params.
3738  *
3739  * Unref the @allocator after use it.
3740  */
3741 void
3742 gst_base_src_get_allocator (GstBaseSrc * src,
3743     GstAllocator ** allocator, GstAllocationParams * params)
3744 {
3745   g_return_if_fail (GST_IS_BASE_SRC (src));
3746
3747   if (allocator)
3748     *allocator = src->priv->allocator ?
3749         gst_object_ref (src->priv->allocator) : NULL;
3750
3751   if (params)
3752     *params = src->priv->params;
3753 }