d53eda31fa83ec993861b5309abded891f573c99
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * An application may send an EOS event to a source element to make it
142  * perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163 #include <gst/glib-compat-private.h>
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gst-i18n-lib.h>
168
169 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
170 #define GST_CAT_DEFAULT gst_base_src_debug
171
172 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
173 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
174 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
177 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
178 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
179 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
180 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_TYPEFIND        FALSE
192 #define DEFAULT_DO_TIMESTAMP    FALSE
193
194 enum
195 {
196   PROP_0,
197   PROP_BLOCKSIZE,
198   PROP_NUM_BUFFERS,
199   PROP_TYPEFIND,
200   PROP_DO_TIMESTAMP
201 };
202
203 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
204    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
205
206 struct _GstBaseSrcPrivate
207 {
208   gboolean discont;
209   gboolean flushing;
210
211   GstFlowReturn start_result;
212   gboolean async;
213
214   /* if a stream-start event should be sent */
215   gboolean stream_start_pending;
216
217   /* if segment should be sent */
218   gboolean segment_pending;
219
220   /* if EOS is pending (atomic) */
221   gint pending_eos;
222
223   /* startup latency is the time it takes between going to PLAYING and producing
224    * the first BUFFER with running_time 0. This value is included in the latency
225    * reporting. */
226   GstClockTime latency;
227   /* timestamp offset, this is the offset add to the values of gst_times for
228    * pseudo live sources */
229   GstClockTimeDiff ts_offset;
230
231   gboolean do_timestamp;
232   volatile gint dynamic_size;
233
234   /* stream sequence number */
235   guint32 seqnum;
236
237   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
238    * pushed in the data stream */
239   GList *pending_events;
240   volatile gint have_events;
241
242   /* QoS *//* with LOCK */
243   gboolean qos_enabled;
244   gdouble proportion;
245   GstClockTime earliest_time;
246
247   GstBufferPool *pool;
248   GstAllocator *allocator;
249   GstAllocationParams params;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_class_init (GstBaseSrcClass * klass);
255 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
256 static void gst_base_src_finalize (GObject * object);
257
258
259 GType
260 gst_base_src_get_type (void)
261 {
262   static volatile gsize base_src_type = 0;
263
264   if (g_once_init_enter (&base_src_type)) {
265     GType _type;
266     static const GTypeInfo base_src_info = {
267       sizeof (GstBaseSrcClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_base_src_class_init,
271       NULL,
272       NULL,
273       sizeof (GstBaseSrc),
274       0,
275       (GInstanceInitFunc) gst_base_src_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_ELEMENT,
279         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
280     g_once_init_leave (&base_src_type, _type);
281   }
282   return base_src_type;
283 }
284
285 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
286     GstCaps * filter);
287 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
288 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
289
290 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
291 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
292     GstPadMode mode, gboolean active);
293 static void gst_base_src_set_property (GObject * object, guint prop_id,
294     const GValue * value, GParamSpec * pspec);
295 static void gst_base_src_get_property (GObject * object, guint prop_id,
296     GValue * value, GParamSpec * pspec);
297 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
298     GstEvent * event);
299 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
300 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
301
302 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
303     GstQuery * query);
304
305 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
306     gboolean active);
307 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
308 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
309     GstSegment * segment);
310 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
311 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
312     GstEvent * event, GstSegment * segment);
313 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
314     guint64 offset, guint size, GstBuffer ** buf);
315 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
316     guint64 offset, guint size, GstBuffer ** buf);
317 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
318     GstQuery * query);
319
320 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
321     gboolean flushing, gboolean live_play, gboolean * playing);
322
323 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
324 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
325
326 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
327     GstStateChange transition);
328
329 static void gst_base_src_loop (GstPad * pad);
330 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
331     guint64 offset, guint length, GstBuffer ** buf);
332 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
333     guint length, GstBuffer ** buf);
334 static gboolean gst_base_src_seekable (GstBaseSrc * src);
335 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
337     guint * length);
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
349
350   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
351
352   parent_class = g_type_class_peek_parent (klass);
353
354   gobject_class->finalize = gst_base_src_finalize;
355   gobject_class->set_property = gst_base_src_set_property;
356   gobject_class->get_property = gst_base_src_get_property;
357
358   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
359       g_param_spec_uint ("blocksize", "Block size",
360           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
361           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
363       g_param_spec_int ("num-buffers", "num-buffers",
364           "Number of buffers to output before sending EOS (-1 = unlimited)",
365           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
366           G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
368       g_param_spec_boolean ("typefind", "Typefind",
369           "Run typefind before negotiating", DEFAULT_TYPEFIND,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
372       g_param_spec_boolean ("do-timestamp", "Do timestamp",
373           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   gstelement_class->change_state =
377       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
378   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
379
380   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
381   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
382   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
383   klass->prepare_seek_segment =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
385   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
386   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
387   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
388   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
389   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
390   klass->decide_allocation =
391       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
392
393   /* Registering debug symbols for function pointers */
394   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
395   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
396   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
397   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
398   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
399 }
400
401 static void
402 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
403 {
404   GstPad *pad;
405   GstPadTemplate *pad_template;
406
407   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
408
409   basesrc->is_live = FALSE;
410   g_mutex_init (&basesrc->live_lock);
411   g_cond_init (&basesrc->live_cond);
412   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
413   basesrc->num_buffers_left = -1;
414
415   basesrc->can_activate_push = TRUE;
416
417   pad_template =
418       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
419   g_return_if_fail (pad_template != NULL);
420
421   GST_DEBUG_OBJECT (basesrc, "creating src pad");
422   pad = gst_pad_new_from_template (pad_template, "src");
423
424   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
425   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
426   gst_pad_set_event_function (pad, gst_base_src_event);
427   gst_pad_set_query_function (pad, gst_base_src_query);
428   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
429
430   /* hold pointer to pad */
431   basesrc->srcpad = pad;
432   GST_DEBUG_OBJECT (basesrc, "adding src pad");
433   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
434
435   basesrc->blocksize = DEFAULT_BLOCKSIZE;
436   basesrc->clock_id = NULL;
437   /* we operate in BYTES by default */
438   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
439   basesrc->typefind = DEFAULT_TYPEFIND;
440   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
441   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
442
443   basesrc->priv->start_result = GST_FLOW_FLUSHING;
444   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
445   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
446   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
447
448   GST_DEBUG_OBJECT (basesrc, "init done");
449 }
450
451 static void
452 gst_base_src_finalize (GObject * object)
453 {
454   GstBaseSrc *basesrc;
455   GstEvent **event_p;
456
457   basesrc = GST_BASE_SRC (object);
458
459   g_mutex_clear (&basesrc->live_lock);
460   g_cond_clear (&basesrc->live_cond);
461
462   event_p = &basesrc->pending_seek;
463   gst_event_replace (event_p, NULL);
464
465   if (basesrc->priv->pending_events) {
466     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
467         NULL);
468     g_list_free (basesrc->priv->pending_events);
469   }
470
471   G_OBJECT_CLASS (parent_class)->finalize (object);
472 }
473
474 /**
475  * gst_base_src_wait_playing:
476  * @src: the src
477  *
478  * If the #GstBaseSrcClass.create() method performs its own synchronisation
479  * against the clock it must unblock when going from PLAYING to the PAUSED state
480  * and call this method before continuing to produce the remaining data.
481  *
482  * This function will block until a state change to PLAYING happens (in which
483  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
484  * to a state change to READY or a FLUSH event (in which case this function
485  * returns #GST_FLOW_FLUSHING).
486  *
487  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
488  * continue. Any other return value should be returned from the create vmethod.
489  */
490 GstFlowReturn
491 gst_base_src_wait_playing (GstBaseSrc * src)
492 {
493   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
494
495   do {
496     /* block until the state changes, or we get a flush, or something */
497     GST_DEBUG_OBJECT (src, "live source waiting for running state");
498     GST_LIVE_WAIT (src);
499     GST_DEBUG_OBJECT (src, "live source unlocked");
500     if (src->priv->flushing)
501       goto flushing;
502   } while (G_UNLIKELY (!src->live_running));
503
504   return GST_FLOW_OK;
505
506   /* ERRORS */
507 flushing:
508   {
509     GST_DEBUG_OBJECT (src, "we are flushing");
510     return GST_FLOW_FLUSHING;
511   }
512 }
513
514 /**
515  * gst_base_src_set_live:
516  * @src: base source instance
517  * @live: new live-mode
518  *
519  * If the element listens to a live source, @live should
520  * be set to %TRUE.
521  *
522  * A live source will not produce data in the PAUSED state and
523  * will therefore not be able to participate in the PREROLL phase
524  * of a pipeline. To signal this fact to the application and the
525  * pipeline, the state change return value of the live source will
526  * be GST_STATE_CHANGE_NO_PREROLL.
527  */
528 void
529 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
530 {
531   g_return_if_fail (GST_IS_BASE_SRC (src));
532
533   GST_OBJECT_LOCK (src);
534   src->is_live = live;
535   GST_OBJECT_UNLOCK (src);
536 }
537
538 /**
539  * gst_base_src_is_live:
540  * @src: base source instance
541  *
542  * Check if an element is in live mode.
543  *
544  * Returns: %TRUE if element is in live mode.
545  */
546 gboolean
547 gst_base_src_is_live (GstBaseSrc * src)
548 {
549   gboolean result;
550
551   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
552
553   GST_OBJECT_LOCK (src);
554   result = src->is_live;
555   GST_OBJECT_UNLOCK (src);
556
557   return result;
558 }
559
560 /**
561  * gst_base_src_set_format:
562  * @src: base source instance
563  * @format: the format to use
564  *
565  * Sets the default format of the source. This will be the format used
566  * for sending NEW_SEGMENT events and for performing seeks.
567  *
568  * If a format of GST_FORMAT_BYTES is set, the element will be able to
569  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
570  *
571  * This function must only be called in states < %GST_STATE_PAUSED.
572  */
573 void
574 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
575 {
576   g_return_if_fail (GST_IS_BASE_SRC (src));
577   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
578
579   GST_OBJECT_LOCK (src);
580   gst_segment_init (&src->segment, format);
581   GST_OBJECT_UNLOCK (src);
582 }
583
584 /**
585  * gst_base_src_set_dynamic_size:
586  * @src: base source instance
587  * @dynamic: new dynamic size mode
588  *
589  * If not @dynamic, size is only updated when needed, such as when trying to
590  * read past current tracked size.  Otherwise, size is checked for upon each
591  * read.
592  */
593 void
594 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
595 {
596   g_return_if_fail (GST_IS_BASE_SRC (src));
597
598   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
599 }
600
601 /**
602  * gst_base_src_set_async:
603  * @src: base source instance
604  * @async: new async mode
605  *
606  * Configure async behaviour in @src, no state change will block. The open,
607  * close, start, stop, play and pause virtual methods will be executed in a
608  * different thread and are thus allowed to perform blocking operations. Any
609  * blocking operation should be unblocked with the unlock vmethod.
610  */
611 void
612 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
613 {
614   g_return_if_fail (GST_IS_BASE_SRC (src));
615
616   GST_OBJECT_LOCK (src);
617   src->priv->async = async;
618   GST_OBJECT_UNLOCK (src);
619 }
620
621 /**
622  * gst_base_src_is_async:
623  * @src: base source instance
624  *
625  * Get the current async behaviour of @src. See also gst_base_src_set_async().
626  *
627  * Returns: %TRUE if @src is operating in async mode.
628  */
629 gboolean
630 gst_base_src_is_async (GstBaseSrc * src)
631 {
632   gboolean res;
633
634   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
635
636   GST_OBJECT_LOCK (src);
637   res = src->priv->async;
638   GST_OBJECT_UNLOCK (src);
639
640   return res;
641 }
642
643
644 /**
645  * gst_base_src_query_latency:
646  * @src: the source
647  * @live: (out) (allow-none): if the source is live
648  * @min_latency: (out) (allow-none): the min latency of the source
649  * @max_latency: (out) (allow-none): the max latency of the source
650  *
651  * Query the source for the latency parameters. @live will be TRUE when @src is
652  * configured as a live source. @min_latency will be set to the difference
653  * between the running time and the timestamp of the first buffer.
654  * @max_latency is always the undefined value of -1.
655  *
656  * This function is mostly used by subclasses.
657  *
658  * Returns: TRUE if the query succeeded.
659  */
660 gboolean
661 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
662     GstClockTime * min_latency, GstClockTime * max_latency)
663 {
664   GstClockTime min;
665
666   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
667
668   GST_OBJECT_LOCK (src);
669   if (live)
670     *live = src->is_live;
671
672   /* if we have a startup latency, report this one, else report 0. Subclasses
673    * are supposed to override the query function if they want something
674    * else. */
675   if (src->priv->latency != -1)
676     min = src->priv->latency;
677   else
678     min = 0;
679
680   if (min_latency)
681     *min_latency = min;
682   if (max_latency)
683     *max_latency = -1;
684
685   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
686       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
687       GST_TIME_ARGS (-1));
688   GST_OBJECT_UNLOCK (src);
689
690   return TRUE;
691 }
692
693 /**
694  * gst_base_src_set_blocksize:
695  * @src: the source
696  * @blocksize: the new blocksize in bytes
697  *
698  * Set the number of bytes that @src will push out with each buffer. When
699  * @blocksize is set to -1, a default length will be used.
700  */
701 void
702 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
703 {
704   g_return_if_fail (GST_IS_BASE_SRC (src));
705
706   GST_OBJECT_LOCK (src);
707   src->blocksize = blocksize;
708   GST_OBJECT_UNLOCK (src);
709 }
710
711 /**
712  * gst_base_src_get_blocksize:
713  * @src: the source
714  *
715  * Get the number of bytes that @src will push out with each buffer.
716  *
717  * Returns: the number of bytes pushed with each buffer.
718  */
719 guint
720 gst_base_src_get_blocksize (GstBaseSrc * src)
721 {
722   gint res;
723
724   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
725
726   GST_OBJECT_LOCK (src);
727   res = src->blocksize;
728   GST_OBJECT_UNLOCK (src);
729
730   return res;
731 }
732
733
734 /**
735  * gst_base_src_set_do_timestamp:
736  * @src: the source
737  * @timestamp: enable or disable timestamping
738  *
739  * Configure @src to automatically timestamp outgoing buffers based on the
740  * current running_time of the pipeline. This property is mostly useful for live
741  * sources.
742  */
743 void
744 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
745 {
746   g_return_if_fail (GST_IS_BASE_SRC (src));
747
748   GST_OBJECT_LOCK (src);
749   src->priv->do_timestamp = timestamp;
750   GST_OBJECT_UNLOCK (src);
751 }
752
753 /**
754  * gst_base_src_get_do_timestamp:
755  * @src: the source
756  *
757  * Query if @src timestamps outgoing buffers based on the current running_time.
758  *
759  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
760  */
761 gboolean
762 gst_base_src_get_do_timestamp (GstBaseSrc * src)
763 {
764   gboolean res;
765
766   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
767
768   GST_OBJECT_LOCK (src);
769   res = src->priv->do_timestamp;
770   GST_OBJECT_UNLOCK (src);
771
772   return res;
773 }
774
775 /**
776  * gst_base_src_new_seamless_segment:
777  * @src: The source
778  * @start: The new start value for the segment
779  * @stop: Stop value for the new segment
780  * @position: The position value for the new segent
781  *
782  * Prepare a new seamless segment for emission downstream. This function must
783  * only be called by derived sub-classes, and only from the create() function,
784  * as the stream-lock needs to be held.
785  *
786  * The format for the new segment will be the current format of the source, as
787  * configured with gst_base_src_set_format()
788  *
789  * Returns: %TRUE if preparation of the seamless segment succeeded.
790  */
791 gboolean
792 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
793     gint64 position)
794 {
795   gboolean res = TRUE;
796
797   GST_DEBUG_OBJECT (src,
798       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
799       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
800       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
801
802   GST_OBJECT_LOCK (src);
803
804   src->segment.base = gst_segment_to_running_time (&src->segment,
805       src->segment.format, src->segment.position);
806   src->segment.start = start;
807   src->segment.stop = stop;
808   src->segment.position = position;
809
810   /* forward, we send data from position to stop */
811   src->priv->segment_pending = TRUE;
812   GST_OBJECT_UNLOCK (src);
813
814   src->priv->discont = TRUE;
815   src->running = TRUE;
816
817   return res;
818 }
819
820 static gboolean
821 gst_base_src_send_stream_start (GstBaseSrc * src)
822 {
823   gboolean ret = TRUE;
824
825   if (src->priv->stream_start_pending) {
826     ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
827     src->priv->stream_start_pending = FALSE;
828   }
829
830   return ret;
831 }
832
833 /**
834  * gst_base_src_set_caps:
835  * @src: a #GstBaseSrc
836  * @caps: a #GstCaps
837  *
838  * Set new caps on the basesrc source pad.
839  *
840  * Returns: %TRUE if the caps could be set
841  */
842 gboolean
843 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
844 {
845   GstBaseSrcClass *bclass;
846   gboolean res = TRUE;
847
848   bclass = GST_BASE_SRC_GET_CLASS (src);
849
850   gst_base_src_send_stream_start (src);
851
852   if (bclass->set_caps)
853     res = bclass->set_caps (src, caps);
854   if (res)
855     res = gst_pad_set_caps (src->srcpad, caps);
856
857   return res;
858 }
859
860 static GstCaps *
861 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
862 {
863   GstCaps *caps = NULL;
864   GstPadTemplate *pad_template;
865   GstBaseSrcClass *bclass;
866
867   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
868
869   pad_template =
870       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
871
872   if (pad_template != NULL) {
873     caps = gst_pad_template_get_caps (pad_template);
874
875     if (filter) {
876       GstCaps *intersection;
877
878       intersection =
879           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
880       gst_caps_unref (caps);
881       caps = intersection;
882     }
883   }
884   return caps;
885 }
886
887 static GstCaps *
888 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
889 {
890   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
891   return gst_caps_fixate (caps);
892 }
893
894 static GstCaps *
895 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
896 {
897   GstBaseSrcClass *bclass;
898
899   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
900
901   if (bclass->fixate)
902     caps = bclass->fixate (bsrc, caps);
903
904   return caps;
905 }
906
907 static gboolean
908 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
909 {
910   gboolean res;
911
912   switch (GST_QUERY_TYPE (query)) {
913     case GST_QUERY_POSITION:
914     {
915       GstFormat format;
916
917       gst_query_parse_position (query, &format, NULL);
918
919       GST_DEBUG_OBJECT (src, "position query in format %s",
920           gst_format_get_name (format));
921
922       switch (format) {
923         case GST_FORMAT_PERCENT:
924         {
925           gint64 percent;
926           gint64 position;
927           gint64 duration;
928
929           GST_OBJECT_LOCK (src);
930           position = src->segment.position;
931           duration = src->segment.duration;
932           GST_OBJECT_UNLOCK (src);
933
934           if (position != -1 && duration != -1) {
935             if (position < duration)
936               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
937                   duration);
938             else
939               percent = GST_FORMAT_PERCENT_MAX;
940           } else
941             percent = -1;
942
943           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
944           res = TRUE;
945           break;
946         }
947         default:
948         {
949           gint64 position;
950           GstFormat seg_format;
951
952           GST_OBJECT_LOCK (src);
953           position =
954               gst_segment_to_stream_time (&src->segment, src->segment.format,
955               src->segment.position);
956           seg_format = src->segment.format;
957           GST_OBJECT_UNLOCK (src);
958
959           if (position != -1) {
960             /* convert to requested format */
961             res =
962                 gst_pad_query_convert (src->srcpad, seg_format,
963                 position, format, &position);
964           } else
965             res = TRUE;
966
967           gst_query_set_position (query, format, position);
968           break;
969         }
970       }
971       break;
972     }
973     case GST_QUERY_DURATION:
974     {
975       GstFormat format;
976
977       gst_query_parse_duration (query, &format, NULL);
978
979       GST_DEBUG_OBJECT (src, "duration query in format %s",
980           gst_format_get_name (format));
981
982       switch (format) {
983         case GST_FORMAT_PERCENT:
984           gst_query_set_duration (query, GST_FORMAT_PERCENT,
985               GST_FORMAT_PERCENT_MAX);
986           res = TRUE;
987           break;
988         default:
989         {
990           gint64 duration;
991           GstFormat seg_format;
992           guint length = 0;
993
994           /* may have to refresh duration */
995           if (g_atomic_int_get (&src->priv->dynamic_size))
996             gst_base_src_update_length (src, 0, &length);
997
998           /* this is the duration as configured by the subclass. */
999           GST_OBJECT_LOCK (src);
1000           duration = src->segment.duration;
1001           seg_format = src->segment.format;
1002           GST_OBJECT_UNLOCK (src);
1003
1004           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1005               duration, gst_format_get_name (seg_format));
1006
1007           if (duration != -1) {
1008             /* convert to requested format, if this fails, we have a duration
1009              * but we cannot answer the query, we must return FALSE. */
1010             res =
1011                 gst_pad_query_convert (src->srcpad, seg_format,
1012                 duration, format, &duration);
1013           } else {
1014             /* The subclass did not configure a duration, we assume that the
1015              * media has an unknown duration then and we return TRUE to report
1016              * this. Note that this is not the same as returning FALSE, which
1017              * means that we cannot report the duration at all. */
1018             res = TRUE;
1019           }
1020           gst_query_set_duration (query, format, duration);
1021           break;
1022         }
1023       }
1024       break;
1025     }
1026
1027     case GST_QUERY_SEEKING:
1028     {
1029       GstFormat format, seg_format;
1030       gint64 duration;
1031
1032       GST_OBJECT_LOCK (src);
1033       duration = src->segment.duration;
1034       seg_format = src->segment.format;
1035       GST_OBJECT_UNLOCK (src);
1036
1037       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1038       if (format == seg_format) {
1039         gst_query_set_seeking (query, seg_format,
1040             gst_base_src_seekable (src), 0, duration);
1041         res = TRUE;
1042       } else {
1043         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1044         /* Don't reply to the query to make up for demuxers which don't
1045          * handle the SEEKING query yet. Players like Totem will fall back
1046          * to the duration when the SEEKING query isn't answered. */
1047         res = FALSE;
1048       }
1049       break;
1050     }
1051     case GST_QUERY_SEGMENT:
1052     {
1053       gint64 start, stop;
1054
1055       GST_OBJECT_LOCK (src);
1056       /* no end segment configured, current duration then */
1057       if ((stop = src->segment.stop) == -1)
1058         stop = src->segment.duration;
1059       start = src->segment.start;
1060
1061       /* adjust to stream time */
1062       if (src->segment.time != -1) {
1063         start -= src->segment.time;
1064         if (stop != -1)
1065           stop -= src->segment.time;
1066       }
1067
1068       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1069           start, stop);
1070       GST_OBJECT_UNLOCK (src);
1071       res = TRUE;
1072       break;
1073     }
1074
1075     case GST_QUERY_FORMATS:
1076     {
1077       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1078           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1079       res = TRUE;
1080       break;
1081     }
1082     case GST_QUERY_CONVERT:
1083     {
1084       GstFormat src_fmt, dest_fmt;
1085       gint64 src_val, dest_val;
1086
1087       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1088
1089       /* we can only convert between equal formats... */
1090       if (src_fmt == dest_fmt) {
1091         dest_val = src_val;
1092         res = TRUE;
1093       } else
1094         res = FALSE;
1095
1096       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1097       break;
1098     }
1099     case GST_QUERY_LATENCY:
1100     {
1101       GstClockTime min, max;
1102       gboolean live;
1103
1104       /* Subclasses should override and implement something useful */
1105       res = gst_base_src_query_latency (src, &live, &min, &max);
1106
1107       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1108           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1109           GST_TIME_ARGS (max));
1110
1111       gst_query_set_latency (query, live, min, max);
1112       break;
1113     }
1114     case GST_QUERY_JITTER:
1115     case GST_QUERY_RATE:
1116       res = FALSE;
1117       break;
1118     case GST_QUERY_BUFFERING:
1119     {
1120       GstFormat format, seg_format;
1121       gint64 start, stop, estimated;
1122
1123       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1124
1125       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1126           gst_format_get_name (format));
1127
1128       GST_OBJECT_LOCK (src);
1129       if (src->random_access) {
1130         estimated = 0;
1131         start = 0;
1132         if (format == GST_FORMAT_PERCENT)
1133           stop = GST_FORMAT_PERCENT_MAX;
1134         else
1135           stop = src->segment.duration;
1136       } else {
1137         estimated = -1;
1138         start = -1;
1139         stop = -1;
1140       }
1141       seg_format = src->segment.format;
1142       GST_OBJECT_UNLOCK (src);
1143
1144       /* convert to required format. When the conversion fails, we can't answer
1145        * the query. When the value is unknown, we can don't perform conversion
1146        * but report TRUE. */
1147       if (format != GST_FORMAT_PERCENT && stop != -1) {
1148         res = gst_pad_query_convert (src->srcpad, seg_format,
1149             stop, format, &stop);
1150       } else {
1151         res = TRUE;
1152       }
1153       if (res && format != GST_FORMAT_PERCENT && start != -1)
1154         res = gst_pad_query_convert (src->srcpad, seg_format,
1155             start, format, &start);
1156
1157       gst_query_set_buffering_range (query, format, start, stop, estimated);
1158       break;
1159     }
1160     case GST_QUERY_SCHEDULING:
1161     {
1162       gboolean random_access;
1163
1164       random_access = gst_base_src_is_random_access (src);
1165
1166       /* we can operate in getrange mode if the native format is bytes
1167        * and we are seekable, this condition is set in the random_access
1168        * flag and is set in the _start() method. */
1169       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1170       if (random_access)
1171         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1172       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1173
1174       res = TRUE;
1175       break;
1176     }
1177     case GST_QUERY_CAPS:
1178     {
1179       GstBaseSrcClass *bclass;
1180       GstCaps *caps, *filter;
1181
1182       bclass = GST_BASE_SRC_GET_CLASS (src);
1183       if (bclass->get_caps) {
1184         gst_query_parse_caps (query, &filter);
1185         if ((caps = bclass->get_caps (src, filter))) {
1186           gst_query_set_caps_result (query, caps);
1187           gst_caps_unref (caps);
1188           res = TRUE;
1189         } else {
1190           res = FALSE;
1191         }
1192       } else
1193         res = FALSE;
1194       break;
1195     }
1196     case GST_QUERY_URI:{
1197       if (GST_IS_URI_HANDLER (src)) {
1198         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1199
1200         gst_query_set_uri (query, uri);
1201         g_free (uri);
1202         res = TRUE;
1203       } else {
1204         res = FALSE;
1205       }
1206       break;
1207     }
1208     default:
1209       res = FALSE;
1210       break;
1211   }
1212   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1213       res);
1214
1215   return res;
1216 }
1217
1218 static gboolean
1219 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1220 {
1221   GstBaseSrc *src;
1222   GstBaseSrcClass *bclass;
1223   gboolean result = FALSE;
1224
1225   src = GST_BASE_SRC (parent);
1226   bclass = GST_BASE_SRC_GET_CLASS (src);
1227
1228   if (bclass->query)
1229     result = bclass->query (src, query);
1230
1231   return result;
1232 }
1233
1234 static gboolean
1235 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1236 {
1237   gboolean res = TRUE;
1238
1239   /* update our offset if the start/stop position was updated */
1240   if (segment->format == GST_FORMAT_BYTES) {
1241     segment->time = segment->start;
1242   } else if (segment->start == 0) {
1243     /* seek to start, we can implement a default for this. */
1244     segment->time = 0;
1245   } else {
1246     res = FALSE;
1247     GST_INFO_OBJECT (src, "Can't do a default seek");
1248   }
1249
1250   return res;
1251 }
1252
1253 static gboolean
1254 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1255 {
1256   GstBaseSrcClass *bclass;
1257   gboolean result = FALSE;
1258
1259   bclass = GST_BASE_SRC_GET_CLASS (src);
1260
1261   if (bclass->do_seek)
1262     result = bclass->do_seek (src, segment);
1263
1264   return result;
1265 }
1266
1267 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1268
1269 static gboolean
1270 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1271     GstSegment * segment)
1272 {
1273   /* By default, we try one of 2 things:
1274    *   - For absolute seek positions, convert the requested position to our
1275    *     configured processing format and place it in the output segment \
1276    *   - For relative seek positions, convert our current (input) values to the
1277    *     seek format, adjust by the relative seek offset and then convert back to
1278    *     the processing format
1279    */
1280   GstSeekType start_type, stop_type;
1281   gint64 start, stop;
1282   GstSeekFlags flags;
1283   GstFormat seek_format, dest_format;
1284   gdouble rate;
1285   gboolean update;
1286   gboolean res = TRUE;
1287
1288   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1289       &start_type, &start, &stop_type, &stop);
1290   dest_format = segment->format;
1291
1292   if (seek_format == dest_format) {
1293     gst_segment_do_seek (segment, rate, seek_format, flags,
1294         start_type, start, stop_type, stop, &update);
1295     return TRUE;
1296   }
1297
1298   if (start_type != GST_SEEK_TYPE_NONE) {
1299     /* FIXME: Handle seek_end by converting the input segment vals */
1300     res =
1301         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1302         &start);
1303     start_type = GST_SEEK_TYPE_SET;
1304   }
1305
1306   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1307     /* FIXME: Handle seek_end by converting the input segment vals */
1308     res =
1309         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1310         &stop);
1311     stop_type = GST_SEEK_TYPE_SET;
1312   }
1313
1314   /* And finally, configure our output segment in the desired format */
1315   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1316       stop_type, stop, &update);
1317
1318   if (!res)
1319     goto no_format;
1320
1321   return res;
1322
1323 no_format:
1324   {
1325     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1326     return FALSE;
1327   }
1328 }
1329
1330 static gboolean
1331 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1332     GstSegment * seeksegment)
1333 {
1334   GstBaseSrcClass *bclass;
1335   gboolean result = FALSE;
1336
1337   bclass = GST_BASE_SRC_GET_CLASS (src);
1338
1339   if (bclass->prepare_seek_segment)
1340     result = bclass->prepare_seek_segment (src, event, seeksegment);
1341
1342   return result;
1343 }
1344
1345 static GstFlowReturn
1346 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1347     guint size, GstBuffer ** buffer)
1348 {
1349   GstFlowReturn ret;
1350   GstBaseSrcPrivate *priv = src->priv;
1351
1352   if (priv->pool) {
1353     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1354   } else if (size != -1) {
1355     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1356     if (G_UNLIKELY (*buffer == NULL))
1357       goto alloc_failed;
1358
1359     ret = GST_FLOW_OK;
1360   } else {
1361     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1362         size);
1363     goto alloc_failed;
1364   }
1365   return ret;
1366
1367   /* ERRORS */
1368 alloc_failed:
1369   {
1370     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1371     return GST_FLOW_ERROR;
1372   }
1373 }
1374
1375 static GstFlowReturn
1376 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1377     guint size, GstBuffer ** buffer)
1378 {
1379   GstBaseSrcClass *bclass;
1380   GstFlowReturn ret;
1381   GstBuffer *res_buf;
1382
1383   bclass = GST_BASE_SRC_GET_CLASS (src);
1384
1385   if (G_UNLIKELY (!bclass->alloc))
1386     goto no_function;
1387   if (G_UNLIKELY (!bclass->fill))
1388     goto no_function;
1389
1390   if (*buffer == NULL) {
1391     /* downstream did not provide us with a buffer to fill, allocate one
1392      * ourselves */
1393     ret = bclass->alloc (src, offset, size, &res_buf);
1394     if (G_UNLIKELY (ret != GST_FLOW_OK))
1395       goto alloc_failed;
1396   } else {
1397     res_buf = *buffer;
1398   }
1399
1400   if (G_LIKELY (size > 0)) {
1401     /* only call fill when there is a size */
1402     ret = bclass->fill (src, offset, size, res_buf);
1403     if (G_UNLIKELY (ret != GST_FLOW_OK))
1404       goto not_ok;
1405   }
1406
1407   *buffer = res_buf;
1408
1409   return GST_FLOW_OK;
1410
1411   /* ERRORS */
1412 no_function:
1413   {
1414     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1415     return GST_FLOW_NOT_SUPPORTED;
1416   }
1417 alloc_failed:
1418   {
1419     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1420     return ret;
1421   }
1422 not_ok:
1423   {
1424     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1425         gst_flow_get_name (ret));
1426     if (*buffer == NULL)
1427       gst_buffer_unref (res_buf);
1428     return ret;
1429   }
1430 }
1431
1432 /* this code implements the seeking. It is a good example
1433  * handling all cases.
1434  *
1435  * A seek updates the currently configured segment.start
1436  * and segment.stop values based on the SEEK_TYPE. If the
1437  * segment.start value is updated, a seek to this new position
1438  * should be performed.
1439  *
1440  * The seek can only be executed when we are not currently
1441  * streaming any data, to make sure that this is the case, we
1442  * acquire the STREAM_LOCK which is taken when we are in the
1443  * _loop() function or when a getrange() is called. Normally
1444  * we will not receive a seek if we are operating in pull mode
1445  * though. When we operate as a live source we might block on the live
1446  * cond, which does not release the STREAM_LOCK. Therefore we will try
1447  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1448  * safe to perform the seek.
1449  *
1450  * When we are in the loop() function, we might be in the middle
1451  * of pushing a buffer, which might block in a sink. To make sure
1452  * that the push gets unblocked we push out a FLUSH_START event.
1453  * Our loop function will get a FLUSHING return value from
1454  * the push and will pause, effectively releasing the STREAM_LOCK.
1455  *
1456  * For a non-flushing seek, we pause the task, which might eventually
1457  * release the STREAM_LOCK. We say eventually because when the sink
1458  * blocks on the sample we might wait a very long time until the sink
1459  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1460  * can continue the seek. A non-flushing seek is normally done in a
1461  * running pipeline to perform seamless playback, this means that the sink is
1462  * PLAYING and will return from its chain function.
1463  * In the case of a non-flushing seek we need to make sure that the
1464  * data we output after the seek is continuous with the previous data,
1465  * this is because a non-flushing seek does not reset the running-time
1466  * to 0. We do this by closing the currently running segment, ie. sending
1467  * a new_segment event with the stop position set to the last processed
1468  * position.
1469  *
1470  * After updating the segment.start/stop values, we prepare for
1471  * streaming again. We push out a FLUSH_STOP to make the peer pad
1472  * accept data again and we start our task again.
1473  *
1474  * A segment seek posts a message on the bus saying that the playback
1475  * of the segment started. We store the segment flag internally because
1476  * when we reach the segment.stop we have to post a segment.done
1477  * instead of EOS when doing a segment seek.
1478  */
1479 static gboolean
1480 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1481 {
1482   gboolean res = TRUE, tres;
1483   gdouble rate;
1484   GstFormat seek_format, dest_format;
1485   GstSeekFlags flags;
1486   GstSeekType start_type, stop_type;
1487   gint64 start, stop;
1488   gboolean flush, playing;
1489   gboolean update;
1490   gboolean relative_seek = FALSE;
1491   gboolean seekseg_configured = FALSE;
1492   GstSegment seeksegment;
1493   guint32 seqnum;
1494   GstEvent *tevent;
1495
1496   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1497
1498   GST_OBJECT_LOCK (src);
1499   dest_format = src->segment.format;
1500   GST_OBJECT_UNLOCK (src);
1501
1502   if (event) {
1503     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1504         &start_type, &start, &stop_type, &stop);
1505
1506     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1507         SEEK_TYPE_IS_RELATIVE (stop_type);
1508
1509     if (dest_format != seek_format && !relative_seek) {
1510       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1511        * here before taking the stream lock, otherwise we must convert it later,
1512        * once we have the stream lock and can read the last configures segment
1513        * start and stop positions */
1514       gst_segment_init (&seeksegment, dest_format);
1515
1516       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1517         goto prepare_failed;
1518
1519       seekseg_configured = TRUE;
1520     }
1521
1522     flush = flags & GST_SEEK_FLAG_FLUSH;
1523     seqnum = gst_event_get_seqnum (event);
1524   } else {
1525     flush = FALSE;
1526     /* get next seqnum */
1527     seqnum = gst_util_seqnum_next ();
1528   }
1529
1530   /* send flush start */
1531   if (flush) {
1532     tevent = gst_event_new_flush_start ();
1533     gst_event_set_seqnum (tevent, seqnum);
1534     gst_pad_push_event (src->srcpad, tevent);
1535   } else
1536     gst_pad_pause_task (src->srcpad);
1537
1538   /* unblock streaming thread. */
1539   if (unlock)
1540     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1541
1542   /* grab streaming lock, this should eventually be possible, either
1543    * because the task is paused, our streaming thread stopped
1544    * or because our peer is flushing. */
1545   GST_PAD_STREAM_LOCK (src->srcpad);
1546   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1547     /* we have seen this event before, issue a warning for now */
1548     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1549         seqnum);
1550   } else {
1551     src->priv->seqnum = seqnum;
1552     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1553   }
1554
1555   if (unlock)
1556     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1557
1558   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1559    * copy the current segment info into the temp segment that we can actually
1560    * attempt the seek with. We only update the real segment if the seek succeeds. */
1561   if (!seekseg_configured) {
1562     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1563
1564     /* now configure the final seek segment */
1565     if (event) {
1566       if (seeksegment.format != seek_format) {
1567         /* OK, here's where we give the subclass a chance to convert the relative
1568          * seek into an absolute one in the processing format. We set up any
1569          * absolute seek above, before taking the stream lock. */
1570         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1571           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1572               "Aborting seek");
1573           res = FALSE;
1574         }
1575       } else {
1576         /* The seek format matches our processing format, no need to ask the
1577          * the subclass to configure the segment. */
1578         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1579             start_type, start, stop_type, stop, &update);
1580       }
1581     }
1582     /* Else, no seek event passed, so we're just (re)starting the
1583        current segment. */
1584   }
1585
1586   if (res) {
1587     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1588         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1589         seeksegment.start, seeksegment.stop, seeksegment.position);
1590
1591     /* do the seek, segment.position contains the new position. */
1592     res = gst_base_src_do_seek (src, &seeksegment);
1593   }
1594
1595   /* and prepare to continue streaming */
1596   if (flush) {
1597     tevent = gst_event_new_flush_stop (TRUE);
1598     gst_event_set_seqnum (tevent, seqnum);
1599     /* send flush stop, peer will accept data and events again. We
1600      * are not yet providing data as we still have the STREAM_LOCK. */
1601     gst_pad_push_event (src->srcpad, tevent);
1602   }
1603
1604   /* The subclass must have converted the segment to the processing format
1605    * by now */
1606   if (res && seeksegment.format != dest_format) {
1607     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1608         "in the correct format. Aborting seek.");
1609     res = FALSE;
1610   }
1611
1612   /* if the seek was successful, we update our real segment and push
1613    * out the new segment. */
1614   if (res) {
1615     GST_OBJECT_LOCK (src);
1616     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1617     GST_OBJECT_UNLOCK (src);
1618
1619     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1620       GstMessage *message;
1621
1622       message = gst_message_new_segment_start (GST_OBJECT (src),
1623           seeksegment.format, seeksegment.position);
1624       gst_message_set_seqnum (message, seqnum);
1625
1626       gst_element_post_message (GST_ELEMENT (src), message);
1627     }
1628
1629     /* for deriving a stop position for the playback segment from the seek
1630      * segment, we must take the duration when the stop is not set */
1631     if ((stop = seeksegment.stop) == -1)
1632       stop = seeksegment.duration;
1633
1634     src->priv->segment_pending = TRUE;
1635   }
1636
1637   src->priv->discont = TRUE;
1638   src->running = TRUE;
1639   /* and restart the task in case it got paused explicitly or by
1640    * the FLUSH_START event we pushed out. */
1641   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1642       src->srcpad, NULL);
1643   if (res && !tres)
1644     res = FALSE;
1645
1646   /* and release the lock again so we can continue streaming */
1647   GST_PAD_STREAM_UNLOCK (src->srcpad);
1648
1649   return res;
1650
1651   /* ERROR */
1652 prepare_failed:
1653   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1654       "Aborting seek");
1655   return FALSE;
1656 }
1657
1658 /* all events send to this element directly. This is mainly done from the
1659  * application.
1660  */
1661 static gboolean
1662 gst_base_src_send_event (GstElement * element, GstEvent * event)
1663 {
1664   GstBaseSrc *src;
1665   gboolean result = FALSE;
1666
1667   src = GST_BASE_SRC (element);
1668
1669   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1670
1671   switch (GST_EVENT_TYPE (event)) {
1672       /* bidirectional events */
1673     case GST_EVENT_FLUSH_START:
1674       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1675       result = gst_pad_push_event (src->srcpad, event);
1676       event = NULL;
1677       break;
1678     case GST_EVENT_FLUSH_STOP:
1679       GST_LIVE_LOCK (src->srcpad);
1680       src->priv->segment_pending = TRUE;
1681       /* sending random flushes downstream can break stuff,
1682        * especially sync since all segment info will get flushed */
1683       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1684       result = gst_pad_push_event (src->srcpad, event);
1685       GST_LIVE_UNLOCK (src->srcpad);
1686       event = NULL;
1687       break;
1688
1689       /* downstream serialized events */
1690     case GST_EVENT_EOS:
1691     {
1692       GstBaseSrcClass *bclass;
1693
1694       bclass = GST_BASE_SRC_GET_CLASS (src);
1695
1696       /* queue EOS and make sure the task or pull function performs the EOS
1697        * actions.
1698        *
1699        * We have two possibilities:
1700        *
1701        *  - Before we are to enter the _create function, we check the pending_eos
1702        *    first and do EOS instead of entering it.
1703        *  - If we are in the _create function or we did not manage to set the
1704        *    flag fast enough and we are about to enter the _create function,
1705        *    we unlock it so that we exit with FLUSHING immediately. We then
1706        *    check the EOS flag and do the EOS logic.
1707        */
1708       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1709       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1710
1711
1712       /* unlock the _create function so that we can check the pending_eos flag
1713        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1714        * that we can grab it and stop the unlock again. We don't take the stream
1715        * lock so that this operation is guaranteed to never block. */
1716       gst_base_src_activate_pool (src, FALSE);
1717       if (bclass->unlock)
1718         bclass->unlock (src);
1719
1720       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1721
1722       GST_LIVE_LOCK (src);
1723       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1724       /* now stop the unlock of the streaming thread again. Grabbing the live
1725        * lock is enough because that protects the create function. */
1726       if (bclass->unlock_stop)
1727         bclass->unlock_stop (src);
1728       gst_base_src_activate_pool (src, TRUE);
1729       GST_LIVE_UNLOCK (src);
1730
1731       result = TRUE;
1732       break;
1733     }
1734     case GST_EVENT_SEGMENT:
1735       /* sending random SEGMENT downstream can break sync. */
1736       break;
1737     case GST_EVENT_TAG:
1738     case GST_EVENT_CUSTOM_DOWNSTREAM:
1739     case GST_EVENT_CUSTOM_BOTH:
1740       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1741       GST_OBJECT_LOCK (src);
1742       src->priv->pending_events =
1743           g_list_append (src->priv->pending_events, event);
1744       g_atomic_int_set (&src->priv->have_events, TRUE);
1745       GST_OBJECT_UNLOCK (src);
1746       event = NULL;
1747       result = TRUE;
1748       break;
1749     case GST_EVENT_BUFFERSIZE:
1750       /* does not seem to make much sense currently */
1751       break;
1752
1753       /* upstream events */
1754     case GST_EVENT_QOS:
1755       /* elements should override send_event and do something */
1756       break;
1757     case GST_EVENT_SEEK:
1758     {
1759       gboolean started;
1760
1761       GST_OBJECT_LOCK (src->srcpad);
1762       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1763         goto wrong_mode;
1764       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1765       GST_OBJECT_UNLOCK (src->srcpad);
1766
1767       if (started) {
1768         GST_DEBUG_OBJECT (src, "performing seek");
1769         /* when we are running in push mode, we can execute the
1770          * seek right now. */
1771         result = gst_base_src_perform_seek (src, event, TRUE);
1772       } else {
1773         GstEvent **event_p;
1774
1775         /* else we store the event and execute the seek when we
1776          * get activated */
1777         GST_OBJECT_LOCK (src);
1778         GST_DEBUG_OBJECT (src, "queueing seek");
1779         event_p = &src->pending_seek;
1780         gst_event_replace ((GstEvent **) event_p, event);
1781         GST_OBJECT_UNLOCK (src);
1782         /* assume the seek will work */
1783         result = TRUE;
1784       }
1785       break;
1786     }
1787     case GST_EVENT_NAVIGATION:
1788       /* could make sense for elements that do something with navigation events
1789        * but then they would need to override the send_event function */
1790       break;
1791     case GST_EVENT_LATENCY:
1792       /* does not seem to make sense currently */
1793       break;
1794
1795       /* custom events */
1796     case GST_EVENT_CUSTOM_UPSTREAM:
1797       /* override send_event if you want this */
1798       break;
1799     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1800     case GST_EVENT_CUSTOM_BOTH_OOB:
1801       /* insert a random custom event into the pipeline */
1802       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1803       result = gst_pad_push_event (src->srcpad, event);
1804       /* we gave away the ref to the event in the push */
1805       event = NULL;
1806       break;
1807     default:
1808       break;
1809   }
1810 done:
1811   /* if we still have a ref to the event, unref it now */
1812   if (event)
1813     gst_event_unref (event);
1814
1815   return result;
1816
1817   /* ERRORS */
1818 wrong_mode:
1819   {
1820     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1821     GST_OBJECT_UNLOCK (src->srcpad);
1822     result = FALSE;
1823     goto done;
1824   }
1825 }
1826
1827 static gboolean
1828 gst_base_src_seekable (GstBaseSrc * src)
1829 {
1830   GstBaseSrcClass *bclass;
1831   bclass = GST_BASE_SRC_GET_CLASS (src);
1832   if (bclass->is_seekable)
1833     return bclass->is_seekable (src);
1834   else
1835     return FALSE;
1836 }
1837
1838 static void
1839 gst_base_src_update_qos (GstBaseSrc * src,
1840     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1841 {
1842   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1843       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1844       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1845
1846   GST_OBJECT_LOCK (src);
1847   src->priv->proportion = proportion;
1848   src->priv->earliest_time = timestamp + diff;
1849   GST_OBJECT_UNLOCK (src);
1850 }
1851
1852
1853 static gboolean
1854 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1855 {
1856   gboolean result;
1857
1858   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1859
1860   switch (GST_EVENT_TYPE (event)) {
1861     case GST_EVENT_SEEK:
1862       /* is normally called when in push mode */
1863       if (!gst_base_src_seekable (src))
1864         goto not_seekable;
1865
1866       result = gst_base_src_perform_seek (src, event, TRUE);
1867       break;
1868     case GST_EVENT_FLUSH_START:
1869       /* cancel any blocking getrange, is normally called
1870        * when in pull mode. */
1871       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1872       break;
1873     case GST_EVENT_FLUSH_STOP:
1874       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1875       break;
1876     case GST_EVENT_QOS:
1877     {
1878       gdouble proportion;
1879       GstClockTimeDiff diff;
1880       GstClockTime timestamp;
1881
1882       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1883       gst_base_src_update_qos (src, proportion, diff, timestamp);
1884       result = TRUE;
1885       break;
1886     }
1887     case GST_EVENT_RECONFIGURE:
1888       result = TRUE;
1889       break;
1890     case GST_EVENT_LATENCY:
1891       result = TRUE;
1892       break;
1893     default:
1894       result = FALSE;
1895       break;
1896   }
1897   return result;
1898
1899   /* ERRORS */
1900 not_seekable:
1901   {
1902     GST_DEBUG_OBJECT (src, "is not seekable");
1903     return FALSE;
1904   }
1905 }
1906
1907 static gboolean
1908 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1909 {
1910   GstBaseSrc *src;
1911   GstBaseSrcClass *bclass;
1912   gboolean result = FALSE;
1913
1914   src = GST_BASE_SRC (parent);
1915   bclass = GST_BASE_SRC_GET_CLASS (src);
1916
1917   if (bclass->event) {
1918     if (!(result = bclass->event (src, event)))
1919       goto subclass_failed;
1920   }
1921
1922 done:
1923   gst_event_unref (event);
1924
1925   return result;
1926
1927   /* ERRORS */
1928 subclass_failed:
1929   {
1930     GST_DEBUG_OBJECT (src, "subclass refused event");
1931     goto done;
1932   }
1933 }
1934
1935 static void
1936 gst_base_src_set_property (GObject * object, guint prop_id,
1937     const GValue * value, GParamSpec * pspec)
1938 {
1939   GstBaseSrc *src;
1940
1941   src = GST_BASE_SRC (object);
1942
1943   switch (prop_id) {
1944     case PROP_BLOCKSIZE:
1945       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1946       break;
1947     case PROP_NUM_BUFFERS:
1948       src->num_buffers = g_value_get_int (value);
1949       break;
1950     case PROP_TYPEFIND:
1951       src->typefind = g_value_get_boolean (value);
1952       break;
1953     case PROP_DO_TIMESTAMP:
1954       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1955       break;
1956     default:
1957       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1958       break;
1959   }
1960 }
1961
1962 static void
1963 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1964     GParamSpec * pspec)
1965 {
1966   GstBaseSrc *src;
1967
1968   src = GST_BASE_SRC (object);
1969
1970   switch (prop_id) {
1971     case PROP_BLOCKSIZE:
1972       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1973       break;
1974     case PROP_NUM_BUFFERS:
1975       g_value_set_int (value, src->num_buffers);
1976       break;
1977     case PROP_TYPEFIND:
1978       g_value_set_boolean (value, src->typefind);
1979       break;
1980     case PROP_DO_TIMESTAMP:
1981       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1982       break;
1983     default:
1984       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1985       break;
1986   }
1987 }
1988
1989 /* with STREAM_LOCK and LOCK */
1990 static GstClockReturn
1991 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1992 {
1993   GstClockReturn ret;
1994   GstClockID id;
1995
1996   id = gst_clock_new_single_shot_id (clock, time);
1997
1998   basesrc->clock_id = id;
1999   /* release the live lock while waiting */
2000   GST_LIVE_UNLOCK (basesrc);
2001
2002   ret = gst_clock_id_wait (id, NULL);
2003
2004   GST_LIVE_LOCK (basesrc);
2005   gst_clock_id_unref (id);
2006   basesrc->clock_id = NULL;
2007
2008   return ret;
2009 }
2010
2011 /* perform synchronisation on a buffer.
2012  * with STREAM_LOCK.
2013  */
2014 static GstClockReturn
2015 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2016 {
2017   GstClockReturn result;
2018   GstClockTime start, end;
2019   GstBaseSrcClass *bclass;
2020   GstClockTime base_time;
2021   GstClock *clock;
2022   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2023   gboolean do_timestamp, first, pseudo_live, is_live;
2024
2025   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2026
2027   start = end = -1;
2028   if (bclass->get_times)
2029     bclass->get_times (basesrc, buffer, &start, &end);
2030
2031   /* get buffer timestamp */
2032   dts = GST_BUFFER_DTS (buffer);
2033   pts = GST_BUFFER_PTS (buffer);
2034
2035   if (GST_CLOCK_TIME_IS_VALID (dts))
2036     timestamp = dts;
2037   else
2038     timestamp = pts;
2039
2040   /* grab the lock to prepare for clocking and calculate the startup
2041    * latency. */
2042   GST_OBJECT_LOCK (basesrc);
2043
2044   is_live = basesrc->is_live;
2045   /* if we are asked to sync against the clock we are a pseudo live element */
2046   pseudo_live = (start != -1 && is_live);
2047   /* check for the first buffer */
2048   first = (basesrc->priv->latency == -1);
2049
2050   if (timestamp != -1 && pseudo_live) {
2051     GstClockTime latency;
2052
2053     /* we have a timestamp and a sync time, latency is the diff */
2054     if (timestamp <= start)
2055       latency = start - timestamp;
2056     else
2057       latency = 0;
2058
2059     if (first) {
2060       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2061           GST_TIME_ARGS (latency));
2062       /* first time we calculate latency, just configure */
2063       basesrc->priv->latency = latency;
2064     } else {
2065       if (basesrc->priv->latency != latency) {
2066         /* we have a new latency, FIXME post latency message */
2067         basesrc->priv->latency = latency;
2068         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2069             GST_TIME_ARGS (latency));
2070       }
2071     }
2072   } else if (first) {
2073     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2074         is_live, start != -1);
2075     basesrc->priv->latency = 0;
2076   }
2077
2078   /* get clock, if no clock, we can't sync or do timestamps */
2079   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2080     goto no_clock;
2081   else
2082     gst_object_ref (clock);
2083
2084   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2085
2086   do_timestamp = basesrc->priv->do_timestamp;
2087   GST_OBJECT_UNLOCK (basesrc);
2088
2089   /* first buffer, calculate the timestamp offset */
2090   if (first) {
2091     GstClockTime running_time;
2092
2093     now = gst_clock_get_time (clock);
2094     running_time = now - base_time;
2095
2096     GST_LOG_OBJECT (basesrc,
2097         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2098         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2099         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2100
2101     if (pseudo_live && timestamp != -1) {
2102       /* live source and we need to sync, add startup latency to all timestamps
2103        * to get the real running_time. Live sources should always timestamp
2104        * according to the current running time. */
2105       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2106
2107       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2108           GST_TIME_ARGS (basesrc->priv->ts_offset));
2109     } else {
2110       basesrc->priv->ts_offset = 0;
2111       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2112     }
2113
2114     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2115       if (do_timestamp) {
2116         dts = running_time;
2117       } else {
2118         dts = 0;
2119       }
2120       GST_BUFFER_DTS (buffer) = dts;
2121
2122       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2123           GST_TIME_ARGS (dts));
2124     }
2125   } else {
2126     /* not the first buffer, the timestamp is the diff between the clock and
2127      * base_time */
2128     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2129       now = gst_clock_get_time (clock);
2130
2131       dts = now - base_time;
2132       GST_BUFFER_DTS (buffer) = dts;
2133
2134       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2135           GST_TIME_ARGS (dts));
2136     }
2137   }
2138   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2139     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2140       pts = dts;
2141
2142     GST_BUFFER_PTS (buffer) = dts;
2143
2144     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2145         GST_TIME_ARGS (pts));
2146   }
2147
2148   /* if we don't have a buffer timestamp, we don't sync */
2149   if (!GST_CLOCK_TIME_IS_VALID (start))
2150     goto no_sync;
2151
2152   if (is_live) {
2153     /* for pseudo live sources, add our ts_offset to the timestamp */
2154     if (GST_CLOCK_TIME_IS_VALID (pts))
2155       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2156     if (GST_CLOCK_TIME_IS_VALID (dts))
2157       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2158     start += basesrc->priv->ts_offset;
2159   }
2160
2161   GST_LOG_OBJECT (basesrc,
2162       "waiting for clock, base time %" GST_TIME_FORMAT
2163       ", stream_start %" GST_TIME_FORMAT,
2164       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2165
2166   result = gst_base_src_wait (basesrc, clock, start + base_time);
2167
2168   gst_object_unref (clock);
2169
2170   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2171
2172   return result;
2173
2174   /* special cases */
2175 no_clock:
2176   {
2177     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2178     GST_OBJECT_UNLOCK (basesrc);
2179     return GST_CLOCK_OK;
2180   }
2181 no_sync:
2182   {
2183     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2184     gst_object_unref (clock);
2185     return GST_CLOCK_OK;
2186   }
2187 }
2188
2189 /* Called with STREAM_LOCK and LIVE_LOCK */
2190 static gboolean
2191 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2192 {
2193   guint64 size, maxsize;
2194   GstBaseSrcClass *bclass;
2195   GstFormat format;
2196   gint64 stop;
2197   gboolean dynamic;
2198
2199   bclass = GST_BASE_SRC_GET_CLASS (src);
2200
2201   format = src->segment.format;
2202   stop = src->segment.stop;
2203   /* get total file size */
2204   size = src->segment.duration;
2205
2206   /* only operate if we are working with bytes */
2207   if (format != GST_FORMAT_BYTES)
2208     return TRUE;
2209
2210   /* the max amount of bytes to read is the total size or
2211    * up to the segment.stop if present. */
2212   if (stop != -1)
2213     maxsize = MIN (size, stop);
2214   else
2215     maxsize = size;
2216
2217   GST_DEBUG_OBJECT (src,
2218       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2219       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2220       *length, size, stop, maxsize);
2221
2222   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2223   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2224
2225   /* check size if we have one */
2226   if (maxsize != -1) {
2227     /* if we run past the end, check if the file became bigger and
2228      * retry. */
2229     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2230       /* see if length of the file changed */
2231       if (bclass->get_size)
2232         if (!bclass->get_size (src, &size))
2233           size = -1;
2234
2235       /* make sure we don't exceed the configured segment stop
2236        * if it was set */
2237       if (stop != -1)
2238         maxsize = MIN (size, stop);
2239       else
2240         maxsize = size;
2241
2242       /* if we are at or past the end, EOS */
2243       if (G_UNLIKELY (offset >= maxsize))
2244         goto unexpected_length;
2245
2246       /* else we can clip to the end */
2247       if (G_UNLIKELY (offset + *length >= maxsize))
2248         *length = maxsize - offset;
2249
2250     }
2251   }
2252
2253   /* keep track of current duration.
2254    * segment is in bytes, we checked that above. */
2255   GST_OBJECT_LOCK (src);
2256   src->segment.duration = size;
2257   GST_OBJECT_UNLOCK (src);
2258
2259   return TRUE;
2260
2261   /* ERRORS */
2262 unexpected_length:
2263   {
2264     return FALSE;
2265   }
2266 }
2267
2268 /* must be called with LIVE_LOCK */
2269 static GstFlowReturn
2270 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2271     GstBuffer ** buf)
2272 {
2273   GstFlowReturn ret;
2274   GstBaseSrcClass *bclass;
2275   GstClockReturn status;
2276   GstBuffer *res_buf;
2277   GstBuffer *in_buf;
2278
2279   bclass = GST_BASE_SRC_GET_CLASS (src);
2280
2281 again:
2282   if (src->is_live) {
2283     if (G_UNLIKELY (!src->live_running)) {
2284       ret = gst_base_src_wait_playing (src);
2285       if (ret != GST_FLOW_OK)
2286         goto stopped;
2287     }
2288   }
2289
2290   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2291           && !GST_BASE_SRC_IS_STARTING (src)))
2292     goto not_started;
2293
2294   if (G_UNLIKELY (!bclass->create))
2295     goto no_function;
2296
2297   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2298     goto unexpected_length;
2299
2300   /* track position */
2301   GST_OBJECT_LOCK (src);
2302   if (src->segment.format == GST_FORMAT_BYTES)
2303     src->segment.position = offset;
2304   GST_OBJECT_UNLOCK (src);
2305
2306   /* normally we don't count buffers */
2307   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2308     if (src->num_buffers_left == 0)
2309       goto reached_num_buffers;
2310     else
2311       src->num_buffers_left--;
2312   }
2313
2314   /* don't enter the create function if a pending EOS event was set. For the
2315    * logic of the pending_eos, check the event function of this class. */
2316   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2317     goto eos;
2318
2319   GST_DEBUG_OBJECT (src,
2320       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2321       G_GINT64_FORMAT, offset, length, src->segment.time);
2322
2323   res_buf = in_buf = *buf;
2324
2325   ret = bclass->create (src, offset, length, &res_buf);
2326
2327   /* The create function could be unlocked because we have a pending EOS. It's
2328    * possible that we have a valid buffer from create that we need to
2329    * discard when the create function returned _OK. */
2330   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2331     if (ret == GST_FLOW_OK) {
2332       if (*buf == NULL)
2333         gst_buffer_unref (res_buf);
2334     }
2335     goto eos;
2336   }
2337
2338   if (G_UNLIKELY (ret != GST_FLOW_OK))
2339     goto not_ok;
2340
2341   /* fallback in case the create function didn't fill a provided buffer */
2342   if (in_buf != NULL && res_buf != in_buf) {
2343     GstMapInfo info;
2344     gsize copied_size;
2345
2346     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2347         "fill the provided buffer, copying");
2348
2349     gst_buffer_map (in_buf, &info, GST_MAP_WRITE);
2350     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2351     gst_buffer_unmap (in_buf, &info);
2352     gst_buffer_set_size (in_buf, copied_size);
2353
2354     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2355
2356     gst_buffer_unref (res_buf);
2357     res_buf = in_buf;
2358   }
2359
2360   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2361   if (offset == 0 && src->segment.time == 0
2362       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2363     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2364     res_buf = gst_buffer_make_writable (res_buf);
2365     GST_BUFFER_DTS (res_buf) = 0;
2366   }
2367
2368   /* now sync before pushing the buffer */
2369   status = gst_base_src_do_sync (src, res_buf);
2370
2371   /* waiting for the clock could have made us flushing */
2372   if (G_UNLIKELY (src->priv->flushing))
2373     goto flushing;
2374
2375   switch (status) {
2376     case GST_CLOCK_EARLY:
2377       /* the buffer is too late. We currently don't drop the buffer. */
2378       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2379       break;
2380     case GST_CLOCK_OK:
2381       /* buffer synchronised properly */
2382       GST_DEBUG_OBJECT (src, "buffer ok");
2383       break;
2384     case GST_CLOCK_UNSCHEDULED:
2385       /* this case is triggered when we were waiting for the clock and
2386        * it got unlocked because we did a state change. In any case, get rid of
2387        * the buffer. */
2388       if (*buf == NULL)
2389         gst_buffer_unref (res_buf);
2390
2391       if (!src->live_running) {
2392         /* We return FLUSHING when we are not running to stop the dataflow also
2393          * get rid of the produced buffer. */
2394         GST_DEBUG_OBJECT (src,
2395             "clock was unscheduled (%d), returning FLUSHING", status);
2396         ret = GST_FLOW_FLUSHING;
2397       } else {
2398         /* If we are running when this happens, we quickly switched between
2399          * pause and playing. We try to produce a new buffer */
2400         GST_DEBUG_OBJECT (src,
2401             "clock was unscheduled (%d), but we are running", status);
2402         goto again;
2403       }
2404       break;
2405     default:
2406       /* all other result values are unexpected and errors */
2407       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2408           (_("Internal clock error.")),
2409           ("clock returned unexpected return value %d", status));
2410       if (*buf == NULL)
2411         gst_buffer_unref (res_buf);
2412       ret = GST_FLOW_ERROR;
2413       break;
2414   }
2415   if (G_LIKELY (ret == GST_FLOW_OK))
2416     *buf = res_buf;
2417
2418   return ret;
2419
2420   /* ERROR */
2421 stopped:
2422   {
2423     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2424         gst_flow_get_name (ret));
2425     return ret;
2426   }
2427 not_ok:
2428   {
2429     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2430         gst_flow_get_name (ret));
2431     return ret;
2432   }
2433 not_started:
2434   {
2435     GST_DEBUG_OBJECT (src, "getrange but not started");
2436     return GST_FLOW_FLUSHING;
2437   }
2438 no_function:
2439   {
2440     GST_DEBUG_OBJECT (src, "no create function");
2441     return GST_FLOW_NOT_SUPPORTED;
2442   }
2443 unexpected_length:
2444   {
2445     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2446         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2447     return GST_FLOW_EOS;
2448   }
2449 reached_num_buffers:
2450   {
2451     GST_DEBUG_OBJECT (src, "sent all buffers");
2452     return GST_FLOW_EOS;
2453   }
2454 flushing:
2455   {
2456     GST_DEBUG_OBJECT (src, "we are flushing");
2457     if (*buf == NULL)
2458       gst_buffer_unref (res_buf);
2459     return GST_FLOW_FLUSHING;
2460   }
2461 eos:
2462   {
2463     GST_DEBUG_OBJECT (src, "we are EOS");
2464     return GST_FLOW_EOS;
2465   }
2466 }
2467
2468 static GstFlowReturn
2469 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2470     guint length, GstBuffer ** buf)
2471 {
2472   GstBaseSrc *src;
2473   GstFlowReturn res;
2474
2475   src = GST_BASE_SRC_CAST (parent);
2476
2477   GST_LIVE_LOCK (src);
2478   if (G_UNLIKELY (src->priv->flushing))
2479     goto flushing;
2480
2481   res = gst_base_src_get_range (src, offset, length, buf);
2482
2483 done:
2484   GST_LIVE_UNLOCK (src);
2485
2486   return res;
2487
2488   /* ERRORS */
2489 flushing:
2490   {
2491     GST_DEBUG_OBJECT (src, "we are flushing");
2492     res = GST_FLOW_FLUSHING;
2493     goto done;
2494   }
2495 }
2496
2497 static gboolean
2498 gst_base_src_is_random_access (GstBaseSrc * src)
2499 {
2500   /* we need to start the basesrc to check random access */
2501   if (!GST_BASE_SRC_IS_STARTED (src)) {
2502     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2503     if (G_LIKELY (gst_base_src_start (src))) {
2504       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2505         goto start_failed;
2506       gst_base_src_stop (src);
2507     }
2508   }
2509
2510   return src->random_access;
2511
2512   /* ERRORS */
2513 start_failed:
2514   {
2515     GST_DEBUG_OBJECT (src, "failed to start");
2516     return FALSE;
2517   }
2518 }
2519
2520 static void
2521 gst_base_src_loop (GstPad * pad)
2522 {
2523   GstBaseSrc *src;
2524   GstBuffer *buf = NULL;
2525   GstFlowReturn ret;
2526   gint64 position;
2527   gboolean eos;
2528   guint blocksize;
2529   GList *pending_events = NULL, *tmp;
2530
2531   eos = FALSE;
2532
2533   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2534
2535   gst_base_src_send_stream_start (src);
2536
2537   /* check if we need to renegotiate */
2538   if (gst_pad_check_reconfigure (pad)) {
2539     if (!gst_base_src_negotiate (src))
2540       goto not_negotiated;
2541   }
2542
2543   GST_LIVE_LOCK (src);
2544
2545   if (G_UNLIKELY (src->priv->flushing))
2546     goto flushing;
2547
2548   blocksize = src->blocksize;
2549
2550   /* if we operate in bytes, we can calculate an offset */
2551   if (src->segment.format == GST_FORMAT_BYTES) {
2552     position = src->segment.position;
2553     /* for negative rates, start with subtracting the blocksize */
2554     if (src->segment.rate < 0.0) {
2555       /* we cannot go below segment.start */
2556       if (position > src->segment.start + blocksize)
2557         position -= blocksize;
2558       else {
2559         /* last block, remainder up to segment.start */
2560         blocksize = position - src->segment.start;
2561         position = src->segment.start;
2562       }
2563     }
2564   } else
2565     position = -1;
2566
2567   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2568       GST_TIME_ARGS (position), blocksize);
2569
2570   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2571   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2572     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2573         gst_flow_get_name (ret));
2574     GST_LIVE_UNLOCK (src);
2575     goto pause;
2576   }
2577   /* this should not happen */
2578   if (G_UNLIKELY (buf == NULL))
2579     goto null_buffer;
2580
2581   /* push events to close/start our segment before we push the buffer. */
2582   if (G_UNLIKELY (src->priv->segment_pending)) {
2583     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2584     src->priv->segment_pending = FALSE;
2585   }
2586
2587   if (g_atomic_int_get (&src->priv->have_events)) {
2588     GST_OBJECT_LOCK (src);
2589     /* take the events */
2590     pending_events = src->priv->pending_events;
2591     src->priv->pending_events = NULL;
2592     g_atomic_int_set (&src->priv->have_events, FALSE);
2593     GST_OBJECT_UNLOCK (src);
2594   }
2595
2596   /* Push out pending events if any */
2597   if (G_UNLIKELY (pending_events != NULL)) {
2598     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2599       GstEvent *ev = (GstEvent *) tmp->data;
2600       gst_pad_push_event (pad, ev);
2601     }
2602     g_list_free (pending_events);
2603   }
2604
2605   /* figure out the new position */
2606   switch (src->segment.format) {
2607     case GST_FORMAT_BYTES:
2608     {
2609       guint bufsize = gst_buffer_get_size (buf);
2610
2611       /* we subtracted above for negative rates */
2612       if (src->segment.rate >= 0.0)
2613         position += bufsize;
2614       break;
2615     }
2616     case GST_FORMAT_TIME:
2617     {
2618       GstClockTime start, duration;
2619
2620       start = GST_BUFFER_TIMESTAMP (buf);
2621       duration = GST_BUFFER_DURATION (buf);
2622
2623       if (GST_CLOCK_TIME_IS_VALID (start))
2624         position = start;
2625       else
2626         position = src->segment.position;
2627
2628       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2629         if (src->segment.rate >= 0.0)
2630           position += duration;
2631         else if (position > duration)
2632           position -= duration;
2633         else
2634           position = 0;
2635       }
2636       break;
2637     }
2638     case GST_FORMAT_DEFAULT:
2639       if (src->segment.rate >= 0.0)
2640         position = GST_BUFFER_OFFSET_END (buf);
2641       else
2642         position = GST_BUFFER_OFFSET (buf);
2643       break;
2644     default:
2645       position = -1;
2646       break;
2647   }
2648   if (position != -1) {
2649     if (src->segment.rate >= 0.0) {
2650       /* positive rate, check if we reached the stop */
2651       if (src->segment.stop != -1) {
2652         if (position >= src->segment.stop) {
2653           eos = TRUE;
2654           position = src->segment.stop;
2655         }
2656       }
2657     } else {
2658       /* negative rate, check if we reached the start. start is always set to
2659        * something different from -1 */
2660       if (position <= src->segment.start) {
2661         eos = TRUE;
2662         position = src->segment.start;
2663       }
2664       /* when going reverse, all buffers are DISCONT */
2665       src->priv->discont = TRUE;
2666     }
2667     GST_OBJECT_LOCK (src);
2668     src->segment.position = position;
2669     GST_OBJECT_UNLOCK (src);
2670   }
2671
2672   if (G_UNLIKELY (src->priv->discont)) {
2673     GST_INFO_OBJECT (src, "marking pending DISCONT");
2674     buf = gst_buffer_make_writable (buf);
2675     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2676     src->priv->discont = FALSE;
2677   }
2678   GST_LIVE_UNLOCK (src);
2679
2680   ret = gst_pad_push (pad, buf);
2681   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2682     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2683         gst_flow_get_name (ret));
2684     goto pause;
2685   }
2686
2687   if (G_UNLIKELY (eos)) {
2688     GST_INFO_OBJECT (src, "pausing after end of segment");
2689     ret = GST_FLOW_EOS;
2690     goto pause;
2691   }
2692
2693 done:
2694   return;
2695
2696   /* special cases */
2697 not_negotiated:
2698   {
2699     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2700     ret = GST_FLOW_NOT_NEGOTIATED;
2701     goto pause;
2702   }
2703 flushing:
2704   {
2705     GST_DEBUG_OBJECT (src, "we are flushing");
2706     GST_LIVE_UNLOCK (src);
2707     ret = GST_FLOW_FLUSHING;
2708     goto pause;
2709   }
2710 pause:
2711   {
2712     const gchar *reason = gst_flow_get_name (ret);
2713     GstEvent *event;
2714
2715     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2716     src->running = FALSE;
2717     gst_pad_pause_task (pad);
2718     if (ret == GST_FLOW_EOS) {
2719       gboolean flag_segment;
2720       GstFormat format;
2721       gint64 position;
2722
2723       /* perform EOS logic */
2724       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2725       format = src->segment.format;
2726       position = src->segment.position;
2727
2728       if (flag_segment) {
2729         GstMessage *message;
2730
2731         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2732             format, position);
2733         gst_message_set_seqnum (message, src->priv->seqnum);
2734         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2735         event = gst_event_new_segment_done (format, position);
2736         gst_event_set_seqnum (event, src->priv->seqnum);
2737         gst_pad_push_event (pad, event);
2738       } else {
2739         event = gst_event_new_eos ();
2740         gst_event_set_seqnum (event, src->priv->seqnum);
2741         gst_pad_push_event (pad, event);
2742       }
2743     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2744       event = gst_event_new_eos ();
2745       gst_event_set_seqnum (event, src->priv->seqnum);
2746       /* for fatal errors we post an error message, post the error
2747        * first so the app knows about the error first.
2748        * Also don't do this for FLUSHING because it happens
2749        * due to flushing and posting an error message because of
2750        * that is the wrong thing to do, e.g. when we're doing
2751        * a flushing seek. */
2752       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2753           (_("Internal data flow error.")),
2754           ("streaming task paused, reason %s (%d)", reason, ret));
2755       gst_pad_push_event (pad, event);
2756     }
2757     goto done;
2758   }
2759 null_buffer:
2760   {
2761     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2762         (_("Internal data flow error.")), ("element returned NULL buffer"));
2763     GST_LIVE_UNLOCK (src);
2764     goto done;
2765   }
2766 }
2767
2768 static gboolean
2769 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2770     GstAllocator * allocator, GstAllocationParams * params)
2771 {
2772   GstAllocator *oldalloc;
2773   GstBufferPool *oldpool;
2774   GstBaseSrcPrivate *priv = basesrc->priv;
2775
2776   if (pool) {
2777     GST_DEBUG_OBJECT (basesrc, "activate pool");
2778     if (!gst_buffer_pool_set_active (pool, TRUE))
2779       goto activate_failed;
2780   }
2781
2782   GST_OBJECT_LOCK (basesrc);
2783   oldpool = priv->pool;
2784   priv->pool = pool;
2785
2786   oldalloc = priv->allocator;
2787   priv->allocator = allocator;
2788
2789   if (params)
2790     priv->params = *params;
2791   else
2792     gst_allocation_params_init (&priv->params);
2793   GST_OBJECT_UNLOCK (basesrc);
2794
2795   if (oldpool) {
2796     /* only deactivate if the pool is not the one we're using */
2797     if (oldpool != pool) {
2798       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2799       gst_buffer_pool_set_active (oldpool, FALSE);
2800     }
2801     gst_object_unref (oldpool);
2802   }
2803   if (oldalloc) {
2804     gst_object_unref (oldalloc);
2805   }
2806   return TRUE;
2807
2808   /* ERRORS */
2809 activate_failed:
2810   {
2811     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2812     return FALSE;
2813   }
2814 }
2815
2816 static gboolean
2817 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2818 {
2819   GstBaseSrcPrivate *priv = basesrc->priv;
2820   GstBufferPool *pool;
2821   gboolean res = TRUE;
2822
2823   GST_OBJECT_LOCK (basesrc);
2824   if ((pool = priv->pool))
2825     pool = gst_object_ref (pool);
2826   GST_OBJECT_UNLOCK (basesrc);
2827
2828   if (pool) {
2829     res = gst_buffer_pool_set_active (pool, active);
2830     gst_object_unref (pool);
2831   }
2832   return res;
2833 }
2834
2835
2836 static gboolean
2837 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2838 {
2839   GstCaps *outcaps;
2840   GstBufferPool *pool;
2841   guint size, min, max;
2842   GstAllocator *allocator;
2843   GstAllocationParams params;
2844   GstStructure *config;
2845   gboolean update_allocator;
2846
2847   gst_query_parse_allocation (query, &outcaps, NULL);
2848
2849   /* we got configuration from our peer or the decide_allocation method,
2850    * parse them */
2851   if (gst_query_get_n_allocation_params (query) > 0) {
2852     /* try the allocator */
2853     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2854     update_allocator = TRUE;
2855   } else {
2856     allocator = NULL;
2857     gst_allocation_params_init (&params);
2858     update_allocator = FALSE;
2859   }
2860
2861   if (gst_query_get_n_allocation_pools (query) > 0) {
2862     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2863
2864     if (pool == NULL) {
2865       /* no pool, we can make our own */
2866       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2867       pool = gst_buffer_pool_new ();
2868     }
2869   } else {
2870     pool = NULL;
2871     size = min = max = 0;
2872   }
2873
2874   /* now configure */
2875   if (pool) {
2876     config = gst_buffer_pool_get_config (pool);
2877     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2878     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2879     gst_buffer_pool_set_config (pool, config);
2880   }
2881
2882   if (update_allocator)
2883     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2884   else
2885     gst_query_add_allocation_param (query, allocator, &params);
2886   if (allocator)
2887     gst_object_unref (allocator);
2888
2889   if (pool) {
2890     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2891     gst_object_unref (pool);
2892   }
2893
2894   return TRUE;
2895 }
2896
2897 static gboolean
2898 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2899 {
2900   GstBaseSrcClass *bclass;
2901   gboolean result = TRUE;
2902   GstQuery *query;
2903   GstBufferPool *pool = NULL;
2904   GstAllocator *allocator = NULL;
2905   GstAllocationParams params;
2906
2907   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2908
2909   /* make query and let peer pad answer, we don't really care if it worked or
2910    * not, if it failed, the allocation query would contain defaults and the
2911    * subclass would then set better values if needed */
2912   query = gst_query_new_allocation (caps, TRUE);
2913   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2914     /* not a problem, just debug a little */
2915     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2916   }
2917
2918   g_assert (bclass->decide_allocation != NULL);
2919   result = bclass->decide_allocation (basesrc, query);
2920
2921   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2922       query);
2923
2924   if (!result)
2925     goto no_decide_allocation;
2926
2927   /* we got configuration from our peer or the decide_allocation method,
2928    * parse them */
2929   if (gst_query_get_n_allocation_params (query) > 0) {
2930     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2931   } else {
2932     allocator = NULL;
2933     gst_allocation_params_init (&params);
2934   }
2935
2936   if (gst_query_get_n_allocation_pools (query) > 0)
2937     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2938
2939   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2940
2941   gst_query_unref (query);
2942
2943   return result;
2944
2945   /* Errors */
2946 no_decide_allocation:
2947   {
2948     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
2949     gst_query_unref (query);
2950
2951     return result;
2952   }
2953 }
2954
2955 /* default negotiation code.
2956  *
2957  * Take intersection between src and sink pads, take first
2958  * caps and fixate.
2959  */
2960 static gboolean
2961 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2962 {
2963   GstCaps *thiscaps;
2964   GstCaps *caps = NULL;
2965   GstCaps *peercaps = NULL;
2966   gboolean result = FALSE;
2967
2968   /* first see what is possible on our source pad */
2969   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2970   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2971   /* nothing or anything is allowed, we're done */
2972   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2973     goto no_nego_needed;
2974
2975   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2976     goto no_caps;
2977
2978   /* get the peer caps */
2979   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2980   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2981   if (peercaps) {
2982     /* The result is already a subset of our caps */
2983     caps = peercaps;
2984     gst_caps_unref (thiscaps);
2985   } else {
2986     /* no peer, work with our own caps then */
2987     caps = thiscaps;
2988   }
2989   if (caps && !gst_caps_is_empty (caps)) {
2990     /* now fixate */
2991     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2992     if (gst_caps_is_any (caps)) {
2993       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
2994       /* hmm, still anything, so element can do anything and
2995        * nego is not needed */
2996       result = TRUE;
2997     } else {
2998       caps = gst_base_src_fixate (basesrc, caps);
2999       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3000       if (gst_caps_is_fixed (caps)) {
3001         /* yay, fixed caps, use those then, it's possible that the subclass does
3002          * not accept this caps after all and we have to fail. */
3003         result = gst_base_src_set_caps (basesrc, caps);
3004       }
3005     }
3006     gst_caps_unref (caps);
3007   } else {
3008     if (caps)
3009       gst_caps_unref (caps);
3010     GST_DEBUG_OBJECT (basesrc, "no common caps");
3011   }
3012   return result;
3013
3014 no_nego_needed:
3015   {
3016     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3017     if (thiscaps)
3018       gst_caps_unref (thiscaps);
3019     return TRUE;
3020   }
3021 no_caps:
3022   {
3023     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3024         ("No supported formats found"),
3025         ("This element did not produce valid caps"));
3026     if (thiscaps)
3027       gst_caps_unref (thiscaps);
3028     return TRUE;
3029   }
3030 }
3031
3032 static gboolean
3033 gst_base_src_negotiate (GstBaseSrc * basesrc)
3034 {
3035   GstBaseSrcClass *bclass;
3036   gboolean result;
3037
3038   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3039
3040   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3041
3042   if (G_LIKELY (bclass->negotiate))
3043     result = bclass->negotiate (basesrc);
3044   else
3045     result = TRUE;
3046
3047   if (G_LIKELY (result)) {
3048     GstCaps *caps;
3049
3050     caps = gst_pad_get_current_caps (basesrc->srcpad);
3051
3052     result = gst_base_src_prepare_allocation (basesrc, caps);
3053
3054     if (caps)
3055       gst_caps_unref (caps);
3056   }
3057   return result;
3058 }
3059
3060 static gboolean
3061 gst_base_src_start (GstBaseSrc * basesrc)
3062 {
3063   GstBaseSrcClass *bclass;
3064   gboolean result;
3065
3066   GST_LIVE_LOCK (basesrc);
3067   if (GST_BASE_SRC_IS_STARTING (basesrc))
3068     goto was_starting;
3069   if (GST_BASE_SRC_IS_STARTED (basesrc))
3070     goto was_started;
3071
3072   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3073   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3074   basesrc->num_buffers_left = basesrc->num_buffers;
3075   basesrc->running = FALSE;
3076   basesrc->priv->segment_pending = FALSE;
3077   GST_OBJECT_LOCK (basesrc);
3078   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3079   GST_OBJECT_UNLOCK (basesrc);
3080   GST_LIVE_UNLOCK (basesrc);
3081
3082   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3083   if (bclass->start)
3084     result = bclass->start (basesrc);
3085   else
3086     result = TRUE;
3087
3088   if (!result)
3089     goto could_not_start;
3090
3091   if (!gst_base_src_is_async (basesrc))
3092     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3093
3094   return result;
3095
3096   /* ERROR */
3097 was_starting:
3098   {
3099     GST_DEBUG_OBJECT (basesrc, "was starting");
3100     GST_LIVE_UNLOCK (basesrc);
3101     return TRUE;
3102   }
3103 was_started:
3104   {
3105     GST_DEBUG_OBJECT (basesrc, "was started");
3106     GST_LIVE_UNLOCK (basesrc);
3107     return TRUE;
3108   }
3109 could_not_start:
3110   {
3111     GST_DEBUG_OBJECT (basesrc, "could not start");
3112     /* subclass is supposed to post a message. We don't have to call _stop. */
3113     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3114     return FALSE;
3115   }
3116 }
3117
3118 /**
3119  * gst_base_src_start_complete:
3120  * @basesrc: base source instance
3121  * @ret: a #GstFlowReturn
3122  *
3123  * Complete an asynchronous start operation. When the subclass overrides the
3124  * start method, it should call gst_base_src_start_complete() when the start
3125  * operation completes either from the same thread or from an asynchronous
3126  * helper thread.
3127  */
3128 void
3129 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3130 {
3131   gboolean have_size;
3132   guint64 size;
3133   gboolean seekable;
3134   GstFormat format;
3135   GstPadMode mode;
3136   GstEvent *event;
3137
3138   if (ret != GST_FLOW_OK)
3139     goto error;
3140
3141   GST_DEBUG_OBJECT (basesrc, "starting source");
3142   format = basesrc->segment.format;
3143
3144   /* figure out the size */
3145   have_size = FALSE;
3146   size = -1;
3147   if (format == GST_FORMAT_BYTES) {
3148     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3149
3150     if (bclass->get_size) {
3151       if (!(have_size = bclass->get_size (basesrc, &size)))
3152         size = -1;
3153     }
3154     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3155     /* only update the size when operating in bytes, subclass is supposed
3156      * to set duration in the start method for other formats */
3157     GST_OBJECT_LOCK (basesrc);
3158     basesrc->segment.duration = size;
3159     GST_OBJECT_UNLOCK (basesrc);
3160   }
3161
3162   GST_DEBUG_OBJECT (basesrc,
3163       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3164       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3165       basesrc->segment.duration);
3166
3167   seekable = gst_base_src_seekable (basesrc);
3168   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3169
3170   /* update for random access flag */
3171   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3172
3173   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3174
3175   /* stop flushing now but for live sources, still block in the LIVE lock when
3176    * we are not yet PLAYING */
3177   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3178
3179   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3180
3181   GST_OBJECT_LOCK (basesrc->srcpad);
3182   mode = GST_PAD_MODE (basesrc->srcpad);
3183   GST_OBJECT_UNLOCK (basesrc->srcpad);
3184
3185   /* take the stream lock here, we only want to let the task run when we have
3186    * set the STARTED flag */
3187   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3188   if (mode == GST_PAD_MODE_PUSH) {
3189     /* do initial seek, which will start the task */
3190     GST_OBJECT_LOCK (basesrc);
3191     event = basesrc->pending_seek;
3192     basesrc->pending_seek = NULL;
3193     GST_OBJECT_UNLOCK (basesrc);
3194
3195     /* The perform seek code will start the task when finished. We don't have to
3196      * unlock the streaming thread because it is not running yet */
3197     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3198       goto seek_failed;
3199
3200     if (event)
3201       gst_event_unref (event);
3202   } else {
3203     /* if not random_access, we cannot operate in pull mode for now */
3204     if (G_UNLIKELY (!basesrc->random_access))
3205       goto no_get_range;
3206   }
3207
3208   GST_LIVE_LOCK (basesrc);
3209   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3210   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3211   basesrc->priv->start_result = ret;
3212   GST_LIVE_SIGNAL (basesrc);
3213   GST_LIVE_UNLOCK (basesrc);
3214
3215   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3216
3217   return;
3218
3219 seek_failed:
3220   {
3221     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3222     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3223     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3224     if (event)
3225       gst_event_unref (event);
3226     ret = GST_FLOW_ERROR;
3227     goto error;
3228   }
3229 no_get_range:
3230   {
3231     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3232     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3233     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3234     ret = GST_FLOW_ERROR;
3235     goto error;
3236   }
3237 error:
3238   {
3239     GST_LIVE_LOCK (basesrc);
3240     basesrc->priv->start_result = ret;
3241     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3242     GST_LIVE_SIGNAL (basesrc);
3243     GST_LIVE_UNLOCK (basesrc);
3244     return;
3245   }
3246 }
3247
3248 /**
3249  * gst_base_src_start_wait:
3250  * @basesrc: base source instance
3251  *
3252  * Wait until the start operation completes.
3253  *
3254  * Returns: a #GstFlowReturn.
3255  */
3256 GstFlowReturn
3257 gst_base_src_start_wait (GstBaseSrc * basesrc)
3258 {
3259   GstFlowReturn result;
3260
3261   GST_LIVE_LOCK (basesrc);
3262   if (G_UNLIKELY (basesrc->priv->flushing))
3263     goto flushing;
3264
3265   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3266     GST_LIVE_WAIT (basesrc);
3267     if (G_UNLIKELY (basesrc->priv->flushing))
3268       goto flushing;
3269   }
3270   result = basesrc->priv->start_result;
3271   GST_LIVE_UNLOCK (basesrc);
3272
3273   return result;
3274
3275   /* ERRORS */
3276 flushing:
3277   {
3278     GST_DEBUG_OBJECT (basesrc, "we are flushing");
3279     GST_LIVE_UNLOCK (basesrc);
3280     return GST_FLOW_FLUSHING;
3281   }
3282 }
3283
3284 static gboolean
3285 gst_base_src_stop (GstBaseSrc * basesrc)
3286 {
3287   GstBaseSrcClass *bclass;
3288   gboolean result = TRUE;
3289
3290   GST_DEBUG_OBJECT (basesrc, "stopping source");
3291
3292   /* flush all */
3293   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3294   /* stop the task */
3295   gst_pad_stop_task (basesrc->srcpad);
3296
3297   GST_LIVE_LOCK (basesrc);
3298   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3299     goto was_stopped;
3300
3301   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3302   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3303   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3304   GST_LIVE_SIGNAL (basesrc);
3305   GST_LIVE_UNLOCK (basesrc);
3306
3307   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3308   if (bclass->stop)
3309     result = bclass->stop (basesrc);
3310
3311   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3312
3313   return result;
3314
3315 was_stopped:
3316   {
3317     GST_DEBUG_OBJECT (basesrc, "was started");
3318     GST_LIVE_UNLOCK (basesrc);
3319     return TRUE;
3320   }
3321 }
3322
3323 /* start or stop flushing dataprocessing
3324  */
3325 static gboolean
3326 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3327     gboolean flushing, gboolean live_play, gboolean * playing)
3328 {
3329   GstBaseSrcClass *bclass;
3330
3331   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3332
3333   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3334
3335   if (flushing) {
3336     gst_base_src_activate_pool (basesrc, FALSE);
3337     /* unlock any subclasses, we need to do this before grabbing the
3338      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3339      * unlock to the params because of backwards compat (see seek handler)*/
3340     if (bclass->unlock)
3341       bclass->unlock (basesrc);
3342   }
3343
3344   /* the live lock is released when we are blocked, waiting for playing or
3345    * when we sync to the clock. */
3346   GST_LIVE_LOCK (basesrc);
3347   if (playing)
3348     *playing = basesrc->live_running;
3349   basesrc->priv->flushing = flushing;
3350   if (flushing) {
3351     /* if we are locked in the live lock, signal it to make it flush */
3352     basesrc->live_running = TRUE;
3353
3354     /* clear pending EOS if any */
3355     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3356
3357     /* step 1, now that we have the LIVE lock, clear our unlock request */
3358     if (bclass->unlock_stop)
3359       bclass->unlock_stop (basesrc);
3360
3361     /* step 2, unblock clock sync (if any) or any other blocking thing */
3362     if (basesrc->clock_id)
3363       gst_clock_id_unschedule (basesrc->clock_id);
3364   } else {
3365     /* signal the live source that it can start playing */
3366     basesrc->live_running = live_play;
3367
3368     gst_base_src_activate_pool (basesrc, TRUE);
3369
3370     /* Drop all delayed events */
3371     GST_OBJECT_LOCK (basesrc);
3372     if (basesrc->priv->pending_events) {
3373       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3374           NULL);
3375       g_list_free (basesrc->priv->pending_events);
3376       basesrc->priv->pending_events = NULL;
3377       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3378     }
3379     GST_OBJECT_UNLOCK (basesrc);
3380   }
3381   GST_LIVE_SIGNAL (basesrc);
3382   GST_LIVE_UNLOCK (basesrc);
3383
3384   return TRUE;
3385 }
3386
3387 /* the purpose of this function is to make sure that a live source blocks in the
3388  * LIVE lock or leaves the LIVE lock and continues playing. */
3389 static gboolean
3390 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3391 {
3392   GstBaseSrcClass *bclass;
3393
3394   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3395
3396   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3397   if (!live_play) {
3398     GST_DEBUG_OBJECT (basesrc, "unlock");
3399     if (bclass->unlock)
3400       bclass->unlock (basesrc);
3401   }
3402
3403   /* we are now able to grab the LIVE lock, when we get it, we can be
3404    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3405    * for the clock. */
3406   GST_LIVE_LOCK (basesrc);
3407   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3408
3409   /* unblock clock sync (if any) */
3410   if (basesrc->clock_id)
3411     gst_clock_id_unschedule (basesrc->clock_id);
3412
3413   /* configure what to do when we get to the LIVE lock. */
3414   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3415   basesrc->live_running = live_play;
3416
3417   if (live_play) {
3418     gboolean start;
3419
3420     /* clear our unlock request when going to PLAYING */
3421     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3422     if (bclass->unlock_stop)
3423       bclass->unlock_stop (basesrc);
3424
3425     /* for live sources we restart the timestamp correction */
3426     basesrc->priv->latency = -1;
3427     /* have to restart the task in case it stopped because of the unlock when
3428      * we went to PAUSED. Only do this if we operating in push mode. */
3429     GST_OBJECT_LOCK (basesrc->srcpad);
3430     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3431     GST_OBJECT_UNLOCK (basesrc->srcpad);
3432     if (start)
3433       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3434           basesrc->srcpad, NULL);
3435     GST_DEBUG_OBJECT (basesrc, "signal");
3436     GST_LIVE_SIGNAL (basesrc);
3437   }
3438   GST_LIVE_UNLOCK (basesrc);
3439
3440   return TRUE;
3441 }
3442
3443 static gboolean
3444 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3445 {
3446   GstBaseSrc *basesrc;
3447
3448   basesrc = GST_BASE_SRC (parent);
3449
3450   /* prepare subclass first */
3451   if (active) {
3452     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3453
3454     if (G_UNLIKELY (!basesrc->can_activate_push))
3455       goto no_push_activation;
3456
3457     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3458       goto error_start;
3459   } else {
3460     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3461     /* now we can stop the source */
3462     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3463       goto error_stop;
3464   }
3465   return TRUE;
3466
3467   /* ERRORS */
3468 no_push_activation:
3469   {
3470     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3471     return FALSE;
3472   }
3473 error_start:
3474   {
3475     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3476     return FALSE;
3477   }
3478 error_stop:
3479   {
3480     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3481     return FALSE;
3482   }
3483 }
3484
3485 static gboolean
3486 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3487 {
3488   GstBaseSrc *basesrc;
3489
3490   basesrc = GST_BASE_SRC (parent);
3491
3492   /* prepare subclass first */
3493   if (active) {
3494     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3495     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3496       goto error_start;
3497   } else {
3498     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3499     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3500       goto error_stop;
3501   }
3502   return TRUE;
3503
3504   /* ERRORS */
3505 error_start:
3506   {
3507     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3508     return FALSE;
3509   }
3510 error_stop:
3511   {
3512     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3513     return FALSE;
3514   }
3515 }
3516
3517 static gboolean
3518 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3519     GstPadMode mode, gboolean active)
3520 {
3521   gboolean res;
3522
3523   switch (mode) {
3524     case GST_PAD_MODE_PULL:
3525       res = gst_base_src_activate_pull (pad, parent, active);
3526       break;
3527     case GST_PAD_MODE_PUSH:
3528       res = gst_base_src_activate_push (pad, parent, active);
3529       break;
3530     default:
3531       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3532       res = FALSE;
3533       break;
3534   }
3535   return res;
3536 }
3537
3538
3539 static GstStateChangeReturn
3540 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3541 {
3542   GstBaseSrc *basesrc;
3543   GstStateChangeReturn result;
3544   gboolean no_preroll = FALSE;
3545
3546   basesrc = GST_BASE_SRC (element);
3547
3548   switch (transition) {
3549     case GST_STATE_CHANGE_NULL_TO_READY:
3550       break;
3551     case GST_STATE_CHANGE_READY_TO_PAUSED:
3552       basesrc->priv->stream_start_pending = TRUE;
3553       no_preroll = gst_base_src_is_live (basesrc);
3554       break;
3555     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3556       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3557       if (gst_base_src_is_live (basesrc)) {
3558         /* now we can start playback */
3559         gst_base_src_set_playing (basesrc, TRUE);
3560       }
3561       break;
3562     default:
3563       break;
3564   }
3565
3566   if ((result =
3567           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3568               transition)) == GST_STATE_CHANGE_FAILURE)
3569     goto failure;
3570
3571   switch (transition) {
3572     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3573       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3574       if (gst_base_src_is_live (basesrc)) {
3575         /* make sure we block in the live lock in PAUSED */
3576         gst_base_src_set_playing (basesrc, FALSE);
3577         no_preroll = TRUE;
3578       }
3579       break;
3580     case GST_STATE_CHANGE_PAUSED_TO_READY:
3581     {
3582       /* we don't need to unblock anything here, the pad deactivation code
3583        * already did this */
3584       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3585       gst_event_replace (&basesrc->pending_seek, NULL);
3586       basesrc->priv->stream_start_pending = FALSE;
3587       break;
3588     }
3589     case GST_STATE_CHANGE_READY_TO_NULL:
3590       break;
3591     default:
3592       break;
3593   }
3594
3595   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3596     result = GST_STATE_CHANGE_NO_PREROLL;
3597
3598   return result;
3599
3600   /* ERRORS */
3601 failure:
3602   {
3603     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3604     return result;
3605   }
3606 }