fdd3a4d8d13bdb41e190725b152dea25355a8d1f
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * Since GStreamer 0.10.16 an application may send an EOS event to a source
142  * element to make it perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163 #include <gst/glib-compat-private.h>
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gst-i18n-lib.h>
168
169 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
170 #define GST_CAT_DEFAULT gst_base_src_debug
171
172 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
173 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
174 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
177 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
178 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
179 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
180 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_TYPEFIND        FALSE
192 #define DEFAULT_DO_TIMESTAMP    FALSE
193
194 enum
195 {
196   PROP_0,
197   PROP_BLOCKSIZE,
198   PROP_NUM_BUFFERS,
199   PROP_TYPEFIND,
200   PROP_DO_TIMESTAMP
201 };
202
203 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
204    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
205
206 struct _GstBaseSrcPrivate
207 {
208   gboolean discont;
209   gboolean flushing;
210
211   GstFlowReturn start_result;
212   gboolean async;
213
214   /* if a stream-start event should be sent */
215   gboolean stream_start_pending;
216
217   /* if segment should be sent */
218   gboolean segment_pending;
219
220   /* if EOS is pending (atomic) */
221   gint pending_eos;
222
223   /* startup latency is the time it takes between going to PLAYING and producing
224    * the first BUFFER with running_time 0. This value is included in the latency
225    * reporting. */
226   GstClockTime latency;
227   /* timestamp offset, this is the offset add to the values of gst_times for
228    * pseudo live sources */
229   GstClockTimeDiff ts_offset;
230
231   gboolean do_timestamp;
232   volatile gint dynamic_size;
233
234   /* stream sequence number */
235   guint32 seqnum;
236
237   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
238    * pushed in the data stream */
239   GList *pending_events;
240   volatile gint have_events;
241
242   /* QoS *//* with LOCK */
243   gboolean qos_enabled;
244   gdouble proportion;
245   GstClockTime earliest_time;
246
247   GstBufferPool *pool;
248   GstAllocator *allocator;
249   GstAllocationParams params;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_class_init (GstBaseSrcClass * klass);
255 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
256 static void gst_base_src_finalize (GObject * object);
257
258
259 GType
260 gst_base_src_get_type (void)
261 {
262   static volatile gsize base_src_type = 0;
263
264   if (g_once_init_enter (&base_src_type)) {
265     GType _type;
266     static const GTypeInfo base_src_info = {
267       sizeof (GstBaseSrcClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_base_src_class_init,
271       NULL,
272       NULL,
273       sizeof (GstBaseSrc),
274       0,
275       (GInstanceInitFunc) gst_base_src_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_ELEMENT,
279         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
280     g_once_init_leave (&base_src_type, _type);
281   }
282   return base_src_type;
283 }
284
285 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
286     GstCaps * filter);
287 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
288 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
289
290 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
291 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
292     GstPadMode mode, gboolean active);
293 static void gst_base_src_set_property (GObject * object, guint prop_id,
294     const GValue * value, GParamSpec * pspec);
295 static void gst_base_src_get_property (GObject * object, guint prop_id,
296     GValue * value, GParamSpec * pspec);
297 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
298     GstEvent * event);
299 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
300 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
301
302 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
303     GstQuery * query);
304
305 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
306     gboolean active);
307 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
308 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
309     GstSegment * segment);
310 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
311 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
312     GstEvent * event, GstSegment * segment);
313 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
314     guint64 offset, guint size, GstBuffer ** buf);
315 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
316     guint64 offset, guint size, GstBuffer ** buf);
317
318 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
319     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
320
321 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
322 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
323
324 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
325     GstStateChange transition);
326
327 static void gst_base_src_loop (GstPad * pad);
328 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
329     guint64 offset, guint length, GstBuffer ** buf);
330 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
331     guint length, GstBuffer ** buf);
332 static gboolean gst_base_src_seekable (GstBaseSrc * src);
333 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
334 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
335     guint * length);
336
337 static void
338 gst_base_src_class_init (GstBaseSrcClass * klass)
339 {
340   GObjectClass *gobject_class;
341   GstElementClass *gstelement_class;
342
343   gobject_class = G_OBJECT_CLASS (klass);
344   gstelement_class = GST_ELEMENT_CLASS (klass);
345
346   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
347
348   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
349
350   parent_class = g_type_class_peek_parent (klass);
351
352   gobject_class->finalize = gst_base_src_finalize;
353   gobject_class->set_property = gst_base_src_set_property;
354   gobject_class->get_property = gst_base_src_get_property;
355
356   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
357       g_param_spec_uint ("blocksize", "Block size",
358           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
359           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
360   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
361       g_param_spec_int ("num-buffers", "num-buffers",
362           "Number of buffers to output before sending EOS (-1 = unlimited)",
363           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
364           G_PARAM_STATIC_STRINGS));
365   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
366       g_param_spec_boolean ("typefind", "Typefind",
367           "Run typefind before negotiating", DEFAULT_TYPEFIND,
368           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
369   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
370       g_param_spec_boolean ("do-timestamp", "Do timestamp",
371           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
372           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
373
374   gstelement_class->change_state =
375       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
376   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
377
378   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
379   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
380   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
381   klass->prepare_seek_segment =
382       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
383   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
384   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
385   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
386   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
387   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
388
389   /* Registering debug symbols for function pointers */
390   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
391   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
392   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
393   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
394   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
395 }
396
397 static void
398 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
399 {
400   GstPad *pad;
401   GstPadTemplate *pad_template;
402
403   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
404
405   basesrc->is_live = FALSE;
406   g_mutex_init (&basesrc->live_lock);
407   g_cond_init (&basesrc->live_cond);
408   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
409   basesrc->num_buffers_left = -1;
410
411   basesrc->can_activate_push = TRUE;
412
413   pad_template =
414       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
415   g_return_if_fail (pad_template != NULL);
416
417   GST_DEBUG_OBJECT (basesrc, "creating src pad");
418   pad = gst_pad_new_from_template (pad_template, "src");
419
420   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
421   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
422   gst_pad_set_event_function (pad, gst_base_src_event);
423   gst_pad_set_query_function (pad, gst_base_src_query);
424   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
425
426   /* hold pointer to pad */
427   basesrc->srcpad = pad;
428   GST_DEBUG_OBJECT (basesrc, "adding src pad");
429   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
430
431   basesrc->blocksize = DEFAULT_BLOCKSIZE;
432   basesrc->clock_id = NULL;
433   /* we operate in BYTES by default */
434   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
435   basesrc->typefind = DEFAULT_TYPEFIND;
436   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
437   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
438
439   basesrc->priv->start_result = GST_FLOW_FLUSHING;
440   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
441   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
442   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
443
444   GST_DEBUG_OBJECT (basesrc, "init done");
445 }
446
447 static void
448 gst_base_src_finalize (GObject * object)
449 {
450   GstBaseSrc *basesrc;
451   GstEvent **event_p;
452
453   basesrc = GST_BASE_SRC (object);
454
455   g_mutex_clear (&basesrc->live_lock);
456   g_cond_clear (&basesrc->live_cond);
457
458   event_p = &basesrc->pending_seek;
459   gst_event_replace (event_p, NULL);
460
461   if (basesrc->priv->pending_events) {
462     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
463         NULL);
464     g_list_free (basesrc->priv->pending_events);
465   }
466
467   G_OBJECT_CLASS (parent_class)->finalize (object);
468 }
469
470 /**
471  * gst_base_src_wait_playing:
472  * @src: the src
473  *
474  * If the #GstBaseSrcClass.create() method performs its own synchronisation
475  * against the clock it must unblock when going from PLAYING to the PAUSED state
476  * and call this method before continuing to produce the remaining data.
477  *
478  * This function will block until a state change to PLAYING happens (in which
479  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
480  * to a state change to READY or a FLUSH event (in which case this function
481  * returns #GST_FLOW_FLUSHING).
482  *
483  * Since: 0.10.12
484  *
485  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
486  * continue. Any other return value should be returned from the create vmethod.
487  */
488 GstFlowReturn
489 gst_base_src_wait_playing (GstBaseSrc * src)
490 {
491   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
492
493   do {
494     /* block until the state changes, or we get a flush, or something */
495     GST_DEBUG_OBJECT (src, "live source waiting for running state");
496     GST_LIVE_WAIT (src);
497     GST_DEBUG_OBJECT (src, "live source unlocked");
498     if (src->priv->flushing)
499       goto flushing;
500   } while (G_UNLIKELY (!src->live_running));
501
502   return GST_FLOW_OK;
503
504   /* ERRORS */
505 flushing:
506   {
507     GST_DEBUG_OBJECT (src, "we are flushing");
508     return GST_FLOW_FLUSHING;
509   }
510 }
511
512 /**
513  * gst_base_src_set_live:
514  * @src: base source instance
515  * @live: new live-mode
516  *
517  * If the element listens to a live source, @live should
518  * be set to %TRUE.
519  *
520  * A live source will not produce data in the PAUSED state and
521  * will therefore not be able to participate in the PREROLL phase
522  * of a pipeline. To signal this fact to the application and the
523  * pipeline, the state change return value of the live source will
524  * be GST_STATE_CHANGE_NO_PREROLL.
525  */
526 void
527 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
528 {
529   g_return_if_fail (GST_IS_BASE_SRC (src));
530
531   GST_OBJECT_LOCK (src);
532   src->is_live = live;
533   GST_OBJECT_UNLOCK (src);
534 }
535
536 /**
537  * gst_base_src_is_live:
538  * @src: base source instance
539  *
540  * Check if an element is in live mode.
541  *
542  * Returns: %TRUE if element is in live mode.
543  */
544 gboolean
545 gst_base_src_is_live (GstBaseSrc * src)
546 {
547   gboolean result;
548
549   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
550
551   GST_OBJECT_LOCK (src);
552   result = src->is_live;
553   GST_OBJECT_UNLOCK (src);
554
555   return result;
556 }
557
558 /**
559  * gst_base_src_set_format:
560  * @src: base source instance
561  * @format: the format to use
562  *
563  * Sets the default format of the source. This will be the format used
564  * for sending NEW_SEGMENT events and for performing seeks.
565  *
566  * If a format of GST_FORMAT_BYTES is set, the element will be able to
567  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
568  *
569  * This function must only be called in states < %GST_STATE_PAUSED.
570  *
571  * Since: 0.10.1
572  */
573 void
574 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
575 {
576   g_return_if_fail (GST_IS_BASE_SRC (src));
577   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
578
579   GST_OBJECT_LOCK (src);
580   gst_segment_init (&src->segment, format);
581   GST_OBJECT_UNLOCK (src);
582 }
583
584 /**
585  * gst_base_src_set_dynamic_size:
586  * @src: base source instance
587  * @dynamic: new dynamic size mode
588  *
589  * If not @dynamic, size is only updated when needed, such as when trying to
590  * read past current tracked size.  Otherwise, size is checked for upon each
591  * read.
592  *
593  * Since: 0.10.36
594  */
595 void
596 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
597 {
598   g_return_if_fail (GST_IS_BASE_SRC (src));
599
600   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
601 }
602
603 /**
604  * gst_base_src_set_async:
605  * @src: base source instance
606  * @async: new async mode
607  *
608  * Configure async behaviour in @src, no state change will block. The open,
609  * close, start, stop, play and pause virtual methods will be executed in a
610  * different thread and are thus allowed to perform blocking operations. Any
611  * blocking operation should be unblocked with the unlock vmethod.
612  */
613 void
614 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
615 {
616   g_return_if_fail (GST_IS_BASE_SRC (src));
617
618   GST_OBJECT_LOCK (src);
619   src->priv->async = async;
620   GST_OBJECT_UNLOCK (src);
621 }
622
623 /**
624  * gst_base_src_is_async:
625  * @src: base source instance
626  *
627  * Get the current async behaviour of @src. See also gst_base_src_set_async().
628  *
629  * Returns: %TRUE if @src is operating in async mode.
630  */
631 gboolean
632 gst_base_src_is_async (GstBaseSrc * src)
633 {
634   gboolean res;
635
636   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
637
638   GST_OBJECT_LOCK (src);
639   res = src->priv->async;
640   GST_OBJECT_UNLOCK (src);
641
642   return res;
643 }
644
645
646 /**
647  * gst_base_src_query_latency:
648  * @src: the source
649  * @live: (out) (allow-none): if the source is live
650  * @min_latency: (out) (allow-none): the min latency of the source
651  * @max_latency: (out) (allow-none): the max latency of the source
652  *
653  * Query the source for the latency parameters. @live will be TRUE when @src is
654  * configured as a live source. @min_latency will be set to the difference
655  * between the running time and the timestamp of the first buffer.
656  * @max_latency is always the undefined value of -1.
657  *
658  * This function is mostly used by subclasses.
659  *
660  * Returns: TRUE if the query succeeded.
661  *
662  * Since: 0.10.13
663  */
664 gboolean
665 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
666     GstClockTime * min_latency, GstClockTime * max_latency)
667 {
668   GstClockTime min;
669
670   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
671
672   GST_OBJECT_LOCK (src);
673   if (live)
674     *live = src->is_live;
675
676   /* if we have a startup latency, report this one, else report 0. Subclasses
677    * are supposed to override the query function if they want something
678    * else. */
679   if (src->priv->latency != -1)
680     min = src->priv->latency;
681   else
682     min = 0;
683
684   if (min_latency)
685     *min_latency = min;
686   if (max_latency)
687     *max_latency = -1;
688
689   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
690       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
691       GST_TIME_ARGS (-1));
692   GST_OBJECT_UNLOCK (src);
693
694   return TRUE;
695 }
696
697 /**
698  * gst_base_src_set_blocksize:
699  * @src: the source
700  * @blocksize: the new blocksize in bytes
701  *
702  * Set the number of bytes that @src will push out with each buffer. When
703  * @blocksize is set to -1, a default length will be used.
704  *
705  * Since: 0.10.22
706  */
707 void
708 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
709 {
710   g_return_if_fail (GST_IS_BASE_SRC (src));
711
712   GST_OBJECT_LOCK (src);
713   src->blocksize = blocksize;
714   GST_OBJECT_UNLOCK (src);
715 }
716
717 /**
718  * gst_base_src_get_blocksize:
719  * @src: the source
720  *
721  * Get the number of bytes that @src will push out with each buffer.
722  *
723  * Returns: the number of bytes pushed with each buffer.
724  *
725  * Since: 0.10.22
726  */
727 guint
728 gst_base_src_get_blocksize (GstBaseSrc * src)
729 {
730   gint res;
731
732   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
733
734   GST_OBJECT_LOCK (src);
735   res = src->blocksize;
736   GST_OBJECT_UNLOCK (src);
737
738   return res;
739 }
740
741
742 /**
743  * gst_base_src_set_do_timestamp:
744  * @src: the source
745  * @timestamp: enable or disable timestamping
746  *
747  * Configure @src to automatically timestamp outgoing buffers based on the
748  * current running_time of the pipeline. This property is mostly useful for live
749  * sources.
750  *
751  * Since: 0.10.15
752  */
753 void
754 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
755 {
756   g_return_if_fail (GST_IS_BASE_SRC (src));
757
758   GST_OBJECT_LOCK (src);
759   src->priv->do_timestamp = timestamp;
760   GST_OBJECT_UNLOCK (src);
761 }
762
763 /**
764  * gst_base_src_get_do_timestamp:
765  * @src: the source
766  *
767  * Query if @src timestamps outgoing buffers based on the current running_time.
768  *
769  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
770  *
771  * Since: 0.10.15
772  */
773 gboolean
774 gst_base_src_get_do_timestamp (GstBaseSrc * src)
775 {
776   gboolean res;
777
778   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
779
780   GST_OBJECT_LOCK (src);
781   res = src->priv->do_timestamp;
782   GST_OBJECT_UNLOCK (src);
783
784   return res;
785 }
786
787 /**
788  * gst_base_src_new_seamless_segment:
789  * @src: The source
790  * @start: The new start value for the segment
791  * @stop: Stop value for the new segment
792  * @position: The position value for the new segent
793  *
794  * Prepare a new seamless segment for emission downstream. This function must
795  * only be called by derived sub-classes, and only from the create() function,
796  * as the stream-lock needs to be held.
797  *
798  * The format for the new segment will be the current format of the source, as
799  * configured with gst_base_src_set_format()
800  *
801  * Returns: %TRUE if preparation of the seamless segment succeeded.
802  *
803  * Since: 0.10.26
804  */
805 gboolean
806 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
807     gint64 position)
808 {
809   gboolean res = TRUE;
810
811   GST_DEBUG_OBJECT (src,
812       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
813       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
814       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
815
816   GST_OBJECT_LOCK (src);
817
818   src->segment.base = gst_segment_to_running_time (&src->segment,
819       src->segment.format, src->segment.position);
820   src->segment.start = start;
821   src->segment.stop = stop;
822   src->segment.position = position;
823
824   /* forward, we send data from position to stop */
825   src->priv->segment_pending = TRUE;
826   GST_OBJECT_UNLOCK (src);
827
828   src->priv->discont = TRUE;
829   src->running = TRUE;
830
831   return res;
832 }
833
834 static gboolean
835 gst_base_src_send_stream_start (GstBaseSrc * src)
836 {
837   gboolean ret = TRUE;
838
839   if (src->priv->stream_start_pending) {
840     ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
841     src->priv->stream_start_pending = FALSE;
842   }
843
844   return ret;
845 }
846
847 /**
848  * gst_base_src_set_caps:
849  * @src: a #GstBaseSrc
850  * @caps: a #GstCaps
851  *
852  * Set new caps on the basesrc source pad.
853  *
854  * Returns: %TRUE if the caps could be set
855  */
856 gboolean
857 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
858 {
859   GstBaseSrcClass *bclass;
860   gboolean res = TRUE;
861
862   bclass = GST_BASE_SRC_GET_CLASS (src);
863
864   gst_base_src_send_stream_start (src);
865   gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
866
867   if (bclass->set_caps)
868     res = bclass->set_caps (src, caps);
869
870   return res;
871 }
872
873 static GstCaps *
874 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
875 {
876   GstCaps *caps = NULL;
877   GstPadTemplate *pad_template;
878   GstBaseSrcClass *bclass;
879
880   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
881
882   pad_template =
883       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
884
885   if (pad_template != NULL) {
886     caps = gst_pad_template_get_caps (pad_template);
887
888     if (filter) {
889       GstCaps *intersection;
890
891       intersection =
892           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
893       gst_caps_unref (caps);
894       caps = intersection;
895     }
896   }
897   return caps;
898 }
899
900 static GstCaps *
901 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
902 {
903   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
904   return gst_caps_fixate (caps);
905 }
906
907 static GstCaps *
908 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
909 {
910   GstBaseSrcClass *bclass;
911
912   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
913
914   if (bclass->fixate)
915     caps = bclass->fixate (bsrc, caps);
916
917   return caps;
918 }
919
920 static gboolean
921 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
922 {
923   gboolean res;
924
925   switch (GST_QUERY_TYPE (query)) {
926     case GST_QUERY_POSITION:
927     {
928       GstFormat format;
929
930       gst_query_parse_position (query, &format, NULL);
931
932       GST_DEBUG_OBJECT (src, "position query in format %s",
933           gst_format_get_name (format));
934
935       switch (format) {
936         case GST_FORMAT_PERCENT:
937         {
938           gint64 percent;
939           gint64 position;
940           gint64 duration;
941
942           GST_OBJECT_LOCK (src);
943           position = src->segment.position;
944           duration = src->segment.duration;
945           GST_OBJECT_UNLOCK (src);
946
947           if (position != -1 && duration != -1) {
948             if (position < duration)
949               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
950                   duration);
951             else
952               percent = GST_FORMAT_PERCENT_MAX;
953           } else
954             percent = -1;
955
956           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
957           res = TRUE;
958           break;
959         }
960         default:
961         {
962           gint64 position;
963           GstFormat seg_format;
964
965           GST_OBJECT_LOCK (src);
966           position =
967               gst_segment_to_stream_time (&src->segment, src->segment.format,
968               src->segment.position);
969           seg_format = src->segment.format;
970           GST_OBJECT_UNLOCK (src);
971
972           if (position != -1) {
973             /* convert to requested format */
974             res =
975                 gst_pad_query_convert (src->srcpad, seg_format,
976                 position, format, &position);
977           } else
978             res = TRUE;
979
980           gst_query_set_position (query, format, position);
981           break;
982         }
983       }
984       break;
985     }
986     case GST_QUERY_DURATION:
987     {
988       GstFormat format;
989
990       gst_query_parse_duration (query, &format, NULL);
991
992       GST_DEBUG_OBJECT (src, "duration query in format %s",
993           gst_format_get_name (format));
994
995       switch (format) {
996         case GST_FORMAT_PERCENT:
997           gst_query_set_duration (query, GST_FORMAT_PERCENT,
998               GST_FORMAT_PERCENT_MAX);
999           res = TRUE;
1000           break;
1001         default:
1002         {
1003           gint64 duration;
1004           GstFormat seg_format;
1005           guint length = 0;
1006
1007           /* may have to refresh duration */
1008           if (g_atomic_int_get (&src->priv->dynamic_size))
1009             gst_base_src_update_length (src, 0, &length);
1010
1011           /* this is the duration as configured by the subclass. */
1012           GST_OBJECT_LOCK (src);
1013           duration = src->segment.duration;
1014           seg_format = src->segment.format;
1015           GST_OBJECT_UNLOCK (src);
1016
1017           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1018               duration, gst_format_get_name (seg_format));
1019
1020           if (duration != -1) {
1021             /* convert to requested format, if this fails, we have a duration
1022              * but we cannot answer the query, we must return FALSE. */
1023             res =
1024                 gst_pad_query_convert (src->srcpad, seg_format,
1025                 duration, format, &duration);
1026           } else {
1027             /* The subclass did not configure a duration, we assume that the
1028              * media has an unknown duration then and we return TRUE to report
1029              * this. Note that this is not the same as returning FALSE, which
1030              * means that we cannot report the duration at all. */
1031             res = TRUE;
1032           }
1033           gst_query_set_duration (query, format, duration);
1034           break;
1035         }
1036       }
1037       break;
1038     }
1039
1040     case GST_QUERY_SEEKING:
1041     {
1042       GstFormat format, seg_format;
1043       gint64 duration;
1044
1045       GST_OBJECT_LOCK (src);
1046       duration = src->segment.duration;
1047       seg_format = src->segment.format;
1048       GST_OBJECT_UNLOCK (src);
1049
1050       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1051       if (format == seg_format) {
1052         gst_query_set_seeking (query, seg_format,
1053             gst_base_src_seekable (src), 0, duration);
1054         res = TRUE;
1055       } else {
1056         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1057         /* Don't reply to the query to make up for demuxers which don't
1058          * handle the SEEKING query yet. Players like Totem will fall back
1059          * to the duration when the SEEKING query isn't answered. */
1060         res = FALSE;
1061       }
1062       break;
1063     }
1064     case GST_QUERY_SEGMENT:
1065     {
1066       gint64 start, stop;
1067
1068       GST_OBJECT_LOCK (src);
1069       /* no end segment configured, current duration then */
1070       if ((stop = src->segment.stop) == -1)
1071         stop = src->segment.duration;
1072       start = src->segment.start;
1073
1074       /* adjust to stream time */
1075       if (src->segment.time != -1) {
1076         start -= src->segment.time;
1077         if (stop != -1)
1078           stop -= src->segment.time;
1079       }
1080
1081       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1082           start, stop);
1083       GST_OBJECT_UNLOCK (src);
1084       res = TRUE;
1085       break;
1086     }
1087
1088     case GST_QUERY_FORMATS:
1089     {
1090       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1091           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1092       res = TRUE;
1093       break;
1094     }
1095     case GST_QUERY_CONVERT:
1096     {
1097       GstFormat src_fmt, dest_fmt;
1098       gint64 src_val, dest_val;
1099
1100       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1101
1102       /* we can only convert between equal formats... */
1103       if (src_fmt == dest_fmt) {
1104         dest_val = src_val;
1105         res = TRUE;
1106       } else
1107         res = FALSE;
1108
1109       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1110       break;
1111     }
1112     case GST_QUERY_LATENCY:
1113     {
1114       GstClockTime min, max;
1115       gboolean live;
1116
1117       /* Subclasses should override and implement something useful */
1118       res = gst_base_src_query_latency (src, &live, &min, &max);
1119
1120       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1121           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1122           GST_TIME_ARGS (max));
1123
1124       gst_query_set_latency (query, live, min, max);
1125       break;
1126     }
1127     case GST_QUERY_JITTER:
1128     case GST_QUERY_RATE:
1129       res = FALSE;
1130       break;
1131     case GST_QUERY_BUFFERING:
1132     {
1133       GstFormat format, seg_format;
1134       gint64 start, stop, estimated;
1135
1136       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1137
1138       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1139           gst_format_get_name (format));
1140
1141       GST_OBJECT_LOCK (src);
1142       if (src->random_access) {
1143         estimated = 0;
1144         start = 0;
1145         if (format == GST_FORMAT_PERCENT)
1146           stop = GST_FORMAT_PERCENT_MAX;
1147         else
1148           stop = src->segment.duration;
1149       } else {
1150         estimated = -1;
1151         start = -1;
1152         stop = -1;
1153       }
1154       seg_format = src->segment.format;
1155       GST_OBJECT_UNLOCK (src);
1156
1157       /* convert to required format. When the conversion fails, we can't answer
1158        * the query. When the value is unknown, we can don't perform conversion
1159        * but report TRUE. */
1160       if (format != GST_FORMAT_PERCENT && stop != -1) {
1161         res = gst_pad_query_convert (src->srcpad, seg_format,
1162             stop, format, &stop);
1163       } else {
1164         res = TRUE;
1165       }
1166       if (res && format != GST_FORMAT_PERCENT && start != -1)
1167         res = gst_pad_query_convert (src->srcpad, seg_format,
1168             start, format, &start);
1169
1170       gst_query_set_buffering_range (query, format, start, stop, estimated);
1171       break;
1172     }
1173     case GST_QUERY_SCHEDULING:
1174     {
1175       gboolean random_access;
1176
1177       random_access = gst_base_src_is_random_access (src);
1178
1179       /* we can operate in getrange mode if the native format is bytes
1180        * and we are seekable, this condition is set in the random_access
1181        * flag and is set in the _start() method. */
1182       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1183       if (random_access)
1184         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1185       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1186
1187       res = TRUE;
1188       break;
1189     }
1190     case GST_QUERY_CAPS:
1191     {
1192       GstBaseSrcClass *bclass;
1193       GstCaps *caps, *filter;
1194
1195       bclass = GST_BASE_SRC_GET_CLASS (src);
1196       if (bclass->get_caps) {
1197         gst_query_parse_caps (query, &filter);
1198         if ((caps = bclass->get_caps (src, filter))) {
1199           gst_query_set_caps_result (query, caps);
1200           gst_caps_unref (caps);
1201           res = TRUE;
1202         } else {
1203           res = FALSE;
1204         }
1205       } else
1206         res = FALSE;
1207       break;
1208     }
1209     default:
1210       res = FALSE;
1211       break;
1212   }
1213   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1214       res);
1215
1216   return res;
1217 }
1218
1219 static gboolean
1220 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1221 {
1222   GstBaseSrc *src;
1223   GstBaseSrcClass *bclass;
1224   gboolean result = FALSE;
1225
1226   src = GST_BASE_SRC (parent);
1227   bclass = GST_BASE_SRC_GET_CLASS (src);
1228
1229   if (bclass->query)
1230     result = bclass->query (src, query);
1231
1232   return result;
1233 }
1234
1235 static gboolean
1236 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1237 {
1238   gboolean res = TRUE;
1239
1240   /* update our offset if the start/stop position was updated */
1241   if (segment->format == GST_FORMAT_BYTES) {
1242     segment->time = segment->start;
1243   } else if (segment->start == 0) {
1244     /* seek to start, we can implement a default for this. */
1245     segment->time = 0;
1246   } else {
1247     res = FALSE;
1248     GST_INFO_OBJECT (src, "Can't do a default seek");
1249   }
1250
1251   return res;
1252 }
1253
1254 static gboolean
1255 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1256 {
1257   GstBaseSrcClass *bclass;
1258   gboolean result = FALSE;
1259
1260   bclass = GST_BASE_SRC_GET_CLASS (src);
1261
1262   if (bclass->do_seek)
1263     result = bclass->do_seek (src, segment);
1264
1265   return result;
1266 }
1267
1268 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1269
1270 static gboolean
1271 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1272     GstSegment * segment)
1273 {
1274   /* By default, we try one of 2 things:
1275    *   - For absolute seek positions, convert the requested position to our
1276    *     configured processing format and place it in the output segment \
1277    *   - For relative seek positions, convert our current (input) values to the
1278    *     seek format, adjust by the relative seek offset and then convert back to
1279    *     the processing format
1280    */
1281   GstSeekType cur_type, stop_type;
1282   gint64 cur, stop;
1283   GstSeekFlags flags;
1284   GstFormat seek_format, dest_format;
1285   gdouble rate;
1286   gboolean update;
1287   gboolean res = TRUE;
1288
1289   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1290       &cur_type, &cur, &stop_type, &stop);
1291   dest_format = segment->format;
1292
1293   if (seek_format == dest_format) {
1294     gst_segment_do_seek (segment, rate, seek_format, flags,
1295         cur_type, cur, stop_type, stop, &update);
1296     return TRUE;
1297   }
1298
1299   if (cur_type != GST_SEEK_TYPE_NONE) {
1300     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1301     res =
1302         gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format,
1303         &cur);
1304     cur_type = GST_SEEK_TYPE_SET;
1305   }
1306
1307   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1308     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1309     res =
1310         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1311         &stop);
1312     stop_type = GST_SEEK_TYPE_SET;
1313   }
1314
1315   /* And finally, configure our output segment in the desired format */
1316   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1317       stop_type, stop, &update);
1318
1319   if (!res)
1320     goto no_format;
1321
1322   return res;
1323
1324 no_format:
1325   {
1326     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1327     return FALSE;
1328   }
1329 }
1330
1331 static gboolean
1332 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1333     GstSegment * seeksegment)
1334 {
1335   GstBaseSrcClass *bclass;
1336   gboolean result = FALSE;
1337
1338   bclass = GST_BASE_SRC_GET_CLASS (src);
1339
1340   if (bclass->prepare_seek_segment)
1341     result = bclass->prepare_seek_segment (src, event, seeksegment);
1342
1343   return result;
1344 }
1345
1346 static GstFlowReturn
1347 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1348     guint size, GstBuffer ** buffer)
1349 {
1350   GstFlowReturn ret;
1351   GstBaseSrcPrivate *priv = src->priv;
1352
1353   if (priv->pool) {
1354     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1355   } else if (size != -1) {
1356     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1357     if (G_UNLIKELY (*buffer == NULL))
1358       goto alloc_failed;
1359
1360     ret = GST_FLOW_OK;
1361   } else {
1362     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1363         size);
1364     goto alloc_failed;
1365   }
1366   return ret;
1367
1368   /* ERRORS */
1369 alloc_failed:
1370   {
1371     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1372     return GST_FLOW_ERROR;
1373   }
1374 }
1375
1376 static GstFlowReturn
1377 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1378     guint size, GstBuffer ** buffer)
1379 {
1380   GstBaseSrcClass *bclass;
1381   GstFlowReturn ret;
1382   GstBuffer *res_buf;
1383
1384   bclass = GST_BASE_SRC_GET_CLASS (src);
1385
1386   if (G_UNLIKELY (!bclass->alloc))
1387     goto no_function;
1388   if (G_UNLIKELY (!bclass->fill))
1389     goto no_function;
1390
1391   if (*buffer == NULL) {
1392     /* downstream did not provide us with a buffer to fill, allocate one
1393      * ourselves */
1394     ret = bclass->alloc (src, offset, size, &res_buf);
1395     if (G_UNLIKELY (ret != GST_FLOW_OK))
1396       goto alloc_failed;
1397   } else {
1398     res_buf = *buffer;
1399   }
1400
1401   if (G_LIKELY (size > 0)) {
1402     /* only call fill when there is a size */
1403     ret = bclass->fill (src, offset, size, res_buf);
1404     if (G_UNLIKELY (ret != GST_FLOW_OK))
1405       goto not_ok;
1406   }
1407
1408   *buffer = res_buf;
1409
1410   return GST_FLOW_OK;
1411
1412   /* ERRORS */
1413 no_function:
1414   {
1415     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1416     return GST_FLOW_NOT_SUPPORTED;
1417   }
1418 alloc_failed:
1419   {
1420     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1421     return ret;
1422   }
1423 not_ok:
1424   {
1425     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1426         gst_flow_get_name (ret));
1427     if (*buffer == NULL)
1428       gst_buffer_unref (res_buf);
1429     return ret;
1430   }
1431 }
1432
1433 /* this code implements the seeking. It is a good example
1434  * handling all cases.
1435  *
1436  * A seek updates the currently configured segment.start
1437  * and segment.stop values based on the SEEK_TYPE. If the
1438  * segment.start value is updated, a seek to this new position
1439  * should be performed.
1440  *
1441  * The seek can only be executed when we are not currently
1442  * streaming any data, to make sure that this is the case, we
1443  * acquire the STREAM_LOCK which is taken when we are in the
1444  * _loop() function or when a getrange() is called. Normally
1445  * we will not receive a seek if we are operating in pull mode
1446  * though. When we operate as a live source we might block on the live
1447  * cond, which does not release the STREAM_LOCK. Therefore we will try
1448  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1449  * safe to perform the seek.
1450  *
1451  * When we are in the loop() function, we might be in the middle
1452  * of pushing a buffer, which might block in a sink. To make sure
1453  * that the push gets unblocked we push out a FLUSH_START event.
1454  * Our loop function will get a WRONG_STATE return value from
1455  * the push and will pause, effectively releasing the STREAM_LOCK.
1456  *
1457  * For a non-flushing seek, we pause the task, which might eventually
1458  * release the STREAM_LOCK. We say eventually because when the sink
1459  * blocks on the sample we might wait a very long time until the sink
1460  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1461  * can continue the seek. A non-flushing seek is normally done in a
1462  * running pipeline to perform seamless playback, this means that the sink is
1463  * PLAYING and will return from its chain function.
1464  * In the case of a non-flushing seek we need to make sure that the
1465  * data we output after the seek is continuous with the previous data,
1466  * this is because a non-flushing seek does not reset the running-time
1467  * to 0. We do this by closing the currently running segment, ie. sending
1468  * a new_segment event with the stop position set to the last processed
1469  * position.
1470  *
1471  * After updating the segment.start/stop values, we prepare for
1472  * streaming again. We push out a FLUSH_STOP to make the peer pad
1473  * accept data again and we start our task again.
1474  *
1475  * A segment seek posts a message on the bus saying that the playback
1476  * of the segment started. We store the segment flag internally because
1477  * when we reach the segment.stop we have to post a segment.done
1478  * instead of EOS when doing a segment seek.
1479  */
1480 /* FIXME (0.11), we have the unlock gboolean here because most current
1481  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1482  * the streaming thread isn't running, resulting in bogus unlocks later when it
1483  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1484  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1485  * until 0.11
1486  */
1487 static gboolean
1488 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1489 {
1490   gboolean res = TRUE, tres;
1491   gdouble rate;
1492   GstFormat seek_format, dest_format;
1493   GstSeekFlags flags;
1494   GstSeekType cur_type, stop_type;
1495   gint64 cur, stop;
1496   gboolean flush, playing;
1497   gboolean update;
1498   gboolean relative_seek = FALSE;
1499   gboolean seekseg_configured = FALSE;
1500   GstSegment seeksegment;
1501   guint32 seqnum;
1502   GstEvent *tevent;
1503
1504   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1505
1506   GST_OBJECT_LOCK (src);
1507   dest_format = src->segment.format;
1508   GST_OBJECT_UNLOCK (src);
1509
1510   if (event) {
1511     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1512         &cur_type, &cur, &stop_type, &stop);
1513
1514     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1515         SEEK_TYPE_IS_RELATIVE (stop_type);
1516
1517     if (dest_format != seek_format && !relative_seek) {
1518       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1519        * here before taking the stream lock, otherwise we must convert it later,
1520        * once we have the stream lock and can read the last configures segment
1521        * start and stop positions */
1522       gst_segment_init (&seeksegment, dest_format);
1523
1524       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1525         goto prepare_failed;
1526
1527       seekseg_configured = TRUE;
1528     }
1529
1530     flush = flags & GST_SEEK_FLAG_FLUSH;
1531     seqnum = gst_event_get_seqnum (event);
1532   } else {
1533     flush = FALSE;
1534     /* get next seqnum */
1535     seqnum = gst_util_seqnum_next ();
1536   }
1537
1538   /* send flush start */
1539   if (flush) {
1540     tevent = gst_event_new_flush_start ();
1541     gst_event_set_seqnum (tevent, seqnum);
1542     gst_pad_push_event (src->srcpad, tevent);
1543   } else
1544     gst_pad_pause_task (src->srcpad);
1545
1546   /* unblock streaming thread. */
1547   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1548
1549   /* grab streaming lock, this should eventually be possible, either
1550    * because the task is paused, our streaming thread stopped
1551    * or because our peer is flushing. */
1552   GST_PAD_STREAM_LOCK (src->srcpad);
1553   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1554     /* we have seen this event before, issue a warning for now */
1555     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1556         seqnum);
1557   } else {
1558     src->priv->seqnum = seqnum;
1559     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1560   }
1561
1562   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1563
1564   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1565    * copy the current segment info into the temp segment that we can actually
1566    * attempt the seek with. We only update the real segment if the seek succeeds. */
1567   if (!seekseg_configured) {
1568     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1569
1570     /* now configure the final seek segment */
1571     if (event) {
1572       if (seeksegment.format != seek_format) {
1573         /* OK, here's where we give the subclass a chance to convert the relative
1574          * seek into an absolute one in the processing format. We set up any
1575          * absolute seek above, before taking the stream lock. */
1576         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1577           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1578               "Aborting seek");
1579           res = FALSE;
1580         }
1581       } else {
1582         /* The seek format matches our processing format, no need to ask the
1583          * the subclass to configure the segment. */
1584         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1585             cur_type, cur, stop_type, stop, &update);
1586       }
1587     }
1588     /* Else, no seek event passed, so we're just (re)starting the
1589        current segment. */
1590   }
1591
1592   if (res) {
1593     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1594         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1595         seeksegment.start, seeksegment.stop, seeksegment.position);
1596
1597     /* do the seek, segment.position contains the new position. */
1598     res = gst_base_src_do_seek (src, &seeksegment);
1599   }
1600
1601   /* and prepare to continue streaming */
1602   if (flush) {
1603     tevent = gst_event_new_flush_stop (TRUE);
1604     gst_event_set_seqnum (tevent, seqnum);
1605     /* send flush stop, peer will accept data and events again. We
1606      * are not yet providing data as we still have the STREAM_LOCK. */
1607     gst_pad_push_event (src->srcpad, tevent);
1608   }
1609
1610   /* The subclass must have converted the segment to the processing format
1611    * by now */
1612   if (res && seeksegment.format != dest_format) {
1613     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1614         "in the correct format. Aborting seek.");
1615     res = FALSE;
1616   }
1617
1618   /* if the seek was successful, we update our real segment and push
1619    * out the new segment. */
1620   if (res) {
1621     GST_OBJECT_LOCK (src);
1622     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1623     GST_OBJECT_UNLOCK (src);
1624
1625     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1626       GstMessage *message;
1627
1628       message = gst_message_new_segment_start (GST_OBJECT (src),
1629           seeksegment.format, seeksegment.position);
1630       gst_message_set_seqnum (message, seqnum);
1631
1632       gst_element_post_message (GST_ELEMENT (src), message);
1633     }
1634
1635     /* for deriving a stop position for the playback segment from the seek
1636      * segment, we must take the duration when the stop is not set */
1637     if ((stop = seeksegment.stop) == -1)
1638       stop = seeksegment.duration;
1639
1640     src->priv->segment_pending = TRUE;
1641   }
1642
1643   src->priv->discont = TRUE;
1644   src->running = TRUE;
1645   /* and restart the task in case it got paused explicitly or by
1646    * the FLUSH_START event we pushed out. */
1647   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1648       src->srcpad);
1649   if (res && !tres)
1650     res = FALSE;
1651
1652   /* and release the lock again so we can continue streaming */
1653   GST_PAD_STREAM_UNLOCK (src->srcpad);
1654
1655   return res;
1656
1657   /* ERROR */
1658 prepare_failed:
1659   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1660       "Aborting seek");
1661   return FALSE;
1662 }
1663
1664 /* all events send to this element directly. This is mainly done from the
1665  * application.
1666  */
1667 static gboolean
1668 gst_base_src_send_event (GstElement * element, GstEvent * event)
1669 {
1670   GstBaseSrc *src;
1671   gboolean result = FALSE;
1672
1673   src = GST_BASE_SRC (element);
1674
1675   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1676
1677   switch (GST_EVENT_TYPE (event)) {
1678       /* bidirectional events */
1679     case GST_EVENT_FLUSH_START:
1680     case GST_EVENT_FLUSH_STOP:
1681       /* sending random flushes downstream can break stuff,
1682        * especially sync since all segment info will get flushed */
1683       break;
1684
1685       /* downstream serialized events */
1686     case GST_EVENT_EOS:
1687     {
1688       GstBaseSrcClass *bclass;
1689
1690       bclass = GST_BASE_SRC_GET_CLASS (src);
1691
1692       /* queue EOS and make sure the task or pull function performs the EOS
1693        * actions.
1694        *
1695        * We have two possibilities:
1696        *
1697        *  - Before we are to enter the _create function, we check the pending_eos
1698        *    first and do EOS instead of entering it.
1699        *  - If we are in the _create function or we did not manage to set the
1700        *    flag fast enough and we are about to enter the _create function,
1701        *    we unlock it so that we exit with WRONG_STATE immediately. We then
1702        *    check the EOS flag and do the EOS logic.
1703        */
1704       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1705       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1706
1707
1708       /* unlock the _create function so that we can check the pending_eos flag
1709        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1710        * that we can grab it and stop the unlock again. We don't take the stream
1711        * lock so that this operation is guaranteed to never block. */
1712       gst_base_src_activate_pool (src, FALSE);
1713       if (bclass->unlock)
1714         bclass->unlock (src);
1715
1716       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1717
1718       GST_LIVE_LOCK (src);
1719       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1720       /* now stop the unlock of the streaming thread again. Grabbing the live
1721        * lock is enough because that protects the create function. */
1722       if (bclass->unlock_stop)
1723         bclass->unlock_stop (src);
1724       gst_base_src_activate_pool (src, TRUE);
1725       GST_LIVE_UNLOCK (src);
1726
1727       result = TRUE;
1728       break;
1729     }
1730     case GST_EVENT_SEGMENT:
1731       /* sending random SEGMENT downstream can break sync. */
1732       break;
1733     case GST_EVENT_TAG:
1734     case GST_EVENT_CUSTOM_DOWNSTREAM:
1735     case GST_EVENT_CUSTOM_BOTH:
1736       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1737       GST_OBJECT_LOCK (src);
1738       src->priv->pending_events =
1739           g_list_append (src->priv->pending_events, event);
1740       g_atomic_int_set (&src->priv->have_events, TRUE);
1741       GST_OBJECT_UNLOCK (src);
1742       event = NULL;
1743       result = TRUE;
1744       break;
1745     case GST_EVENT_BUFFERSIZE:
1746       /* does not seem to make much sense currently */
1747       break;
1748
1749       /* upstream events */
1750     case GST_EVENT_QOS:
1751       /* elements should override send_event and do something */
1752       break;
1753     case GST_EVENT_SEEK:
1754     {
1755       gboolean started;
1756
1757       GST_OBJECT_LOCK (src->srcpad);
1758       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1759         goto wrong_mode;
1760       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1761       GST_OBJECT_UNLOCK (src->srcpad);
1762
1763       if (started) {
1764         GST_DEBUG_OBJECT (src, "performing seek");
1765         /* when we are running in push mode, we can execute the
1766          * seek right now, we need to unlock. */
1767         result = gst_base_src_perform_seek (src, event, TRUE);
1768       } else {
1769         GstEvent **event_p;
1770
1771         /* else we store the event and execute the seek when we
1772          * get activated */
1773         GST_OBJECT_LOCK (src);
1774         GST_DEBUG_OBJECT (src, "queueing seek");
1775         event_p = &src->pending_seek;
1776         gst_event_replace ((GstEvent **) event_p, event);
1777         GST_OBJECT_UNLOCK (src);
1778         /* assume the seek will work */
1779         result = TRUE;
1780       }
1781       break;
1782     }
1783     case GST_EVENT_NAVIGATION:
1784       /* could make sense for elements that do something with navigation events
1785        * but then they would need to override the send_event function */
1786       break;
1787     case GST_EVENT_LATENCY:
1788       /* does not seem to make sense currently */
1789       break;
1790
1791       /* custom events */
1792     case GST_EVENT_CUSTOM_UPSTREAM:
1793       /* override send_event if you want this */
1794       break;
1795     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1796     case GST_EVENT_CUSTOM_BOTH_OOB:
1797       /* insert a random custom event into the pipeline */
1798       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1799       result = gst_pad_push_event (src->srcpad, event);
1800       /* we gave away the ref to the event in the push */
1801       event = NULL;
1802       break;
1803     default:
1804       break;
1805   }
1806 done:
1807   /* if we still have a ref to the event, unref it now */
1808   if (event)
1809     gst_event_unref (event);
1810
1811   return result;
1812
1813   /* ERRORS */
1814 wrong_mode:
1815   {
1816     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1817     GST_OBJECT_UNLOCK (src->srcpad);
1818     result = FALSE;
1819     goto done;
1820   }
1821 }
1822
1823 static gboolean
1824 gst_base_src_seekable (GstBaseSrc * src)
1825 {
1826   GstBaseSrcClass *bclass;
1827   bclass = GST_BASE_SRC_GET_CLASS (src);
1828   if (bclass->is_seekable)
1829     return bclass->is_seekable (src);
1830   else
1831     return FALSE;
1832 }
1833
1834 static void
1835 gst_base_src_update_qos (GstBaseSrc * src,
1836     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1837 {
1838   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1839       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1840       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1841
1842   GST_OBJECT_LOCK (src);
1843   src->priv->proportion = proportion;
1844   src->priv->earliest_time = timestamp + diff;
1845   GST_OBJECT_UNLOCK (src);
1846 }
1847
1848
1849 static gboolean
1850 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1851 {
1852   gboolean result;
1853
1854   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1855
1856   switch (GST_EVENT_TYPE (event)) {
1857     case GST_EVENT_SEEK:
1858       /* is normally called when in push mode */
1859       if (!gst_base_src_seekable (src))
1860         goto not_seekable;
1861
1862       result = gst_base_src_perform_seek (src, event, TRUE);
1863       break;
1864     case GST_EVENT_FLUSH_START:
1865       /* cancel any blocking getrange, is normally called
1866        * when in pull mode. */
1867       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1868       break;
1869     case GST_EVENT_FLUSH_STOP:
1870       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1871       break;
1872     case GST_EVENT_QOS:
1873     {
1874       gdouble proportion;
1875       GstClockTimeDiff diff;
1876       GstClockTime timestamp;
1877
1878       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1879       gst_base_src_update_qos (src, proportion, diff, timestamp);
1880       result = TRUE;
1881       break;
1882     }
1883     case GST_EVENT_RECONFIGURE:
1884       result = TRUE;
1885       break;
1886     case GST_EVENT_LATENCY:
1887       result = TRUE;
1888       break;
1889     default:
1890       result = FALSE;
1891       break;
1892   }
1893   return result;
1894
1895   /* ERRORS */
1896 not_seekable:
1897   {
1898     GST_DEBUG_OBJECT (src, "is not seekable");
1899     return FALSE;
1900   }
1901 }
1902
1903 static gboolean
1904 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1905 {
1906   GstBaseSrc *src;
1907   GstBaseSrcClass *bclass;
1908   gboolean result = FALSE;
1909
1910   src = GST_BASE_SRC (parent);
1911   bclass = GST_BASE_SRC_GET_CLASS (src);
1912
1913   if (bclass->event) {
1914     if (!(result = bclass->event (src, event)))
1915       goto subclass_failed;
1916   }
1917
1918 done:
1919   gst_event_unref (event);
1920
1921   return result;
1922
1923   /* ERRORS */
1924 subclass_failed:
1925   {
1926     GST_DEBUG_OBJECT (src, "subclass refused event");
1927     goto done;
1928   }
1929 }
1930
1931 static void
1932 gst_base_src_set_property (GObject * object, guint prop_id,
1933     const GValue * value, GParamSpec * pspec)
1934 {
1935   GstBaseSrc *src;
1936
1937   src = GST_BASE_SRC (object);
1938
1939   switch (prop_id) {
1940     case PROP_BLOCKSIZE:
1941       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1942       break;
1943     case PROP_NUM_BUFFERS:
1944       src->num_buffers = g_value_get_int (value);
1945       break;
1946     case PROP_TYPEFIND:
1947       src->typefind = g_value_get_boolean (value);
1948       break;
1949     case PROP_DO_TIMESTAMP:
1950       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1951       break;
1952     default:
1953       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1954       break;
1955   }
1956 }
1957
1958 static void
1959 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1960     GParamSpec * pspec)
1961 {
1962   GstBaseSrc *src;
1963
1964   src = GST_BASE_SRC (object);
1965
1966   switch (prop_id) {
1967     case PROP_BLOCKSIZE:
1968       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1969       break;
1970     case PROP_NUM_BUFFERS:
1971       g_value_set_int (value, src->num_buffers);
1972       break;
1973     case PROP_TYPEFIND:
1974       g_value_set_boolean (value, src->typefind);
1975       break;
1976     case PROP_DO_TIMESTAMP:
1977       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1978       break;
1979     default:
1980       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1981       break;
1982   }
1983 }
1984
1985 /* with STREAM_LOCK and LOCK */
1986 static GstClockReturn
1987 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1988 {
1989   GstClockReturn ret;
1990   GstClockID id;
1991
1992   id = gst_clock_new_single_shot_id (clock, time);
1993
1994   basesrc->clock_id = id;
1995   /* release the live lock while waiting */
1996   GST_LIVE_UNLOCK (basesrc);
1997
1998   ret = gst_clock_id_wait (id, NULL);
1999
2000   GST_LIVE_LOCK (basesrc);
2001   gst_clock_id_unref (id);
2002   basesrc->clock_id = NULL;
2003
2004   return ret;
2005 }
2006
2007 /* perform synchronisation on a buffer.
2008  * with STREAM_LOCK.
2009  */
2010 static GstClockReturn
2011 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2012 {
2013   GstClockReturn result;
2014   GstClockTime start, end;
2015   GstBaseSrcClass *bclass;
2016   GstClockTime base_time;
2017   GstClock *clock;
2018   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
2019   gboolean do_timestamp, first, pseudo_live;
2020
2021   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2022
2023   start = end = -1;
2024   if (bclass->get_times)
2025     bclass->get_times (basesrc, buffer, &start, &end);
2026
2027   /* get buffer timestamp */
2028   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2029
2030   /* grab the lock to prepare for clocking and calculate the startup
2031    * latency. */
2032   GST_OBJECT_LOCK (basesrc);
2033
2034   /* if we are asked to sync against the clock we are a pseudo live element */
2035   pseudo_live = (start != -1 && basesrc->is_live);
2036   /* check for the first buffer */
2037   first = (basesrc->priv->latency == -1);
2038
2039   if (timestamp != -1 && pseudo_live) {
2040     GstClockTime latency;
2041
2042     /* we have a timestamp and a sync time, latency is the diff */
2043     if (timestamp <= start)
2044       latency = start - timestamp;
2045     else
2046       latency = 0;
2047
2048     if (first) {
2049       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2050           GST_TIME_ARGS (latency));
2051       /* first time we calculate latency, just configure */
2052       basesrc->priv->latency = latency;
2053     } else {
2054       if (basesrc->priv->latency != latency) {
2055         /* we have a new latency, FIXME post latency message */
2056         basesrc->priv->latency = latency;
2057         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2058             GST_TIME_ARGS (latency));
2059       }
2060     }
2061   } else if (first) {
2062     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2063         basesrc->is_live, start != -1);
2064     basesrc->priv->latency = 0;
2065   }
2066
2067   /* get clock, if no clock, we can't sync or do timestamps */
2068   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2069     goto no_clock;
2070
2071   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2072
2073   do_timestamp = basesrc->priv->do_timestamp;
2074
2075   /* first buffer, calculate the timestamp offset */
2076   if (first) {
2077     GstClockTime running_time;
2078
2079     now = gst_clock_get_time (clock);
2080     running_time = now - base_time;
2081
2082     GST_LOG_OBJECT (basesrc,
2083         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
2084         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
2085         GST_TIME_ARGS (running_time));
2086
2087     if (pseudo_live && timestamp != -1) {
2088       /* live source and we need to sync, add startup latency to all timestamps
2089        * to get the real running_time. Live sources should always timestamp
2090        * according to the current running time. */
2091       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2092
2093       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2094           GST_TIME_ARGS (basesrc->priv->ts_offset));
2095     } else {
2096       basesrc->priv->ts_offset = 0;
2097       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2098     }
2099
2100     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
2101       if (do_timestamp)
2102         timestamp = running_time;
2103       else
2104         timestamp = 0;
2105
2106       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
2107
2108       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2109           GST_TIME_ARGS (timestamp));
2110     }
2111
2112     /* add the timestamp offset we need for sync */
2113     timestamp += basesrc->priv->ts_offset;
2114   } else {
2115     /* not the first buffer, the timestamp is the diff between the clock and
2116      * base_time */
2117     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
2118       now = gst_clock_get_time (clock);
2119
2120       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
2121
2122       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2123           GST_TIME_ARGS (now - base_time));
2124     }
2125   }
2126
2127   /* if we don't have a buffer timestamp, we don't sync */
2128   if (!GST_CLOCK_TIME_IS_VALID (start))
2129     goto no_sync;
2130
2131   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
2132     /* for pseudo live sources, add our ts_offset to the timestamp */
2133     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
2134     start += basesrc->priv->ts_offset;
2135   }
2136
2137   GST_LOG_OBJECT (basesrc,
2138       "waiting for clock, base time %" GST_TIME_FORMAT
2139       ", stream_start %" GST_TIME_FORMAT,
2140       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2141   GST_OBJECT_UNLOCK (basesrc);
2142
2143   result = gst_base_src_wait (basesrc, clock, start + base_time);
2144
2145   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2146
2147   return result;
2148
2149   /* special cases */
2150 no_clock:
2151   {
2152     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2153     GST_OBJECT_UNLOCK (basesrc);
2154     return GST_CLOCK_OK;
2155   }
2156 no_sync:
2157   {
2158     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2159     GST_OBJECT_UNLOCK (basesrc);
2160     return GST_CLOCK_OK;
2161   }
2162 }
2163
2164 /* Called with STREAM_LOCK and LIVE_LOCK */
2165 static gboolean
2166 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2167 {
2168   guint64 size, maxsize;
2169   GstBaseSrcClass *bclass;
2170   GstFormat format;
2171   gint64 stop;
2172   gboolean dynamic;
2173
2174   bclass = GST_BASE_SRC_GET_CLASS (src);
2175
2176   format = src->segment.format;
2177   stop = src->segment.stop;
2178   /* get total file size */
2179   size = src->segment.duration;
2180
2181   /* only operate if we are working with bytes */
2182   if (format != GST_FORMAT_BYTES)
2183     return TRUE;
2184
2185   /* the max amount of bytes to read is the total size or
2186    * up to the segment.stop if present. */
2187   if (stop != -1)
2188     maxsize = MIN (size, stop);
2189   else
2190     maxsize = size;
2191
2192   GST_DEBUG_OBJECT (src,
2193       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2194       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2195       *length, size, stop, maxsize);
2196
2197   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2198   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2199
2200   /* check size if we have one */
2201   if (maxsize != -1) {
2202     /* if we run past the end, check if the file became bigger and
2203      * retry. */
2204     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2205       /* see if length of the file changed */
2206       if (bclass->get_size)
2207         if (!bclass->get_size (src, &size))
2208           size = -1;
2209
2210       /* make sure we don't exceed the configured segment stop
2211        * if it was set */
2212       if (stop != -1)
2213         maxsize = MIN (size, stop);
2214       else
2215         maxsize = size;
2216
2217       /* if we are at or past the end, EOS */
2218       if (G_UNLIKELY (offset >= maxsize))
2219         goto unexpected_length;
2220
2221       /* else we can clip to the end */
2222       if (G_UNLIKELY (offset + *length >= maxsize))
2223         *length = maxsize - offset;
2224
2225     }
2226   }
2227
2228   /* keep track of current duration.
2229    * segment is in bytes, we checked that above. */
2230   GST_OBJECT_LOCK (src);
2231   src->segment.duration = size;
2232   GST_OBJECT_UNLOCK (src);
2233
2234   return TRUE;
2235
2236   /* ERRORS */
2237 unexpected_length:
2238   {
2239     return FALSE;
2240   }
2241 }
2242
2243 /* must be called with LIVE_LOCK */
2244 static GstFlowReturn
2245 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2246     GstBuffer ** buf)
2247 {
2248   GstFlowReturn ret;
2249   GstBaseSrcClass *bclass;
2250   GstClockReturn status;
2251   GstBuffer *res_buf;
2252
2253   bclass = GST_BASE_SRC_GET_CLASS (src);
2254
2255 again:
2256   if (src->is_live) {
2257     if (G_UNLIKELY (!src->live_running)) {
2258       ret = gst_base_src_wait_playing (src);
2259       if (ret != GST_FLOW_OK)
2260         goto stopped;
2261     }
2262   }
2263
2264   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2265           && !GST_BASE_SRC_IS_STARTING (src)))
2266     goto not_started;
2267
2268   if (G_UNLIKELY (!bclass->create))
2269     goto no_function;
2270
2271   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2272     goto unexpected_length;
2273
2274   /* track position */
2275   GST_OBJECT_LOCK (src);
2276   if (src->segment.format == GST_FORMAT_BYTES)
2277     src->segment.position = offset;
2278   GST_OBJECT_UNLOCK (src);
2279
2280   /* normally we don't count buffers */
2281   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2282     if (src->num_buffers_left == 0)
2283       goto reached_num_buffers;
2284     else
2285       src->num_buffers_left--;
2286   }
2287
2288   /* don't enter the create function if a pending EOS event was set. For the
2289    * logic of the pending_eos, check the event function of this class. */
2290   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2291     goto eos;
2292
2293   GST_DEBUG_OBJECT (src,
2294       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2295       G_GINT64_FORMAT, offset, length, src->segment.time);
2296
2297   res_buf = *buf;
2298
2299   ret = bclass->create (src, offset, length, &res_buf);
2300
2301   /* The create function could be unlocked because we have a pending EOS. It's
2302    * possible that we have a valid buffer from create that we need to
2303    * discard when the create function returned _OK. */
2304   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2305     if (ret == GST_FLOW_OK) {
2306       if (*buf == NULL)
2307         gst_buffer_unref (res_buf);
2308     }
2309     goto eos;
2310   }
2311
2312   if (G_UNLIKELY (ret != GST_FLOW_OK))
2313     goto not_ok;
2314
2315   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2316   if (offset == 0 && src->segment.time == 0
2317       && GST_BUFFER_TIMESTAMP (res_buf) == -1 && !src->is_live) {
2318     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2319     res_buf = gst_buffer_make_writable (res_buf);
2320     GST_BUFFER_TIMESTAMP (res_buf) = 0;
2321   }
2322
2323   /* now sync before pushing the buffer */
2324   status = gst_base_src_do_sync (src, res_buf);
2325
2326   /* waiting for the clock could have made us flushing */
2327   if (G_UNLIKELY (src->priv->flushing))
2328     goto flushing;
2329
2330   switch (status) {
2331     case GST_CLOCK_EARLY:
2332       /* the buffer is too late. We currently don't drop the buffer. */
2333       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2334       break;
2335     case GST_CLOCK_OK:
2336       /* buffer synchronised properly */
2337       GST_DEBUG_OBJECT (src, "buffer ok");
2338       break;
2339     case GST_CLOCK_UNSCHEDULED:
2340       /* this case is triggered when we were waiting for the clock and
2341        * it got unlocked because we did a state change. In any case, get rid of
2342        * the buffer. */
2343       if (*buf == NULL)
2344         gst_buffer_unref (res_buf);
2345
2346       if (!src->live_running) {
2347         /* We return WRONG_STATE when we are not running to stop the dataflow also
2348          * get rid of the produced buffer. */
2349         GST_DEBUG_OBJECT (src,
2350             "clock was unscheduled (%d), returning WRONG_STATE", status);
2351         ret = GST_FLOW_FLUSHING;
2352       } else {
2353         /* If we are running when this happens, we quickly switched between
2354          * pause and playing. We try to produce a new buffer */
2355         GST_DEBUG_OBJECT (src,
2356             "clock was unscheduled (%d), but we are running", status);
2357         goto again;
2358       }
2359       break;
2360     default:
2361       /* all other result values are unexpected and errors */
2362       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2363           (_("Internal clock error.")),
2364           ("clock returned unexpected return value %d", status));
2365       if (*buf == NULL)
2366         gst_buffer_unref (res_buf);
2367       ret = GST_FLOW_ERROR;
2368       break;
2369   }
2370   if (G_LIKELY (ret == GST_FLOW_OK))
2371     *buf = res_buf;
2372
2373   return ret;
2374
2375   /* ERROR */
2376 stopped:
2377   {
2378     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2379         gst_flow_get_name (ret));
2380     return ret;
2381   }
2382 not_ok:
2383   {
2384     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2385         gst_flow_get_name (ret));
2386     return ret;
2387   }
2388 not_started:
2389   {
2390     GST_DEBUG_OBJECT (src, "getrange but not started");
2391     return GST_FLOW_FLUSHING;
2392   }
2393 no_function:
2394   {
2395     GST_DEBUG_OBJECT (src, "no create function");
2396     return GST_FLOW_NOT_SUPPORTED;
2397   }
2398 unexpected_length:
2399   {
2400     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2401         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2402     return GST_FLOW_EOS;
2403   }
2404 reached_num_buffers:
2405   {
2406     GST_DEBUG_OBJECT (src, "sent all buffers");
2407     return GST_FLOW_EOS;
2408   }
2409 flushing:
2410   {
2411     GST_DEBUG_OBJECT (src, "we are flushing");
2412     if (*buf == NULL)
2413       gst_buffer_unref (res_buf);
2414     return GST_FLOW_FLUSHING;
2415   }
2416 eos:
2417   {
2418     GST_DEBUG_OBJECT (src, "we are EOS");
2419     return GST_FLOW_EOS;
2420   }
2421 }
2422
2423 static GstFlowReturn
2424 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2425     guint length, GstBuffer ** buf)
2426 {
2427   GstBaseSrc *src;
2428   GstFlowReturn res;
2429
2430   src = GST_BASE_SRC_CAST (parent);
2431
2432   GST_LIVE_LOCK (src);
2433   if (G_UNLIKELY (src->priv->flushing))
2434     goto flushing;
2435
2436   res = gst_base_src_get_range (src, offset, length, buf);
2437
2438 done:
2439   GST_LIVE_UNLOCK (src);
2440
2441   return res;
2442
2443   /* ERRORS */
2444 flushing:
2445   {
2446     GST_DEBUG_OBJECT (src, "we are flushing");
2447     res = GST_FLOW_FLUSHING;
2448     goto done;
2449   }
2450 }
2451
2452 static gboolean
2453 gst_base_src_is_random_access (GstBaseSrc * src)
2454 {
2455   /* we need to start the basesrc to check random access */
2456   if (!GST_BASE_SRC_IS_STARTED (src)) {
2457     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2458     if (G_LIKELY (gst_base_src_start (src))) {
2459       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2460         goto start_failed;
2461       gst_base_src_stop (src);
2462     }
2463   }
2464
2465   return src->random_access;
2466
2467   /* ERRORS */
2468 start_failed:
2469   {
2470     GST_DEBUG_OBJECT (src, "failed to start");
2471     return FALSE;
2472   }
2473 }
2474
2475 static void
2476 gst_base_src_loop (GstPad * pad)
2477 {
2478   GstBaseSrc *src;
2479   GstBuffer *buf = NULL;
2480   GstFlowReturn ret;
2481   gint64 position;
2482   gboolean eos;
2483   guint blocksize;
2484   GList *pending_events = NULL, *tmp;
2485
2486   eos = FALSE;
2487
2488   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2489
2490   gst_base_src_send_stream_start (src);
2491
2492   /* check if we need to renegotiate */
2493   if (gst_pad_check_reconfigure (pad)) {
2494     if (!gst_base_src_negotiate (src))
2495       goto not_negotiated;
2496   }
2497
2498   GST_LIVE_LOCK (src);
2499
2500   if (G_UNLIKELY (src->priv->flushing))
2501     goto flushing;
2502
2503   blocksize = src->blocksize;
2504
2505   /* if we operate in bytes, we can calculate an offset */
2506   if (src->segment.format == GST_FORMAT_BYTES) {
2507     position = src->segment.position;
2508     /* for negative rates, start with subtracting the blocksize */
2509     if (src->segment.rate < 0.0) {
2510       /* we cannot go below segment.start */
2511       if (position > src->segment.start + blocksize)
2512         position -= blocksize;
2513       else {
2514         /* last block, remainder up to segment.start */
2515         blocksize = position - src->segment.start;
2516         position = src->segment.start;
2517       }
2518     }
2519   } else
2520     position = -1;
2521
2522   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2523       GST_TIME_ARGS (position), blocksize);
2524
2525   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2526   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2527     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2528         gst_flow_get_name (ret));
2529     GST_LIVE_UNLOCK (src);
2530     goto pause;
2531   }
2532   /* this should not happen */
2533   if (G_UNLIKELY (buf == NULL))
2534     goto null_buffer;
2535
2536   /* push events to close/start our segment before we push the buffer. */
2537   if (G_UNLIKELY (src->priv->segment_pending)) {
2538     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2539     src->priv->segment_pending = FALSE;
2540   }
2541
2542   if (g_atomic_int_get (&src->priv->have_events)) {
2543     GST_OBJECT_LOCK (src);
2544     /* take the events */
2545     pending_events = src->priv->pending_events;
2546     src->priv->pending_events = NULL;
2547     g_atomic_int_set (&src->priv->have_events, FALSE);
2548     GST_OBJECT_UNLOCK (src);
2549   }
2550
2551   /* Push out pending events if any */
2552   if (G_UNLIKELY (pending_events != NULL)) {
2553     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2554       GstEvent *ev = (GstEvent *) tmp->data;
2555       gst_pad_push_event (pad, ev);
2556     }
2557     g_list_free (pending_events);
2558   }
2559
2560   /* figure out the new position */
2561   switch (src->segment.format) {
2562     case GST_FORMAT_BYTES:
2563     {
2564       guint bufsize = gst_buffer_get_size (buf);
2565
2566       /* we subtracted above for negative rates */
2567       if (src->segment.rate >= 0.0)
2568         position += bufsize;
2569       break;
2570     }
2571     case GST_FORMAT_TIME:
2572     {
2573       GstClockTime start, duration;
2574
2575       start = GST_BUFFER_TIMESTAMP (buf);
2576       duration = GST_BUFFER_DURATION (buf);
2577
2578       if (GST_CLOCK_TIME_IS_VALID (start))
2579         position = start;
2580       else
2581         position = src->segment.position;
2582
2583       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2584         if (src->segment.rate >= 0.0)
2585           position += duration;
2586         else if (position > duration)
2587           position -= duration;
2588         else
2589           position = 0;
2590       }
2591       break;
2592     }
2593     case GST_FORMAT_DEFAULT:
2594       if (src->segment.rate >= 0.0)
2595         position = GST_BUFFER_OFFSET_END (buf);
2596       else
2597         position = GST_BUFFER_OFFSET (buf);
2598       break;
2599     default:
2600       position = -1;
2601       break;
2602   }
2603   if (position != -1) {
2604     if (src->segment.rate >= 0.0) {
2605       /* positive rate, check if we reached the stop */
2606       if (src->segment.stop != -1) {
2607         if (position >= src->segment.stop) {
2608           eos = TRUE;
2609           position = src->segment.stop;
2610         }
2611       }
2612     } else {
2613       /* negative rate, check if we reached the start. start is always set to
2614        * something different from -1 */
2615       if (position <= src->segment.start) {
2616         eos = TRUE;
2617         position = src->segment.start;
2618       }
2619       /* when going reverse, all buffers are DISCONT */
2620       src->priv->discont = TRUE;
2621     }
2622     GST_OBJECT_LOCK (src);
2623     src->segment.position = position;
2624     GST_OBJECT_UNLOCK (src);
2625   }
2626
2627   if (G_UNLIKELY (src->priv->discont)) {
2628     GST_INFO_OBJECT (src, "marking pending DISCONT");
2629     buf = gst_buffer_make_writable (buf);
2630     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2631     src->priv->discont = FALSE;
2632   }
2633   GST_LIVE_UNLOCK (src);
2634
2635   ret = gst_pad_push (pad, buf);
2636   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2637     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2638         gst_flow_get_name (ret));
2639     goto pause;
2640   }
2641
2642   if (G_UNLIKELY (eos)) {
2643     GST_INFO_OBJECT (src, "pausing after end of segment");
2644     ret = GST_FLOW_EOS;
2645     goto pause;
2646   }
2647
2648 done:
2649   return;
2650
2651   /* special cases */
2652 not_negotiated:
2653   {
2654     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2655     ret = GST_FLOW_NOT_NEGOTIATED;
2656     goto pause;
2657   }
2658 flushing:
2659   {
2660     GST_DEBUG_OBJECT (src, "we are flushing");
2661     GST_LIVE_UNLOCK (src);
2662     ret = GST_FLOW_FLUSHING;
2663     goto pause;
2664   }
2665 pause:
2666   {
2667     const gchar *reason = gst_flow_get_name (ret);
2668     GstEvent *event;
2669
2670     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2671     src->running = FALSE;
2672     gst_pad_pause_task (pad);
2673     if (ret == GST_FLOW_EOS) {
2674       gboolean flag_segment;
2675       GstFormat format;
2676       gint64 position;
2677
2678       /* perform EOS logic */
2679       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2680       format = src->segment.format;
2681       position = src->segment.position;
2682
2683       if (flag_segment) {
2684         GstMessage *message;
2685
2686         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2687             format, position);
2688         gst_message_set_seqnum (message, src->priv->seqnum);
2689         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2690       } else {
2691         event = gst_event_new_eos ();
2692         gst_event_set_seqnum (event, src->priv->seqnum);
2693         gst_pad_push_event (pad, event);
2694       }
2695     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2696       event = gst_event_new_eos ();
2697       gst_event_set_seqnum (event, src->priv->seqnum);
2698       /* for fatal errors we post an error message, post the error
2699        * first so the app knows about the error first.
2700        * Also don't do this for WRONG_STATE because it happens
2701        * due to flushing and posting an error message because of
2702        * that is the wrong thing to do, e.g. when we're doing
2703        * a flushing seek. */
2704       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2705           (_("Internal data flow error.")),
2706           ("streaming task paused, reason %s (%d)", reason, ret));
2707       gst_pad_push_event (pad, event);
2708     }
2709     goto done;
2710   }
2711 null_buffer:
2712   {
2713     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2714         (_("Internal data flow error.")), ("element returned NULL buffer"));
2715     GST_LIVE_UNLOCK (src);
2716     goto done;
2717   }
2718 }
2719
2720 static gboolean
2721 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2722     GstAllocator * allocator, GstAllocationParams * params)
2723 {
2724   GstAllocator *oldalloc;
2725   GstBufferPool *oldpool;
2726   GstBaseSrcPrivate *priv = basesrc->priv;
2727
2728   if (pool) {
2729     GST_DEBUG_OBJECT (basesrc, "activate pool");
2730     if (!gst_buffer_pool_set_active (pool, TRUE))
2731       goto activate_failed;
2732   }
2733
2734   GST_OBJECT_LOCK (basesrc);
2735   oldpool = priv->pool;
2736   priv->pool = pool;
2737
2738   oldalloc = priv->allocator;
2739   priv->allocator = allocator;
2740
2741   if (params)
2742     priv->params = *params;
2743   else
2744     gst_allocation_params_init (&priv->params);
2745   GST_OBJECT_UNLOCK (basesrc);
2746
2747   if (oldpool) {
2748     /* only deactivate if the pool is not the one we're using */
2749     if (oldpool != pool) {
2750       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2751       gst_buffer_pool_set_active (oldpool, FALSE);
2752     }
2753     gst_object_unref (oldpool);
2754   }
2755   if (oldalloc) {
2756     gst_allocator_unref (oldalloc);
2757   }
2758   return TRUE;
2759
2760   /* ERRORS */
2761 activate_failed:
2762   {
2763     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2764     return FALSE;
2765   }
2766 }
2767
2768 static gboolean
2769 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2770 {
2771   GstBaseSrcPrivate *priv = basesrc->priv;
2772   GstBufferPool *pool;
2773   gboolean res = TRUE;
2774
2775   GST_OBJECT_LOCK (basesrc);
2776   if ((pool = priv->pool))
2777     pool = gst_object_ref (pool);
2778   GST_OBJECT_UNLOCK (basesrc);
2779
2780   if (pool) {
2781     res = gst_buffer_pool_set_active (pool, active);
2782     gst_object_unref (pool);
2783   }
2784   return res;
2785 }
2786
2787 static gboolean
2788 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2789 {
2790   GstBaseSrcClass *bclass;
2791   gboolean result = TRUE;
2792   GstQuery *query;
2793   GstBufferPool *pool = NULL;
2794   GstAllocator *allocator = NULL;
2795   guint size, min, max;
2796   GstAllocationParams params;
2797
2798   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2799
2800   /* make query and let peer pad answer, we don't really care if it worked or
2801    * not, if it failed, the allocation query would contain defaults and the
2802    * subclass would then set better values if needed */
2803   query = gst_query_new_allocation (caps, TRUE);
2804   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2805     /* not a problem, just debug a little */
2806     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2807   }
2808
2809   if (G_LIKELY (bclass->decide_allocation))
2810     result = bclass->decide_allocation (basesrc, query);
2811
2812   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2813       query);
2814
2815   if (gst_query_get_n_allocation_params (query) > 0) {
2816     /* try the allocator */
2817     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2818   } else {
2819     allocator = NULL;
2820     gst_allocation_params_init (&params);
2821   }
2822
2823   if (gst_query_get_n_allocation_pools (query) > 0) {
2824     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2825
2826     if (pool == NULL) {
2827       /* no pool, just parameters, we can make our own */
2828       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2829       pool = gst_buffer_pool_new ();
2830     }
2831   } else {
2832     pool = NULL;
2833     size = min = max = 0;
2834   }
2835
2836   /* now configure */
2837   if (pool) {
2838     GstStructure *config;
2839
2840     config = gst_buffer_pool_get_config (pool);
2841     gst_buffer_pool_config_set_params (config, caps, size, min, max);
2842     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2843     gst_buffer_pool_set_config (pool, config);
2844   }
2845
2846   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2847
2848   gst_query_unref (query);
2849
2850   return result;
2851
2852 }
2853
2854 /* default negotiation code.
2855  *
2856  * Take intersection between src and sink pads, take first
2857  * caps and fixate.
2858  */
2859 static gboolean
2860 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2861 {
2862   GstCaps *thiscaps;
2863   GstCaps *caps = NULL;
2864   GstCaps *peercaps = NULL;
2865   gboolean result = FALSE;
2866
2867   /* first see what is possible on our source pad */
2868   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2869   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2870   /* nothing or anything is allowed, we're done */
2871   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2872     goto no_nego_needed;
2873
2874   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2875     goto no_caps;
2876
2877   /* get the peer caps */
2878   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2879   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2880   if (peercaps) {
2881     /* The result is already a subset of our caps */
2882     caps = peercaps;
2883     gst_caps_unref (thiscaps);
2884   } else {
2885     /* no peer, work with our own caps then */
2886     caps = thiscaps;
2887   }
2888   if (caps && !gst_caps_is_empty (caps)) {
2889     /* now fixate */
2890     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2891     if (gst_caps_is_any (caps)) {
2892       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
2893       /* hmm, still anything, so element can do anything and
2894        * nego is not needed */
2895       result = TRUE;
2896     } else {
2897       caps = gst_base_src_fixate (basesrc, caps);
2898       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2899       if (gst_caps_is_fixed (caps)) {
2900         /* yay, fixed caps, use those then, it's possible that the subclass does
2901          * not accept this caps after all and we have to fail. */
2902         result = gst_base_src_set_caps (basesrc, caps);
2903       }
2904     }
2905     gst_caps_unref (caps);
2906   } else {
2907     if (caps)
2908       gst_caps_unref (caps);
2909     GST_DEBUG_OBJECT (basesrc, "no common caps");
2910   }
2911   return result;
2912
2913 no_nego_needed:
2914   {
2915     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2916     if (thiscaps)
2917       gst_caps_unref (thiscaps);
2918     return TRUE;
2919   }
2920 no_caps:
2921   {
2922     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2923         ("No supported formats found"),
2924         ("This element did not produce valid caps"));
2925     if (thiscaps)
2926       gst_caps_unref (thiscaps);
2927     return TRUE;
2928   }
2929 }
2930
2931 static gboolean
2932 gst_base_src_negotiate (GstBaseSrc * basesrc)
2933 {
2934   GstBaseSrcClass *bclass;
2935   gboolean result;
2936
2937   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2938
2939   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
2940
2941   if (G_LIKELY (bclass->negotiate))
2942     result = bclass->negotiate (basesrc);
2943   else
2944     result = TRUE;
2945
2946   if (G_LIKELY (result)) {
2947     GstCaps *caps;
2948
2949     caps = gst_pad_get_current_caps (basesrc->srcpad);
2950
2951     result = gst_base_src_prepare_allocation (basesrc, caps);
2952
2953     if (caps)
2954       gst_caps_unref (caps);
2955   }
2956   return result;
2957 }
2958
2959 static gboolean
2960 gst_base_src_start (GstBaseSrc * basesrc)
2961 {
2962   GstBaseSrcClass *bclass;
2963   gboolean result;
2964
2965   GST_LIVE_LOCK (basesrc);
2966   if (GST_BASE_SRC_IS_STARTING (basesrc))
2967     goto was_starting;
2968   if (GST_BASE_SRC_IS_STARTED (basesrc))
2969     goto was_started;
2970
2971   basesrc->priv->start_result = GST_FLOW_FLUSHING;
2972   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
2973   basesrc->num_buffers_left = basesrc->num_buffers;
2974   basesrc->running = FALSE;
2975   basesrc->priv->segment_pending = FALSE;
2976   GST_OBJECT_LOCK (basesrc);
2977   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2978   GST_OBJECT_UNLOCK (basesrc);
2979   GST_LIVE_UNLOCK (basesrc);
2980
2981   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2982   if (bclass->start)
2983     result = bclass->start (basesrc);
2984   else
2985     result = TRUE;
2986
2987   if (!result)
2988     goto could_not_start;
2989
2990   if (!gst_base_src_is_async (basesrc))
2991     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
2992
2993   return result;
2994
2995   /* ERROR */
2996 was_starting:
2997   {
2998     GST_DEBUG_OBJECT (basesrc, "was starting");
2999     GST_LIVE_UNLOCK (basesrc);
3000     return TRUE;
3001   }
3002 was_started:
3003   {
3004     GST_DEBUG_OBJECT (basesrc, "was started");
3005     GST_LIVE_UNLOCK (basesrc);
3006     return TRUE;
3007   }
3008 could_not_start:
3009   {
3010     GST_DEBUG_OBJECT (basesrc, "could not start");
3011     /* subclass is supposed to post a message. We don't have to call _stop. */
3012     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3013     return FALSE;
3014   }
3015 }
3016
3017 /**
3018  * gst_base_src_start_complete:
3019  * @src: base source instance
3020  * @ret: a #GstFlowReturn
3021  *
3022  * Complete an asynchronous start operation. When the subclass overrides the
3023  * start method, it should call gst_base_src_start_complete() when the start
3024  * operation completes either from the same thread or from an asynchronous
3025  * helper thread.
3026  */
3027 void
3028 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3029 {
3030   gboolean have_size;
3031   guint64 size;
3032   gboolean seekable;
3033   GstFormat format;
3034   GstPadMode mode;
3035   GstEvent *event;
3036
3037   if (ret != GST_FLOW_OK)
3038     goto error;
3039
3040   GST_DEBUG_OBJECT (basesrc, "starting source");
3041   format = basesrc->segment.format;
3042
3043   /* figure out the size */
3044   have_size = FALSE;
3045   size = -1;
3046   if (format == GST_FORMAT_BYTES) {
3047     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3048
3049     if (bclass->get_size) {
3050       if (!(have_size = bclass->get_size (basesrc, &size)))
3051         size = -1;
3052     }
3053     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3054     /* only update the size when operating in bytes, subclass is supposed
3055      * to set duration in the start method for other formats */
3056     GST_OBJECT_LOCK (basesrc);
3057     basesrc->segment.duration = size;
3058     GST_OBJECT_UNLOCK (basesrc);
3059   }
3060
3061   GST_DEBUG_OBJECT (basesrc,
3062       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3063       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3064       basesrc->segment.duration);
3065
3066   seekable = gst_base_src_seekable (basesrc);
3067   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3068
3069   /* update for random access flag */
3070   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3071
3072   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3073
3074   /* stop flushing now but for live sources, still block in the LIVE lock when
3075    * we are not yet PLAYING */
3076   gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
3077
3078   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3079
3080   GST_OBJECT_LOCK (basesrc->srcpad);
3081   mode = GST_PAD_MODE (basesrc->srcpad);
3082   GST_OBJECT_UNLOCK (basesrc->srcpad);
3083
3084   if (mode == GST_PAD_MODE_PUSH) {
3085     /* do initial seek, which will start the task */
3086     GST_OBJECT_LOCK (basesrc);
3087     event = basesrc->pending_seek;
3088     basesrc->pending_seek = NULL;
3089     GST_OBJECT_UNLOCK (basesrc);
3090
3091     /* no need to unlock anything, the task is certainly
3092      * not running here. The perform seek code will start the task when
3093      * finished. */
3094     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3095       goto seek_failed;
3096
3097     if (event)
3098       gst_event_unref (event);
3099   } else {
3100     /* if not random_access, we cannot operate in pull mode for now */
3101     if (G_UNLIKELY (!basesrc->random_access))
3102       goto no_get_range;
3103   }
3104
3105   GST_LIVE_LOCK (basesrc);
3106   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3107   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3108   basesrc->priv->start_result = ret;
3109   GST_LIVE_SIGNAL (basesrc);
3110   GST_LIVE_UNLOCK (basesrc);
3111
3112   return;
3113
3114 seek_failed:
3115   {
3116     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3117     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3118     if (event)
3119       gst_event_unref (event);
3120     ret = GST_FLOW_ERROR;
3121     goto error;
3122   }
3123 no_get_range:
3124   {
3125     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3126     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3127     ret = GST_FLOW_ERROR;
3128     goto error;
3129   }
3130 error:
3131   {
3132     GST_LIVE_LOCK (basesrc);
3133     basesrc->priv->start_result = ret;
3134     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3135     GST_LIVE_SIGNAL (basesrc);
3136     GST_LIVE_UNLOCK (basesrc);
3137     return;
3138   }
3139 }
3140
3141 /**
3142  * gst_base_src_start_wait:
3143  * @src: base source instance
3144  * @ret: a #GstFlowReturn
3145  *
3146  * Wait until the start operation completes.
3147  *
3148  * Returns: a #GstFlowReturn.
3149  */
3150 GstFlowReturn
3151 gst_base_src_start_wait (GstBaseSrc * basesrc)
3152 {
3153   GstFlowReturn result;
3154
3155   GST_LIVE_LOCK (basesrc);
3156   if (G_UNLIKELY (basesrc->priv->flushing))
3157     goto flushing;
3158
3159   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3160     GST_LIVE_WAIT (basesrc);
3161     if (G_UNLIKELY (basesrc->priv->flushing))
3162       goto flushing;
3163   }
3164   result = basesrc->priv->start_result;
3165   GST_LIVE_UNLOCK (basesrc);
3166
3167   return result;
3168
3169   /* ERRORS */
3170 flushing:
3171   {
3172     GST_DEBUG_OBJECT (basesrc, "we are flushing");
3173     GST_LIVE_UNLOCK (basesrc);
3174     return GST_FLOW_FLUSHING;
3175   }
3176 }
3177
3178 static gboolean
3179 gst_base_src_stop (GstBaseSrc * basesrc)
3180 {
3181   GstBaseSrcClass *bclass;
3182   gboolean result = TRUE;
3183
3184   GST_DEBUG_OBJECT (basesrc, "stopping source");
3185
3186   /* flush all */
3187   gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3188   /* stop the task */
3189   gst_pad_stop_task (basesrc->srcpad);
3190
3191   GST_LIVE_LOCK (basesrc);
3192   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3193     goto was_stopped;
3194
3195   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3196   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3197   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3198   GST_LIVE_SIGNAL (basesrc);
3199   GST_LIVE_UNLOCK (basesrc);
3200
3201   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3202   if (bclass->stop)
3203     result = bclass->stop (basesrc);
3204
3205   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3206
3207   return result;
3208
3209 was_stopped:
3210   {
3211     GST_DEBUG_OBJECT (basesrc, "was started");
3212     GST_LIVE_UNLOCK (basesrc);
3213     return TRUE;
3214   }
3215 }
3216
3217 /* start or stop flushing dataprocessing
3218  */
3219 static gboolean
3220 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3221     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
3222 {
3223   GstBaseSrcClass *bclass;
3224
3225   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3226
3227   if (flushing && unlock) {
3228     gst_base_src_activate_pool (basesrc, FALSE);
3229     /* unlock any subclasses, we need to do this before grabbing the
3230      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3231      * unlock to the params because of backwards compat (see seek handler)*/
3232     if (bclass->unlock)
3233       bclass->unlock (basesrc);
3234   }
3235
3236   /* the live lock is released when we are blocked, waiting for playing or
3237    * when we sync to the clock. */
3238   GST_LIVE_LOCK (basesrc);
3239   if (playing)
3240     *playing = basesrc->live_running;
3241   basesrc->priv->flushing = flushing;
3242   if (flushing) {
3243     /* if we are locked in the live lock, signal it to make it flush */
3244     basesrc->live_running = TRUE;
3245
3246     /* clear pending EOS if any */
3247     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3248
3249     /* step 1, now that we have the LIVE lock, clear our unlock request */
3250     if (bclass->unlock_stop)
3251       bclass->unlock_stop (basesrc);
3252
3253     /* step 2, unblock clock sync (if any) or any other blocking thing */
3254     if (basesrc->clock_id)
3255       gst_clock_id_unschedule (basesrc->clock_id);
3256   } else {
3257     /* signal the live source that it can start playing */
3258     basesrc->live_running = live_play;
3259
3260     gst_base_src_activate_pool (basesrc, TRUE);
3261
3262     /* When unlocking drop all delayed events */
3263     if (unlock) {
3264       GST_OBJECT_LOCK (basesrc);
3265       if (basesrc->priv->pending_events) {
3266         g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3267             NULL);
3268         g_list_free (basesrc->priv->pending_events);
3269         basesrc->priv->pending_events = NULL;
3270         g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3271       }
3272       GST_OBJECT_UNLOCK (basesrc);
3273     }
3274   }
3275   GST_LIVE_SIGNAL (basesrc);
3276   GST_LIVE_UNLOCK (basesrc);
3277
3278   return TRUE;
3279 }
3280
3281 /* the purpose of this function is to make sure that a live source blocks in the
3282  * LIVE lock or leaves the LIVE lock and continues playing. */
3283 static gboolean
3284 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3285 {
3286   GstBaseSrcClass *bclass;
3287
3288   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3289
3290   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3291   if (!live_play) {
3292     GST_DEBUG_OBJECT (basesrc, "unlock");
3293     if (bclass->unlock)
3294       bclass->unlock (basesrc);
3295   }
3296
3297   /* we are now able to grab the LIVE lock, when we get it, we can be
3298    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3299    * for the clock. */
3300   GST_LIVE_LOCK (basesrc);
3301   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3302
3303   /* unblock clock sync (if any) */
3304   if (basesrc->clock_id)
3305     gst_clock_id_unschedule (basesrc->clock_id);
3306
3307   /* configure what to do when we get to the LIVE lock. */
3308   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3309   basesrc->live_running = live_play;
3310
3311   if (live_play) {
3312     gboolean start;
3313
3314     /* clear our unlock request when going to PLAYING */
3315     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3316     if (bclass->unlock_stop)
3317       bclass->unlock_stop (basesrc);
3318
3319     /* for live sources we restart the timestamp correction */
3320     basesrc->priv->latency = -1;
3321     /* have to restart the task in case it stopped because of the unlock when
3322      * we went to PAUSED. Only do this if we operating in push mode. */
3323     GST_OBJECT_LOCK (basesrc->srcpad);
3324     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3325     GST_OBJECT_UNLOCK (basesrc->srcpad);
3326     if (start)
3327       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3328           basesrc->srcpad);
3329     GST_DEBUG_OBJECT (basesrc, "signal");
3330     GST_LIVE_SIGNAL (basesrc);
3331   }
3332   GST_LIVE_UNLOCK (basesrc);
3333
3334   return TRUE;
3335 }
3336
3337 static gboolean
3338 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3339 {
3340   GstBaseSrc *basesrc;
3341
3342   basesrc = GST_BASE_SRC (parent);
3343
3344   /* prepare subclass first */
3345   if (active) {
3346     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3347
3348     if (G_UNLIKELY (!basesrc->can_activate_push))
3349       goto no_push_activation;
3350
3351     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3352       goto error_start;
3353   } else {
3354     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3355     /* now we can stop the source */
3356     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3357       goto error_stop;
3358   }
3359   return TRUE;
3360
3361   /* ERRORS */
3362 no_push_activation:
3363   {
3364     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3365     return FALSE;
3366   }
3367 error_start:
3368   {
3369     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3370     return FALSE;
3371   }
3372 error_stop:
3373   {
3374     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3375     return FALSE;
3376   }
3377 }
3378
3379 static gboolean
3380 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3381 {
3382   GstBaseSrc *basesrc;
3383
3384   basesrc = GST_BASE_SRC (parent);
3385
3386   /* prepare subclass first */
3387   if (active) {
3388     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3389     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3390       goto error_start;
3391   } else {
3392     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3393     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3394       goto error_stop;
3395   }
3396   return TRUE;
3397
3398   /* ERRORS */
3399 error_start:
3400   {
3401     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3402     return FALSE;
3403   }
3404 error_stop:
3405   {
3406     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3407     return FALSE;
3408   }
3409 }
3410
3411 static gboolean
3412 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3413     GstPadMode mode, gboolean active)
3414 {
3415   gboolean res;
3416
3417   switch (mode) {
3418     case GST_PAD_MODE_PULL:
3419       res = gst_base_src_activate_pull (pad, parent, active);
3420       break;
3421     case GST_PAD_MODE_PUSH:
3422       res = gst_base_src_activate_push (pad, parent, active);
3423       break;
3424     default:
3425       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3426       res = FALSE;
3427       break;
3428   }
3429   return res;
3430 }
3431
3432
3433 static GstStateChangeReturn
3434 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3435 {
3436   GstBaseSrc *basesrc;
3437   GstStateChangeReturn result;
3438   gboolean no_preroll = FALSE;
3439
3440   basesrc = GST_BASE_SRC (element);
3441
3442   switch (transition) {
3443     case GST_STATE_CHANGE_NULL_TO_READY:
3444       break;
3445     case GST_STATE_CHANGE_READY_TO_PAUSED:
3446       basesrc->priv->stream_start_pending = TRUE;
3447       no_preroll = gst_base_src_is_live (basesrc);
3448       break;
3449     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3450       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3451       if (gst_base_src_is_live (basesrc)) {
3452         /* now we can start playback */
3453         gst_base_src_set_playing (basesrc, TRUE);
3454       }
3455       break;
3456     default:
3457       break;
3458   }
3459
3460   if ((result =
3461           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3462               transition)) == GST_STATE_CHANGE_FAILURE)
3463     goto failure;
3464
3465   switch (transition) {
3466     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3467       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3468       if (gst_base_src_is_live (basesrc)) {
3469         /* make sure we block in the live lock in PAUSED */
3470         gst_base_src_set_playing (basesrc, FALSE);
3471         no_preroll = TRUE;
3472       }
3473       break;
3474     case GST_STATE_CHANGE_PAUSED_TO_READY:
3475     {
3476       /* we don't need to unblock anything here, the pad deactivation code
3477        * already did this */
3478       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3479       gst_event_replace (&basesrc->pending_seek, NULL);
3480       basesrc->priv->stream_start_pending = FALSE;
3481       break;
3482     }
3483     case GST_STATE_CHANGE_READY_TO_NULL:
3484       break;
3485     default:
3486       break;
3487   }
3488
3489   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3490     result = GST_STATE_CHANGE_NO_PREROLL;
3491
3492   return result;
3493
3494   /* ERRORS */
3495 failure:
3496   {
3497     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3498     return result;
3499   }
3500 }