b90b942a308dcdc775d5314249ee275dc00b0831
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_SEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *
125  *   gst_element_class_set_static_metadata (gstelement_class,
126  *      "Source name",
127  *      "Source",
128  *      "My Source element",
129  *      "The author &lt;my.sink@my.email&gt;");
130  * }
131  * ]|
132  *
133  * <refsect2>
134  * <title>Controlled shutdown of live sources in applications</title>
135  * <para>
136  * Applications that record from a live source may want to stop recording
137  * in a controlled way, so that the recording is stopped, but the data
138  * already in the pipeline is processed to the end (remember that many live
139  * sources would go on recording forever otherwise). For that to happen the
140  * application needs to make the source stop recording and send an EOS
141  * event down the pipeline. The application would then wait for an
142  * EOS message posted on the pipeline's bus to know when all data has
143  * been processed and the pipeline can safely be stopped.
144  *
145  * An application may send an EOS event to a source element to make it
146  * perform the EOS logic (send EOS event downstream or post a
147  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
148  * with the gst_element_send_event() function on the element or its parent bin.
149  *
150  * After the EOS has been sent to the element, the application should wait for
151  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
152  * received, it may safely shut down the entire pipeline.
153  *
154  * Last reviewed on 2007-12-19 (0.10.16)
155  * </para>
156  * </refsect2>
157  */
158
159 #ifdef HAVE_CONFIG_H
160 #  include "config.h"
161 #endif
162
163 #include <stdlib.h>
164 #include <string.h>
165
166 #include <gst/gst_private.h>
167 #include <gst/glib-compat-private.h>
168
169 #include "gstbasesrc.h"
170 #include "gsttypefindhelper.h"
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
183 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
184 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
185
186
187 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
188 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
189 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
190
191
192 /* BaseSrc signals and args */
193 enum
194 {
195   /* FILL ME */
196   LAST_SIGNAL
197 };
198
199 #define DEFAULT_BLOCKSIZE       4096
200 #define DEFAULT_NUM_BUFFERS     -1
201 #define DEFAULT_TYPEFIND        FALSE
202 #define DEFAULT_DO_TIMESTAMP    FALSE
203
204 enum
205 {
206   PROP_0,
207   PROP_BLOCKSIZE,
208   PROP_NUM_BUFFERS,
209   PROP_TYPEFIND,
210   PROP_DO_TIMESTAMP
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean discont;
219   gboolean flushing;
220
221   GstFlowReturn start_result;
222   gboolean async;
223
224   /* if a stream-start event should be sent */
225   gboolean stream_start_pending;
226
227   /* if segment should be sent */
228   gboolean segment_pending;
229
230   /* if EOS is pending (atomic) */
231   gint pending_eos;
232
233   /* startup latency is the time it takes between going to PLAYING and producing
234    * the first BUFFER with running_time 0. This value is included in the latency
235    * reporting. */
236   GstClockTime latency;
237   /* timestamp offset, this is the offset add to the values of gst_times for
238    * pseudo live sources */
239   GstClockTimeDiff ts_offset;
240
241   gboolean do_timestamp;
242   volatile gint dynamic_size;
243
244   /* stream sequence number */
245   guint32 seqnum;
246
247   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
248    * pushed in the data stream */
249   GList *pending_events;
250   volatile gint have_events;
251
252   /* QoS *//* with LOCK */
253   gboolean qos_enabled;
254   gdouble proportion;
255   GstClockTime earliest_time;
256
257   GstBufferPool *pool;
258   GstAllocator *allocator;
259   GstAllocationParams params;
260
261   GCond async_cond;
262 };
263
264 static GstElementClass *parent_class = NULL;
265
266 static void gst_base_src_class_init (GstBaseSrcClass * klass);
267 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
268 static void gst_base_src_finalize (GObject * object);
269
270
271 GType
272 gst_base_src_get_type (void)
273 {
274   static volatile gsize base_src_type = 0;
275
276   if (g_once_init_enter (&base_src_type)) {
277     GType _type;
278     static const GTypeInfo base_src_info = {
279       sizeof (GstBaseSrcClass),
280       NULL,
281       NULL,
282       (GClassInitFunc) gst_base_src_class_init,
283       NULL,
284       NULL,
285       sizeof (GstBaseSrc),
286       0,
287       (GInstanceInitFunc) gst_base_src_init,
288     };
289
290     _type = g_type_register_static (GST_TYPE_ELEMENT,
291         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
292     g_once_init_leave (&base_src_type, _type);
293   }
294   return base_src_type;
295 }
296
297 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
298     GstCaps * filter);
299 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
300 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
301
302 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
303 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
304     GstPadMode mode, gboolean active);
305 static void gst_base_src_set_property (GObject * object, guint prop_id,
306     const GValue * value, GParamSpec * pspec);
307 static void gst_base_src_get_property (GObject * object, guint prop_id,
308     GValue * value, GParamSpec * pspec);
309 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
310     GstEvent * event);
311 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
312 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
313
314 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
315     GstQuery * query);
316
317 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
318     gboolean active);
319 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
321     GstSegment * segment);
322 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
323 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
324     GstEvent * event, GstSegment * segment);
325 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
326     guint64 offset, guint size, GstBuffer ** buf);
327 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
328     guint64 offset, guint size, GstBuffer ** buf);
329 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
330     GstQuery * query);
331
332 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
333     gboolean flushing, gboolean live_play, gboolean * playing);
334
335 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
337
338 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
339     GstStateChange transition);
340
341 static void gst_base_src_loop (GstPad * pad);
342 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
343     guint64 offset, guint length, GstBuffer ** buf);
344 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
345     guint length, GstBuffer ** buf);
346 static gboolean gst_base_src_seekable (GstBaseSrc * src);
347 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
348 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
349     guint * length, gboolean force);
350
351 static void
352 gst_base_src_class_init (GstBaseSrcClass * klass)
353 {
354   GObjectClass *gobject_class;
355   GstElementClass *gstelement_class;
356
357   gobject_class = G_OBJECT_CLASS (klass);
358   gstelement_class = GST_ELEMENT_CLASS (klass);
359
360   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
361
362   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
363
364   parent_class = g_type_class_peek_parent (klass);
365
366   gobject_class->finalize = gst_base_src_finalize;
367   gobject_class->set_property = gst_base_src_set_property;
368   gobject_class->get_property = gst_base_src_get_property;
369
370   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
371       g_param_spec_uint ("blocksize", "Block size",
372           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
373           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
375       g_param_spec_int ("num-buffers", "num-buffers",
376           "Number of buffers to output before sending EOS (-1 = unlimited)",
377           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
378           G_PARAM_STATIC_STRINGS));
379   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
380       g_param_spec_boolean ("typefind", "Typefind",
381           "Run typefind before negotiating", DEFAULT_TYPEFIND,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
384       g_param_spec_boolean ("do-timestamp", "Do timestamp",
385           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   gstelement_class->change_state =
389       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
390   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
391
392   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
393   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
394   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
395   klass->prepare_seek_segment =
396       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
397   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
398   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
399   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
400   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
401   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
402   klass->decide_allocation =
403       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
404
405   /* Registering debug symbols for function pointers */
406   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
407   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
408   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
409   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
410   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
411 }
412
413 static void
414 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
415 {
416   GstPad *pad;
417   GstPadTemplate *pad_template;
418
419   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
420
421   basesrc->is_live = FALSE;
422   g_mutex_init (&basesrc->live_lock);
423   g_cond_init (&basesrc->live_cond);
424   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
425   basesrc->num_buffers_left = -1;
426
427   basesrc->can_activate_push = TRUE;
428
429   pad_template =
430       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
431   g_return_if_fail (pad_template != NULL);
432
433   GST_DEBUG_OBJECT (basesrc, "creating src pad");
434   pad = gst_pad_new_from_template (pad_template, "src");
435
436   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
437   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
438   gst_pad_set_event_function (pad, gst_base_src_event);
439   gst_pad_set_query_function (pad, gst_base_src_query);
440   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
441
442   /* hold pointer to pad */
443   basesrc->srcpad = pad;
444   GST_DEBUG_OBJECT (basesrc, "adding src pad");
445   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
446
447   basesrc->blocksize = DEFAULT_BLOCKSIZE;
448   basesrc->clock_id = NULL;
449   /* we operate in BYTES by default */
450   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
451   basesrc->typefind = DEFAULT_TYPEFIND;
452   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
453   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
454
455   g_cond_init (&basesrc->priv->async_cond);
456   basesrc->priv->start_result = GST_FLOW_FLUSHING;
457   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
458   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
459   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
460
461   GST_DEBUG_OBJECT (basesrc, "init done");
462 }
463
464 static void
465 gst_base_src_finalize (GObject * object)
466 {
467   GstBaseSrc *basesrc;
468   GstEvent **event_p;
469
470   basesrc = GST_BASE_SRC (object);
471
472   g_mutex_clear (&basesrc->live_lock);
473   g_cond_clear (&basesrc->live_cond);
474   g_cond_clear (&basesrc->priv->async_cond);
475
476   event_p = &basesrc->pending_seek;
477   gst_event_replace (event_p, NULL);
478
479   if (basesrc->priv->pending_events) {
480     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
481         NULL);
482     g_list_free (basesrc->priv->pending_events);
483   }
484
485   G_OBJECT_CLASS (parent_class)->finalize (object);
486 }
487
488 /**
489  * gst_base_src_wait_playing:
490  * @src: the src
491  *
492  * If the #GstBaseSrcClass.create() method performs its own synchronisation
493  * against the clock it must unblock when going from PLAYING to the PAUSED state
494  * and call this method before continuing to produce the remaining data.
495  *
496  * This function will block until a state change to PLAYING happens (in which
497  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
498  * to a state change to READY or a FLUSH event (in which case this function
499  * returns #GST_FLOW_FLUSHING).
500  *
501  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
502  * continue. Any other return value should be returned from the create vmethod.
503  */
504 GstFlowReturn
505 gst_base_src_wait_playing (GstBaseSrc * src)
506 {
507   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
508
509   do {
510     /* block until the state changes, or we get a flush, or something */
511     GST_DEBUG_OBJECT (src, "live source waiting for running state");
512     GST_LIVE_WAIT (src);
513     GST_DEBUG_OBJECT (src, "live source unlocked");
514     if (src->priv->flushing)
515       goto flushing;
516   } while (G_UNLIKELY (!src->live_running));
517
518   return GST_FLOW_OK;
519
520   /* ERRORS */
521 flushing:
522   {
523     GST_DEBUG_OBJECT (src, "we are flushing");
524     return GST_FLOW_FLUSHING;
525   }
526 }
527
528 /**
529  * gst_base_src_set_live:
530  * @src: base source instance
531  * @live: new live-mode
532  *
533  * If the element listens to a live source, @live should
534  * be set to %TRUE.
535  *
536  * A live source will not produce data in the PAUSED state and
537  * will therefore not be able to participate in the PREROLL phase
538  * of a pipeline. To signal this fact to the application and the
539  * pipeline, the state change return value of the live source will
540  * be GST_STATE_CHANGE_NO_PREROLL.
541  */
542 void
543 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
544 {
545   g_return_if_fail (GST_IS_BASE_SRC (src));
546
547   GST_OBJECT_LOCK (src);
548   src->is_live = live;
549   GST_OBJECT_UNLOCK (src);
550 }
551
552 /**
553  * gst_base_src_is_live:
554  * @src: base source instance
555  *
556  * Check if an element is in live mode.
557  *
558  * Returns: %TRUE if element is in live mode.
559  */
560 gboolean
561 gst_base_src_is_live (GstBaseSrc * src)
562 {
563   gboolean result;
564
565   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
566
567   GST_OBJECT_LOCK (src);
568   result = src->is_live;
569   GST_OBJECT_UNLOCK (src);
570
571   return result;
572 }
573
574 /**
575  * gst_base_src_set_format:
576  * @src: base source instance
577  * @format: the format to use
578  *
579  * Sets the default format of the source. This will be the format used
580  * for sending SEGMENT events and for performing seeks.
581  *
582  * If a format of GST_FORMAT_BYTES is set, the element will be able to
583  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
584  *
585  * This function must only be called in states < %GST_STATE_PAUSED.
586  */
587 void
588 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
589 {
590   g_return_if_fail (GST_IS_BASE_SRC (src));
591   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
592
593   GST_OBJECT_LOCK (src);
594   gst_segment_init (&src->segment, format);
595   GST_OBJECT_UNLOCK (src);
596 }
597
598 /**
599  * gst_base_src_set_dynamic_size:
600  * @src: base source instance
601  * @dynamic: new dynamic size mode
602  *
603  * If not @dynamic, size is only updated when needed, such as when trying to
604  * read past current tracked size.  Otherwise, size is checked for upon each
605  * read.
606  */
607 void
608 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
609 {
610   g_return_if_fail (GST_IS_BASE_SRC (src));
611
612   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
613 }
614
615 /**
616  * gst_base_src_set_async:
617  * @src: base source instance
618  * @async: new async mode
619  *
620  * Configure async behaviour in @src, no state change will block. The open,
621  * close, start, stop, play and pause virtual methods will be executed in a
622  * different thread and are thus allowed to perform blocking operations. Any
623  * blocking operation should be unblocked with the unlock vmethod.
624  */
625 void
626 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
627 {
628   g_return_if_fail (GST_IS_BASE_SRC (src));
629
630   GST_OBJECT_LOCK (src);
631   src->priv->async = async;
632   GST_OBJECT_UNLOCK (src);
633 }
634
635 /**
636  * gst_base_src_is_async:
637  * @src: base source instance
638  *
639  * Get the current async behaviour of @src. See also gst_base_src_set_async().
640  *
641  * Returns: %TRUE if @src is operating in async mode.
642  */
643 gboolean
644 gst_base_src_is_async (GstBaseSrc * src)
645 {
646   gboolean res;
647
648   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
649
650   GST_OBJECT_LOCK (src);
651   res = src->priv->async;
652   GST_OBJECT_UNLOCK (src);
653
654   return res;
655 }
656
657
658 /**
659  * gst_base_src_query_latency:
660  * @src: the source
661  * @live: (out) (allow-none): if the source is live
662  * @min_latency: (out) (allow-none): the min latency of the source
663  * @max_latency: (out) (allow-none): the max latency of the source
664  *
665  * Query the source for the latency parameters. @live will be TRUE when @src is
666  * configured as a live source. @min_latency will be set to the difference
667  * between the running time and the timestamp of the first buffer.
668  * @max_latency is always the undefined value of -1.
669  *
670  * This function is mostly used by subclasses.
671  *
672  * Returns: TRUE if the query succeeded.
673  */
674 gboolean
675 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
676     GstClockTime * min_latency, GstClockTime * max_latency)
677 {
678   GstClockTime min;
679
680   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
681
682   GST_OBJECT_LOCK (src);
683   if (live)
684     *live = src->is_live;
685
686   /* if we have a startup latency, report this one, else report 0. Subclasses
687    * are supposed to override the query function if they want something
688    * else. */
689   if (src->priv->latency != -1)
690     min = src->priv->latency;
691   else
692     min = 0;
693
694   if (min_latency)
695     *min_latency = min;
696   if (max_latency)
697     *max_latency = -1;
698
699   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
700       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
701       GST_TIME_ARGS (-1));
702   GST_OBJECT_UNLOCK (src);
703
704   return TRUE;
705 }
706
707 /**
708  * gst_base_src_set_blocksize:
709  * @src: the source
710  * @blocksize: the new blocksize in bytes
711  *
712  * Set the number of bytes that @src will push out with each buffer. When
713  * @blocksize is set to -1, a default length will be used.
714  */
715 void
716 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
717 {
718   g_return_if_fail (GST_IS_BASE_SRC (src));
719
720   GST_OBJECT_LOCK (src);
721   src->blocksize = blocksize;
722   GST_OBJECT_UNLOCK (src);
723 }
724
725 /**
726  * gst_base_src_get_blocksize:
727  * @src: the source
728  *
729  * Get the number of bytes that @src will push out with each buffer.
730  *
731  * Returns: the number of bytes pushed with each buffer.
732  */
733 guint
734 gst_base_src_get_blocksize (GstBaseSrc * src)
735 {
736   gint res;
737
738   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
739
740   GST_OBJECT_LOCK (src);
741   res = src->blocksize;
742   GST_OBJECT_UNLOCK (src);
743
744   return res;
745 }
746
747
748 /**
749  * gst_base_src_set_do_timestamp:
750  * @src: the source
751  * @timestamp: enable or disable timestamping
752  *
753  * Configure @src to automatically timestamp outgoing buffers based on the
754  * current running_time of the pipeline. This property is mostly useful for live
755  * sources.
756  */
757 void
758 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
759 {
760   g_return_if_fail (GST_IS_BASE_SRC (src));
761
762   GST_OBJECT_LOCK (src);
763   src->priv->do_timestamp = timestamp;
764   GST_OBJECT_UNLOCK (src);
765 }
766
767 /**
768  * gst_base_src_get_do_timestamp:
769  * @src: the source
770  *
771  * Query if @src timestamps outgoing buffers based on the current running_time.
772  *
773  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
774  */
775 gboolean
776 gst_base_src_get_do_timestamp (GstBaseSrc * src)
777 {
778   gboolean res;
779
780   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
781
782   GST_OBJECT_LOCK (src);
783   res = src->priv->do_timestamp;
784   GST_OBJECT_UNLOCK (src);
785
786   return res;
787 }
788
789 /**
790  * gst_base_src_new_seamless_segment:
791  * @src: The source
792  * @start: The new start value for the segment
793  * @stop: Stop value for the new segment
794  * @time: The new time value for the start of the new segent
795  *
796  * Prepare a new seamless segment for emission downstream. This function must
797  * only be called by derived sub-classes, and only from the create() function,
798  * as the stream-lock needs to be held.
799  *
800  * The format for the new segment will be the current format of the source, as
801  * configured with gst_base_src_set_format()
802  *
803  * Returns: %TRUE if preparation of the seamless segment succeeded.
804  */
805 gboolean
806 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
807     gint64 time)
808 {
809   gboolean res = TRUE;
810
811   GST_OBJECT_LOCK (src);
812
813   src->segment.base = gst_segment_to_running_time (&src->segment,
814       src->segment.format, src->segment.position);
815   src->segment.position = src->segment.start = start;
816   src->segment.stop = stop;
817   src->segment.time = time;
818
819   /* Mark pending segment. Will be sent before next data */
820   src->priv->segment_pending = TRUE;
821
822   GST_DEBUG_OBJECT (src,
823       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
824       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
825       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
826       GST_TIME_ARGS (src->segment.base));
827
828   GST_OBJECT_UNLOCK (src);
829
830   src->priv->discont = TRUE;
831   src->running = TRUE;
832
833   return res;
834 }
835
836 static gboolean
837 gst_base_src_send_stream_start (GstBaseSrc * src)
838 {
839   gboolean ret = TRUE;
840
841   if (src->priv->stream_start_pending) {
842     gchar *stream_id;
843     GstEvent *event;
844
845     stream_id =
846         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
847
848     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
849     event = gst_event_new_stream_start (stream_id);
850     gst_event_set_group_id (event, gst_util_group_id_next ());
851
852     ret = gst_pad_push_event (src->srcpad, event);
853     src->priv->stream_start_pending = FALSE;
854     g_free (stream_id);
855   }
856
857   return ret;
858 }
859
860 /**
861  * gst_base_src_set_caps:
862  * @src: a #GstBaseSrc
863  * @caps: a #GstCaps
864  *
865  * Set new caps on the basesrc source pad.
866  *
867  * Returns: %TRUE if the caps could be set
868  */
869 gboolean
870 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
871 {
872   GstBaseSrcClass *bclass;
873   gboolean res = TRUE;
874
875   bclass = GST_BASE_SRC_GET_CLASS (src);
876
877   gst_base_src_send_stream_start (src);
878
879   if (bclass->set_caps)
880     res = bclass->set_caps (src, caps);
881
882   if (res)
883     res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
884
885   return res;
886 }
887
888 static GstCaps *
889 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
890 {
891   GstCaps *caps = NULL;
892   GstPadTemplate *pad_template;
893   GstBaseSrcClass *bclass;
894
895   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
896
897   pad_template =
898       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
899
900   if (pad_template != NULL) {
901     caps = gst_pad_template_get_caps (pad_template);
902
903     if (filter) {
904       GstCaps *intersection;
905
906       intersection =
907           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
908       gst_caps_unref (caps);
909       caps = intersection;
910     }
911   }
912   return caps;
913 }
914
915 static GstCaps *
916 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
917 {
918   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
919   return gst_caps_fixate (caps);
920 }
921
922 static GstCaps *
923 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
924 {
925   GstBaseSrcClass *bclass;
926
927   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
928
929   if (bclass->fixate)
930     caps = bclass->fixate (bsrc, caps);
931
932   return caps;
933 }
934
935 static gboolean
936 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
937 {
938   gboolean res;
939
940   switch (GST_QUERY_TYPE (query)) {
941     case GST_QUERY_POSITION:
942     {
943       GstFormat format;
944
945       gst_query_parse_position (query, &format, NULL);
946
947       GST_DEBUG_OBJECT (src, "position query in format %s",
948           gst_format_get_name (format));
949
950       switch (format) {
951         case GST_FORMAT_PERCENT:
952         {
953           gint64 percent;
954           gint64 position;
955           gint64 duration;
956
957           GST_OBJECT_LOCK (src);
958           position = src->segment.position;
959           duration = src->segment.duration;
960           GST_OBJECT_UNLOCK (src);
961
962           if (position != -1 && duration != -1) {
963             if (position < duration)
964               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
965                   duration);
966             else
967               percent = GST_FORMAT_PERCENT_MAX;
968           } else
969             percent = -1;
970
971           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
972           res = TRUE;
973           break;
974         }
975         default:
976         {
977           gint64 position;
978           GstFormat seg_format;
979
980           GST_OBJECT_LOCK (src);
981           position =
982               gst_segment_to_stream_time (&src->segment, src->segment.format,
983               src->segment.position);
984           seg_format = src->segment.format;
985           GST_OBJECT_UNLOCK (src);
986
987           if (position != -1) {
988             /* convert to requested format */
989             res =
990                 gst_pad_query_convert (src->srcpad, seg_format,
991                 position, format, &position);
992           } else
993             res = TRUE;
994
995           gst_query_set_position (query, format, position);
996           break;
997         }
998       }
999       break;
1000     }
1001     case GST_QUERY_DURATION:
1002     {
1003       GstFormat format;
1004
1005       gst_query_parse_duration (query, &format, NULL);
1006
1007       GST_DEBUG_OBJECT (src, "duration query in format %s",
1008           gst_format_get_name (format));
1009
1010       switch (format) {
1011         case GST_FORMAT_PERCENT:
1012           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1013               GST_FORMAT_PERCENT_MAX);
1014           res = TRUE;
1015           break;
1016         default:
1017         {
1018           gint64 duration;
1019           GstFormat seg_format;
1020           guint length = 0;
1021
1022           /* may have to refresh duration */
1023           gst_base_src_update_length (src, 0, &length,
1024               g_atomic_int_get (&src->priv->dynamic_size));
1025
1026           /* this is the duration as configured by the subclass. */
1027           GST_OBJECT_LOCK (src);
1028           duration = src->segment.duration;
1029           seg_format = src->segment.format;
1030           GST_OBJECT_UNLOCK (src);
1031
1032           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1033               duration, gst_format_get_name (seg_format));
1034
1035           if (duration != -1) {
1036             /* convert to requested format, if this fails, we have a duration
1037              * but we cannot answer the query, we must return FALSE. */
1038             res =
1039                 gst_pad_query_convert (src->srcpad, seg_format,
1040                 duration, format, &duration);
1041           } else {
1042             /* The subclass did not configure a duration, we assume that the
1043              * media has an unknown duration then and we return TRUE to report
1044              * this. Note that this is not the same as returning FALSE, which
1045              * means that we cannot report the duration at all. */
1046             res = TRUE;
1047           }
1048           gst_query_set_duration (query, format, duration);
1049           break;
1050         }
1051       }
1052       break;
1053     }
1054
1055     case GST_QUERY_SEEKING:
1056     {
1057       GstFormat format, seg_format;
1058       gint64 duration;
1059
1060       GST_OBJECT_LOCK (src);
1061       duration = src->segment.duration;
1062       seg_format = src->segment.format;
1063       GST_OBJECT_UNLOCK (src);
1064
1065       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1066       if (format == seg_format) {
1067         gst_query_set_seeking (query, seg_format,
1068             gst_base_src_seekable (src), 0, duration);
1069         res = TRUE;
1070       } else {
1071         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1072         /* Don't reply to the query to make up for demuxers which don't
1073          * handle the SEEKING query yet. Players like Totem will fall back
1074          * to the duration when the SEEKING query isn't answered. */
1075         res = FALSE;
1076       }
1077       break;
1078     }
1079     case GST_QUERY_SEGMENT:
1080     {
1081       gint64 start, stop;
1082
1083       GST_OBJECT_LOCK (src);
1084       /* no end segment configured, current duration then */
1085       if ((stop = src->segment.stop) == -1)
1086         stop = src->segment.duration;
1087       start = src->segment.start;
1088
1089       /* adjust to stream time */
1090       if (src->segment.time != -1) {
1091         start -= src->segment.time;
1092         if (stop != -1)
1093           stop -= src->segment.time;
1094       }
1095
1096       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1097           start, stop);
1098       GST_OBJECT_UNLOCK (src);
1099       res = TRUE;
1100       break;
1101     }
1102
1103     case GST_QUERY_FORMATS:
1104     {
1105       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1106           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1107       res = TRUE;
1108       break;
1109     }
1110     case GST_QUERY_CONVERT:
1111     {
1112       GstFormat src_fmt, dest_fmt;
1113       gint64 src_val, dest_val;
1114
1115       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1116
1117       /* we can only convert between equal formats... */
1118       if (src_fmt == dest_fmt) {
1119         dest_val = src_val;
1120         res = TRUE;
1121       } else
1122         res = FALSE;
1123
1124       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1125       break;
1126     }
1127     case GST_QUERY_LATENCY:
1128     {
1129       GstClockTime min, max;
1130       gboolean live;
1131
1132       /* Subclasses should override and implement something useful */
1133       res = gst_base_src_query_latency (src, &live, &min, &max);
1134
1135       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1136           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1137           GST_TIME_ARGS (max));
1138
1139       gst_query_set_latency (query, live, min, max);
1140       break;
1141     }
1142     case GST_QUERY_JITTER:
1143     case GST_QUERY_RATE:
1144       res = FALSE;
1145       break;
1146     case GST_QUERY_BUFFERING:
1147     {
1148       GstFormat format, seg_format;
1149       gint64 start, stop, estimated;
1150
1151       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1152
1153       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1154           gst_format_get_name (format));
1155
1156       GST_OBJECT_LOCK (src);
1157       if (src->random_access) {
1158         estimated = 0;
1159         start = 0;
1160         if (format == GST_FORMAT_PERCENT)
1161           stop = GST_FORMAT_PERCENT_MAX;
1162         else
1163           stop = src->segment.duration;
1164       } else {
1165         estimated = -1;
1166         start = -1;
1167         stop = -1;
1168       }
1169       seg_format = src->segment.format;
1170       GST_OBJECT_UNLOCK (src);
1171
1172       /* convert to required format. When the conversion fails, we can't answer
1173        * the query. When the value is unknown, we can don't perform conversion
1174        * but report TRUE. */
1175       if (format != GST_FORMAT_PERCENT && stop != -1) {
1176         res = gst_pad_query_convert (src->srcpad, seg_format,
1177             stop, format, &stop);
1178       } else {
1179         res = TRUE;
1180       }
1181       if (res && format != GST_FORMAT_PERCENT && start != -1)
1182         res = gst_pad_query_convert (src->srcpad, seg_format,
1183             start, format, &start);
1184
1185       gst_query_set_buffering_range (query, format, start, stop, estimated);
1186       break;
1187     }
1188     case GST_QUERY_SCHEDULING:
1189     {
1190       gboolean random_access;
1191
1192       random_access = gst_base_src_is_random_access (src);
1193
1194       /* we can operate in getrange mode if the native format is bytes
1195        * and we are seekable, this condition is set in the random_access
1196        * flag and is set in the _start() method. */
1197       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1198       if (random_access)
1199         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1200       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1201
1202       res = TRUE;
1203       break;
1204     }
1205     case GST_QUERY_CAPS:
1206     {
1207       GstBaseSrcClass *bclass;
1208       GstCaps *caps, *filter;
1209
1210       bclass = GST_BASE_SRC_GET_CLASS (src);
1211       if (bclass->get_caps) {
1212         gst_query_parse_caps (query, &filter);
1213         if ((caps = bclass->get_caps (src, filter))) {
1214           gst_query_set_caps_result (query, caps);
1215           gst_caps_unref (caps);
1216           res = TRUE;
1217         } else {
1218           res = FALSE;
1219         }
1220       } else
1221         res = FALSE;
1222       break;
1223     }
1224     case GST_QUERY_URI:{
1225       if (GST_IS_URI_HANDLER (src)) {
1226         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1227
1228         if (uri != NULL) {
1229           gst_query_set_uri (query, uri);
1230           g_free (uri);
1231           res = TRUE;
1232         } else {
1233           res = FALSE;
1234         }
1235       } else {
1236         res = FALSE;
1237       }
1238       break;
1239     }
1240     default:
1241       res = FALSE;
1242       break;
1243   }
1244   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1245       res);
1246
1247   return res;
1248 }
1249
1250 static gboolean
1251 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1252 {
1253   GstBaseSrc *src;
1254   GstBaseSrcClass *bclass;
1255   gboolean result = FALSE;
1256
1257   src = GST_BASE_SRC (parent);
1258   bclass = GST_BASE_SRC_GET_CLASS (src);
1259
1260   if (bclass->query)
1261     result = bclass->query (src, query);
1262
1263   return result;
1264 }
1265
1266 static gboolean
1267 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1268 {
1269   gboolean res = TRUE;
1270
1271   /* update our offset if the start/stop position was updated */
1272   if (segment->format == GST_FORMAT_BYTES) {
1273     segment->time = segment->start;
1274   } else if (segment->start == 0) {
1275     /* seek to start, we can implement a default for this. */
1276     segment->time = 0;
1277   } else {
1278     res = FALSE;
1279     GST_INFO_OBJECT (src, "Can't do a default seek");
1280   }
1281
1282   return res;
1283 }
1284
1285 static gboolean
1286 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1287 {
1288   GstBaseSrcClass *bclass;
1289   gboolean result = FALSE;
1290
1291   bclass = GST_BASE_SRC_GET_CLASS (src);
1292
1293   GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
1294
1295   if (bclass->do_seek)
1296     result = bclass->do_seek (src, segment);
1297
1298   return result;
1299 }
1300
1301 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1302
1303 static gboolean
1304 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1305     GstSegment * segment)
1306 {
1307   /* By default, we try one of 2 things:
1308    *   - For absolute seek positions, convert the requested position to our
1309    *     configured processing format and place it in the output segment \
1310    *   - For relative seek positions, convert our current (input) values to the
1311    *     seek format, adjust by the relative seek offset and then convert back to
1312    *     the processing format
1313    */
1314   GstSeekType start_type, stop_type;
1315   gint64 start, stop;
1316   GstSeekFlags flags;
1317   GstFormat seek_format, dest_format;
1318   gdouble rate;
1319   gboolean update;
1320   gboolean res = TRUE;
1321
1322   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1323       &start_type, &start, &stop_type, &stop);
1324   dest_format = segment->format;
1325
1326   if (seek_format == dest_format) {
1327     gst_segment_do_seek (segment, rate, seek_format, flags,
1328         start_type, start, stop_type, stop, &update);
1329     return TRUE;
1330   }
1331
1332   if (start_type != GST_SEEK_TYPE_NONE) {
1333     /* FIXME: Handle seek_end by converting the input segment vals */
1334     res =
1335         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1336         &start);
1337     start_type = GST_SEEK_TYPE_SET;
1338   }
1339
1340   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1341     /* FIXME: Handle seek_end by converting the input segment vals */
1342     res =
1343         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1344         &stop);
1345     stop_type = GST_SEEK_TYPE_SET;
1346   }
1347
1348   /* And finally, configure our output segment in the desired format */
1349   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1350       stop_type, stop, &update);
1351
1352   if (!res)
1353     goto no_format;
1354
1355   return res;
1356
1357 no_format:
1358   {
1359     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1360     return FALSE;
1361   }
1362 }
1363
1364 static gboolean
1365 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1366     GstSegment * seeksegment)
1367 {
1368   GstBaseSrcClass *bclass;
1369   gboolean result = FALSE;
1370
1371   bclass = GST_BASE_SRC_GET_CLASS (src);
1372
1373   if (bclass->prepare_seek_segment)
1374     result = bclass->prepare_seek_segment (src, event, seeksegment);
1375
1376   return result;
1377 }
1378
1379 static GstFlowReturn
1380 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1381     guint size, GstBuffer ** buffer)
1382 {
1383   GstFlowReturn ret;
1384   GstBaseSrcPrivate *priv = src->priv;
1385
1386   if (priv->pool) {
1387     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1388   } else if (size != -1) {
1389     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1390     if (G_UNLIKELY (*buffer == NULL))
1391       goto alloc_failed;
1392
1393     ret = GST_FLOW_OK;
1394   } else {
1395     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1396         size);
1397     goto alloc_failed;
1398   }
1399   return ret;
1400
1401   /* ERRORS */
1402 alloc_failed:
1403   {
1404     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1405     return GST_FLOW_ERROR;
1406   }
1407 }
1408
1409 static GstFlowReturn
1410 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1411     guint size, GstBuffer ** buffer)
1412 {
1413   GstBaseSrcClass *bclass;
1414   GstFlowReturn ret;
1415   GstBuffer *res_buf;
1416
1417   bclass = GST_BASE_SRC_GET_CLASS (src);
1418
1419   if (G_UNLIKELY (!bclass->alloc))
1420     goto no_function;
1421   if (G_UNLIKELY (!bclass->fill))
1422     goto no_function;
1423
1424   if (*buffer == NULL) {
1425     /* downstream did not provide us with a buffer to fill, allocate one
1426      * ourselves */
1427     ret = bclass->alloc (src, offset, size, &res_buf);
1428     if (G_UNLIKELY (ret != GST_FLOW_OK))
1429       goto alloc_failed;
1430   } else {
1431     res_buf = *buffer;
1432   }
1433
1434   if (G_LIKELY (size > 0)) {
1435     /* only call fill when there is a size */
1436     ret = bclass->fill (src, offset, size, res_buf);
1437     if (G_UNLIKELY (ret != GST_FLOW_OK))
1438       goto not_ok;
1439   }
1440
1441   *buffer = res_buf;
1442
1443   return GST_FLOW_OK;
1444
1445   /* ERRORS */
1446 no_function:
1447   {
1448     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1449     return GST_FLOW_NOT_SUPPORTED;
1450   }
1451 alloc_failed:
1452   {
1453     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1454     return ret;
1455   }
1456 not_ok:
1457   {
1458     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1459         gst_flow_get_name (ret));
1460     if (*buffer == NULL)
1461       gst_buffer_unref (res_buf);
1462     return ret;
1463   }
1464 }
1465
1466 /* this code implements the seeking. It is a good example
1467  * handling all cases.
1468  *
1469  * A seek updates the currently configured segment.start
1470  * and segment.stop values based on the SEEK_TYPE. If the
1471  * segment.start value is updated, a seek to this new position
1472  * should be performed.
1473  *
1474  * The seek can only be executed when we are not currently
1475  * streaming any data, to make sure that this is the case, we
1476  * acquire the STREAM_LOCK which is taken when we are in the
1477  * _loop() function or when a getrange() is called. Normally
1478  * we will not receive a seek if we are operating in pull mode
1479  * though. When we operate as a live source we might block on the live
1480  * cond, which does not release the STREAM_LOCK. Therefore we will try
1481  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1482  * safe to perform the seek.
1483  *
1484  * When we are in the loop() function, we might be in the middle
1485  * of pushing a buffer, which might block in a sink. To make sure
1486  * that the push gets unblocked we push out a FLUSH_START event.
1487  * Our loop function will get a FLUSHING return value from
1488  * the push and will pause, effectively releasing the STREAM_LOCK.
1489  *
1490  * For a non-flushing seek, we pause the task, which might eventually
1491  * release the STREAM_LOCK. We say eventually because when the sink
1492  * blocks on the sample we might wait a very long time until the sink
1493  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1494  * can continue the seek. A non-flushing seek is normally done in a
1495  * running pipeline to perform seamless playback, this means that the sink is
1496  * PLAYING and will return from its chain function.
1497  * In the case of a non-flushing seek we need to make sure that the
1498  * data we output after the seek is continuous with the previous data,
1499  * this is because a non-flushing seek does not reset the running-time
1500  * to 0. We do this by closing the currently running segment, ie. sending
1501  * a new_segment event with the stop position set to the last processed
1502  * position.
1503  *
1504  * After updating the segment.start/stop values, we prepare for
1505  * streaming again. We push out a FLUSH_STOP to make the peer pad
1506  * accept data again and we start our task again.
1507  *
1508  * A segment seek posts a message on the bus saying that the playback
1509  * of the segment started. We store the segment flag internally because
1510  * when we reach the segment.stop we have to post a segment.done
1511  * instead of EOS when doing a segment seek.
1512  */
1513 static gboolean
1514 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1515 {
1516   gboolean res = TRUE, tres;
1517   gdouble rate;
1518   GstFormat seek_format, dest_format;
1519   GstSeekFlags flags;
1520   GstSeekType start_type, stop_type;
1521   gint64 start, stop;
1522   gboolean flush, playing;
1523   gboolean update;
1524   gboolean relative_seek = FALSE;
1525   gboolean seekseg_configured = FALSE;
1526   GstSegment seeksegment;
1527   guint32 seqnum;
1528   GstEvent *tevent;
1529
1530   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1531
1532   GST_OBJECT_LOCK (src);
1533   dest_format = src->segment.format;
1534   GST_OBJECT_UNLOCK (src);
1535
1536   if (event) {
1537     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1538         &start_type, &start, &stop_type, &stop);
1539
1540     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1541         SEEK_TYPE_IS_RELATIVE (stop_type);
1542
1543     if (dest_format != seek_format && !relative_seek) {
1544       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1545        * here before taking the stream lock, otherwise we must convert it later,
1546        * once we have the stream lock and can read the last configures segment
1547        * start and stop positions */
1548       gst_segment_init (&seeksegment, dest_format);
1549
1550       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1551         goto prepare_failed;
1552
1553       seekseg_configured = TRUE;
1554     }
1555
1556     flush = flags & GST_SEEK_FLAG_FLUSH;
1557     seqnum = gst_event_get_seqnum (event);
1558   } else {
1559     flush = FALSE;
1560     /* get next seqnum */
1561     seqnum = gst_util_seqnum_next ();
1562   }
1563
1564   /* send flush start */
1565   if (flush) {
1566     tevent = gst_event_new_flush_start ();
1567     gst_event_set_seqnum (tevent, seqnum);
1568     gst_pad_push_event (src->srcpad, tevent);
1569   } else
1570     gst_pad_pause_task (src->srcpad);
1571
1572   /* unblock streaming thread. */
1573   if (unlock)
1574     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1575
1576   /* grab streaming lock, this should eventually be possible, either
1577    * because the task is paused, our streaming thread stopped
1578    * or because our peer is flushing. */
1579   GST_PAD_STREAM_LOCK (src->srcpad);
1580   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1581     /* we have seen this event before, issue a warning for now */
1582     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1583         seqnum);
1584   } else {
1585     src->priv->seqnum = seqnum;
1586     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1587   }
1588
1589   if (unlock)
1590     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1591
1592   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1593    * copy the current segment info into the temp segment that we can actually
1594    * attempt the seek with. We only update the real segment if the seek succeeds. */
1595   if (!seekseg_configured) {
1596     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1597
1598     /* now configure the final seek segment */
1599     if (event) {
1600       if (seeksegment.format != seek_format) {
1601         /* OK, here's where we give the subclass a chance to convert the relative
1602          * seek into an absolute one in the processing format. We set up any
1603          * absolute seek above, before taking the stream lock. */
1604         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1605           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1606               "Aborting seek");
1607           res = FALSE;
1608         }
1609       } else {
1610         /* The seek format matches our processing format, no need to ask the
1611          * the subclass to configure the segment. */
1612         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1613             start_type, start, stop_type, stop, &update);
1614       }
1615     }
1616     /* Else, no seek event passed, so we're just (re)starting the
1617        current segment. */
1618   }
1619
1620   if (res) {
1621     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1622         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1623         seeksegment.start, seeksegment.stop, seeksegment.position);
1624
1625     /* do the seek, segment.position contains the new position. */
1626     res = gst_base_src_do_seek (src, &seeksegment);
1627   }
1628
1629   /* and prepare to continue streaming */
1630   if (flush) {
1631     tevent = gst_event_new_flush_stop (TRUE);
1632     gst_event_set_seqnum (tevent, seqnum);
1633     /* send flush stop, peer will accept data and events again. We
1634      * are not yet providing data as we still have the STREAM_LOCK. */
1635     gst_pad_push_event (src->srcpad, tevent);
1636   }
1637
1638   /* The subclass must have converted the segment to the processing format
1639    * by now */
1640   if (res && seeksegment.format != dest_format) {
1641     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1642         "in the correct format. Aborting seek.");
1643     res = FALSE;
1644   }
1645
1646   /* if the seek was successful, we update our real segment and push
1647    * out the new segment. */
1648   if (res) {
1649     GST_OBJECT_LOCK (src);
1650     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1651     GST_OBJECT_UNLOCK (src);
1652
1653     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1654       GstMessage *message;
1655
1656       message = gst_message_new_segment_start (GST_OBJECT (src),
1657           seeksegment.format, seeksegment.position);
1658       gst_message_set_seqnum (message, seqnum);
1659
1660       gst_element_post_message (GST_ELEMENT (src), message);
1661     }
1662
1663     /* for deriving a stop position for the playback segment from the seek
1664      * segment, we must take the duration when the stop is not set */
1665     /* FIXME: This is never used below */
1666     if ((stop = seeksegment.stop) == -1)
1667       stop = seeksegment.duration;
1668
1669     src->priv->segment_pending = TRUE;
1670   }
1671
1672   src->priv->discont = TRUE;
1673   src->running = TRUE;
1674   /* and restart the task in case it got paused explicitly or by
1675    * the FLUSH_START event we pushed out. */
1676   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1677       src->srcpad, NULL);
1678   if (res && !tres)
1679     res = FALSE;
1680
1681   /* and release the lock again so we can continue streaming */
1682   GST_PAD_STREAM_UNLOCK (src->srcpad);
1683
1684   return res;
1685
1686   /* ERROR */
1687 prepare_failed:
1688   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1689       "Aborting seek");
1690   return FALSE;
1691 }
1692
1693 /* all events send to this element directly. This is mainly done from the
1694  * application.
1695  */
1696 static gboolean
1697 gst_base_src_send_event (GstElement * element, GstEvent * event)
1698 {
1699   GstBaseSrc *src;
1700   gboolean result = FALSE;
1701
1702   src = GST_BASE_SRC (element);
1703
1704   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1705
1706   switch (GST_EVENT_TYPE (event)) {
1707       /* bidirectional events */
1708     case GST_EVENT_FLUSH_START:
1709       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1710       result = gst_pad_push_event (src->srcpad, event);
1711       event = NULL;
1712       break;
1713     case GST_EVENT_FLUSH_STOP:
1714       GST_LIVE_LOCK (src);
1715       src->priv->segment_pending = TRUE;
1716       /* sending random flushes downstream can break stuff,
1717        * especially sync since all segment info will get flushed */
1718       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1719       result = gst_pad_push_event (src->srcpad, event);
1720       GST_LIVE_UNLOCK (src);
1721       event = NULL;
1722       break;
1723
1724       /* downstream serialized events */
1725     case GST_EVENT_EOS:
1726     {
1727       GstBaseSrcClass *bclass;
1728
1729       bclass = GST_BASE_SRC_GET_CLASS (src);
1730
1731       /* queue EOS and make sure the task or pull function performs the EOS
1732        * actions.
1733        *
1734        * We have two possibilities:
1735        *
1736        *  - Before we are to enter the _create function, we check the pending_eos
1737        *    first and do EOS instead of entering it.
1738        *  - If we are in the _create function or we did not manage to set the
1739        *    flag fast enough and we are about to enter the _create function,
1740        *    we unlock it so that we exit with FLUSHING immediately. We then
1741        *    check the EOS flag and do the EOS logic.
1742        */
1743       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1744       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1745
1746
1747       /* unlock the _create function so that we can check the pending_eos flag
1748        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1749        * that we can grab it and stop the unlock again. We don't take the stream
1750        * lock so that this operation is guaranteed to never block. */
1751       gst_base_src_activate_pool (src, FALSE);
1752       if (bclass->unlock)
1753         bclass->unlock (src);
1754
1755       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1756
1757       GST_LIVE_LOCK (src);
1758       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1759       /* now stop the unlock of the streaming thread again. Grabbing the live
1760        * lock is enough because that protects the create function. */
1761       if (bclass->unlock_stop)
1762         bclass->unlock_stop (src);
1763       gst_base_src_activate_pool (src, TRUE);
1764       GST_LIVE_UNLOCK (src);
1765
1766       result = TRUE;
1767       break;
1768     }
1769     case GST_EVENT_SEGMENT:
1770       /* sending random SEGMENT downstream can break sync. */
1771       break;
1772     case GST_EVENT_TAG:
1773     case GST_EVENT_CUSTOM_DOWNSTREAM:
1774     case GST_EVENT_CUSTOM_BOTH:
1775       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1776       GST_OBJECT_LOCK (src);
1777       src->priv->pending_events =
1778           g_list_append (src->priv->pending_events, event);
1779       g_atomic_int_set (&src->priv->have_events, TRUE);
1780       GST_OBJECT_UNLOCK (src);
1781       event = NULL;
1782       result = TRUE;
1783       break;
1784     case GST_EVENT_BUFFERSIZE:
1785       /* does not seem to make much sense currently */
1786       break;
1787
1788       /* upstream events */
1789     case GST_EVENT_QOS:
1790       /* elements should override send_event and do something */
1791       break;
1792     case GST_EVENT_SEEK:
1793     {
1794       gboolean started;
1795
1796       GST_OBJECT_LOCK (src->srcpad);
1797       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1798         goto wrong_mode;
1799       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1800       GST_OBJECT_UNLOCK (src->srcpad);
1801
1802       if (started) {
1803         GST_DEBUG_OBJECT (src, "performing seek");
1804         /* when we are running in push mode, we can execute the
1805          * seek right now. */
1806         result = gst_base_src_perform_seek (src, event, TRUE);
1807       } else {
1808         GstEvent **event_p;
1809
1810         /* else we store the event and execute the seek when we
1811          * get activated */
1812         GST_OBJECT_LOCK (src);
1813         GST_DEBUG_OBJECT (src, "queueing seek");
1814         event_p = &src->pending_seek;
1815         gst_event_replace ((GstEvent **) event_p, event);
1816         GST_OBJECT_UNLOCK (src);
1817         /* assume the seek will work */
1818         result = TRUE;
1819       }
1820       break;
1821     }
1822     case GST_EVENT_NAVIGATION:
1823       /* could make sense for elements that do something with navigation events
1824        * but then they would need to override the send_event function */
1825       break;
1826     case GST_EVENT_LATENCY:
1827       /* does not seem to make sense currently */
1828       break;
1829
1830       /* custom events */
1831     case GST_EVENT_CUSTOM_UPSTREAM:
1832       /* override send_event if you want this */
1833       break;
1834     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1835     case GST_EVENT_CUSTOM_BOTH_OOB:
1836       /* insert a random custom event into the pipeline */
1837       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1838       result = gst_pad_push_event (src->srcpad, event);
1839       /* we gave away the ref to the event in the push */
1840       event = NULL;
1841       break;
1842     default:
1843       break;
1844   }
1845 done:
1846   /* if we still have a ref to the event, unref it now */
1847   if (event)
1848     gst_event_unref (event);
1849
1850   return result;
1851
1852   /* ERRORS */
1853 wrong_mode:
1854   {
1855     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1856     GST_OBJECT_UNLOCK (src->srcpad);
1857     result = FALSE;
1858     goto done;
1859   }
1860 }
1861
1862 static gboolean
1863 gst_base_src_seekable (GstBaseSrc * src)
1864 {
1865   GstBaseSrcClass *bclass;
1866   bclass = GST_BASE_SRC_GET_CLASS (src);
1867   if (bclass->is_seekable)
1868     return bclass->is_seekable (src);
1869   else
1870     return FALSE;
1871 }
1872
1873 static void
1874 gst_base_src_update_qos (GstBaseSrc * src,
1875     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1876 {
1877   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1878       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1879       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1880
1881   GST_OBJECT_LOCK (src);
1882   src->priv->proportion = proportion;
1883   src->priv->earliest_time = timestamp + diff;
1884   GST_OBJECT_UNLOCK (src);
1885 }
1886
1887
1888 static gboolean
1889 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1890 {
1891   gboolean result;
1892
1893   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1894
1895   switch (GST_EVENT_TYPE (event)) {
1896     case GST_EVENT_SEEK:
1897       /* is normally called when in push mode */
1898       if (!gst_base_src_seekable (src))
1899         goto not_seekable;
1900
1901       result = gst_base_src_perform_seek (src, event, TRUE);
1902       break;
1903     case GST_EVENT_FLUSH_START:
1904       /* cancel any blocking getrange, is normally called
1905        * when in pull mode. */
1906       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1907       break;
1908     case GST_EVENT_FLUSH_STOP:
1909       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1910       break;
1911     case GST_EVENT_QOS:
1912     {
1913       gdouble proportion;
1914       GstClockTimeDiff diff;
1915       GstClockTime timestamp;
1916
1917       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1918       gst_base_src_update_qos (src, proportion, diff, timestamp);
1919       result = TRUE;
1920       break;
1921     }
1922     case GST_EVENT_RECONFIGURE:
1923       result = TRUE;
1924       break;
1925     case GST_EVENT_LATENCY:
1926       result = TRUE;
1927       break;
1928     default:
1929       result = FALSE;
1930       break;
1931   }
1932   return result;
1933
1934   /* ERRORS */
1935 not_seekable:
1936   {
1937     GST_DEBUG_OBJECT (src, "is not seekable");
1938     return FALSE;
1939   }
1940 }
1941
1942 static gboolean
1943 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1944 {
1945   GstBaseSrc *src;
1946   GstBaseSrcClass *bclass;
1947   gboolean result = FALSE;
1948
1949   src = GST_BASE_SRC (parent);
1950   bclass = GST_BASE_SRC_GET_CLASS (src);
1951
1952   if (bclass->event) {
1953     if (!(result = bclass->event (src, event)))
1954       goto subclass_failed;
1955   }
1956
1957 done:
1958   gst_event_unref (event);
1959
1960   return result;
1961
1962   /* ERRORS */
1963 subclass_failed:
1964   {
1965     GST_DEBUG_OBJECT (src, "subclass refused event");
1966     goto done;
1967   }
1968 }
1969
1970 static void
1971 gst_base_src_set_property (GObject * object, guint prop_id,
1972     const GValue * value, GParamSpec * pspec)
1973 {
1974   GstBaseSrc *src;
1975
1976   src = GST_BASE_SRC (object);
1977
1978   switch (prop_id) {
1979     case PROP_BLOCKSIZE:
1980       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1981       break;
1982     case PROP_NUM_BUFFERS:
1983       src->num_buffers = g_value_get_int (value);
1984       break;
1985     case PROP_TYPEFIND:
1986       src->typefind = g_value_get_boolean (value);
1987       break;
1988     case PROP_DO_TIMESTAMP:
1989       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1990       break;
1991     default:
1992       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1993       break;
1994   }
1995 }
1996
1997 static void
1998 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1999     GParamSpec * pspec)
2000 {
2001   GstBaseSrc *src;
2002
2003   src = GST_BASE_SRC (object);
2004
2005   switch (prop_id) {
2006     case PROP_BLOCKSIZE:
2007       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2008       break;
2009     case PROP_NUM_BUFFERS:
2010       g_value_set_int (value, src->num_buffers);
2011       break;
2012     case PROP_TYPEFIND:
2013       g_value_set_boolean (value, src->typefind);
2014       break;
2015     case PROP_DO_TIMESTAMP:
2016       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2017       break;
2018     default:
2019       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2020       break;
2021   }
2022 }
2023
2024 /* with STREAM_LOCK and LOCK */
2025 static GstClockReturn
2026 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2027 {
2028   GstClockReturn ret;
2029   GstClockID id;
2030
2031   id = gst_clock_new_single_shot_id (clock, time);
2032
2033   basesrc->clock_id = id;
2034   /* release the live lock while waiting */
2035   GST_LIVE_UNLOCK (basesrc);
2036
2037   ret = gst_clock_id_wait (id, NULL);
2038
2039   GST_LIVE_LOCK (basesrc);
2040   gst_clock_id_unref (id);
2041   basesrc->clock_id = NULL;
2042
2043   return ret;
2044 }
2045
2046 /* perform synchronisation on a buffer.
2047  * with STREAM_LOCK.
2048  */
2049 static GstClockReturn
2050 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2051 {
2052   GstClockReturn result;
2053   GstClockTime start, end;
2054   GstBaseSrcClass *bclass;
2055   GstClockTime base_time;
2056   GstClock *clock;
2057   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2058   gboolean do_timestamp, first, pseudo_live, is_live;
2059
2060   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2061
2062   start = end = -1;
2063   if (bclass->get_times)
2064     bclass->get_times (basesrc, buffer, &start, &end);
2065
2066   /* get buffer timestamp */
2067   dts = GST_BUFFER_DTS (buffer);
2068   pts = GST_BUFFER_PTS (buffer);
2069
2070   if (GST_CLOCK_TIME_IS_VALID (dts))
2071     timestamp = dts;
2072   else
2073     timestamp = pts;
2074
2075   /* grab the lock to prepare for clocking and calculate the startup
2076    * latency. */
2077   GST_OBJECT_LOCK (basesrc);
2078
2079   is_live = basesrc->is_live;
2080   /* if we are asked to sync against the clock we are a pseudo live element */
2081   pseudo_live = (start != -1 && is_live);
2082   /* check for the first buffer */
2083   first = (basesrc->priv->latency == -1);
2084
2085   if (timestamp != -1 && pseudo_live) {
2086     GstClockTime latency;
2087
2088     /* we have a timestamp and a sync time, latency is the diff */
2089     if (timestamp <= start)
2090       latency = start - timestamp;
2091     else
2092       latency = 0;
2093
2094     if (first) {
2095       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2096           GST_TIME_ARGS (latency));
2097       /* first time we calculate latency, just configure */
2098       basesrc->priv->latency = latency;
2099     } else {
2100       if (basesrc->priv->latency != latency) {
2101         /* we have a new latency, FIXME post latency message */
2102         basesrc->priv->latency = latency;
2103         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2104             GST_TIME_ARGS (latency));
2105       }
2106     }
2107   } else if (first) {
2108     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2109         is_live, start != -1);
2110     basesrc->priv->latency = 0;
2111   }
2112
2113   /* get clock, if no clock, we can't sync or do timestamps */
2114   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2115     goto no_clock;
2116   else
2117     gst_object_ref (clock);
2118
2119   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2120
2121   do_timestamp = basesrc->priv->do_timestamp;
2122   GST_OBJECT_UNLOCK (basesrc);
2123
2124   /* first buffer, calculate the timestamp offset */
2125   if (first) {
2126     GstClockTime running_time;
2127
2128     now = gst_clock_get_time (clock);
2129     running_time = now - base_time;
2130
2131     GST_LOG_OBJECT (basesrc,
2132         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2133         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2134         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2135
2136     if (pseudo_live && timestamp != -1) {
2137       /* live source and we need to sync, add startup latency to all timestamps
2138        * to get the real running_time. Live sources should always timestamp
2139        * according to the current running time. */
2140       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2141
2142       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2143           GST_TIME_ARGS (basesrc->priv->ts_offset));
2144     } else {
2145       basesrc->priv->ts_offset = 0;
2146       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2147     }
2148
2149     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2150       if (do_timestamp) {
2151         dts = running_time;
2152       } else {
2153         dts = 0;
2154       }
2155       GST_BUFFER_DTS (buffer) = dts;
2156
2157       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2158           GST_TIME_ARGS (dts));
2159     }
2160   } else {
2161     /* not the first buffer, the timestamp is the diff between the clock and
2162      * base_time */
2163     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2164       now = gst_clock_get_time (clock);
2165
2166       dts = now - base_time;
2167       GST_BUFFER_DTS (buffer) = dts;
2168
2169       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2170           GST_TIME_ARGS (dts));
2171     }
2172   }
2173   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2174     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2175       pts = dts;
2176
2177     GST_BUFFER_PTS (buffer) = dts;
2178
2179     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2180         GST_TIME_ARGS (pts));
2181   }
2182
2183   /* if we don't have a buffer timestamp, we don't sync */
2184   if (!GST_CLOCK_TIME_IS_VALID (start))
2185     goto no_sync;
2186
2187   if (is_live) {
2188     /* for pseudo live sources, add our ts_offset to the timestamp */
2189     if (GST_CLOCK_TIME_IS_VALID (pts))
2190       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2191     if (GST_CLOCK_TIME_IS_VALID (dts))
2192       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2193     start += basesrc->priv->ts_offset;
2194   }
2195
2196   GST_LOG_OBJECT (basesrc,
2197       "waiting for clock, base time %" GST_TIME_FORMAT
2198       ", stream_start %" GST_TIME_FORMAT,
2199       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2200
2201   result = gst_base_src_wait (basesrc, clock, start + base_time);
2202
2203   gst_object_unref (clock);
2204
2205   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2206
2207   return result;
2208
2209   /* special cases */
2210 no_clock:
2211   {
2212     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2213     GST_OBJECT_UNLOCK (basesrc);
2214     return GST_CLOCK_OK;
2215   }
2216 no_sync:
2217   {
2218     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2219     gst_object_unref (clock);
2220     return GST_CLOCK_OK;
2221   }
2222 }
2223
2224 /* Called with STREAM_LOCK and LIVE_LOCK */
2225 static gboolean
2226 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
2227     gboolean force)
2228 {
2229   guint64 size, maxsize;
2230   GstBaseSrcClass *bclass;
2231   GstFormat format;
2232   gint64 stop;
2233
2234   bclass = GST_BASE_SRC_GET_CLASS (src);
2235
2236   format = src->segment.format;
2237   stop = src->segment.stop;
2238   /* get total file size */
2239   size = src->segment.duration;
2240
2241   /* only operate if we are working with bytes */
2242   if (format != GST_FORMAT_BYTES)
2243     return TRUE;
2244
2245   /* the max amount of bytes to read is the total size or
2246    * up to the segment.stop if present. */
2247   if (stop != -1)
2248     maxsize = MIN (size, stop);
2249   else
2250     maxsize = size;
2251
2252   GST_DEBUG_OBJECT (src,
2253       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2254       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2255       *length, size, stop, maxsize);
2256
2257   /* check size if we have one */
2258   if (maxsize != -1) {
2259     /* if we run past the end, check if the file became bigger and
2260      * retry. */
2261     if (G_UNLIKELY (offset + *length >= maxsize || force)) {
2262       /* see if length of the file changed */
2263       if (bclass->get_size)
2264         if (!bclass->get_size (src, &size))
2265           size = -1;
2266
2267       /* make sure we don't exceed the configured segment stop
2268        * if it was set */
2269       if (stop != -1)
2270         maxsize = MIN (size, stop);
2271       else
2272         maxsize = size;
2273
2274       /* if we are at or past the end, EOS */
2275       if (G_UNLIKELY (offset >= maxsize))
2276         goto unexpected_length;
2277
2278       /* else we can clip to the end */
2279       if (G_UNLIKELY (offset + *length >= maxsize))
2280         *length = maxsize - offset;
2281
2282     }
2283   }
2284
2285   /* keep track of current duration.
2286    * segment is in bytes, we checked that above. */
2287   GST_OBJECT_LOCK (src);
2288   src->segment.duration = size;
2289   GST_OBJECT_UNLOCK (src);
2290
2291   return TRUE;
2292
2293   /* ERRORS */
2294 unexpected_length:
2295   {
2296     return FALSE;
2297   }
2298 }
2299
2300 /* must be called with LIVE_LOCK */
2301 static GstFlowReturn
2302 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2303     GstBuffer ** buf)
2304 {
2305   GstFlowReturn ret;
2306   GstBaseSrcClass *bclass;
2307   GstClockReturn status;
2308   GstBuffer *res_buf;
2309   GstBuffer *in_buf;
2310
2311   bclass = GST_BASE_SRC_GET_CLASS (src);
2312
2313 again:
2314   if (src->is_live) {
2315     if (G_UNLIKELY (!src->live_running)) {
2316       ret = gst_base_src_wait_playing (src);
2317       if (ret != GST_FLOW_OK)
2318         goto stopped;
2319     }
2320   }
2321
2322   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2323           && !GST_BASE_SRC_IS_STARTING (src)))
2324     goto not_started;
2325
2326   if (G_UNLIKELY (!bclass->create))
2327     goto no_function;
2328
2329   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE)))
2330     goto unexpected_length;
2331
2332   /* track position */
2333   GST_OBJECT_LOCK (src);
2334   if (src->segment.format == GST_FORMAT_BYTES)
2335     src->segment.position = offset;
2336   GST_OBJECT_UNLOCK (src);
2337
2338   /* normally we don't count buffers */
2339   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2340     if (src->num_buffers_left == 0)
2341       goto reached_num_buffers;
2342     else
2343       src->num_buffers_left--;
2344   }
2345
2346   /* don't enter the create function if a pending EOS event was set. For the
2347    * logic of the pending_eos, check the event function of this class. */
2348   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2349     goto eos;
2350
2351   GST_DEBUG_OBJECT (src,
2352       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2353       G_GINT64_FORMAT, offset, length, src->segment.time);
2354
2355   res_buf = in_buf = *buf;
2356
2357   ret = bclass->create (src, offset, length, &res_buf);
2358
2359   /* The create function could be unlocked because we have a pending EOS. It's
2360    * possible that we have a valid buffer from create that we need to
2361    * discard when the create function returned _OK. */
2362   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2363     if (ret == GST_FLOW_OK) {
2364       if (*buf == NULL)
2365         gst_buffer_unref (res_buf);
2366     }
2367     goto eos;
2368   }
2369
2370   if (G_UNLIKELY (ret != GST_FLOW_OK))
2371     goto not_ok;
2372
2373   /* fallback in case the create function didn't fill a provided buffer */
2374   if (in_buf != NULL && res_buf != in_buf) {
2375     GstMapInfo info;
2376     gsize copied_size;
2377
2378     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2379         "fill the provided buffer, copying");
2380
2381     if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
2382       goto map_failed;
2383
2384     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2385     gst_buffer_unmap (in_buf, &info);
2386     gst_buffer_set_size (in_buf, copied_size);
2387
2388     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2389
2390     gst_buffer_unref (res_buf);
2391     res_buf = in_buf;
2392   }
2393
2394   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2395   if (offset == 0 && src->segment.time == 0
2396       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2397     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2398     res_buf = gst_buffer_make_writable (res_buf);
2399     GST_BUFFER_DTS (res_buf) = 0;
2400   }
2401
2402   /* now sync before pushing the buffer */
2403   status = gst_base_src_do_sync (src, res_buf);
2404
2405   /* waiting for the clock could have made us flushing */
2406   if (G_UNLIKELY (src->priv->flushing))
2407     goto flushing;
2408
2409   switch (status) {
2410     case GST_CLOCK_EARLY:
2411       /* the buffer is too late. We currently don't drop the buffer. */
2412       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2413       break;
2414     case GST_CLOCK_OK:
2415       /* buffer synchronised properly */
2416       GST_DEBUG_OBJECT (src, "buffer ok");
2417       break;
2418     case GST_CLOCK_UNSCHEDULED:
2419       /* this case is triggered when we were waiting for the clock and
2420        * it got unlocked because we did a state change. In any case, get rid of
2421        * the buffer. */
2422       if (*buf == NULL)
2423         gst_buffer_unref (res_buf);
2424
2425       if (!src->live_running) {
2426         /* We return FLUSHING when we are not running to stop the dataflow also
2427          * get rid of the produced buffer. */
2428         GST_DEBUG_OBJECT (src,
2429             "clock was unscheduled (%d), returning FLUSHING", status);
2430         ret = GST_FLOW_FLUSHING;
2431       } else {
2432         /* If we are running when this happens, we quickly switched between
2433          * pause and playing. We try to produce a new buffer */
2434         GST_DEBUG_OBJECT (src,
2435             "clock was unscheduled (%d), but we are running", status);
2436         goto again;
2437       }
2438       break;
2439     default:
2440       /* all other result values are unexpected and errors */
2441       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2442           (_("Internal clock error.")),
2443           ("clock returned unexpected return value %d", status));
2444       if (*buf == NULL)
2445         gst_buffer_unref (res_buf);
2446       ret = GST_FLOW_ERROR;
2447       break;
2448   }
2449   if (G_LIKELY (ret == GST_FLOW_OK))
2450     *buf = res_buf;
2451
2452   return ret;
2453
2454   /* ERROR */
2455 stopped:
2456   {
2457     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2458         gst_flow_get_name (ret));
2459     return ret;
2460   }
2461 not_ok:
2462   {
2463     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2464         gst_flow_get_name (ret));
2465     return ret;
2466   }
2467 map_failed:
2468   {
2469     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
2470         (_("Failed to map buffer.")),
2471         ("failed to map result buffer in WRITE mode"));
2472     if (*buf == NULL)
2473       gst_buffer_unref (res_buf);
2474     return GST_FLOW_ERROR;
2475   }
2476 not_started:
2477   {
2478     GST_DEBUG_OBJECT (src, "getrange but not started");
2479     return GST_FLOW_FLUSHING;
2480   }
2481 no_function:
2482   {
2483     GST_DEBUG_OBJECT (src, "no create function");
2484     return GST_FLOW_NOT_SUPPORTED;
2485   }
2486 unexpected_length:
2487   {
2488     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2489         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2490     return GST_FLOW_EOS;
2491   }
2492 reached_num_buffers:
2493   {
2494     GST_DEBUG_OBJECT (src, "sent all buffers");
2495     return GST_FLOW_EOS;
2496   }
2497 flushing:
2498   {
2499     GST_DEBUG_OBJECT (src, "we are flushing");
2500     if (*buf == NULL)
2501       gst_buffer_unref (res_buf);
2502     return GST_FLOW_FLUSHING;
2503   }
2504 eos:
2505   {
2506     GST_DEBUG_OBJECT (src, "we are EOS");
2507     return GST_FLOW_EOS;
2508   }
2509 }
2510
2511 static GstFlowReturn
2512 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2513     guint length, GstBuffer ** buf)
2514 {
2515   GstBaseSrc *src;
2516   GstFlowReturn res;
2517
2518   src = GST_BASE_SRC_CAST (parent);
2519
2520   GST_LIVE_LOCK (src);
2521   if (G_UNLIKELY (src->priv->flushing))
2522     goto flushing;
2523
2524   res = gst_base_src_get_range (src, offset, length, buf);
2525
2526 done:
2527   GST_LIVE_UNLOCK (src);
2528
2529   return res;
2530
2531   /* ERRORS */
2532 flushing:
2533   {
2534     GST_DEBUG_OBJECT (src, "we are flushing");
2535     res = GST_FLOW_FLUSHING;
2536     goto done;
2537   }
2538 }
2539
2540 static gboolean
2541 gst_base_src_is_random_access (GstBaseSrc * src)
2542 {
2543   /* we need to start the basesrc to check random access */
2544   if (!GST_BASE_SRC_IS_STARTED (src)) {
2545     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2546     if (G_LIKELY (gst_base_src_start (src))) {
2547       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2548         goto start_failed;
2549       gst_base_src_stop (src);
2550     }
2551   }
2552
2553   return src->random_access;
2554
2555   /* ERRORS */
2556 start_failed:
2557   {
2558     GST_DEBUG_OBJECT (src, "failed to start");
2559     return FALSE;
2560   }
2561 }
2562
2563 static void
2564 gst_base_src_loop (GstPad * pad)
2565 {
2566   GstBaseSrc *src;
2567   GstBuffer *buf = NULL;
2568   GstFlowReturn ret;
2569   gint64 position;
2570   gboolean eos;
2571   guint blocksize;
2572   GList *pending_events = NULL, *tmp;
2573
2574   eos = FALSE;
2575
2576   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2577
2578   /* Just leave immediately if we're flushing */
2579   GST_LIVE_LOCK (src);
2580   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2581     goto flushing;
2582   GST_LIVE_UNLOCK (src);
2583
2584   gst_base_src_send_stream_start (src);
2585
2586   /* The stream-start event could've caused something to flush us */
2587   GST_LIVE_LOCK (src);
2588   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2589     goto flushing;
2590   GST_LIVE_UNLOCK (src);
2591
2592   /* check if we need to renegotiate */
2593   if (gst_pad_check_reconfigure (pad)) {
2594     if (!gst_base_src_negotiate (src)) {
2595       gst_pad_mark_reconfigure (pad);
2596       if (GST_PAD_IS_FLUSHING (pad))
2597         goto flushing;
2598       else
2599         goto negotiate_failed;
2600     }
2601   }
2602
2603   GST_LIVE_LOCK (src);
2604
2605   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2606     goto flushing;
2607
2608   blocksize = src->blocksize;
2609
2610   /* if we operate in bytes, we can calculate an offset */
2611   if (src->segment.format == GST_FORMAT_BYTES) {
2612     position = src->segment.position;
2613     /* for negative rates, start with subtracting the blocksize */
2614     if (src->segment.rate < 0.0) {
2615       /* we cannot go below segment.start */
2616       if (position > src->segment.start + blocksize)
2617         position -= blocksize;
2618       else {
2619         /* last block, remainder up to segment.start */
2620         blocksize = position - src->segment.start;
2621         position = src->segment.start;
2622       }
2623     }
2624   } else
2625     position = -1;
2626
2627   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2628       GST_TIME_ARGS (position), blocksize);
2629
2630   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2631   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2632     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2633         gst_flow_get_name (ret));
2634     GST_LIVE_UNLOCK (src);
2635     goto pause;
2636   }
2637   /* this should not happen */
2638   if (G_UNLIKELY (buf == NULL))
2639     goto null_buffer;
2640
2641   /* push events to close/start our segment before we push the buffer. */
2642   if (G_UNLIKELY (src->priv->segment_pending)) {
2643     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2644     src->priv->segment_pending = FALSE;
2645   }
2646
2647   if (g_atomic_int_get (&src->priv->have_events)) {
2648     GST_OBJECT_LOCK (src);
2649     /* take the events */
2650     pending_events = src->priv->pending_events;
2651     src->priv->pending_events = NULL;
2652     g_atomic_int_set (&src->priv->have_events, FALSE);
2653     GST_OBJECT_UNLOCK (src);
2654   }
2655
2656   /* Push out pending events if any */
2657   if (G_UNLIKELY (pending_events != NULL)) {
2658     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2659       GstEvent *ev = (GstEvent *) tmp->data;
2660       gst_pad_push_event (pad, ev);
2661     }
2662     g_list_free (pending_events);
2663   }
2664
2665   /* figure out the new position */
2666   switch (src->segment.format) {
2667     case GST_FORMAT_BYTES:
2668     {
2669       guint bufsize = gst_buffer_get_size (buf);
2670
2671       /* we subtracted above for negative rates */
2672       if (src->segment.rate >= 0.0)
2673         position += bufsize;
2674       break;
2675     }
2676     case GST_FORMAT_TIME:
2677     {
2678       GstClockTime start, duration;
2679
2680       start = GST_BUFFER_TIMESTAMP (buf);
2681       duration = GST_BUFFER_DURATION (buf);
2682
2683       if (GST_CLOCK_TIME_IS_VALID (start))
2684         position = start;
2685       else
2686         position = src->segment.position;
2687
2688       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2689         if (src->segment.rate >= 0.0)
2690           position += duration;
2691         else if (position > duration)
2692           position -= duration;
2693         else
2694           position = 0;
2695       }
2696       break;
2697     }
2698     case GST_FORMAT_DEFAULT:
2699       if (src->segment.rate >= 0.0)
2700         position = GST_BUFFER_OFFSET_END (buf);
2701       else
2702         position = GST_BUFFER_OFFSET (buf);
2703       break;
2704     default:
2705       position = -1;
2706       break;
2707   }
2708   if (position != -1) {
2709     if (src->segment.rate >= 0.0) {
2710       /* positive rate, check if we reached the stop */
2711       if (src->segment.stop != -1) {
2712         if (position >= src->segment.stop) {
2713           eos = TRUE;
2714           position = src->segment.stop;
2715         }
2716       }
2717     } else {
2718       /* negative rate, check if we reached the start. start is always set to
2719        * something different from -1 */
2720       if (position <= src->segment.start) {
2721         eos = TRUE;
2722         position = src->segment.start;
2723       }
2724       /* when going reverse, all buffers are DISCONT */
2725       src->priv->discont = TRUE;
2726     }
2727     GST_OBJECT_LOCK (src);
2728     src->segment.position = position;
2729     GST_OBJECT_UNLOCK (src);
2730   }
2731
2732   if (G_UNLIKELY (src->priv->discont)) {
2733     GST_INFO_OBJECT (src, "marking pending DISCONT");
2734     buf = gst_buffer_make_writable (buf);
2735     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2736     src->priv->discont = FALSE;
2737   }
2738   GST_LIVE_UNLOCK (src);
2739
2740   ret = gst_pad_push (pad, buf);
2741   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2742     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2743       goto not_negotiated;
2744     }
2745     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2746         gst_flow_get_name (ret));
2747     goto pause;
2748   }
2749
2750   if (G_UNLIKELY (eos)) {
2751     GST_INFO_OBJECT (src, "pausing after end of segment");
2752     ret = GST_FLOW_EOS;
2753     goto pause;
2754   }
2755
2756 done:
2757   return;
2758
2759   /* special cases */
2760 not_negotiated:
2761   {
2762     if (gst_pad_needs_reconfigure (pad)) {
2763       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2764       return;
2765     }
2766     /* fallthrough when push returns NOT_NEGOTIATED and we don't have
2767      * a pending negotiation request on our srcpad */
2768   }
2769 negotiate_failed:
2770   {
2771     GST_DEBUG_OBJECT (src, "Not negotiated");
2772     ret = GST_FLOW_NOT_NEGOTIATED;
2773     goto pause;
2774   }
2775 flushing:
2776   {
2777     GST_DEBUG_OBJECT (src, "we are flushing");
2778     GST_LIVE_UNLOCK (src);
2779     ret = GST_FLOW_FLUSHING;
2780     goto pause;
2781   }
2782 pause:
2783   {
2784     const gchar *reason = gst_flow_get_name (ret);
2785     GstEvent *event;
2786
2787     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2788     src->running = FALSE;
2789     gst_pad_pause_task (pad);
2790     if (ret == GST_FLOW_EOS) {
2791       gboolean flag_segment;
2792       GstFormat format;
2793       gint64 position;
2794
2795       /* perform EOS logic */
2796       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2797       format = src->segment.format;
2798       position = src->segment.position;
2799
2800       if (flag_segment) {
2801         GstMessage *message;
2802
2803         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2804             format, position);
2805         gst_message_set_seqnum (message, src->priv->seqnum);
2806         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2807         event = gst_event_new_segment_done (format, position);
2808         gst_event_set_seqnum (event, src->priv->seqnum);
2809         gst_pad_push_event (pad, event);
2810       } else {
2811         event = gst_event_new_eos ();
2812         gst_event_set_seqnum (event, src->priv->seqnum);
2813         gst_pad_push_event (pad, event);
2814       }
2815     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2816       event = gst_event_new_eos ();
2817       gst_event_set_seqnum (event, src->priv->seqnum);
2818       /* for fatal errors we post an error message, post the error
2819        * first so the app knows about the error first.
2820        * Also don't do this for FLUSHING because it happens
2821        * due to flushing and posting an error message because of
2822        * that is the wrong thing to do, e.g. when we're doing
2823        * a flushing seek. */
2824       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2825           (_("Internal data flow error.")),
2826           ("streaming task paused, reason %s (%d)", reason, ret));
2827       gst_pad_push_event (pad, event);
2828     }
2829     goto done;
2830   }
2831 null_buffer:
2832   {
2833     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2834         (_("Internal data flow error.")), ("element returned NULL buffer"));
2835     GST_LIVE_UNLOCK (src);
2836     goto done;
2837   }
2838 }
2839
2840 static gboolean
2841 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2842     GstAllocator * allocator, GstAllocationParams * params)
2843 {
2844   GstAllocator *oldalloc;
2845   GstBufferPool *oldpool;
2846   GstBaseSrcPrivate *priv = basesrc->priv;
2847
2848   if (pool) {
2849     GST_DEBUG_OBJECT (basesrc, "activate pool");
2850     if (!gst_buffer_pool_set_active (pool, TRUE))
2851       goto activate_failed;
2852   }
2853
2854   GST_OBJECT_LOCK (basesrc);
2855   oldpool = priv->pool;
2856   priv->pool = pool;
2857
2858   oldalloc = priv->allocator;
2859   priv->allocator = allocator;
2860
2861   if (params)
2862     priv->params = *params;
2863   else
2864     gst_allocation_params_init (&priv->params);
2865   GST_OBJECT_UNLOCK (basesrc);
2866
2867   if (oldpool) {
2868     /* only deactivate if the pool is not the one we're using */
2869     if (oldpool != pool) {
2870       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2871       gst_buffer_pool_set_active (oldpool, FALSE);
2872     }
2873     gst_object_unref (oldpool);
2874   }
2875   if (oldalloc) {
2876     gst_object_unref (oldalloc);
2877   }
2878   return TRUE;
2879
2880   /* ERRORS */
2881 activate_failed:
2882   {
2883     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2884     return FALSE;
2885   }
2886 }
2887
2888 static gboolean
2889 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2890 {
2891   GstBaseSrcPrivate *priv = basesrc->priv;
2892   GstBufferPool *pool;
2893   gboolean res = TRUE;
2894
2895   GST_OBJECT_LOCK (basesrc);
2896   if ((pool = priv->pool))
2897     pool = gst_object_ref (pool);
2898   GST_OBJECT_UNLOCK (basesrc);
2899
2900   if (pool) {
2901     res = gst_buffer_pool_set_active (pool, active);
2902     gst_object_unref (pool);
2903   }
2904   return res;
2905 }
2906
2907
2908 static gboolean
2909 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2910 {
2911   GstCaps *outcaps;
2912   GstBufferPool *pool;
2913   guint size, min, max;
2914   GstAllocator *allocator;
2915   GstAllocationParams params;
2916   GstStructure *config;
2917   gboolean update_allocator;
2918
2919   gst_query_parse_allocation (query, &outcaps, NULL);
2920
2921   /* we got configuration from our peer or the decide_allocation method,
2922    * parse them */
2923   if (gst_query_get_n_allocation_params (query) > 0) {
2924     /* try the allocator */
2925     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2926     update_allocator = TRUE;
2927   } else {
2928     allocator = NULL;
2929     gst_allocation_params_init (&params);
2930     update_allocator = FALSE;
2931   }
2932
2933   if (gst_query_get_n_allocation_pools (query) > 0) {
2934     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2935
2936     if (pool == NULL) {
2937       /* no pool, we can make our own */
2938       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2939       pool = gst_buffer_pool_new ();
2940     }
2941   } else {
2942     pool = NULL;
2943     size = min = max = 0;
2944   }
2945
2946   /* now configure */
2947   if (pool) {
2948     config = gst_buffer_pool_get_config (pool);
2949     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2950     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2951     gst_buffer_pool_set_config (pool, config);
2952   }
2953
2954   if (update_allocator)
2955     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2956   else
2957     gst_query_add_allocation_param (query, allocator, &params);
2958   if (allocator)
2959     gst_object_unref (allocator);
2960
2961   if (pool) {
2962     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2963     gst_object_unref (pool);
2964   }
2965
2966   return TRUE;
2967 }
2968
2969 static gboolean
2970 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2971 {
2972   GstBaseSrcClass *bclass;
2973   gboolean result = TRUE;
2974   GstQuery *query;
2975   GstBufferPool *pool = NULL;
2976   GstAllocator *allocator = NULL;
2977   GstAllocationParams params;
2978
2979   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2980
2981   /* make query and let peer pad answer, we don't really care if it worked or
2982    * not, if it failed, the allocation query would contain defaults and the
2983    * subclass would then set better values if needed */
2984   query = gst_query_new_allocation (caps, TRUE);
2985   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2986     /* not a problem, just debug a little */
2987     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2988   }
2989
2990   g_assert (bclass->decide_allocation != NULL);
2991   result = bclass->decide_allocation (basesrc, query);
2992
2993   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2994       query);
2995
2996   if (!result)
2997     goto no_decide_allocation;
2998
2999   /* we got configuration from our peer or the decide_allocation method,
3000    * parse them */
3001   if (gst_query_get_n_allocation_params (query) > 0) {
3002     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
3003   } else {
3004     allocator = NULL;
3005     gst_allocation_params_init (&params);
3006   }
3007
3008   if (gst_query_get_n_allocation_pools (query) > 0)
3009     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
3010
3011   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
3012
3013   gst_query_unref (query);
3014
3015   return result;
3016
3017   /* Errors */
3018 no_decide_allocation:
3019   {
3020     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
3021     gst_query_unref (query);
3022
3023     return result;
3024   }
3025 }
3026
3027 /* default negotiation code.
3028  *
3029  * Take intersection between src and sink pads, take first
3030  * caps and fixate.
3031  */
3032 static gboolean
3033 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
3034 {
3035   GstCaps *thiscaps;
3036   GstCaps *caps = NULL;
3037   GstCaps *peercaps = NULL;
3038   gboolean result = FALSE;
3039
3040   /* first see what is possible on our source pad */
3041   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3042   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3043   /* nothing or anything is allowed, we're done */
3044   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3045     goto no_nego_needed;
3046
3047   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3048     goto no_caps;
3049
3050   /* get the peer caps */
3051   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3052   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3053   if (peercaps) {
3054     /* The result is already a subset of our caps */
3055     caps = peercaps;
3056     gst_caps_unref (thiscaps);
3057   } else {
3058     /* no peer, work with our own caps then */
3059     caps = thiscaps;
3060   }
3061   if (caps && !gst_caps_is_empty (caps)) {
3062     /* now fixate */
3063     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3064     if (gst_caps_is_any (caps)) {
3065       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3066       /* hmm, still anything, so element can do anything and
3067        * nego is not needed */
3068       result = TRUE;
3069     } else {
3070       caps = gst_base_src_fixate (basesrc, caps);
3071       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3072       if (gst_caps_is_fixed (caps)) {
3073         /* yay, fixed caps, use those then, it's possible that the subclass does
3074          * not accept this caps after all and we have to fail. */
3075         result = gst_base_src_set_caps (basesrc, caps);
3076       }
3077     }
3078     gst_caps_unref (caps);
3079   } else {
3080     if (caps)
3081       gst_caps_unref (caps);
3082     GST_DEBUG_OBJECT (basesrc, "no common caps");
3083   }
3084   return result;
3085
3086 no_nego_needed:
3087   {
3088     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3089     if (thiscaps)
3090       gst_caps_unref (thiscaps);
3091     return TRUE;
3092   }
3093 no_caps:
3094   {
3095     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3096         ("No supported formats found"),
3097         ("This element did not produce valid caps"));
3098     if (thiscaps)
3099       gst_caps_unref (thiscaps);
3100     return TRUE;
3101   }
3102 }
3103
3104 static gboolean
3105 gst_base_src_negotiate (GstBaseSrc * basesrc)
3106 {
3107   GstBaseSrcClass *bclass;
3108   gboolean result;
3109
3110   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3111
3112   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3113
3114   if (G_LIKELY (bclass->negotiate))
3115     result = bclass->negotiate (basesrc);
3116   else
3117     result = TRUE;
3118
3119   if (G_LIKELY (result)) {
3120     GstCaps *caps;
3121
3122     caps = gst_pad_get_current_caps (basesrc->srcpad);
3123
3124     result = gst_base_src_prepare_allocation (basesrc, caps);
3125
3126     if (caps)
3127       gst_caps_unref (caps);
3128   }
3129   return result;
3130 }
3131
3132 static gboolean
3133 gst_base_src_start (GstBaseSrc * basesrc)
3134 {
3135   GstBaseSrcClass *bclass;
3136   gboolean result;
3137
3138   GST_LIVE_LOCK (basesrc);
3139
3140   GST_OBJECT_LOCK (basesrc);
3141   if (GST_BASE_SRC_IS_STARTING (basesrc))
3142     goto was_starting;
3143   if (GST_BASE_SRC_IS_STARTED (basesrc))
3144     goto was_started;
3145
3146   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3147   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3148   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3149   GST_OBJECT_UNLOCK (basesrc);
3150
3151   basesrc->num_buffers_left = basesrc->num_buffers;
3152   basesrc->running = FALSE;
3153   basesrc->priv->segment_pending = FALSE;
3154   GST_LIVE_UNLOCK (basesrc);
3155
3156   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3157   if (bclass->start)
3158     result = bclass->start (basesrc);
3159   else
3160     result = TRUE;
3161
3162   if (!result)
3163     goto could_not_start;
3164
3165   if (!gst_base_src_is_async (basesrc)) {
3166     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3167     /* not really waiting here, we call this to get the result
3168      * from the start_complete call */
3169     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3170   }
3171
3172   return result;
3173
3174   /* ERROR */
3175 was_starting:
3176   {
3177     GST_DEBUG_OBJECT (basesrc, "was starting");
3178     GST_OBJECT_UNLOCK (basesrc);
3179     GST_LIVE_UNLOCK (basesrc);
3180     return TRUE;
3181   }
3182 was_started:
3183   {
3184     GST_DEBUG_OBJECT (basesrc, "was started");
3185     GST_OBJECT_UNLOCK (basesrc);
3186     GST_LIVE_UNLOCK (basesrc);
3187     return TRUE;
3188   }
3189 could_not_start:
3190   {
3191     GST_DEBUG_OBJECT (basesrc, "could not start");
3192     /* subclass is supposed to post a message. We don't have to call _stop. */
3193     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3194     return FALSE;
3195   }
3196 }
3197
3198 /**
3199  * gst_base_src_start_complete:
3200  * @basesrc: base source instance
3201  * @ret: a #GstFlowReturn
3202  *
3203  * Complete an asynchronous start operation. When the subclass overrides the
3204  * start method, it should call gst_base_src_start_complete() when the start
3205  * operation completes either from the same thread or from an asynchronous
3206  * helper thread.
3207  */
3208 void
3209 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3210 {
3211   gboolean have_size;
3212   guint64 size;
3213   gboolean seekable;
3214   GstFormat format;
3215   GstPadMode mode;
3216   GstEvent *event;
3217
3218   if (ret != GST_FLOW_OK)
3219     goto error;
3220
3221   GST_DEBUG_OBJECT (basesrc, "starting source");
3222   format = basesrc->segment.format;
3223
3224   /* figure out the size */
3225   have_size = FALSE;
3226   size = -1;
3227   if (format == GST_FORMAT_BYTES) {
3228     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3229
3230     if (bclass->get_size) {
3231       if (!(have_size = bclass->get_size (basesrc, &size)))
3232         size = -1;
3233     }
3234     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3235     /* only update the size when operating in bytes, subclass is supposed
3236      * to set duration in the start method for other formats */
3237     GST_OBJECT_LOCK (basesrc);
3238     basesrc->segment.duration = size;
3239     GST_OBJECT_UNLOCK (basesrc);
3240   }
3241
3242   GST_DEBUG_OBJECT (basesrc,
3243       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3244       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3245       basesrc->segment.duration);
3246
3247   seekable = gst_base_src_seekable (basesrc);
3248   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3249
3250   /* update for random access flag */
3251   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3252
3253   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3254
3255   /* stop flushing now but for live sources, still block in the LIVE lock when
3256    * we are not yet PLAYING */
3257   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3258
3259   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3260
3261   GST_OBJECT_LOCK (basesrc->srcpad);
3262   mode = GST_PAD_MODE (basesrc->srcpad);
3263   GST_OBJECT_UNLOCK (basesrc->srcpad);
3264
3265   /* take the stream lock here, we only want to let the task run when we have
3266    * set the STARTED flag */
3267   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3268   if (mode == GST_PAD_MODE_PUSH) {
3269     /* do initial seek, which will start the task */
3270     GST_OBJECT_LOCK (basesrc);
3271     event = basesrc->pending_seek;
3272     basesrc->pending_seek = NULL;
3273     GST_OBJECT_UNLOCK (basesrc);
3274
3275     /* The perform seek code will start the task when finished. We don't have to
3276      * unlock the streaming thread because it is not running yet */
3277     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3278       goto seek_failed;
3279
3280     if (event)
3281       gst_event_unref (event);
3282   } else {
3283     /* if not random_access, we cannot operate in pull mode for now */
3284     if (G_UNLIKELY (!basesrc->random_access))
3285       goto no_get_range;
3286   }
3287
3288   GST_OBJECT_LOCK (basesrc);
3289   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3290   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3291   basesrc->priv->start_result = ret;
3292   GST_ASYNC_SIGNAL (basesrc);
3293   GST_OBJECT_UNLOCK (basesrc);
3294
3295   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3296
3297   return;
3298
3299 seek_failed:
3300   {
3301     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3302     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3303     gst_base_src_stop (basesrc);
3304     if (event)
3305       gst_event_unref (event);
3306     ret = GST_FLOW_ERROR;
3307     goto error;
3308   }
3309 no_get_range:
3310   {
3311     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3312     gst_base_src_stop (basesrc);
3313     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3314     ret = GST_FLOW_ERROR;
3315     goto error;
3316   }
3317 error:
3318   {
3319     GST_OBJECT_LOCK (basesrc);
3320     basesrc->priv->start_result = ret;
3321     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3322     GST_ASYNC_SIGNAL (basesrc);
3323     GST_OBJECT_UNLOCK (basesrc);
3324     return;
3325   }
3326 }
3327
3328 /**
3329  * gst_base_src_start_wait:
3330  * @basesrc: base source instance
3331  *
3332  * Wait until the start operation completes.
3333  *
3334  * Returns: a #GstFlowReturn.
3335  */
3336 GstFlowReturn
3337 gst_base_src_start_wait (GstBaseSrc * basesrc)
3338 {
3339   GstFlowReturn result;
3340
3341   GST_OBJECT_LOCK (basesrc);
3342   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3343     GST_ASYNC_WAIT (basesrc);
3344   }
3345   result = basesrc->priv->start_result;
3346   GST_OBJECT_UNLOCK (basesrc);
3347
3348   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3349
3350   return result;
3351 }
3352
3353 static gboolean
3354 gst_base_src_stop (GstBaseSrc * basesrc)
3355 {
3356   GstBaseSrcClass *bclass;
3357   gboolean result = TRUE;
3358
3359   GST_DEBUG_OBJECT (basesrc, "stopping source");
3360
3361   /* flush all */
3362   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3363   /* stop the task */
3364   gst_pad_stop_task (basesrc->srcpad);
3365
3366   GST_OBJECT_LOCK (basesrc);
3367   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3368     goto was_stopped;
3369
3370   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3371   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3372   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3373   GST_ASYNC_SIGNAL (basesrc);
3374   GST_OBJECT_UNLOCK (basesrc);
3375
3376   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3377   if (bclass->stop)
3378     result = bclass->stop (basesrc);
3379
3380   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3381
3382   return result;
3383
3384 was_stopped:
3385   {
3386     GST_DEBUG_OBJECT (basesrc, "was stopped");
3387     GST_OBJECT_UNLOCK (basesrc);
3388     return TRUE;
3389   }
3390 }
3391
3392 /* start or stop flushing dataprocessing
3393  */
3394 static gboolean
3395 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3396     gboolean flushing, gboolean live_play, gboolean * playing)
3397 {
3398   GstBaseSrcClass *bclass;
3399
3400   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3401
3402   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3403
3404   if (flushing) {
3405     gst_base_src_activate_pool (basesrc, FALSE);
3406     /* unlock any subclasses, we need to do this before grabbing the
3407      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3408      * unlock to the params because of backwards compat (see seek handler)*/
3409     if (bclass->unlock)
3410       bclass->unlock (basesrc);
3411   }
3412
3413   /* the live lock is released when we are blocked, waiting for playing or
3414    * when we sync to the clock. */
3415   GST_LIVE_LOCK (basesrc);
3416   if (playing)
3417     *playing = basesrc->live_running;
3418   basesrc->priv->flushing = flushing;
3419   if (flushing) {
3420     /* if we are locked in the live lock, signal it to make it flush */
3421     basesrc->live_running = TRUE;
3422
3423     /* clear pending EOS if any */
3424     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3425
3426     /* step 1, now that we have the LIVE lock, clear our unlock request */
3427     if (bclass->unlock_stop)
3428       bclass->unlock_stop (basesrc);
3429
3430     /* step 2, unblock clock sync (if any) or any other blocking thing */
3431     if (basesrc->clock_id)
3432       gst_clock_id_unschedule (basesrc->clock_id);
3433   } else {
3434     /* signal the live source that it can start playing */
3435     basesrc->live_running = live_play;
3436
3437     gst_base_src_activate_pool (basesrc, TRUE);
3438
3439     /* Drop all delayed events */
3440     GST_OBJECT_LOCK (basesrc);
3441     if (basesrc->priv->pending_events) {
3442       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3443           NULL);
3444       g_list_free (basesrc->priv->pending_events);
3445       basesrc->priv->pending_events = NULL;
3446       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3447     }
3448     GST_OBJECT_UNLOCK (basesrc);
3449   }
3450   GST_LIVE_SIGNAL (basesrc);
3451   GST_LIVE_UNLOCK (basesrc);
3452
3453   return TRUE;
3454 }
3455
3456 /* the purpose of this function is to make sure that a live source blocks in the
3457  * LIVE lock or leaves the LIVE lock and continues playing. */
3458 static gboolean
3459 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3460 {
3461   GstBaseSrcClass *bclass;
3462
3463   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3464
3465   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3466   if (!live_play) {
3467     GST_DEBUG_OBJECT (basesrc, "unlock");
3468     if (bclass->unlock)
3469       bclass->unlock (basesrc);
3470   }
3471
3472   /* we are now able to grab the LIVE lock, when we get it, we can be
3473    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3474    * for the clock. */
3475   GST_LIVE_LOCK (basesrc);
3476   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3477
3478   /* unblock clock sync (if any) */
3479   if (basesrc->clock_id)
3480     gst_clock_id_unschedule (basesrc->clock_id);
3481
3482   /* configure what to do when we get to the LIVE lock. */
3483   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3484   basesrc->live_running = live_play;
3485
3486   if (live_play) {
3487     gboolean start;
3488
3489     /* clear our unlock request when going to PLAYING */
3490     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3491     if (bclass->unlock_stop)
3492       bclass->unlock_stop (basesrc);
3493
3494     /* for live sources we restart the timestamp correction */
3495     basesrc->priv->latency = -1;
3496     /* have to restart the task in case it stopped because of the unlock when
3497      * we went to PAUSED. Only do this if we operating in push mode. */
3498     GST_OBJECT_LOCK (basesrc->srcpad);
3499     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3500     GST_OBJECT_UNLOCK (basesrc->srcpad);
3501     if (start)
3502       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3503           basesrc->srcpad, NULL);
3504     GST_DEBUG_OBJECT (basesrc, "signal");
3505     GST_LIVE_SIGNAL (basesrc);
3506   }
3507   GST_LIVE_UNLOCK (basesrc);
3508
3509   return TRUE;
3510 }
3511
3512 static gboolean
3513 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3514 {
3515   GstBaseSrc *basesrc;
3516
3517   basesrc = GST_BASE_SRC (parent);
3518
3519   /* prepare subclass first */
3520   if (active) {
3521     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3522
3523     if (G_UNLIKELY (!basesrc->can_activate_push))
3524       goto no_push_activation;
3525
3526     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3527       goto error_start;
3528   } else {
3529     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3530     /* now we can stop the source */
3531     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3532       goto error_stop;
3533   }
3534   return TRUE;
3535
3536   /* ERRORS */
3537 no_push_activation:
3538   {
3539     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3540     return FALSE;
3541   }
3542 error_start:
3543   {
3544     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3545     return FALSE;
3546   }
3547 error_stop:
3548   {
3549     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3550     return FALSE;
3551   }
3552 }
3553
3554 static gboolean
3555 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3556 {
3557   GstBaseSrc *basesrc;
3558
3559   basesrc = GST_BASE_SRC (parent);
3560
3561   /* prepare subclass first */
3562   if (active) {
3563     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3564     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3565       goto error_start;
3566   } else {
3567     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3568     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3569       goto error_stop;
3570   }
3571   return TRUE;
3572
3573   /* ERRORS */
3574 error_start:
3575   {
3576     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3577     return FALSE;
3578   }
3579 error_stop:
3580   {
3581     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3582     return FALSE;
3583   }
3584 }
3585
3586 static gboolean
3587 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3588     GstPadMode mode, gboolean active)
3589 {
3590   gboolean res;
3591   GstBaseSrc *src = GST_BASE_SRC (parent);
3592
3593   src->priv->stream_start_pending = FALSE;
3594
3595   switch (mode) {
3596     case GST_PAD_MODE_PULL:
3597       res = gst_base_src_activate_pull (pad, parent, active);
3598       break;
3599     case GST_PAD_MODE_PUSH:
3600       src->priv->stream_start_pending = active;
3601       res = gst_base_src_activate_push (pad, parent, active);
3602       break;
3603     default:
3604       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3605       res = FALSE;
3606       break;
3607   }
3608   return res;
3609 }
3610
3611
3612 static GstStateChangeReturn
3613 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3614 {
3615   GstBaseSrc *basesrc;
3616   GstStateChangeReturn result;
3617   gboolean no_preroll = FALSE;
3618
3619   basesrc = GST_BASE_SRC (element);
3620
3621   switch (transition) {
3622     case GST_STATE_CHANGE_NULL_TO_READY:
3623       break;
3624     case GST_STATE_CHANGE_READY_TO_PAUSED:
3625       no_preroll = gst_base_src_is_live (basesrc);
3626       break;
3627     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3628       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3629       if (gst_base_src_is_live (basesrc)) {
3630         /* now we can start playback */
3631         gst_base_src_set_playing (basesrc, TRUE);
3632       }
3633       break;
3634     default:
3635       break;
3636   }
3637
3638   if ((result =
3639           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3640               transition)) == GST_STATE_CHANGE_FAILURE)
3641     goto failure;
3642
3643   switch (transition) {
3644     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3645       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3646       if (gst_base_src_is_live (basesrc)) {
3647         /* make sure we block in the live lock in PAUSED */
3648         gst_base_src_set_playing (basesrc, FALSE);
3649         no_preroll = TRUE;
3650       }
3651       break;
3652     case GST_STATE_CHANGE_PAUSED_TO_READY:
3653     {
3654       /* we don't need to unblock anything here, the pad deactivation code
3655        * already did this */
3656       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3657       gst_event_replace (&basesrc->pending_seek, NULL);
3658       break;
3659     }
3660     case GST_STATE_CHANGE_READY_TO_NULL:
3661       break;
3662     default:
3663       break;
3664   }
3665
3666   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3667     result = GST_STATE_CHANGE_NO_PREROLL;
3668
3669   return result;
3670
3671   /* ERRORS */
3672 failure:
3673   {
3674     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3675     return result;
3676   }
3677 }
3678
3679 /**
3680  * gst_base_src_get_buffer_pool:
3681  * @src: a #GstBaseSrc
3682  *
3683  * Returns: (transfer full): the instance of the #GstBufferPool used
3684  * by the src; free it after use it
3685  */
3686 GstBufferPool *
3687 gst_base_src_get_buffer_pool (GstBaseSrc * src)
3688 {
3689   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
3690
3691   if (src->priv->pool)
3692     return gst_object_ref (src->priv->pool);
3693
3694   return NULL;
3695 }
3696
3697 /**
3698  * gst_base_src_get_allocator:
3699  * @src: a #GstBaseSrc
3700  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3701  * used
3702  * @params: (out) (allow-none) (transfer full): the
3703  * #GstAllocatorParams of @allocator
3704  *
3705  * Lets #GstBaseSrc sub-classes to know the memory @allocator
3706  * used by the base class and its @params.
3707  *
3708  * Unref the @allocator after use it.
3709  */
3710 void
3711 gst_base_src_get_allocator (GstBaseSrc * src,
3712     GstAllocator ** allocator, GstAllocationParams * params)
3713 {
3714   g_return_if_fail (GST_IS_BASE_SRC (src));
3715
3716   if (allocator)
3717     *allocator = src->priv->allocator ?
3718         gst_object_ref (src->priv->allocator) : NULL;
3719
3720   if (params)
3721     *params = src->priv->params;
3722 }