984542d8cace0cff1ad71d5e9addc218ff082465
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_SEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *
125  *   gst_element_class_set_static_metadata (gstelement_class,
126  *      "Source name",
127  *      "Source",
128  *      "My Source element",
129  *      "The author &lt;my.sink@my.email&gt;");
130  * }
131  * ]|
132  *
133  * <refsect2>
134  * <title>Controlled shutdown of live sources in applications</title>
135  * <para>
136  * Applications that record from a live source may want to stop recording
137  * in a controlled way, so that the recording is stopped, but the data
138  * already in the pipeline is processed to the end (remember that many live
139  * sources would go on recording forever otherwise). For that to happen the
140  * application needs to make the source stop recording and send an EOS
141  * event down the pipeline. The application would then wait for an
142  * EOS message posted on the pipeline's bus to know when all data has
143  * been processed and the pipeline can safely be stopped.
144  *
145  * An application may send an EOS event to a source element to make it
146  * perform the EOS logic (send EOS event downstream or post a
147  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
148  * with the gst_element_send_event() function on the element or its parent bin.
149  *
150  * After the EOS has been sent to the element, the application should wait for
151  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
152  * received, it may safely shut down the entire pipeline.
153  *
154  * Last reviewed on 2007-12-19 (0.10.16)
155  * </para>
156  * </refsect2>
157  */
158
159 #ifdef HAVE_CONFIG_H
160 #  include "config.h"
161 #endif
162
163 #include <stdlib.h>
164 #include <string.h>
165
166 #include <gst/gst_private.h>
167 #include <gst/glib-compat-private.h>
168
169 #include "gstbasesrc.h"
170 #include "gsttypefindhelper.h"
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
183 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
184 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
185
186
187 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
188 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
189 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
190
191
192 /* BaseSrc signals and args */
193 enum
194 {
195   /* FILL ME */
196   LAST_SIGNAL
197 };
198
199 #define DEFAULT_BLOCKSIZE       4096
200 #define DEFAULT_NUM_BUFFERS     -1
201 #define DEFAULT_TYPEFIND        FALSE
202 #define DEFAULT_DO_TIMESTAMP    FALSE
203
204 enum
205 {
206   PROP_0,
207   PROP_BLOCKSIZE,
208   PROP_NUM_BUFFERS,
209   PROP_TYPEFIND,
210   PROP_DO_TIMESTAMP
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean discont;
219   gboolean flushing;
220
221   GstFlowReturn start_result;
222   gboolean async;
223
224   /* if a stream-start event should be sent */
225   gboolean stream_start_pending;
226
227   /* if segment should be sent */
228   gboolean segment_pending;
229
230   /* if EOS is pending (atomic) */
231   gint pending_eos;
232
233   /* startup latency is the time it takes between going to PLAYING and producing
234    * the first BUFFER with running_time 0. This value is included in the latency
235    * reporting. */
236   GstClockTime latency;
237   /* timestamp offset, this is the offset add to the values of gst_times for
238    * pseudo live sources */
239   GstClockTimeDiff ts_offset;
240
241   gboolean do_timestamp;
242   volatile gint dynamic_size;
243
244   /* stream sequence number */
245   guint32 seqnum;
246
247   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
248    * pushed in the data stream */
249   GList *pending_events;
250   volatile gint have_events;
251
252   /* QoS *//* with LOCK */
253   gboolean qos_enabled;
254   gdouble proportion;
255   GstClockTime earliest_time;
256
257   GstBufferPool *pool;
258   GstAllocator *allocator;
259   GstAllocationParams params;
260
261   GCond async_cond;
262 };
263
264 static GstElementClass *parent_class = NULL;
265
266 static void gst_base_src_class_init (GstBaseSrcClass * klass);
267 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
268 static void gst_base_src_finalize (GObject * object);
269
270
271 GType
272 gst_base_src_get_type (void)
273 {
274   static volatile gsize base_src_type = 0;
275
276   if (g_once_init_enter (&base_src_type)) {
277     GType _type;
278     static const GTypeInfo base_src_info = {
279       sizeof (GstBaseSrcClass),
280       NULL,
281       NULL,
282       (GClassInitFunc) gst_base_src_class_init,
283       NULL,
284       NULL,
285       sizeof (GstBaseSrc),
286       0,
287       (GInstanceInitFunc) gst_base_src_init,
288     };
289
290     _type = g_type_register_static (GST_TYPE_ELEMENT,
291         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
292     g_once_init_leave (&base_src_type, _type);
293   }
294   return base_src_type;
295 }
296
297 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
298     GstCaps * filter);
299 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
300 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
301
302 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
303 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
304     GstPadMode mode, gboolean active);
305 static void gst_base_src_set_property (GObject * object, guint prop_id,
306     const GValue * value, GParamSpec * pspec);
307 static void gst_base_src_get_property (GObject * object, guint prop_id,
308     GValue * value, GParamSpec * pspec);
309 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
310     GstEvent * event);
311 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
312 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
313
314 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
315     GstQuery * query);
316
317 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
318     gboolean active);
319 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
321     GstSegment * segment);
322 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
323 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
324     GstEvent * event, GstSegment * segment);
325 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
326     guint64 offset, guint size, GstBuffer ** buf);
327 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
328     guint64 offset, guint size, GstBuffer ** buf);
329 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
330     GstQuery * query);
331
332 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
333     gboolean flushing, gboolean live_play, gboolean * playing);
334
335 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
337
338 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
339     GstStateChange transition);
340
341 static void gst_base_src_loop (GstPad * pad);
342 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
343     guint64 offset, guint length, GstBuffer ** buf);
344 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
345     guint length, GstBuffer ** buf);
346 static gboolean gst_base_src_seekable (GstBaseSrc * src);
347 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
348 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
349     guint * length);
350
351 static void
352 gst_base_src_class_init (GstBaseSrcClass * klass)
353 {
354   GObjectClass *gobject_class;
355   GstElementClass *gstelement_class;
356
357   gobject_class = G_OBJECT_CLASS (klass);
358   gstelement_class = GST_ELEMENT_CLASS (klass);
359
360   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
361
362   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
363
364   parent_class = g_type_class_peek_parent (klass);
365
366   gobject_class->finalize = gst_base_src_finalize;
367   gobject_class->set_property = gst_base_src_set_property;
368   gobject_class->get_property = gst_base_src_get_property;
369
370   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
371       g_param_spec_uint ("blocksize", "Block size",
372           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
373           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
375       g_param_spec_int ("num-buffers", "num-buffers",
376           "Number of buffers to output before sending EOS (-1 = unlimited)",
377           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
378           G_PARAM_STATIC_STRINGS));
379   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
380       g_param_spec_boolean ("typefind", "Typefind",
381           "Run typefind before negotiating", DEFAULT_TYPEFIND,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
384       g_param_spec_boolean ("do-timestamp", "Do timestamp",
385           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   gstelement_class->change_state =
389       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
390   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
391
392   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
393   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
394   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
395   klass->prepare_seek_segment =
396       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
397   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
398   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
399   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
400   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
401   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
402   klass->decide_allocation =
403       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
404
405   /* Registering debug symbols for function pointers */
406   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
407   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
408   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
409   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
410   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
411 }
412
413 static void
414 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
415 {
416   GstPad *pad;
417   GstPadTemplate *pad_template;
418
419   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
420
421   basesrc->is_live = FALSE;
422   g_mutex_init (&basesrc->live_lock);
423   g_cond_init (&basesrc->live_cond);
424   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
425   basesrc->num_buffers_left = -1;
426
427   basesrc->can_activate_push = TRUE;
428
429   pad_template =
430       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
431   g_return_if_fail (pad_template != NULL);
432
433   GST_DEBUG_OBJECT (basesrc, "creating src pad");
434   pad = gst_pad_new_from_template (pad_template, "src");
435
436   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
437   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
438   gst_pad_set_event_function (pad, gst_base_src_event);
439   gst_pad_set_query_function (pad, gst_base_src_query);
440   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
441
442   /* hold pointer to pad */
443   basesrc->srcpad = pad;
444   GST_DEBUG_OBJECT (basesrc, "adding src pad");
445   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
446
447   basesrc->blocksize = DEFAULT_BLOCKSIZE;
448   basesrc->clock_id = NULL;
449   /* we operate in BYTES by default */
450   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
451   basesrc->typefind = DEFAULT_TYPEFIND;
452   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
453   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
454
455   g_cond_init (&basesrc->priv->async_cond);
456   basesrc->priv->start_result = GST_FLOW_FLUSHING;
457   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
458   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
459   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
460
461   GST_DEBUG_OBJECT (basesrc, "init done");
462 }
463
464 static void
465 gst_base_src_finalize (GObject * object)
466 {
467   GstBaseSrc *basesrc;
468   GstEvent **event_p;
469
470   basesrc = GST_BASE_SRC (object);
471
472   g_mutex_clear (&basesrc->live_lock);
473   g_cond_clear (&basesrc->live_cond);
474   g_cond_clear (&basesrc->priv->async_cond);
475
476   event_p = &basesrc->pending_seek;
477   gst_event_replace (event_p, NULL);
478
479   if (basesrc->priv->pending_events) {
480     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
481         NULL);
482     g_list_free (basesrc->priv->pending_events);
483   }
484
485   G_OBJECT_CLASS (parent_class)->finalize (object);
486 }
487
488 /**
489  * gst_base_src_wait_playing:
490  * @src: the src
491  *
492  * If the #GstBaseSrcClass.create() method performs its own synchronisation
493  * against the clock it must unblock when going from PLAYING to the PAUSED state
494  * and call this method before continuing to produce the remaining data.
495  *
496  * This function will block until a state change to PLAYING happens (in which
497  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
498  * to a state change to READY or a FLUSH event (in which case this function
499  * returns #GST_FLOW_FLUSHING).
500  *
501  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
502  * continue. Any other return value should be returned from the create vmethod.
503  */
504 GstFlowReturn
505 gst_base_src_wait_playing (GstBaseSrc * src)
506 {
507   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
508
509   do {
510     /* block until the state changes, or we get a flush, or something */
511     GST_DEBUG_OBJECT (src, "live source waiting for running state");
512     GST_LIVE_WAIT (src);
513     GST_DEBUG_OBJECT (src, "live source unlocked");
514     if (src->priv->flushing)
515       goto flushing;
516   } while (G_UNLIKELY (!src->live_running));
517
518   return GST_FLOW_OK;
519
520   /* ERRORS */
521 flushing:
522   {
523     GST_DEBUG_OBJECT (src, "we are flushing");
524     return GST_FLOW_FLUSHING;
525   }
526 }
527
528 /**
529  * gst_base_src_set_live:
530  * @src: base source instance
531  * @live: new live-mode
532  *
533  * If the element listens to a live source, @live should
534  * be set to %TRUE.
535  *
536  * A live source will not produce data in the PAUSED state and
537  * will therefore not be able to participate in the PREROLL phase
538  * of a pipeline. To signal this fact to the application and the
539  * pipeline, the state change return value of the live source will
540  * be GST_STATE_CHANGE_NO_PREROLL.
541  */
542 void
543 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
544 {
545   g_return_if_fail (GST_IS_BASE_SRC (src));
546
547   GST_OBJECT_LOCK (src);
548   src->is_live = live;
549   GST_OBJECT_UNLOCK (src);
550 }
551
552 /**
553  * gst_base_src_is_live:
554  * @src: base source instance
555  *
556  * Check if an element is in live mode.
557  *
558  * Returns: %TRUE if element is in live mode.
559  */
560 gboolean
561 gst_base_src_is_live (GstBaseSrc * src)
562 {
563   gboolean result;
564
565   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
566
567   GST_OBJECT_LOCK (src);
568   result = src->is_live;
569   GST_OBJECT_UNLOCK (src);
570
571   return result;
572 }
573
574 /**
575  * gst_base_src_set_format:
576  * @src: base source instance
577  * @format: the format to use
578  *
579  * Sets the default format of the source. This will be the format used
580  * for sending SEGMENT events and for performing seeks.
581  *
582  * If a format of GST_FORMAT_BYTES is set, the element will be able to
583  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
584  *
585  * This function must only be called in states < %GST_STATE_PAUSED.
586  */
587 void
588 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
589 {
590   g_return_if_fail (GST_IS_BASE_SRC (src));
591   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
592
593   GST_OBJECT_LOCK (src);
594   gst_segment_init (&src->segment, format);
595   GST_OBJECT_UNLOCK (src);
596 }
597
598 /**
599  * gst_base_src_set_dynamic_size:
600  * @src: base source instance
601  * @dynamic: new dynamic size mode
602  *
603  * If not @dynamic, size is only updated when needed, such as when trying to
604  * read past current tracked size.  Otherwise, size is checked for upon each
605  * read.
606  */
607 void
608 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
609 {
610   g_return_if_fail (GST_IS_BASE_SRC (src));
611
612   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
613 }
614
615 /**
616  * gst_base_src_set_async:
617  * @src: base source instance
618  * @async: new async mode
619  *
620  * Configure async behaviour in @src, no state change will block. The open,
621  * close, start, stop, play and pause virtual methods will be executed in a
622  * different thread and are thus allowed to perform blocking operations. Any
623  * blocking operation should be unblocked with the unlock vmethod.
624  */
625 void
626 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
627 {
628   g_return_if_fail (GST_IS_BASE_SRC (src));
629
630   GST_OBJECT_LOCK (src);
631   src->priv->async = async;
632   GST_OBJECT_UNLOCK (src);
633 }
634
635 /**
636  * gst_base_src_is_async:
637  * @src: base source instance
638  *
639  * Get the current async behaviour of @src. See also gst_base_src_set_async().
640  *
641  * Returns: %TRUE if @src is operating in async mode.
642  */
643 gboolean
644 gst_base_src_is_async (GstBaseSrc * src)
645 {
646   gboolean res;
647
648   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
649
650   GST_OBJECT_LOCK (src);
651   res = src->priv->async;
652   GST_OBJECT_UNLOCK (src);
653
654   return res;
655 }
656
657
658 /**
659  * gst_base_src_query_latency:
660  * @src: the source
661  * @live: (out) (allow-none): if the source is live
662  * @min_latency: (out) (allow-none): the min latency of the source
663  * @max_latency: (out) (allow-none): the max latency of the source
664  *
665  * Query the source for the latency parameters. @live will be TRUE when @src is
666  * configured as a live source. @min_latency will be set to the difference
667  * between the running time and the timestamp of the first buffer.
668  * @max_latency is always the undefined value of -1.
669  *
670  * This function is mostly used by subclasses.
671  *
672  * Returns: TRUE if the query succeeded.
673  */
674 gboolean
675 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
676     GstClockTime * min_latency, GstClockTime * max_latency)
677 {
678   GstClockTime min;
679
680   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
681
682   GST_OBJECT_LOCK (src);
683   if (live)
684     *live = src->is_live;
685
686   /* if we have a startup latency, report this one, else report 0. Subclasses
687    * are supposed to override the query function if they want something
688    * else. */
689   if (src->priv->latency != -1)
690     min = src->priv->latency;
691   else
692     min = 0;
693
694   if (min_latency)
695     *min_latency = min;
696   if (max_latency)
697     *max_latency = -1;
698
699   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
700       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
701       GST_TIME_ARGS (-1));
702   GST_OBJECT_UNLOCK (src);
703
704   return TRUE;
705 }
706
707 /**
708  * gst_base_src_set_blocksize:
709  * @src: the source
710  * @blocksize: the new blocksize in bytes
711  *
712  * Set the number of bytes that @src will push out with each buffer. When
713  * @blocksize is set to -1, a default length will be used.
714  */
715 void
716 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
717 {
718   g_return_if_fail (GST_IS_BASE_SRC (src));
719
720   GST_OBJECT_LOCK (src);
721   src->blocksize = blocksize;
722   GST_OBJECT_UNLOCK (src);
723 }
724
725 /**
726  * gst_base_src_get_blocksize:
727  * @src: the source
728  *
729  * Get the number of bytes that @src will push out with each buffer.
730  *
731  * Returns: the number of bytes pushed with each buffer.
732  */
733 guint
734 gst_base_src_get_blocksize (GstBaseSrc * src)
735 {
736   gint res;
737
738   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
739
740   GST_OBJECT_LOCK (src);
741   res = src->blocksize;
742   GST_OBJECT_UNLOCK (src);
743
744   return res;
745 }
746
747
748 /**
749  * gst_base_src_set_do_timestamp:
750  * @src: the source
751  * @timestamp: enable or disable timestamping
752  *
753  * Configure @src to automatically timestamp outgoing buffers based on the
754  * current running_time of the pipeline. This property is mostly useful for live
755  * sources.
756  */
757 void
758 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
759 {
760   g_return_if_fail (GST_IS_BASE_SRC (src));
761
762   GST_OBJECT_LOCK (src);
763   src->priv->do_timestamp = timestamp;
764   GST_OBJECT_UNLOCK (src);
765 }
766
767 /**
768  * gst_base_src_get_do_timestamp:
769  * @src: the source
770  *
771  * Query if @src timestamps outgoing buffers based on the current running_time.
772  *
773  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
774  */
775 gboolean
776 gst_base_src_get_do_timestamp (GstBaseSrc * src)
777 {
778   gboolean res;
779
780   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
781
782   GST_OBJECT_LOCK (src);
783   res = src->priv->do_timestamp;
784   GST_OBJECT_UNLOCK (src);
785
786   return res;
787 }
788
789 /**
790  * gst_base_src_new_seamless_segment:
791  * @src: The source
792  * @start: The new start value for the segment
793  * @stop: Stop value for the new segment
794  * @time: The new time value for the start of the new segent
795  *
796  * Prepare a new seamless segment for emission downstream. This function must
797  * only be called by derived sub-classes, and only from the create() function,
798  * as the stream-lock needs to be held.
799  *
800  * The format for the new segment will be the current format of the source, as
801  * configured with gst_base_src_set_format()
802  *
803  * Returns: %TRUE if preparation of the seamless segment succeeded.
804  */
805 gboolean
806 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
807     gint64 time)
808 {
809   gboolean res = TRUE;
810
811   GST_OBJECT_LOCK (src);
812
813   src->segment.base = gst_segment_to_running_time (&src->segment,
814       src->segment.format, src->segment.position);
815   src->segment.position = src->segment.start = start;
816   src->segment.stop = stop;
817   src->segment.time = time;
818
819   /* Mark pending segment. Will be sent before next data */
820   src->priv->segment_pending = TRUE;
821
822   GST_DEBUG_OBJECT (src,
823       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
824       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
825       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
826       GST_TIME_ARGS (src->segment.base));
827
828   GST_OBJECT_UNLOCK (src);
829
830   src->priv->discont = TRUE;
831   src->running = TRUE;
832
833   return res;
834 }
835
836 static gboolean
837 gst_base_src_send_stream_start (GstBaseSrc * src)
838 {
839   gboolean ret = TRUE;
840
841   if (src->priv->stream_start_pending) {
842     gchar *stream_id;
843
844     stream_id =
845         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
846
847     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
848     ret =
849         gst_pad_push_event (src->srcpad,
850         gst_event_new_stream_start (stream_id));
851     src->priv->stream_start_pending = FALSE;
852     g_free (stream_id);
853   }
854
855   return ret;
856 }
857
858 /**
859  * gst_base_src_set_caps:
860  * @src: a #GstBaseSrc
861  * @caps: a #GstCaps
862  *
863  * Set new caps on the basesrc source pad.
864  *
865  * Returns: %TRUE if the caps could be set
866  */
867 gboolean
868 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
869 {
870   GstBaseSrcClass *bclass;
871   gboolean res = TRUE;
872
873   bclass = GST_BASE_SRC_GET_CLASS (src);
874
875   gst_base_src_send_stream_start (src);
876
877   if (bclass->set_caps)
878     res = bclass->set_caps (src, caps);
879
880   if (res)
881     res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
882
883   return res;
884 }
885
886 static GstCaps *
887 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
888 {
889   GstCaps *caps = NULL;
890   GstPadTemplate *pad_template;
891   GstBaseSrcClass *bclass;
892
893   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
894
895   pad_template =
896       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
897
898   if (pad_template != NULL) {
899     caps = gst_pad_template_get_caps (pad_template);
900
901     if (filter) {
902       GstCaps *intersection;
903
904       intersection =
905           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
906       gst_caps_unref (caps);
907       caps = intersection;
908     }
909   }
910   return caps;
911 }
912
913 static GstCaps *
914 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
915 {
916   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
917   return gst_caps_fixate (caps);
918 }
919
920 static GstCaps *
921 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
922 {
923   GstBaseSrcClass *bclass;
924
925   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
926
927   if (bclass->fixate)
928     caps = bclass->fixate (bsrc, caps);
929
930   return caps;
931 }
932
933 static gboolean
934 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
935 {
936   gboolean res;
937
938   switch (GST_QUERY_TYPE (query)) {
939     case GST_QUERY_POSITION:
940     {
941       GstFormat format;
942
943       gst_query_parse_position (query, &format, NULL);
944
945       GST_DEBUG_OBJECT (src, "position query in format %s",
946           gst_format_get_name (format));
947
948       switch (format) {
949         case GST_FORMAT_PERCENT:
950         {
951           gint64 percent;
952           gint64 position;
953           gint64 duration;
954
955           GST_OBJECT_LOCK (src);
956           position = src->segment.position;
957           duration = src->segment.duration;
958           GST_OBJECT_UNLOCK (src);
959
960           if (position != -1 && duration != -1) {
961             if (position < duration)
962               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
963                   duration);
964             else
965               percent = GST_FORMAT_PERCENT_MAX;
966           } else
967             percent = -1;
968
969           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
970           res = TRUE;
971           break;
972         }
973         default:
974         {
975           gint64 position;
976           GstFormat seg_format;
977
978           GST_OBJECT_LOCK (src);
979           position =
980               gst_segment_to_stream_time (&src->segment, src->segment.format,
981               src->segment.position);
982           seg_format = src->segment.format;
983           GST_OBJECT_UNLOCK (src);
984
985           if (position != -1) {
986             /* convert to requested format */
987             res =
988                 gst_pad_query_convert (src->srcpad, seg_format,
989                 position, format, &position);
990           } else
991             res = TRUE;
992
993           gst_query_set_position (query, format, position);
994           break;
995         }
996       }
997       break;
998     }
999     case GST_QUERY_DURATION:
1000     {
1001       GstFormat format;
1002
1003       gst_query_parse_duration (query, &format, NULL);
1004
1005       GST_DEBUG_OBJECT (src, "duration query in format %s",
1006           gst_format_get_name (format));
1007
1008       switch (format) {
1009         case GST_FORMAT_PERCENT:
1010           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1011               GST_FORMAT_PERCENT_MAX);
1012           res = TRUE;
1013           break;
1014         default:
1015         {
1016           gint64 duration;
1017           GstFormat seg_format;
1018           guint length = 0;
1019
1020           /* may have to refresh duration */
1021           if (g_atomic_int_get (&src->priv->dynamic_size))
1022             gst_base_src_update_length (src, 0, &length);
1023
1024           /* this is the duration as configured by the subclass. */
1025           GST_OBJECT_LOCK (src);
1026           duration = src->segment.duration;
1027           seg_format = src->segment.format;
1028           GST_OBJECT_UNLOCK (src);
1029
1030           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1031               duration, gst_format_get_name (seg_format));
1032
1033           if (duration != -1) {
1034             /* convert to requested format, if this fails, we have a duration
1035              * but we cannot answer the query, we must return FALSE. */
1036             res =
1037                 gst_pad_query_convert (src->srcpad, seg_format,
1038                 duration, format, &duration);
1039           } else {
1040             /* The subclass did not configure a duration, we assume that the
1041              * media has an unknown duration then and we return TRUE to report
1042              * this. Note that this is not the same as returning FALSE, which
1043              * means that we cannot report the duration at all. */
1044             res = TRUE;
1045           }
1046           gst_query_set_duration (query, format, duration);
1047           break;
1048         }
1049       }
1050       break;
1051     }
1052
1053     case GST_QUERY_SEEKING:
1054     {
1055       GstFormat format, seg_format;
1056       gint64 duration;
1057
1058       GST_OBJECT_LOCK (src);
1059       duration = src->segment.duration;
1060       seg_format = src->segment.format;
1061       GST_OBJECT_UNLOCK (src);
1062
1063       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1064       if (format == seg_format) {
1065         gst_query_set_seeking (query, seg_format,
1066             gst_base_src_seekable (src), 0, duration);
1067         res = TRUE;
1068       } else {
1069         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1070         /* Don't reply to the query to make up for demuxers which don't
1071          * handle the SEEKING query yet. Players like Totem will fall back
1072          * to the duration when the SEEKING query isn't answered. */
1073         res = FALSE;
1074       }
1075       break;
1076     }
1077     case GST_QUERY_SEGMENT:
1078     {
1079       gint64 start, stop;
1080
1081       GST_OBJECT_LOCK (src);
1082       /* no end segment configured, current duration then */
1083       if ((stop = src->segment.stop) == -1)
1084         stop = src->segment.duration;
1085       start = src->segment.start;
1086
1087       /* adjust to stream time */
1088       if (src->segment.time != -1) {
1089         start -= src->segment.time;
1090         if (stop != -1)
1091           stop -= src->segment.time;
1092       }
1093
1094       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1095           start, stop);
1096       GST_OBJECT_UNLOCK (src);
1097       res = TRUE;
1098       break;
1099     }
1100
1101     case GST_QUERY_FORMATS:
1102     {
1103       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1104           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1105       res = TRUE;
1106       break;
1107     }
1108     case GST_QUERY_CONVERT:
1109     {
1110       GstFormat src_fmt, dest_fmt;
1111       gint64 src_val, dest_val;
1112
1113       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1114
1115       /* we can only convert between equal formats... */
1116       if (src_fmt == dest_fmt) {
1117         dest_val = src_val;
1118         res = TRUE;
1119       } else
1120         res = FALSE;
1121
1122       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1123       break;
1124     }
1125     case GST_QUERY_LATENCY:
1126     {
1127       GstClockTime min, max;
1128       gboolean live;
1129
1130       /* Subclasses should override and implement something useful */
1131       res = gst_base_src_query_latency (src, &live, &min, &max);
1132
1133       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1134           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1135           GST_TIME_ARGS (max));
1136
1137       gst_query_set_latency (query, live, min, max);
1138       break;
1139     }
1140     case GST_QUERY_JITTER:
1141     case GST_QUERY_RATE:
1142       res = FALSE;
1143       break;
1144     case GST_QUERY_BUFFERING:
1145     {
1146       GstFormat format, seg_format;
1147       gint64 start, stop, estimated;
1148
1149       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1150
1151       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1152           gst_format_get_name (format));
1153
1154       GST_OBJECT_LOCK (src);
1155       if (src->random_access) {
1156         estimated = 0;
1157         start = 0;
1158         if (format == GST_FORMAT_PERCENT)
1159           stop = GST_FORMAT_PERCENT_MAX;
1160         else
1161           stop = src->segment.duration;
1162       } else {
1163         estimated = -1;
1164         start = -1;
1165         stop = -1;
1166       }
1167       seg_format = src->segment.format;
1168       GST_OBJECT_UNLOCK (src);
1169
1170       /* convert to required format. When the conversion fails, we can't answer
1171        * the query. When the value is unknown, we can don't perform conversion
1172        * but report TRUE. */
1173       if (format != GST_FORMAT_PERCENT && stop != -1) {
1174         res = gst_pad_query_convert (src->srcpad, seg_format,
1175             stop, format, &stop);
1176       } else {
1177         res = TRUE;
1178       }
1179       if (res && format != GST_FORMAT_PERCENT && start != -1)
1180         res = gst_pad_query_convert (src->srcpad, seg_format,
1181             start, format, &start);
1182
1183       gst_query_set_buffering_range (query, format, start, stop, estimated);
1184       break;
1185     }
1186     case GST_QUERY_SCHEDULING:
1187     {
1188       gboolean random_access;
1189
1190       random_access = gst_base_src_is_random_access (src);
1191
1192       /* we can operate in getrange mode if the native format is bytes
1193        * and we are seekable, this condition is set in the random_access
1194        * flag and is set in the _start() method. */
1195       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1196       if (random_access)
1197         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1198       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1199
1200       res = TRUE;
1201       break;
1202     }
1203     case GST_QUERY_CAPS:
1204     {
1205       GstBaseSrcClass *bclass;
1206       GstCaps *caps, *filter;
1207
1208       bclass = GST_BASE_SRC_GET_CLASS (src);
1209       if (bclass->get_caps) {
1210         gst_query_parse_caps (query, &filter);
1211         if ((caps = bclass->get_caps (src, filter))) {
1212           gst_query_set_caps_result (query, caps);
1213           gst_caps_unref (caps);
1214           res = TRUE;
1215         } else {
1216           res = FALSE;
1217         }
1218       } else
1219         res = FALSE;
1220       break;
1221     }
1222     case GST_QUERY_URI:{
1223       if (GST_IS_URI_HANDLER (src)) {
1224         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1225
1226         if (uri != NULL) {
1227           gst_query_set_uri (query, uri);
1228           g_free (uri);
1229           res = TRUE;
1230         } else {
1231           res = FALSE;
1232         }
1233       } else {
1234         res = FALSE;
1235       }
1236       break;
1237     }
1238     default:
1239       res = FALSE;
1240       break;
1241   }
1242   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1243       res);
1244
1245   return res;
1246 }
1247
1248 static gboolean
1249 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1250 {
1251   GstBaseSrc *src;
1252   GstBaseSrcClass *bclass;
1253   gboolean result = FALSE;
1254
1255   src = GST_BASE_SRC (parent);
1256   bclass = GST_BASE_SRC_GET_CLASS (src);
1257
1258   if (bclass->query)
1259     result = bclass->query (src, query);
1260
1261   return result;
1262 }
1263
1264 static gboolean
1265 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1266 {
1267   gboolean res = TRUE;
1268
1269   /* update our offset if the start/stop position was updated */
1270   if (segment->format == GST_FORMAT_BYTES) {
1271     segment->time = segment->start;
1272   } else if (segment->start == 0) {
1273     /* seek to start, we can implement a default for this. */
1274     segment->time = 0;
1275   } else {
1276     res = FALSE;
1277     GST_INFO_OBJECT (src, "Can't do a default seek");
1278   }
1279
1280   return res;
1281 }
1282
1283 static gboolean
1284 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1285 {
1286   GstBaseSrcClass *bclass;
1287   gboolean result = FALSE;
1288
1289   bclass = GST_BASE_SRC_GET_CLASS (src);
1290
1291   GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
1292
1293   if (bclass->do_seek)
1294     result = bclass->do_seek (src, segment);
1295
1296   return result;
1297 }
1298
1299 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1300
1301 static gboolean
1302 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1303     GstSegment * segment)
1304 {
1305   /* By default, we try one of 2 things:
1306    *   - For absolute seek positions, convert the requested position to our
1307    *     configured processing format and place it in the output segment \
1308    *   - For relative seek positions, convert our current (input) values to the
1309    *     seek format, adjust by the relative seek offset and then convert back to
1310    *     the processing format
1311    */
1312   GstSeekType start_type, stop_type;
1313   gint64 start, stop;
1314   GstSeekFlags flags;
1315   GstFormat seek_format, dest_format;
1316   gdouble rate;
1317   gboolean update;
1318   gboolean res = TRUE;
1319
1320   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1321       &start_type, &start, &stop_type, &stop);
1322   dest_format = segment->format;
1323
1324   if (seek_format == dest_format) {
1325     gst_segment_do_seek (segment, rate, seek_format, flags,
1326         start_type, start, stop_type, stop, &update);
1327     return TRUE;
1328   }
1329
1330   if (start_type != GST_SEEK_TYPE_NONE) {
1331     /* FIXME: Handle seek_end by converting the input segment vals */
1332     res =
1333         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1334         &start);
1335     start_type = GST_SEEK_TYPE_SET;
1336   }
1337
1338   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1339     /* FIXME: Handle seek_end by converting the input segment vals */
1340     res =
1341         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1342         &stop);
1343     stop_type = GST_SEEK_TYPE_SET;
1344   }
1345
1346   /* And finally, configure our output segment in the desired format */
1347   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1348       stop_type, stop, &update);
1349
1350   if (!res)
1351     goto no_format;
1352
1353   return res;
1354
1355 no_format:
1356   {
1357     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1358     return FALSE;
1359   }
1360 }
1361
1362 static gboolean
1363 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1364     GstSegment * seeksegment)
1365 {
1366   GstBaseSrcClass *bclass;
1367   gboolean result = FALSE;
1368
1369   bclass = GST_BASE_SRC_GET_CLASS (src);
1370
1371   if (bclass->prepare_seek_segment)
1372     result = bclass->prepare_seek_segment (src, event, seeksegment);
1373
1374   return result;
1375 }
1376
1377 static GstFlowReturn
1378 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1379     guint size, GstBuffer ** buffer)
1380 {
1381   GstFlowReturn ret;
1382   GstBaseSrcPrivate *priv = src->priv;
1383
1384   if (priv->pool) {
1385     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1386   } else if (size != -1) {
1387     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1388     if (G_UNLIKELY (*buffer == NULL))
1389       goto alloc_failed;
1390
1391     ret = GST_FLOW_OK;
1392   } else {
1393     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1394         size);
1395     goto alloc_failed;
1396   }
1397   return ret;
1398
1399   /* ERRORS */
1400 alloc_failed:
1401   {
1402     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1403     return GST_FLOW_ERROR;
1404   }
1405 }
1406
1407 static GstFlowReturn
1408 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1409     guint size, GstBuffer ** buffer)
1410 {
1411   GstBaseSrcClass *bclass;
1412   GstFlowReturn ret;
1413   GstBuffer *res_buf;
1414
1415   bclass = GST_BASE_SRC_GET_CLASS (src);
1416
1417   if (G_UNLIKELY (!bclass->alloc))
1418     goto no_function;
1419   if (G_UNLIKELY (!bclass->fill))
1420     goto no_function;
1421
1422   if (*buffer == NULL) {
1423     /* downstream did not provide us with a buffer to fill, allocate one
1424      * ourselves */
1425     ret = bclass->alloc (src, offset, size, &res_buf);
1426     if (G_UNLIKELY (ret != GST_FLOW_OK))
1427       goto alloc_failed;
1428   } else {
1429     res_buf = *buffer;
1430   }
1431
1432   if (G_LIKELY (size > 0)) {
1433     /* only call fill when there is a size */
1434     ret = bclass->fill (src, offset, size, res_buf);
1435     if (G_UNLIKELY (ret != GST_FLOW_OK))
1436       goto not_ok;
1437   }
1438
1439   *buffer = res_buf;
1440
1441   return GST_FLOW_OK;
1442
1443   /* ERRORS */
1444 no_function:
1445   {
1446     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1447     return GST_FLOW_NOT_SUPPORTED;
1448   }
1449 alloc_failed:
1450   {
1451     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1452     return ret;
1453   }
1454 not_ok:
1455   {
1456     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1457         gst_flow_get_name (ret));
1458     if (*buffer == NULL)
1459       gst_buffer_unref (res_buf);
1460     return ret;
1461   }
1462 }
1463
1464 /* this code implements the seeking. It is a good example
1465  * handling all cases.
1466  *
1467  * A seek updates the currently configured segment.start
1468  * and segment.stop values based on the SEEK_TYPE. If the
1469  * segment.start value is updated, a seek to this new position
1470  * should be performed.
1471  *
1472  * The seek can only be executed when we are not currently
1473  * streaming any data, to make sure that this is the case, we
1474  * acquire the STREAM_LOCK which is taken when we are in the
1475  * _loop() function or when a getrange() is called. Normally
1476  * we will not receive a seek if we are operating in pull mode
1477  * though. When we operate as a live source we might block on the live
1478  * cond, which does not release the STREAM_LOCK. Therefore we will try
1479  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1480  * safe to perform the seek.
1481  *
1482  * When we are in the loop() function, we might be in the middle
1483  * of pushing a buffer, which might block in a sink. To make sure
1484  * that the push gets unblocked we push out a FLUSH_START event.
1485  * Our loop function will get a FLUSHING return value from
1486  * the push and will pause, effectively releasing the STREAM_LOCK.
1487  *
1488  * For a non-flushing seek, we pause the task, which might eventually
1489  * release the STREAM_LOCK. We say eventually because when the sink
1490  * blocks on the sample we might wait a very long time until the sink
1491  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1492  * can continue the seek. A non-flushing seek is normally done in a
1493  * running pipeline to perform seamless playback, this means that the sink is
1494  * PLAYING and will return from its chain function.
1495  * In the case of a non-flushing seek we need to make sure that the
1496  * data we output after the seek is continuous with the previous data,
1497  * this is because a non-flushing seek does not reset the running-time
1498  * to 0. We do this by closing the currently running segment, ie. sending
1499  * a new_segment event with the stop position set to the last processed
1500  * position.
1501  *
1502  * After updating the segment.start/stop values, we prepare for
1503  * streaming again. We push out a FLUSH_STOP to make the peer pad
1504  * accept data again and we start our task again.
1505  *
1506  * A segment seek posts a message on the bus saying that the playback
1507  * of the segment started. We store the segment flag internally because
1508  * when we reach the segment.stop we have to post a segment.done
1509  * instead of EOS when doing a segment seek.
1510  */
1511 static gboolean
1512 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1513 {
1514   gboolean res = TRUE, tres;
1515   gdouble rate;
1516   GstFormat seek_format, dest_format;
1517   GstSeekFlags flags;
1518   GstSeekType start_type, stop_type;
1519   gint64 start, stop;
1520   gboolean flush, playing;
1521   gboolean update;
1522   gboolean relative_seek = FALSE;
1523   gboolean seekseg_configured = FALSE;
1524   GstSegment seeksegment;
1525   guint32 seqnum;
1526   GstEvent *tevent;
1527
1528   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1529
1530   GST_OBJECT_LOCK (src);
1531   dest_format = src->segment.format;
1532   GST_OBJECT_UNLOCK (src);
1533
1534   if (event) {
1535     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1536         &start_type, &start, &stop_type, &stop);
1537
1538     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1539         SEEK_TYPE_IS_RELATIVE (stop_type);
1540
1541     if (dest_format != seek_format && !relative_seek) {
1542       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1543        * here before taking the stream lock, otherwise we must convert it later,
1544        * once we have the stream lock and can read the last configures segment
1545        * start and stop positions */
1546       gst_segment_init (&seeksegment, dest_format);
1547
1548       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1549         goto prepare_failed;
1550
1551       seekseg_configured = TRUE;
1552     }
1553
1554     flush = flags & GST_SEEK_FLAG_FLUSH;
1555     seqnum = gst_event_get_seqnum (event);
1556   } else {
1557     flush = FALSE;
1558     /* get next seqnum */
1559     seqnum = gst_util_seqnum_next ();
1560   }
1561
1562   /* send flush start */
1563   if (flush) {
1564     tevent = gst_event_new_flush_start ();
1565     gst_event_set_seqnum (tevent, seqnum);
1566     gst_pad_push_event (src->srcpad, tevent);
1567   } else
1568     gst_pad_pause_task (src->srcpad);
1569
1570   /* unblock streaming thread. */
1571   if (unlock)
1572     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1573
1574   /* grab streaming lock, this should eventually be possible, either
1575    * because the task is paused, our streaming thread stopped
1576    * or because our peer is flushing. */
1577   GST_PAD_STREAM_LOCK (src->srcpad);
1578   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1579     /* we have seen this event before, issue a warning for now */
1580     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1581         seqnum);
1582   } else {
1583     src->priv->seqnum = seqnum;
1584     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1585   }
1586
1587   if (unlock)
1588     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1589
1590   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1591    * copy the current segment info into the temp segment that we can actually
1592    * attempt the seek with. We only update the real segment if the seek succeeds. */
1593   if (!seekseg_configured) {
1594     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1595
1596     /* now configure the final seek segment */
1597     if (event) {
1598       if (seeksegment.format != seek_format) {
1599         /* OK, here's where we give the subclass a chance to convert the relative
1600          * seek into an absolute one in the processing format. We set up any
1601          * absolute seek above, before taking the stream lock. */
1602         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1603           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1604               "Aborting seek");
1605           res = FALSE;
1606         }
1607       } else {
1608         /* The seek format matches our processing format, no need to ask the
1609          * the subclass to configure the segment. */
1610         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1611             start_type, start, stop_type, stop, &update);
1612       }
1613     }
1614     /* Else, no seek event passed, so we're just (re)starting the
1615        current segment. */
1616   }
1617
1618   if (res) {
1619     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1620         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1621         seeksegment.start, seeksegment.stop, seeksegment.position);
1622
1623     /* do the seek, segment.position contains the new position. */
1624     res = gst_base_src_do_seek (src, &seeksegment);
1625   }
1626
1627   /* and prepare to continue streaming */
1628   if (flush) {
1629     tevent = gst_event_new_flush_stop (TRUE);
1630     gst_event_set_seqnum (tevent, seqnum);
1631     /* send flush stop, peer will accept data and events again. We
1632      * are not yet providing data as we still have the STREAM_LOCK. */
1633     gst_pad_push_event (src->srcpad, tevent);
1634   }
1635
1636   /* The subclass must have converted the segment to the processing format
1637    * by now */
1638   if (res && seeksegment.format != dest_format) {
1639     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1640         "in the correct format. Aborting seek.");
1641     res = FALSE;
1642   }
1643
1644   /* if the seek was successful, we update our real segment and push
1645    * out the new segment. */
1646   if (res) {
1647     GST_OBJECT_LOCK (src);
1648     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1649     GST_OBJECT_UNLOCK (src);
1650
1651     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1652       GstMessage *message;
1653
1654       message = gst_message_new_segment_start (GST_OBJECT (src),
1655           seeksegment.format, seeksegment.position);
1656       gst_message_set_seqnum (message, seqnum);
1657
1658       gst_element_post_message (GST_ELEMENT (src), message);
1659     }
1660
1661     /* for deriving a stop position for the playback segment from the seek
1662      * segment, we must take the duration when the stop is not set */
1663     /* FIXME: This is never used below */
1664     if ((stop = seeksegment.stop) == -1)
1665       stop = seeksegment.duration;
1666
1667     src->priv->segment_pending = TRUE;
1668   }
1669
1670   src->priv->discont = TRUE;
1671   src->running = TRUE;
1672   /* and restart the task in case it got paused explicitly or by
1673    * the FLUSH_START event we pushed out. */
1674   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1675       src->srcpad, NULL);
1676   if (res && !tres)
1677     res = FALSE;
1678
1679   /* and release the lock again so we can continue streaming */
1680   GST_PAD_STREAM_UNLOCK (src->srcpad);
1681
1682   return res;
1683
1684   /* ERROR */
1685 prepare_failed:
1686   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1687       "Aborting seek");
1688   return FALSE;
1689 }
1690
1691 /* all events send to this element directly. This is mainly done from the
1692  * application.
1693  */
1694 static gboolean
1695 gst_base_src_send_event (GstElement * element, GstEvent * event)
1696 {
1697   GstBaseSrc *src;
1698   gboolean result = FALSE;
1699
1700   src = GST_BASE_SRC (element);
1701
1702   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1703
1704   switch (GST_EVENT_TYPE (event)) {
1705       /* bidirectional events */
1706     case GST_EVENT_FLUSH_START:
1707       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1708       result = gst_pad_push_event (src->srcpad, event);
1709       event = NULL;
1710       break;
1711     case GST_EVENT_FLUSH_STOP:
1712       GST_LIVE_LOCK (src->srcpad);
1713       src->priv->segment_pending = TRUE;
1714       /* sending random flushes downstream can break stuff,
1715        * especially sync since all segment info will get flushed */
1716       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1717       result = gst_pad_push_event (src->srcpad, event);
1718       GST_LIVE_UNLOCK (src->srcpad);
1719       event = NULL;
1720       break;
1721
1722       /* downstream serialized events */
1723     case GST_EVENT_EOS:
1724     {
1725       GstBaseSrcClass *bclass;
1726
1727       bclass = GST_BASE_SRC_GET_CLASS (src);
1728
1729       /* queue EOS and make sure the task or pull function performs the EOS
1730        * actions.
1731        *
1732        * We have two possibilities:
1733        *
1734        *  - Before we are to enter the _create function, we check the pending_eos
1735        *    first and do EOS instead of entering it.
1736        *  - If we are in the _create function or we did not manage to set the
1737        *    flag fast enough and we are about to enter the _create function,
1738        *    we unlock it so that we exit with FLUSHING immediately. We then
1739        *    check the EOS flag and do the EOS logic.
1740        */
1741       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1742       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1743
1744
1745       /* unlock the _create function so that we can check the pending_eos flag
1746        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1747        * that we can grab it and stop the unlock again. We don't take the stream
1748        * lock so that this operation is guaranteed to never block. */
1749       gst_base_src_activate_pool (src, FALSE);
1750       if (bclass->unlock)
1751         bclass->unlock (src);
1752
1753       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1754
1755       GST_LIVE_LOCK (src);
1756       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1757       /* now stop the unlock of the streaming thread again. Grabbing the live
1758        * lock is enough because that protects the create function. */
1759       if (bclass->unlock_stop)
1760         bclass->unlock_stop (src);
1761       gst_base_src_activate_pool (src, TRUE);
1762       GST_LIVE_UNLOCK (src);
1763
1764       result = TRUE;
1765       break;
1766     }
1767     case GST_EVENT_SEGMENT:
1768       /* sending random SEGMENT downstream can break sync. */
1769       break;
1770     case GST_EVENT_TAG:
1771     case GST_EVENT_CUSTOM_DOWNSTREAM:
1772     case GST_EVENT_CUSTOM_BOTH:
1773       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1774       GST_OBJECT_LOCK (src);
1775       src->priv->pending_events =
1776           g_list_append (src->priv->pending_events, event);
1777       g_atomic_int_set (&src->priv->have_events, TRUE);
1778       GST_OBJECT_UNLOCK (src);
1779       event = NULL;
1780       result = TRUE;
1781       break;
1782     case GST_EVENT_BUFFERSIZE:
1783       /* does not seem to make much sense currently */
1784       break;
1785
1786       /* upstream events */
1787     case GST_EVENT_QOS:
1788       /* elements should override send_event and do something */
1789       break;
1790     case GST_EVENT_SEEK:
1791     {
1792       gboolean started;
1793
1794       GST_OBJECT_LOCK (src->srcpad);
1795       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1796         goto wrong_mode;
1797       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1798       GST_OBJECT_UNLOCK (src->srcpad);
1799
1800       if (started) {
1801         GST_DEBUG_OBJECT (src, "performing seek");
1802         /* when we are running in push mode, we can execute the
1803          * seek right now. */
1804         result = gst_base_src_perform_seek (src, event, TRUE);
1805       } else {
1806         GstEvent **event_p;
1807
1808         /* else we store the event and execute the seek when we
1809          * get activated */
1810         GST_OBJECT_LOCK (src);
1811         GST_DEBUG_OBJECT (src, "queueing seek");
1812         event_p = &src->pending_seek;
1813         gst_event_replace ((GstEvent **) event_p, event);
1814         GST_OBJECT_UNLOCK (src);
1815         /* assume the seek will work */
1816         result = TRUE;
1817       }
1818       break;
1819     }
1820     case GST_EVENT_NAVIGATION:
1821       /* could make sense for elements that do something with navigation events
1822        * but then they would need to override the send_event function */
1823       break;
1824     case GST_EVENT_LATENCY:
1825       /* does not seem to make sense currently */
1826       break;
1827
1828       /* custom events */
1829     case GST_EVENT_CUSTOM_UPSTREAM:
1830       /* override send_event if you want this */
1831       break;
1832     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1833     case GST_EVENT_CUSTOM_BOTH_OOB:
1834       /* insert a random custom event into the pipeline */
1835       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1836       result = gst_pad_push_event (src->srcpad, event);
1837       /* we gave away the ref to the event in the push */
1838       event = NULL;
1839       break;
1840     default:
1841       break;
1842   }
1843 done:
1844   /* if we still have a ref to the event, unref it now */
1845   if (event)
1846     gst_event_unref (event);
1847
1848   return result;
1849
1850   /* ERRORS */
1851 wrong_mode:
1852   {
1853     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1854     GST_OBJECT_UNLOCK (src->srcpad);
1855     result = FALSE;
1856     goto done;
1857   }
1858 }
1859
1860 static gboolean
1861 gst_base_src_seekable (GstBaseSrc * src)
1862 {
1863   GstBaseSrcClass *bclass;
1864   bclass = GST_BASE_SRC_GET_CLASS (src);
1865   if (bclass->is_seekable)
1866     return bclass->is_seekable (src);
1867   else
1868     return FALSE;
1869 }
1870
1871 static void
1872 gst_base_src_update_qos (GstBaseSrc * src,
1873     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1874 {
1875   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1876       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1877       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1878
1879   GST_OBJECT_LOCK (src);
1880   src->priv->proportion = proportion;
1881   src->priv->earliest_time = timestamp + diff;
1882   GST_OBJECT_UNLOCK (src);
1883 }
1884
1885
1886 static gboolean
1887 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1888 {
1889   gboolean result;
1890
1891   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1892
1893   switch (GST_EVENT_TYPE (event)) {
1894     case GST_EVENT_SEEK:
1895       /* is normally called when in push mode */
1896       if (!gst_base_src_seekable (src))
1897         goto not_seekable;
1898
1899       result = gst_base_src_perform_seek (src, event, TRUE);
1900       break;
1901     case GST_EVENT_FLUSH_START:
1902       /* cancel any blocking getrange, is normally called
1903        * when in pull mode. */
1904       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1905       break;
1906     case GST_EVENT_FLUSH_STOP:
1907       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1908       break;
1909     case GST_EVENT_QOS:
1910     {
1911       gdouble proportion;
1912       GstClockTimeDiff diff;
1913       GstClockTime timestamp;
1914
1915       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1916       gst_base_src_update_qos (src, proportion, diff, timestamp);
1917       result = TRUE;
1918       break;
1919     }
1920     case GST_EVENT_RECONFIGURE:
1921       result = TRUE;
1922       break;
1923     case GST_EVENT_LATENCY:
1924       result = TRUE;
1925       break;
1926     default:
1927       result = FALSE;
1928       break;
1929   }
1930   return result;
1931
1932   /* ERRORS */
1933 not_seekable:
1934   {
1935     GST_DEBUG_OBJECT (src, "is not seekable");
1936     return FALSE;
1937   }
1938 }
1939
1940 static gboolean
1941 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1942 {
1943   GstBaseSrc *src;
1944   GstBaseSrcClass *bclass;
1945   gboolean result = FALSE;
1946
1947   src = GST_BASE_SRC (parent);
1948   bclass = GST_BASE_SRC_GET_CLASS (src);
1949
1950   if (bclass->event) {
1951     if (!(result = bclass->event (src, event)))
1952       goto subclass_failed;
1953   }
1954
1955 done:
1956   gst_event_unref (event);
1957
1958   return result;
1959
1960   /* ERRORS */
1961 subclass_failed:
1962   {
1963     GST_DEBUG_OBJECT (src, "subclass refused event");
1964     goto done;
1965   }
1966 }
1967
1968 static void
1969 gst_base_src_set_property (GObject * object, guint prop_id,
1970     const GValue * value, GParamSpec * pspec)
1971 {
1972   GstBaseSrc *src;
1973
1974   src = GST_BASE_SRC (object);
1975
1976   switch (prop_id) {
1977     case PROP_BLOCKSIZE:
1978       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1979       break;
1980     case PROP_NUM_BUFFERS:
1981       src->num_buffers = g_value_get_int (value);
1982       break;
1983     case PROP_TYPEFIND:
1984       src->typefind = g_value_get_boolean (value);
1985       break;
1986     case PROP_DO_TIMESTAMP:
1987       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1988       break;
1989     default:
1990       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1991       break;
1992   }
1993 }
1994
1995 static void
1996 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1997     GParamSpec * pspec)
1998 {
1999   GstBaseSrc *src;
2000
2001   src = GST_BASE_SRC (object);
2002
2003   switch (prop_id) {
2004     case PROP_BLOCKSIZE:
2005       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2006       break;
2007     case PROP_NUM_BUFFERS:
2008       g_value_set_int (value, src->num_buffers);
2009       break;
2010     case PROP_TYPEFIND:
2011       g_value_set_boolean (value, src->typefind);
2012       break;
2013     case PROP_DO_TIMESTAMP:
2014       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2015       break;
2016     default:
2017       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2018       break;
2019   }
2020 }
2021
2022 /* with STREAM_LOCK and LOCK */
2023 static GstClockReturn
2024 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2025 {
2026   GstClockReturn ret;
2027   GstClockID id;
2028
2029   id = gst_clock_new_single_shot_id (clock, time);
2030
2031   basesrc->clock_id = id;
2032   /* release the live lock while waiting */
2033   GST_LIVE_UNLOCK (basesrc);
2034
2035   ret = gst_clock_id_wait (id, NULL);
2036
2037   GST_LIVE_LOCK (basesrc);
2038   gst_clock_id_unref (id);
2039   basesrc->clock_id = NULL;
2040
2041   return ret;
2042 }
2043
2044 /* perform synchronisation on a buffer.
2045  * with STREAM_LOCK.
2046  */
2047 static GstClockReturn
2048 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2049 {
2050   GstClockReturn result;
2051   GstClockTime start, end;
2052   GstBaseSrcClass *bclass;
2053   GstClockTime base_time;
2054   GstClock *clock;
2055   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2056   gboolean do_timestamp, first, pseudo_live, is_live;
2057
2058   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2059
2060   start = end = -1;
2061   if (bclass->get_times)
2062     bclass->get_times (basesrc, buffer, &start, &end);
2063
2064   /* get buffer timestamp */
2065   dts = GST_BUFFER_DTS (buffer);
2066   pts = GST_BUFFER_PTS (buffer);
2067
2068   if (GST_CLOCK_TIME_IS_VALID (dts))
2069     timestamp = dts;
2070   else
2071     timestamp = pts;
2072
2073   /* grab the lock to prepare for clocking and calculate the startup
2074    * latency. */
2075   GST_OBJECT_LOCK (basesrc);
2076
2077   is_live = basesrc->is_live;
2078   /* if we are asked to sync against the clock we are a pseudo live element */
2079   pseudo_live = (start != -1 && is_live);
2080   /* check for the first buffer */
2081   first = (basesrc->priv->latency == -1);
2082
2083   if (timestamp != -1 && pseudo_live) {
2084     GstClockTime latency;
2085
2086     /* we have a timestamp and a sync time, latency is the diff */
2087     if (timestamp <= start)
2088       latency = start - timestamp;
2089     else
2090       latency = 0;
2091
2092     if (first) {
2093       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2094           GST_TIME_ARGS (latency));
2095       /* first time we calculate latency, just configure */
2096       basesrc->priv->latency = latency;
2097     } else {
2098       if (basesrc->priv->latency != latency) {
2099         /* we have a new latency, FIXME post latency message */
2100         basesrc->priv->latency = latency;
2101         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2102             GST_TIME_ARGS (latency));
2103       }
2104     }
2105   } else if (first) {
2106     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2107         is_live, start != -1);
2108     basesrc->priv->latency = 0;
2109   }
2110
2111   /* get clock, if no clock, we can't sync or do timestamps */
2112   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2113     goto no_clock;
2114   else
2115     gst_object_ref (clock);
2116
2117   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2118
2119   do_timestamp = basesrc->priv->do_timestamp;
2120   GST_OBJECT_UNLOCK (basesrc);
2121
2122   /* first buffer, calculate the timestamp offset */
2123   if (first) {
2124     GstClockTime running_time;
2125
2126     now = gst_clock_get_time (clock);
2127     running_time = now - base_time;
2128
2129     GST_LOG_OBJECT (basesrc,
2130         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2131         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2132         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2133
2134     if (pseudo_live && timestamp != -1) {
2135       /* live source and we need to sync, add startup latency to all timestamps
2136        * to get the real running_time. Live sources should always timestamp
2137        * according to the current running time. */
2138       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2139
2140       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2141           GST_TIME_ARGS (basesrc->priv->ts_offset));
2142     } else {
2143       basesrc->priv->ts_offset = 0;
2144       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2145     }
2146
2147     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2148       if (do_timestamp) {
2149         dts = running_time;
2150       } else {
2151         dts = 0;
2152       }
2153       GST_BUFFER_DTS (buffer) = dts;
2154
2155       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2156           GST_TIME_ARGS (dts));
2157     }
2158   } else {
2159     /* not the first buffer, the timestamp is the diff between the clock and
2160      * base_time */
2161     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2162       now = gst_clock_get_time (clock);
2163
2164       dts = now - base_time;
2165       GST_BUFFER_DTS (buffer) = dts;
2166
2167       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2168           GST_TIME_ARGS (dts));
2169     }
2170   }
2171   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2172     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2173       pts = dts;
2174
2175     GST_BUFFER_PTS (buffer) = dts;
2176
2177     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2178         GST_TIME_ARGS (pts));
2179   }
2180
2181   /* if we don't have a buffer timestamp, we don't sync */
2182   if (!GST_CLOCK_TIME_IS_VALID (start))
2183     goto no_sync;
2184
2185   if (is_live) {
2186     /* for pseudo live sources, add our ts_offset to the timestamp */
2187     if (GST_CLOCK_TIME_IS_VALID (pts))
2188       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2189     if (GST_CLOCK_TIME_IS_VALID (dts))
2190       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2191     start += basesrc->priv->ts_offset;
2192   }
2193
2194   GST_LOG_OBJECT (basesrc,
2195       "waiting for clock, base time %" GST_TIME_FORMAT
2196       ", stream_start %" GST_TIME_FORMAT,
2197       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2198
2199   result = gst_base_src_wait (basesrc, clock, start + base_time);
2200
2201   gst_object_unref (clock);
2202
2203   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2204
2205   return result;
2206
2207   /* special cases */
2208 no_clock:
2209   {
2210     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2211     GST_OBJECT_UNLOCK (basesrc);
2212     return GST_CLOCK_OK;
2213   }
2214 no_sync:
2215   {
2216     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2217     gst_object_unref (clock);
2218     return GST_CLOCK_OK;
2219   }
2220 }
2221
2222 /* Called with STREAM_LOCK and LIVE_LOCK */
2223 static gboolean
2224 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2225 {
2226   guint64 size, maxsize;
2227   GstBaseSrcClass *bclass;
2228   GstFormat format;
2229   gint64 stop;
2230   gboolean dynamic;
2231
2232   bclass = GST_BASE_SRC_GET_CLASS (src);
2233
2234   format = src->segment.format;
2235   stop = src->segment.stop;
2236   /* get total file size */
2237   size = src->segment.duration;
2238
2239   /* only operate if we are working with bytes */
2240   if (format != GST_FORMAT_BYTES)
2241     return TRUE;
2242
2243   /* the max amount of bytes to read is the total size or
2244    * up to the segment.stop if present. */
2245   if (stop != -1)
2246     maxsize = MIN (size, stop);
2247   else
2248     maxsize = size;
2249
2250   GST_DEBUG_OBJECT (src,
2251       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2252       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2253       *length, size, stop, maxsize);
2254
2255   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2256   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2257
2258   /* check size if we have one */
2259   if (maxsize != -1) {
2260     /* if we run past the end, check if the file became bigger and
2261      * retry. */
2262     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2263       /* see if length of the file changed */
2264       if (bclass->get_size)
2265         if (!bclass->get_size (src, &size))
2266           size = -1;
2267
2268       /* make sure we don't exceed the configured segment stop
2269        * if it was set */
2270       if (stop != -1)
2271         maxsize = MIN (size, stop);
2272       else
2273         maxsize = size;
2274
2275       /* if we are at or past the end, EOS */
2276       if (G_UNLIKELY (offset >= maxsize))
2277         goto unexpected_length;
2278
2279       /* else we can clip to the end */
2280       if (G_UNLIKELY (offset + *length >= maxsize))
2281         *length = maxsize - offset;
2282
2283     }
2284   }
2285
2286   /* keep track of current duration.
2287    * segment is in bytes, we checked that above. */
2288   GST_OBJECT_LOCK (src);
2289   src->segment.duration = size;
2290   GST_OBJECT_UNLOCK (src);
2291
2292   return TRUE;
2293
2294   /* ERRORS */
2295 unexpected_length:
2296   {
2297     return FALSE;
2298   }
2299 }
2300
2301 /* must be called with LIVE_LOCK */
2302 static GstFlowReturn
2303 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2304     GstBuffer ** buf)
2305 {
2306   GstFlowReturn ret;
2307   GstBaseSrcClass *bclass;
2308   GstClockReturn status;
2309   GstBuffer *res_buf;
2310   GstBuffer *in_buf;
2311
2312   bclass = GST_BASE_SRC_GET_CLASS (src);
2313
2314 again:
2315   if (src->is_live) {
2316     if (G_UNLIKELY (!src->live_running)) {
2317       ret = gst_base_src_wait_playing (src);
2318       if (ret != GST_FLOW_OK)
2319         goto stopped;
2320     }
2321   }
2322
2323   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2324           && !GST_BASE_SRC_IS_STARTING (src)))
2325     goto not_started;
2326
2327   if (G_UNLIKELY (!bclass->create))
2328     goto no_function;
2329
2330   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2331     goto unexpected_length;
2332
2333   /* track position */
2334   GST_OBJECT_LOCK (src);
2335   if (src->segment.format == GST_FORMAT_BYTES)
2336     src->segment.position = offset;
2337   GST_OBJECT_UNLOCK (src);
2338
2339   /* normally we don't count buffers */
2340   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2341     if (src->num_buffers_left == 0)
2342       goto reached_num_buffers;
2343     else
2344       src->num_buffers_left--;
2345   }
2346
2347   /* don't enter the create function if a pending EOS event was set. For the
2348    * logic of the pending_eos, check the event function of this class. */
2349   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2350     goto eos;
2351
2352   GST_DEBUG_OBJECT (src,
2353       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2354       G_GINT64_FORMAT, offset, length, src->segment.time);
2355
2356   res_buf = in_buf = *buf;
2357
2358   ret = bclass->create (src, offset, length, &res_buf);
2359
2360   /* The create function could be unlocked because we have a pending EOS. It's
2361    * possible that we have a valid buffer from create that we need to
2362    * discard when the create function returned _OK. */
2363   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2364     if (ret == GST_FLOW_OK) {
2365       if (*buf == NULL)
2366         gst_buffer_unref (res_buf);
2367     }
2368     goto eos;
2369   }
2370
2371   if (G_UNLIKELY (ret != GST_FLOW_OK))
2372     goto not_ok;
2373
2374   /* fallback in case the create function didn't fill a provided buffer */
2375   if (in_buf != NULL && res_buf != in_buf) {
2376     GstMapInfo info;
2377     gsize copied_size;
2378
2379     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2380         "fill the provided buffer, copying");
2381
2382     if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
2383       goto map_failed;
2384
2385     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2386     gst_buffer_unmap (in_buf, &info);
2387     gst_buffer_set_size (in_buf, copied_size);
2388
2389     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2390
2391     gst_buffer_unref (res_buf);
2392     res_buf = in_buf;
2393   }
2394
2395   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2396   if (offset == 0 && src->segment.time == 0
2397       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2398     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2399     res_buf = gst_buffer_make_writable (res_buf);
2400     GST_BUFFER_DTS (res_buf) = 0;
2401   }
2402
2403   /* now sync before pushing the buffer */
2404   status = gst_base_src_do_sync (src, res_buf);
2405
2406   /* waiting for the clock could have made us flushing */
2407   if (G_UNLIKELY (src->priv->flushing))
2408     goto flushing;
2409
2410   switch (status) {
2411     case GST_CLOCK_EARLY:
2412       /* the buffer is too late. We currently don't drop the buffer. */
2413       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2414       break;
2415     case GST_CLOCK_OK:
2416       /* buffer synchronised properly */
2417       GST_DEBUG_OBJECT (src, "buffer ok");
2418       break;
2419     case GST_CLOCK_UNSCHEDULED:
2420       /* this case is triggered when we were waiting for the clock and
2421        * it got unlocked because we did a state change. In any case, get rid of
2422        * the buffer. */
2423       if (*buf == NULL)
2424         gst_buffer_unref (res_buf);
2425
2426       if (!src->live_running) {
2427         /* We return FLUSHING when we are not running to stop the dataflow also
2428          * get rid of the produced buffer. */
2429         GST_DEBUG_OBJECT (src,
2430             "clock was unscheduled (%d), returning FLUSHING", status);
2431         ret = GST_FLOW_FLUSHING;
2432       } else {
2433         /* If we are running when this happens, we quickly switched between
2434          * pause and playing. We try to produce a new buffer */
2435         GST_DEBUG_OBJECT (src,
2436             "clock was unscheduled (%d), but we are running", status);
2437         goto again;
2438       }
2439       break;
2440     default:
2441       /* all other result values are unexpected and errors */
2442       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2443           (_("Internal clock error.")),
2444           ("clock returned unexpected return value %d", status));
2445       if (*buf == NULL)
2446         gst_buffer_unref (res_buf);
2447       ret = GST_FLOW_ERROR;
2448       break;
2449   }
2450   if (G_LIKELY (ret == GST_FLOW_OK))
2451     *buf = res_buf;
2452
2453   return ret;
2454
2455   /* ERROR */
2456 stopped:
2457   {
2458     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2459         gst_flow_get_name (ret));
2460     return ret;
2461   }
2462 not_ok:
2463   {
2464     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2465         gst_flow_get_name (ret));
2466     return ret;
2467   }
2468 map_failed:
2469   {
2470     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
2471         (_("Failed to map buffer.")),
2472         ("failed to map result buffer in WRITE mode"));
2473     if (*buf == NULL)
2474       gst_buffer_unref (res_buf);
2475     return GST_FLOW_ERROR;
2476   }
2477 not_started:
2478   {
2479     GST_DEBUG_OBJECT (src, "getrange but not started");
2480     return GST_FLOW_FLUSHING;
2481   }
2482 no_function:
2483   {
2484     GST_DEBUG_OBJECT (src, "no create function");
2485     return GST_FLOW_NOT_SUPPORTED;
2486   }
2487 unexpected_length:
2488   {
2489     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2490         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2491     return GST_FLOW_EOS;
2492   }
2493 reached_num_buffers:
2494   {
2495     GST_DEBUG_OBJECT (src, "sent all buffers");
2496     return GST_FLOW_EOS;
2497   }
2498 flushing:
2499   {
2500     GST_DEBUG_OBJECT (src, "we are flushing");
2501     if (*buf == NULL)
2502       gst_buffer_unref (res_buf);
2503     return GST_FLOW_FLUSHING;
2504   }
2505 eos:
2506   {
2507     GST_DEBUG_OBJECT (src, "we are EOS");
2508     return GST_FLOW_EOS;
2509   }
2510 }
2511
2512 static GstFlowReturn
2513 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2514     guint length, GstBuffer ** buf)
2515 {
2516   GstBaseSrc *src;
2517   GstFlowReturn res;
2518
2519   src = GST_BASE_SRC_CAST (parent);
2520
2521   GST_LIVE_LOCK (src);
2522   if (G_UNLIKELY (src->priv->flushing))
2523     goto flushing;
2524
2525   res = gst_base_src_get_range (src, offset, length, buf);
2526
2527 done:
2528   GST_LIVE_UNLOCK (src);
2529
2530   return res;
2531
2532   /* ERRORS */
2533 flushing:
2534   {
2535     GST_DEBUG_OBJECT (src, "we are flushing");
2536     res = GST_FLOW_FLUSHING;
2537     goto done;
2538   }
2539 }
2540
2541 static gboolean
2542 gst_base_src_is_random_access (GstBaseSrc * src)
2543 {
2544   /* we need to start the basesrc to check random access */
2545   if (!GST_BASE_SRC_IS_STARTED (src)) {
2546     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2547     if (G_LIKELY (gst_base_src_start (src))) {
2548       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2549         goto start_failed;
2550       gst_base_src_stop (src);
2551     }
2552   }
2553
2554   return src->random_access;
2555
2556   /* ERRORS */
2557 start_failed:
2558   {
2559     GST_DEBUG_OBJECT (src, "failed to start");
2560     return FALSE;
2561   }
2562 }
2563
2564 static void
2565 gst_base_src_loop (GstPad * pad)
2566 {
2567   GstBaseSrc *src;
2568   GstBuffer *buf = NULL;
2569   GstFlowReturn ret;
2570   gint64 position;
2571   gboolean eos;
2572   guint blocksize;
2573   GList *pending_events = NULL, *tmp;
2574
2575   eos = FALSE;
2576
2577   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2578
2579   gst_base_src_send_stream_start (src);
2580
2581   /* check if we need to renegotiate */
2582   if (gst_pad_check_reconfigure (pad)) {
2583     if (!gst_base_src_negotiate (src)) {
2584       gst_pad_mark_reconfigure (pad);
2585       if (GST_PAD_IS_FLUSHING (pad))
2586         goto flushing;
2587       else
2588         goto negotiate_failed;
2589     }
2590   }
2591
2592   GST_LIVE_LOCK (src);
2593
2594   if (G_UNLIKELY (src->priv->flushing))
2595     goto flushing;
2596
2597   blocksize = src->blocksize;
2598
2599   /* if we operate in bytes, we can calculate an offset */
2600   if (src->segment.format == GST_FORMAT_BYTES) {
2601     position = src->segment.position;
2602     /* for negative rates, start with subtracting the blocksize */
2603     if (src->segment.rate < 0.0) {
2604       /* we cannot go below segment.start */
2605       if (position > src->segment.start + blocksize)
2606         position -= blocksize;
2607       else {
2608         /* last block, remainder up to segment.start */
2609         blocksize = position - src->segment.start;
2610         position = src->segment.start;
2611       }
2612     }
2613   } else
2614     position = -1;
2615
2616   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2617       GST_TIME_ARGS (position), blocksize);
2618
2619   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2620   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2621     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2622         gst_flow_get_name (ret));
2623     GST_LIVE_UNLOCK (src);
2624     goto pause;
2625   }
2626   /* this should not happen */
2627   if (G_UNLIKELY (buf == NULL))
2628     goto null_buffer;
2629
2630   /* push events to close/start our segment before we push the buffer. */
2631   if (G_UNLIKELY (src->priv->segment_pending)) {
2632     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2633     src->priv->segment_pending = FALSE;
2634   }
2635
2636   if (g_atomic_int_get (&src->priv->have_events)) {
2637     GST_OBJECT_LOCK (src);
2638     /* take the events */
2639     pending_events = src->priv->pending_events;
2640     src->priv->pending_events = NULL;
2641     g_atomic_int_set (&src->priv->have_events, FALSE);
2642     GST_OBJECT_UNLOCK (src);
2643   }
2644
2645   /* Push out pending events if any */
2646   if (G_UNLIKELY (pending_events != NULL)) {
2647     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2648       GstEvent *ev = (GstEvent *) tmp->data;
2649       gst_pad_push_event (pad, ev);
2650     }
2651     g_list_free (pending_events);
2652   }
2653
2654   /* figure out the new position */
2655   switch (src->segment.format) {
2656     case GST_FORMAT_BYTES:
2657     {
2658       guint bufsize = gst_buffer_get_size (buf);
2659
2660       /* we subtracted above for negative rates */
2661       if (src->segment.rate >= 0.0)
2662         position += bufsize;
2663       break;
2664     }
2665     case GST_FORMAT_TIME:
2666     {
2667       GstClockTime start, duration;
2668
2669       start = GST_BUFFER_TIMESTAMP (buf);
2670       duration = GST_BUFFER_DURATION (buf);
2671
2672       if (GST_CLOCK_TIME_IS_VALID (start))
2673         position = start;
2674       else
2675         position = src->segment.position;
2676
2677       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2678         if (src->segment.rate >= 0.0)
2679           position += duration;
2680         else if (position > duration)
2681           position -= duration;
2682         else
2683           position = 0;
2684       }
2685       break;
2686     }
2687     case GST_FORMAT_DEFAULT:
2688       if (src->segment.rate >= 0.0)
2689         position = GST_BUFFER_OFFSET_END (buf);
2690       else
2691         position = GST_BUFFER_OFFSET (buf);
2692       break;
2693     default:
2694       position = -1;
2695       break;
2696   }
2697   if (position != -1) {
2698     if (src->segment.rate >= 0.0) {
2699       /* positive rate, check if we reached the stop */
2700       if (src->segment.stop != -1) {
2701         if (position >= src->segment.stop) {
2702           eos = TRUE;
2703           position = src->segment.stop;
2704         }
2705       }
2706     } else {
2707       /* negative rate, check if we reached the start. start is always set to
2708        * something different from -1 */
2709       if (position <= src->segment.start) {
2710         eos = TRUE;
2711         position = src->segment.start;
2712       }
2713       /* when going reverse, all buffers are DISCONT */
2714       src->priv->discont = TRUE;
2715     }
2716     GST_OBJECT_LOCK (src);
2717     src->segment.position = position;
2718     GST_OBJECT_UNLOCK (src);
2719   }
2720
2721   if (G_UNLIKELY (src->priv->discont)) {
2722     GST_INFO_OBJECT (src, "marking pending DISCONT");
2723     buf = gst_buffer_make_writable (buf);
2724     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2725     src->priv->discont = FALSE;
2726   }
2727   GST_LIVE_UNLOCK (src);
2728
2729   ret = gst_pad_push (pad, buf);
2730   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2731     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2732       goto not_negotiated;
2733     }
2734     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2735         gst_flow_get_name (ret));
2736     goto pause;
2737   }
2738
2739   if (G_UNLIKELY (eos)) {
2740     GST_INFO_OBJECT (src, "pausing after end of segment");
2741     ret = GST_FLOW_EOS;
2742     goto pause;
2743   }
2744
2745 done:
2746   return;
2747
2748   /* special cases */
2749 not_negotiated:
2750   {
2751     if (gst_pad_needs_reconfigure (pad)) {
2752       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2753       return;
2754     }
2755     /* fallthrough when push returns NOT_NEGOTIATED and we don't have
2756      * a pending negotiation request on our srcpad */
2757   }
2758 negotiate_failed:
2759   {
2760     GST_DEBUG_OBJECT (src, "Not negotiated");
2761     ret = GST_FLOW_NOT_NEGOTIATED;
2762     goto pause;
2763   }
2764 flushing:
2765   {
2766     GST_DEBUG_OBJECT (src, "we are flushing");
2767     GST_LIVE_UNLOCK (src);
2768     ret = GST_FLOW_FLUSHING;
2769     goto pause;
2770   }
2771 pause:
2772   {
2773     const gchar *reason = gst_flow_get_name (ret);
2774     GstEvent *event;
2775
2776     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2777     src->running = FALSE;
2778     gst_pad_pause_task (pad);
2779     if (ret == GST_FLOW_EOS) {
2780       gboolean flag_segment;
2781       GstFormat format;
2782       gint64 position;
2783
2784       /* perform EOS logic */
2785       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2786       format = src->segment.format;
2787       position = src->segment.position;
2788
2789       if (flag_segment) {
2790         GstMessage *message;
2791
2792         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2793             format, position);
2794         gst_message_set_seqnum (message, src->priv->seqnum);
2795         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2796         event = gst_event_new_segment_done (format, position);
2797         gst_event_set_seqnum (event, src->priv->seqnum);
2798         gst_pad_push_event (pad, event);
2799       } else {
2800         event = gst_event_new_eos ();
2801         gst_event_set_seqnum (event, src->priv->seqnum);
2802         gst_pad_push_event (pad, event);
2803       }
2804     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2805       event = gst_event_new_eos ();
2806       gst_event_set_seqnum (event, src->priv->seqnum);
2807       /* for fatal errors we post an error message, post the error
2808        * first so the app knows about the error first.
2809        * Also don't do this for FLUSHING because it happens
2810        * due to flushing and posting an error message because of
2811        * that is the wrong thing to do, e.g. when we're doing
2812        * a flushing seek. */
2813       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2814           (_("Internal data flow error.")),
2815           ("streaming task paused, reason %s (%d)", reason, ret));
2816       gst_pad_push_event (pad, event);
2817     }
2818     goto done;
2819   }
2820 null_buffer:
2821   {
2822     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2823         (_("Internal data flow error.")), ("element returned NULL buffer"));
2824     GST_LIVE_UNLOCK (src);
2825     goto done;
2826   }
2827 }
2828
2829 static gboolean
2830 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2831     GstAllocator * allocator, GstAllocationParams * params)
2832 {
2833   GstAllocator *oldalloc;
2834   GstBufferPool *oldpool;
2835   GstBaseSrcPrivate *priv = basesrc->priv;
2836
2837   if (pool) {
2838     GST_DEBUG_OBJECT (basesrc, "activate pool");
2839     if (!gst_buffer_pool_set_active (pool, TRUE))
2840       goto activate_failed;
2841   }
2842
2843   GST_OBJECT_LOCK (basesrc);
2844   oldpool = priv->pool;
2845   priv->pool = pool;
2846
2847   oldalloc = priv->allocator;
2848   priv->allocator = allocator;
2849
2850   if (params)
2851     priv->params = *params;
2852   else
2853     gst_allocation_params_init (&priv->params);
2854   GST_OBJECT_UNLOCK (basesrc);
2855
2856   if (oldpool) {
2857     /* only deactivate if the pool is not the one we're using */
2858     if (oldpool != pool) {
2859       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2860       gst_buffer_pool_set_active (oldpool, FALSE);
2861     }
2862     gst_object_unref (oldpool);
2863   }
2864   if (oldalloc) {
2865     gst_object_unref (oldalloc);
2866   }
2867   return TRUE;
2868
2869   /* ERRORS */
2870 activate_failed:
2871   {
2872     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2873     return FALSE;
2874   }
2875 }
2876
2877 static gboolean
2878 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2879 {
2880   GstBaseSrcPrivate *priv = basesrc->priv;
2881   GstBufferPool *pool;
2882   gboolean res = TRUE;
2883
2884   GST_OBJECT_LOCK (basesrc);
2885   if ((pool = priv->pool))
2886     pool = gst_object_ref (pool);
2887   GST_OBJECT_UNLOCK (basesrc);
2888
2889   if (pool) {
2890     res = gst_buffer_pool_set_active (pool, active);
2891     gst_object_unref (pool);
2892   }
2893   return res;
2894 }
2895
2896
2897 static gboolean
2898 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2899 {
2900   GstCaps *outcaps;
2901   GstBufferPool *pool;
2902   guint size, min, max;
2903   GstAllocator *allocator;
2904   GstAllocationParams params;
2905   GstStructure *config;
2906   gboolean update_allocator;
2907
2908   gst_query_parse_allocation (query, &outcaps, NULL);
2909
2910   /* we got configuration from our peer or the decide_allocation method,
2911    * parse them */
2912   if (gst_query_get_n_allocation_params (query) > 0) {
2913     /* try the allocator */
2914     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2915     update_allocator = TRUE;
2916   } else {
2917     allocator = NULL;
2918     gst_allocation_params_init (&params);
2919     update_allocator = FALSE;
2920   }
2921
2922   if (gst_query_get_n_allocation_pools (query) > 0) {
2923     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2924
2925     if (pool == NULL) {
2926       /* no pool, we can make our own */
2927       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2928       pool = gst_buffer_pool_new ();
2929     }
2930   } else {
2931     pool = NULL;
2932     size = min = max = 0;
2933   }
2934
2935   /* now configure */
2936   if (pool) {
2937     config = gst_buffer_pool_get_config (pool);
2938     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2939     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2940     gst_buffer_pool_set_config (pool, config);
2941   }
2942
2943   if (update_allocator)
2944     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2945   else
2946     gst_query_add_allocation_param (query, allocator, &params);
2947   if (allocator)
2948     gst_object_unref (allocator);
2949
2950   if (pool) {
2951     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2952     gst_object_unref (pool);
2953   }
2954
2955   return TRUE;
2956 }
2957
2958 static gboolean
2959 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2960 {
2961   GstBaseSrcClass *bclass;
2962   gboolean result = TRUE;
2963   GstQuery *query;
2964   GstBufferPool *pool = NULL;
2965   GstAllocator *allocator = NULL;
2966   GstAllocationParams params;
2967
2968   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2969
2970   /* make query and let peer pad answer, we don't really care if it worked or
2971    * not, if it failed, the allocation query would contain defaults and the
2972    * subclass would then set better values if needed */
2973   query = gst_query_new_allocation (caps, TRUE);
2974   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2975     /* not a problem, just debug a little */
2976     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2977   }
2978
2979   g_assert (bclass->decide_allocation != NULL);
2980   result = bclass->decide_allocation (basesrc, query);
2981
2982   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2983       query);
2984
2985   if (!result)
2986     goto no_decide_allocation;
2987
2988   /* we got configuration from our peer or the decide_allocation method,
2989    * parse them */
2990   if (gst_query_get_n_allocation_params (query) > 0) {
2991     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2992   } else {
2993     allocator = NULL;
2994     gst_allocation_params_init (&params);
2995   }
2996
2997   if (gst_query_get_n_allocation_pools (query) > 0)
2998     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2999
3000   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
3001
3002   gst_query_unref (query);
3003
3004   return result;
3005
3006   /* Errors */
3007 no_decide_allocation:
3008   {
3009     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
3010     gst_query_unref (query);
3011
3012     return result;
3013   }
3014 }
3015
3016 /* default negotiation code.
3017  *
3018  * Take intersection between src and sink pads, take first
3019  * caps and fixate.
3020  */
3021 static gboolean
3022 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
3023 {
3024   GstCaps *thiscaps;
3025   GstCaps *caps = NULL;
3026   GstCaps *peercaps = NULL;
3027   gboolean result = FALSE;
3028
3029   /* first see what is possible on our source pad */
3030   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3031   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3032   /* nothing or anything is allowed, we're done */
3033   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3034     goto no_nego_needed;
3035
3036   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3037     goto no_caps;
3038
3039   /* get the peer caps */
3040   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3041   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3042   if (peercaps) {
3043     /* The result is already a subset of our caps */
3044     caps = peercaps;
3045     gst_caps_unref (thiscaps);
3046   } else {
3047     /* no peer, work with our own caps then */
3048     caps = thiscaps;
3049   }
3050   if (caps && !gst_caps_is_empty (caps)) {
3051     /* now fixate */
3052     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3053     if (gst_caps_is_any (caps)) {
3054       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3055       /* hmm, still anything, so element can do anything and
3056        * nego is not needed */
3057       result = TRUE;
3058     } else {
3059       caps = gst_base_src_fixate (basesrc, caps);
3060       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3061       if (gst_caps_is_fixed (caps)) {
3062         /* yay, fixed caps, use those then, it's possible that the subclass does
3063          * not accept this caps after all and we have to fail. */
3064         result = gst_base_src_set_caps (basesrc, caps);
3065       }
3066     }
3067     gst_caps_unref (caps);
3068   } else {
3069     if (caps)
3070       gst_caps_unref (caps);
3071     GST_DEBUG_OBJECT (basesrc, "no common caps");
3072   }
3073   return result;
3074
3075 no_nego_needed:
3076   {
3077     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3078     if (thiscaps)
3079       gst_caps_unref (thiscaps);
3080     return TRUE;
3081   }
3082 no_caps:
3083   {
3084     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3085         ("No supported formats found"),
3086         ("This element did not produce valid caps"));
3087     if (thiscaps)
3088       gst_caps_unref (thiscaps);
3089     return TRUE;
3090   }
3091 }
3092
3093 static gboolean
3094 gst_base_src_negotiate (GstBaseSrc * basesrc)
3095 {
3096   GstBaseSrcClass *bclass;
3097   gboolean result;
3098
3099   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3100
3101   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3102
3103   if (G_LIKELY (bclass->negotiate))
3104     result = bclass->negotiate (basesrc);
3105   else
3106     result = TRUE;
3107
3108   if (G_LIKELY (result)) {
3109     GstCaps *caps;
3110
3111     caps = gst_pad_get_current_caps (basesrc->srcpad);
3112
3113     result = gst_base_src_prepare_allocation (basesrc, caps);
3114
3115     if (caps)
3116       gst_caps_unref (caps);
3117   }
3118   return result;
3119 }
3120
3121 static gboolean
3122 gst_base_src_start (GstBaseSrc * basesrc)
3123 {
3124   GstBaseSrcClass *bclass;
3125   gboolean result;
3126
3127   GST_LIVE_LOCK (basesrc);
3128
3129   GST_OBJECT_LOCK (basesrc);
3130   if (GST_BASE_SRC_IS_STARTING (basesrc))
3131     goto was_starting;
3132   if (GST_BASE_SRC_IS_STARTED (basesrc))
3133     goto was_started;
3134
3135   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3136   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3137   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3138   GST_OBJECT_UNLOCK (basesrc);
3139
3140   basesrc->num_buffers_left = basesrc->num_buffers;
3141   basesrc->running = FALSE;
3142   basesrc->priv->segment_pending = FALSE;
3143   GST_LIVE_UNLOCK (basesrc);
3144
3145   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3146   if (bclass->start)
3147     result = bclass->start (basesrc);
3148   else
3149     result = TRUE;
3150
3151   if (!result)
3152     goto could_not_start;
3153
3154   if (!gst_base_src_is_async (basesrc)) {
3155     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3156     /* not really waiting here, we call this to get the result
3157      * from the start_complete call */
3158     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3159   }
3160
3161   return result;
3162
3163   /* ERROR */
3164 was_starting:
3165   {
3166     GST_DEBUG_OBJECT (basesrc, "was starting");
3167     GST_OBJECT_UNLOCK (basesrc);
3168     GST_LIVE_UNLOCK (basesrc);
3169     return TRUE;
3170   }
3171 was_started:
3172   {
3173     GST_DEBUG_OBJECT (basesrc, "was started");
3174     GST_OBJECT_UNLOCK (basesrc);
3175     GST_LIVE_UNLOCK (basesrc);
3176     return TRUE;
3177   }
3178 could_not_start:
3179   {
3180     GST_DEBUG_OBJECT (basesrc, "could not start");
3181     /* subclass is supposed to post a message. We don't have to call _stop. */
3182     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3183     return FALSE;
3184   }
3185 }
3186
3187 /**
3188  * gst_base_src_start_complete:
3189  * @basesrc: base source instance
3190  * @ret: a #GstFlowReturn
3191  *
3192  * Complete an asynchronous start operation. When the subclass overrides the
3193  * start method, it should call gst_base_src_start_complete() when the start
3194  * operation completes either from the same thread or from an asynchronous
3195  * helper thread.
3196  */
3197 void
3198 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3199 {
3200   gboolean have_size;
3201   guint64 size;
3202   gboolean seekable;
3203   GstFormat format;
3204   GstPadMode mode;
3205   GstEvent *event;
3206
3207   if (ret != GST_FLOW_OK)
3208     goto error;
3209
3210   GST_DEBUG_OBJECT (basesrc, "starting source");
3211   format = basesrc->segment.format;
3212
3213   /* figure out the size */
3214   have_size = FALSE;
3215   size = -1;
3216   if (format == GST_FORMAT_BYTES) {
3217     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3218
3219     if (bclass->get_size) {
3220       if (!(have_size = bclass->get_size (basesrc, &size)))
3221         size = -1;
3222     }
3223     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3224     /* only update the size when operating in bytes, subclass is supposed
3225      * to set duration in the start method for other formats */
3226     GST_OBJECT_LOCK (basesrc);
3227     basesrc->segment.duration = size;
3228     GST_OBJECT_UNLOCK (basesrc);
3229   }
3230
3231   GST_DEBUG_OBJECT (basesrc,
3232       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3233       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3234       basesrc->segment.duration);
3235
3236   seekable = gst_base_src_seekable (basesrc);
3237   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3238
3239   /* update for random access flag */
3240   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3241
3242   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3243
3244   /* stop flushing now but for live sources, still block in the LIVE lock when
3245    * we are not yet PLAYING */
3246   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3247
3248   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3249
3250   GST_OBJECT_LOCK (basesrc->srcpad);
3251   mode = GST_PAD_MODE (basesrc->srcpad);
3252   GST_OBJECT_UNLOCK (basesrc->srcpad);
3253
3254   /* take the stream lock here, we only want to let the task run when we have
3255    * set the STARTED flag */
3256   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3257   if (mode == GST_PAD_MODE_PUSH) {
3258     /* do initial seek, which will start the task */
3259     GST_OBJECT_LOCK (basesrc);
3260     event = basesrc->pending_seek;
3261     basesrc->pending_seek = NULL;
3262     GST_OBJECT_UNLOCK (basesrc);
3263
3264     /* The perform seek code will start the task when finished. We don't have to
3265      * unlock the streaming thread because it is not running yet */
3266     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3267       goto seek_failed;
3268
3269     if (event)
3270       gst_event_unref (event);
3271   } else {
3272     /* if not random_access, we cannot operate in pull mode for now */
3273     if (G_UNLIKELY (!basesrc->random_access))
3274       goto no_get_range;
3275   }
3276
3277   GST_OBJECT_LOCK (basesrc);
3278   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3279   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3280   basesrc->priv->start_result = ret;
3281   GST_ASYNC_SIGNAL (basesrc);
3282   GST_OBJECT_UNLOCK (basesrc);
3283
3284   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3285
3286   return;
3287
3288 seek_failed:
3289   {
3290     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3291     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3292     gst_base_src_stop (basesrc);
3293     if (event)
3294       gst_event_unref (event);
3295     ret = GST_FLOW_ERROR;
3296     goto error;
3297   }
3298 no_get_range:
3299   {
3300     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3301     gst_base_src_stop (basesrc);
3302     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3303     ret = GST_FLOW_ERROR;
3304     goto error;
3305   }
3306 error:
3307   {
3308     GST_OBJECT_LOCK (basesrc);
3309     basesrc->priv->start_result = ret;
3310     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3311     GST_ASYNC_SIGNAL (basesrc);
3312     GST_OBJECT_UNLOCK (basesrc);
3313     return;
3314   }
3315 }
3316
3317 /**
3318  * gst_base_src_start_wait:
3319  * @basesrc: base source instance
3320  *
3321  * Wait until the start operation completes.
3322  *
3323  * Returns: a #GstFlowReturn.
3324  */
3325 GstFlowReturn
3326 gst_base_src_start_wait (GstBaseSrc * basesrc)
3327 {
3328   GstFlowReturn result;
3329
3330   GST_OBJECT_LOCK (basesrc);
3331   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3332     GST_ASYNC_WAIT (basesrc);
3333   }
3334   result = basesrc->priv->start_result;
3335   GST_OBJECT_UNLOCK (basesrc);
3336
3337   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3338
3339   return result;
3340 }
3341
3342 static gboolean
3343 gst_base_src_stop (GstBaseSrc * basesrc)
3344 {
3345   GstBaseSrcClass *bclass;
3346   gboolean result = TRUE;
3347
3348   GST_DEBUG_OBJECT (basesrc, "stopping source");
3349
3350   /* flush all */
3351   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3352   /* stop the task */
3353   gst_pad_stop_task (basesrc->srcpad);
3354
3355   GST_OBJECT_LOCK (basesrc);
3356   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3357     goto was_stopped;
3358
3359   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3360   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3361   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3362   GST_ASYNC_SIGNAL (basesrc);
3363   GST_OBJECT_UNLOCK (basesrc);
3364
3365   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3366   if (bclass->stop)
3367     result = bclass->stop (basesrc);
3368
3369   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3370
3371   return result;
3372
3373 was_stopped:
3374   {
3375     GST_DEBUG_OBJECT (basesrc, "was stopped");
3376     GST_OBJECT_UNLOCK (basesrc);
3377     return TRUE;
3378   }
3379 }
3380
3381 /* start or stop flushing dataprocessing
3382  */
3383 static gboolean
3384 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3385     gboolean flushing, gboolean live_play, gboolean * playing)
3386 {
3387   GstBaseSrcClass *bclass;
3388
3389   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3390
3391   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3392
3393   if (flushing) {
3394     gst_base_src_activate_pool (basesrc, FALSE);
3395     /* unlock any subclasses, we need to do this before grabbing the
3396      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3397      * unlock to the params because of backwards compat (see seek handler)*/
3398     if (bclass->unlock)
3399       bclass->unlock (basesrc);
3400   }
3401
3402   /* the live lock is released when we are blocked, waiting for playing or
3403    * when we sync to the clock. */
3404   GST_LIVE_LOCK (basesrc);
3405   if (playing)
3406     *playing = basesrc->live_running;
3407   basesrc->priv->flushing = flushing;
3408   if (flushing) {
3409     /* if we are locked in the live lock, signal it to make it flush */
3410     basesrc->live_running = TRUE;
3411
3412     /* clear pending EOS if any */
3413     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3414
3415     /* step 1, now that we have the LIVE lock, clear our unlock request */
3416     if (bclass->unlock_stop)
3417       bclass->unlock_stop (basesrc);
3418
3419     /* step 2, unblock clock sync (if any) or any other blocking thing */
3420     if (basesrc->clock_id)
3421       gst_clock_id_unschedule (basesrc->clock_id);
3422   } else {
3423     /* signal the live source that it can start playing */
3424     basesrc->live_running = live_play;
3425
3426     gst_base_src_activate_pool (basesrc, TRUE);
3427
3428     /* Drop all delayed events */
3429     GST_OBJECT_LOCK (basesrc);
3430     if (basesrc->priv->pending_events) {
3431       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3432           NULL);
3433       g_list_free (basesrc->priv->pending_events);
3434       basesrc->priv->pending_events = NULL;
3435       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3436     }
3437     GST_OBJECT_UNLOCK (basesrc);
3438   }
3439   GST_LIVE_SIGNAL (basesrc);
3440   GST_LIVE_UNLOCK (basesrc);
3441
3442   return TRUE;
3443 }
3444
3445 /* the purpose of this function is to make sure that a live source blocks in the
3446  * LIVE lock or leaves the LIVE lock and continues playing. */
3447 static gboolean
3448 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3449 {
3450   GstBaseSrcClass *bclass;
3451
3452   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3453
3454   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3455   if (!live_play) {
3456     GST_DEBUG_OBJECT (basesrc, "unlock");
3457     if (bclass->unlock)
3458       bclass->unlock (basesrc);
3459   }
3460
3461   /* we are now able to grab the LIVE lock, when we get it, we can be
3462    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3463    * for the clock. */
3464   GST_LIVE_LOCK (basesrc);
3465   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3466
3467   /* unblock clock sync (if any) */
3468   if (basesrc->clock_id)
3469     gst_clock_id_unschedule (basesrc->clock_id);
3470
3471   /* configure what to do when we get to the LIVE lock. */
3472   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3473   basesrc->live_running = live_play;
3474
3475   if (live_play) {
3476     gboolean start;
3477
3478     /* clear our unlock request when going to PLAYING */
3479     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3480     if (bclass->unlock_stop)
3481       bclass->unlock_stop (basesrc);
3482
3483     /* for live sources we restart the timestamp correction */
3484     basesrc->priv->latency = -1;
3485     /* have to restart the task in case it stopped because of the unlock when
3486      * we went to PAUSED. Only do this if we operating in push mode. */
3487     GST_OBJECT_LOCK (basesrc->srcpad);
3488     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3489     GST_OBJECT_UNLOCK (basesrc->srcpad);
3490     if (start)
3491       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3492           basesrc->srcpad, NULL);
3493     GST_DEBUG_OBJECT (basesrc, "signal");
3494     GST_LIVE_SIGNAL (basesrc);
3495   }
3496   GST_LIVE_UNLOCK (basesrc);
3497
3498   return TRUE;
3499 }
3500
3501 static gboolean
3502 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3503 {
3504   GstBaseSrc *basesrc;
3505
3506   basesrc = GST_BASE_SRC (parent);
3507
3508   /* prepare subclass first */
3509   if (active) {
3510     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3511
3512     if (G_UNLIKELY (!basesrc->can_activate_push))
3513       goto no_push_activation;
3514
3515     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3516       goto error_start;
3517   } else {
3518     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3519     /* now we can stop the source */
3520     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3521       goto error_stop;
3522   }
3523   return TRUE;
3524
3525   /* ERRORS */
3526 no_push_activation:
3527   {
3528     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3529     return FALSE;
3530   }
3531 error_start:
3532   {
3533     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3534     return FALSE;
3535   }
3536 error_stop:
3537   {
3538     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3539     return FALSE;
3540   }
3541 }
3542
3543 static gboolean
3544 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3545 {
3546   GstBaseSrc *basesrc;
3547
3548   basesrc = GST_BASE_SRC (parent);
3549
3550   /* prepare subclass first */
3551   if (active) {
3552     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3553     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3554       goto error_start;
3555   } else {
3556     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3557     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3558       goto error_stop;
3559   }
3560   return TRUE;
3561
3562   /* ERRORS */
3563 error_start:
3564   {
3565     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3566     return FALSE;
3567   }
3568 error_stop:
3569   {
3570     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3571     return FALSE;
3572   }
3573 }
3574
3575 static gboolean
3576 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3577     GstPadMode mode, gboolean active)
3578 {
3579   gboolean res;
3580   GstBaseSrc *src = GST_BASE_SRC (parent);
3581
3582   src->priv->stream_start_pending = FALSE;
3583
3584   switch (mode) {
3585     case GST_PAD_MODE_PULL:
3586       res = gst_base_src_activate_pull (pad, parent, active);
3587       break;
3588     case GST_PAD_MODE_PUSH:
3589       src->priv->stream_start_pending = active;
3590       res = gst_base_src_activate_push (pad, parent, active);
3591       break;
3592     default:
3593       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3594       res = FALSE;
3595       break;
3596   }
3597   return res;
3598 }
3599
3600
3601 static GstStateChangeReturn
3602 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3603 {
3604   GstBaseSrc *basesrc;
3605   GstStateChangeReturn result;
3606   gboolean no_preroll = FALSE;
3607
3608   basesrc = GST_BASE_SRC (element);
3609
3610   switch (transition) {
3611     case GST_STATE_CHANGE_NULL_TO_READY:
3612       break;
3613     case GST_STATE_CHANGE_READY_TO_PAUSED:
3614       no_preroll = gst_base_src_is_live (basesrc);
3615       break;
3616     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3617       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3618       if (gst_base_src_is_live (basesrc)) {
3619         /* now we can start playback */
3620         gst_base_src_set_playing (basesrc, TRUE);
3621       }
3622       break;
3623     default:
3624       break;
3625   }
3626
3627   if ((result =
3628           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3629               transition)) == GST_STATE_CHANGE_FAILURE)
3630     goto failure;
3631
3632   switch (transition) {
3633     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3634       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3635       if (gst_base_src_is_live (basesrc)) {
3636         /* make sure we block in the live lock in PAUSED */
3637         gst_base_src_set_playing (basesrc, FALSE);
3638         no_preroll = TRUE;
3639       }
3640       break;
3641     case GST_STATE_CHANGE_PAUSED_TO_READY:
3642     {
3643       /* we don't need to unblock anything here, the pad deactivation code
3644        * already did this */
3645       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3646       gst_event_replace (&basesrc->pending_seek, NULL);
3647       break;
3648     }
3649     case GST_STATE_CHANGE_READY_TO_NULL:
3650       break;
3651     default:
3652       break;
3653   }
3654
3655   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3656     result = GST_STATE_CHANGE_NO_PREROLL;
3657
3658   return result;
3659
3660   /* ERRORS */
3661 failure:
3662   {
3663     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3664     return result;
3665   }
3666 }
3667
3668 /**
3669  * gst_base_src_get_buffer_pool:
3670  * @src: a #GstBaseSrc
3671  *
3672  * Returns: (transfer full): the instance of the #GstBufferPool used
3673  * by the src; free it after use it
3674  */
3675 GstBufferPool *
3676 gst_base_src_get_buffer_pool (GstBaseSrc * src)
3677 {
3678   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
3679
3680   if (src->priv->pool)
3681     return gst_object_ref (src->priv->pool);
3682
3683   return NULL;
3684 }
3685
3686 /**
3687  * gst_base_src_get_allocator:
3688  * @src: a #GstBaseSrc
3689  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3690  * used
3691  * @params: (out) (allow-none) (transfer full): the
3692  * #GstAllocatorParams of @allocator
3693  *
3694  * Lets #GstBaseSrc sub-classes to know the memory @allocator
3695  * used by the base class and its @params.
3696  *
3697  * Unref the @allocator after use it.
3698  */
3699 void
3700 gst_base_src_get_allocator (GstBaseSrc * src,
3701     GstAllocator ** allocator, GstAllocationParams * params)
3702 {
3703   g_return_if_fail (GST_IS_BASE_SRC (src));
3704
3705   if (allocator)
3706     *allocator = src->priv->allocator ?
3707         gst_object_ref (src->priv->allocator) : NULL;
3708
3709   if (params)
3710     *params = src->priv->params;
3711 }