basesrc: Only force-update the duration for dynamic sources when doing the DURATION...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_SEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *
125  *   gst_element_class_set_static_metadata (gstelement_class,
126  *      "Source name",
127  *      "Source",
128  *      "My Source element",
129  *      "The author &lt;my.sink@my.email&gt;");
130  * }
131  * ]|
132  *
133  * <refsect2>
134  * <title>Controlled shutdown of live sources in applications</title>
135  * <para>
136  * Applications that record from a live source may want to stop recording
137  * in a controlled way, so that the recording is stopped, but the data
138  * already in the pipeline is processed to the end (remember that many live
139  * sources would go on recording forever otherwise). For that to happen the
140  * application needs to make the source stop recording and send an EOS
141  * event down the pipeline. The application would then wait for an
142  * EOS message posted on the pipeline's bus to know when all data has
143  * been processed and the pipeline can safely be stopped.
144  *
145  * An application may send an EOS event to a source element to make it
146  * perform the EOS logic (send EOS event downstream or post a
147  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
148  * with the gst_element_send_event() function on the element or its parent bin.
149  *
150  * After the EOS has been sent to the element, the application should wait for
151  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
152  * received, it may safely shut down the entire pipeline.
153  *
154  * Last reviewed on 2007-12-19 (0.10.16)
155  * </para>
156  * </refsect2>
157  */
158
159 #ifdef HAVE_CONFIG_H
160 #  include "config.h"
161 #endif
162
163 #include <stdlib.h>
164 #include <string.h>
165
166 #include <gst/gst_private.h>
167 #include <gst/glib-compat-private.h>
168
169 #include "gstbasesrc.h"
170 #include "gsttypefindhelper.h"
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
183 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
184 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
185
186
187 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
188 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
189 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
190
191
192 /* BaseSrc signals and args */
193 enum
194 {
195   /* FILL ME */
196   LAST_SIGNAL
197 };
198
199 #define DEFAULT_BLOCKSIZE       4096
200 #define DEFAULT_NUM_BUFFERS     -1
201 #define DEFAULT_TYPEFIND        FALSE
202 #define DEFAULT_DO_TIMESTAMP    FALSE
203
204 enum
205 {
206   PROP_0,
207   PROP_BLOCKSIZE,
208   PROP_NUM_BUFFERS,
209   PROP_TYPEFIND,
210   PROP_DO_TIMESTAMP
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean discont;
219   gboolean flushing;
220
221   GstFlowReturn start_result;
222   gboolean async;
223
224   /* if a stream-start event should be sent */
225   gboolean stream_start_pending;
226
227   /* if segment should be sent */
228   gboolean segment_pending;
229
230   /* if EOS is pending (atomic) */
231   gint pending_eos;
232
233   /* startup latency is the time it takes between going to PLAYING and producing
234    * the first BUFFER with running_time 0. This value is included in the latency
235    * reporting. */
236   GstClockTime latency;
237   /* timestamp offset, this is the offset add to the values of gst_times for
238    * pseudo live sources */
239   GstClockTimeDiff ts_offset;
240
241   gboolean do_timestamp;
242   volatile gint dynamic_size;
243
244   /* stream sequence number */
245   guint32 seqnum;
246
247   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
248    * pushed in the data stream */
249   GList *pending_events;
250   volatile gint have_events;
251
252   /* QoS *//* with LOCK */
253   gboolean qos_enabled;
254   gdouble proportion;
255   GstClockTime earliest_time;
256
257   GstBufferPool *pool;
258   GstAllocator *allocator;
259   GstAllocationParams params;
260
261   GCond async_cond;
262 };
263
264 static GstElementClass *parent_class = NULL;
265
266 static void gst_base_src_class_init (GstBaseSrcClass * klass);
267 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
268 static void gst_base_src_finalize (GObject * object);
269
270
271 GType
272 gst_base_src_get_type (void)
273 {
274   static volatile gsize base_src_type = 0;
275
276   if (g_once_init_enter (&base_src_type)) {
277     GType _type;
278     static const GTypeInfo base_src_info = {
279       sizeof (GstBaseSrcClass),
280       NULL,
281       NULL,
282       (GClassInitFunc) gst_base_src_class_init,
283       NULL,
284       NULL,
285       sizeof (GstBaseSrc),
286       0,
287       (GInstanceInitFunc) gst_base_src_init,
288     };
289
290     _type = g_type_register_static (GST_TYPE_ELEMENT,
291         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
292     g_once_init_leave (&base_src_type, _type);
293   }
294   return base_src_type;
295 }
296
297 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
298     GstCaps * filter);
299 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
300 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
301
302 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
303 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
304     GstPadMode mode, gboolean active);
305 static void gst_base_src_set_property (GObject * object, guint prop_id,
306     const GValue * value, GParamSpec * pspec);
307 static void gst_base_src_get_property (GObject * object, guint prop_id,
308     GValue * value, GParamSpec * pspec);
309 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
310     GstEvent * event);
311 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
312 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
313
314 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
315     GstQuery * query);
316
317 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
318     gboolean active);
319 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
321     GstSegment * segment);
322 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
323 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
324     GstEvent * event, GstSegment * segment);
325 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
326     guint64 offset, guint size, GstBuffer ** buf);
327 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
328     guint64 offset, guint size, GstBuffer ** buf);
329 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
330     GstQuery * query);
331
332 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
333     gboolean flushing, gboolean live_play, gboolean * playing);
334
335 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
337
338 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
339     GstStateChange transition);
340
341 static void gst_base_src_loop (GstPad * pad);
342 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
343     guint64 offset, guint length, GstBuffer ** buf);
344 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
345     guint length, GstBuffer ** buf);
346 static gboolean gst_base_src_seekable (GstBaseSrc * src);
347 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
348 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
349     guint * length, gboolean force);
350
351 static void
352 gst_base_src_class_init (GstBaseSrcClass * klass)
353 {
354   GObjectClass *gobject_class;
355   GstElementClass *gstelement_class;
356
357   gobject_class = G_OBJECT_CLASS (klass);
358   gstelement_class = GST_ELEMENT_CLASS (klass);
359
360   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
361
362   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
363
364   parent_class = g_type_class_peek_parent (klass);
365
366   gobject_class->finalize = gst_base_src_finalize;
367   gobject_class->set_property = gst_base_src_set_property;
368   gobject_class->get_property = gst_base_src_get_property;
369
370   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
371       g_param_spec_uint ("blocksize", "Block size",
372           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
373           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
375       g_param_spec_int ("num-buffers", "num-buffers",
376           "Number of buffers to output before sending EOS (-1 = unlimited)",
377           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
378           G_PARAM_STATIC_STRINGS));
379   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
380       g_param_spec_boolean ("typefind", "Typefind",
381           "Run typefind before negotiating", DEFAULT_TYPEFIND,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
384       g_param_spec_boolean ("do-timestamp", "Do timestamp",
385           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   gstelement_class->change_state =
389       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
390   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
391
392   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
393   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
394   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
395   klass->prepare_seek_segment =
396       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
397   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
398   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
399   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
400   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
401   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
402   klass->decide_allocation =
403       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
404
405   /* Registering debug symbols for function pointers */
406   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
407   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
408   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
409   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
410   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
411 }
412
413 static void
414 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
415 {
416   GstPad *pad;
417   GstPadTemplate *pad_template;
418
419   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
420
421   basesrc->is_live = FALSE;
422   g_mutex_init (&basesrc->live_lock);
423   g_cond_init (&basesrc->live_cond);
424   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
425   basesrc->num_buffers_left = -1;
426
427   basesrc->can_activate_push = TRUE;
428
429   pad_template =
430       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
431   g_return_if_fail (pad_template != NULL);
432
433   GST_DEBUG_OBJECT (basesrc, "creating src pad");
434   pad = gst_pad_new_from_template (pad_template, "src");
435
436   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
437   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
438   gst_pad_set_event_function (pad, gst_base_src_event);
439   gst_pad_set_query_function (pad, gst_base_src_query);
440   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
441
442   /* hold pointer to pad */
443   basesrc->srcpad = pad;
444   GST_DEBUG_OBJECT (basesrc, "adding src pad");
445   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
446
447   basesrc->blocksize = DEFAULT_BLOCKSIZE;
448   basesrc->clock_id = NULL;
449   /* we operate in BYTES by default */
450   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
451   basesrc->typefind = DEFAULT_TYPEFIND;
452   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
453   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
454
455   g_cond_init (&basesrc->priv->async_cond);
456   basesrc->priv->start_result = GST_FLOW_FLUSHING;
457   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
458   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
459   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
460
461   GST_DEBUG_OBJECT (basesrc, "init done");
462 }
463
464 static void
465 gst_base_src_finalize (GObject * object)
466 {
467   GstBaseSrc *basesrc;
468   GstEvent **event_p;
469
470   basesrc = GST_BASE_SRC (object);
471
472   g_mutex_clear (&basesrc->live_lock);
473   g_cond_clear (&basesrc->live_cond);
474   g_cond_clear (&basesrc->priv->async_cond);
475
476   event_p = &basesrc->pending_seek;
477   gst_event_replace (event_p, NULL);
478
479   if (basesrc->priv->pending_events) {
480     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
481         NULL);
482     g_list_free (basesrc->priv->pending_events);
483   }
484
485   G_OBJECT_CLASS (parent_class)->finalize (object);
486 }
487
488 /**
489  * gst_base_src_wait_playing:
490  * @src: the src
491  *
492  * If the #GstBaseSrcClass.create() method performs its own synchronisation
493  * against the clock it must unblock when going from PLAYING to the PAUSED state
494  * and call this method before continuing to produce the remaining data.
495  *
496  * This function will block until a state change to PLAYING happens (in which
497  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
498  * to a state change to READY or a FLUSH event (in which case this function
499  * returns #GST_FLOW_FLUSHING).
500  *
501  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
502  * continue. Any other return value should be returned from the create vmethod.
503  */
504 GstFlowReturn
505 gst_base_src_wait_playing (GstBaseSrc * src)
506 {
507   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
508
509   do {
510     /* block until the state changes, or we get a flush, or something */
511     GST_DEBUG_OBJECT (src, "live source waiting for running state");
512     GST_LIVE_WAIT (src);
513     GST_DEBUG_OBJECT (src, "live source unlocked");
514     if (src->priv->flushing)
515       goto flushing;
516   } while (G_UNLIKELY (!src->live_running));
517
518   return GST_FLOW_OK;
519
520   /* ERRORS */
521 flushing:
522   {
523     GST_DEBUG_OBJECT (src, "we are flushing");
524     return GST_FLOW_FLUSHING;
525   }
526 }
527
528 /**
529  * gst_base_src_set_live:
530  * @src: base source instance
531  * @live: new live-mode
532  *
533  * If the element listens to a live source, @live should
534  * be set to %TRUE.
535  *
536  * A live source will not produce data in the PAUSED state and
537  * will therefore not be able to participate in the PREROLL phase
538  * of a pipeline. To signal this fact to the application and the
539  * pipeline, the state change return value of the live source will
540  * be GST_STATE_CHANGE_NO_PREROLL.
541  */
542 void
543 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
544 {
545   g_return_if_fail (GST_IS_BASE_SRC (src));
546
547   GST_OBJECT_LOCK (src);
548   src->is_live = live;
549   GST_OBJECT_UNLOCK (src);
550 }
551
552 /**
553  * gst_base_src_is_live:
554  * @src: base source instance
555  *
556  * Check if an element is in live mode.
557  *
558  * Returns: %TRUE if element is in live mode.
559  */
560 gboolean
561 gst_base_src_is_live (GstBaseSrc * src)
562 {
563   gboolean result;
564
565   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
566
567   GST_OBJECT_LOCK (src);
568   result = src->is_live;
569   GST_OBJECT_UNLOCK (src);
570
571   return result;
572 }
573
574 /**
575  * gst_base_src_set_format:
576  * @src: base source instance
577  * @format: the format to use
578  *
579  * Sets the default format of the source. This will be the format used
580  * for sending SEGMENT events and for performing seeks.
581  *
582  * If a format of GST_FORMAT_BYTES is set, the element will be able to
583  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
584  *
585  * This function must only be called in states < %GST_STATE_PAUSED.
586  */
587 void
588 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
589 {
590   g_return_if_fail (GST_IS_BASE_SRC (src));
591   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
592
593   GST_OBJECT_LOCK (src);
594   gst_segment_init (&src->segment, format);
595   GST_OBJECT_UNLOCK (src);
596 }
597
598 /**
599  * gst_base_src_set_dynamic_size:
600  * @src: base source instance
601  * @dynamic: new dynamic size mode
602  *
603  * If not @dynamic, size is only updated when needed, such as when trying to
604  * read past current tracked size.  Otherwise, size is checked for upon each
605  * read.
606  */
607 void
608 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
609 {
610   g_return_if_fail (GST_IS_BASE_SRC (src));
611
612   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
613 }
614
615 /**
616  * gst_base_src_set_async:
617  * @src: base source instance
618  * @async: new async mode
619  *
620  * Configure async behaviour in @src, no state change will block. The open,
621  * close, start, stop, play and pause virtual methods will be executed in a
622  * different thread and are thus allowed to perform blocking operations. Any
623  * blocking operation should be unblocked with the unlock vmethod.
624  */
625 void
626 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
627 {
628   g_return_if_fail (GST_IS_BASE_SRC (src));
629
630   GST_OBJECT_LOCK (src);
631   src->priv->async = async;
632   GST_OBJECT_UNLOCK (src);
633 }
634
635 /**
636  * gst_base_src_is_async:
637  * @src: base source instance
638  *
639  * Get the current async behaviour of @src. See also gst_base_src_set_async().
640  *
641  * Returns: %TRUE if @src is operating in async mode.
642  */
643 gboolean
644 gst_base_src_is_async (GstBaseSrc * src)
645 {
646   gboolean res;
647
648   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
649
650   GST_OBJECT_LOCK (src);
651   res = src->priv->async;
652   GST_OBJECT_UNLOCK (src);
653
654   return res;
655 }
656
657
658 /**
659  * gst_base_src_query_latency:
660  * @src: the source
661  * @live: (out) (allow-none): if the source is live
662  * @min_latency: (out) (allow-none): the min latency of the source
663  * @max_latency: (out) (allow-none): the max latency of the source
664  *
665  * Query the source for the latency parameters. @live will be TRUE when @src is
666  * configured as a live source. @min_latency will be set to the difference
667  * between the running time and the timestamp of the first buffer.
668  * @max_latency is always the undefined value of -1.
669  *
670  * This function is mostly used by subclasses.
671  *
672  * Returns: TRUE if the query succeeded.
673  */
674 gboolean
675 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
676     GstClockTime * min_latency, GstClockTime * max_latency)
677 {
678   GstClockTime min;
679
680   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
681
682   GST_OBJECT_LOCK (src);
683   if (live)
684     *live = src->is_live;
685
686   /* if we have a startup latency, report this one, else report 0. Subclasses
687    * are supposed to override the query function if they want something
688    * else. */
689   if (src->priv->latency != -1)
690     min = src->priv->latency;
691   else
692     min = 0;
693
694   if (min_latency)
695     *min_latency = min;
696   if (max_latency)
697     *max_latency = -1;
698
699   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
700       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
701       GST_TIME_ARGS (-1));
702   GST_OBJECT_UNLOCK (src);
703
704   return TRUE;
705 }
706
707 /**
708  * gst_base_src_set_blocksize:
709  * @src: the source
710  * @blocksize: the new blocksize in bytes
711  *
712  * Set the number of bytes that @src will push out with each buffer. When
713  * @blocksize is set to -1, a default length will be used.
714  */
715 void
716 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
717 {
718   g_return_if_fail (GST_IS_BASE_SRC (src));
719
720   GST_OBJECT_LOCK (src);
721   src->blocksize = blocksize;
722   GST_OBJECT_UNLOCK (src);
723 }
724
725 /**
726  * gst_base_src_get_blocksize:
727  * @src: the source
728  *
729  * Get the number of bytes that @src will push out with each buffer.
730  *
731  * Returns: the number of bytes pushed with each buffer.
732  */
733 guint
734 gst_base_src_get_blocksize (GstBaseSrc * src)
735 {
736   gint res;
737
738   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
739
740   GST_OBJECT_LOCK (src);
741   res = src->blocksize;
742   GST_OBJECT_UNLOCK (src);
743
744   return res;
745 }
746
747
748 /**
749  * gst_base_src_set_do_timestamp:
750  * @src: the source
751  * @timestamp: enable or disable timestamping
752  *
753  * Configure @src to automatically timestamp outgoing buffers based on the
754  * current running_time of the pipeline. This property is mostly useful for live
755  * sources.
756  */
757 void
758 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
759 {
760   g_return_if_fail (GST_IS_BASE_SRC (src));
761
762   GST_OBJECT_LOCK (src);
763   src->priv->do_timestamp = timestamp;
764   GST_OBJECT_UNLOCK (src);
765 }
766
767 /**
768  * gst_base_src_get_do_timestamp:
769  * @src: the source
770  *
771  * Query if @src timestamps outgoing buffers based on the current running_time.
772  *
773  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
774  */
775 gboolean
776 gst_base_src_get_do_timestamp (GstBaseSrc * src)
777 {
778   gboolean res;
779
780   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
781
782   GST_OBJECT_LOCK (src);
783   res = src->priv->do_timestamp;
784   GST_OBJECT_UNLOCK (src);
785
786   return res;
787 }
788
789 /**
790  * gst_base_src_new_seamless_segment:
791  * @src: The source
792  * @start: The new start value for the segment
793  * @stop: Stop value for the new segment
794  * @time: The new time value for the start of the new segent
795  *
796  * Prepare a new seamless segment for emission downstream. This function must
797  * only be called by derived sub-classes, and only from the create() function,
798  * as the stream-lock needs to be held.
799  *
800  * The format for the new segment will be the current format of the source, as
801  * configured with gst_base_src_set_format()
802  *
803  * Returns: %TRUE if preparation of the seamless segment succeeded.
804  */
805 gboolean
806 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
807     gint64 time)
808 {
809   gboolean res = TRUE;
810
811   GST_OBJECT_LOCK (src);
812
813   src->segment.base = gst_segment_to_running_time (&src->segment,
814       src->segment.format, src->segment.position);
815   src->segment.position = src->segment.start = start;
816   src->segment.stop = stop;
817   src->segment.time = time;
818
819   /* Mark pending segment. Will be sent before next data */
820   src->priv->segment_pending = TRUE;
821
822   GST_DEBUG_OBJECT (src,
823       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
824       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
825       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
826       GST_TIME_ARGS (src->segment.base));
827
828   GST_OBJECT_UNLOCK (src);
829
830   src->priv->discont = TRUE;
831   src->running = TRUE;
832
833   return res;
834 }
835
836 static gboolean
837 gst_base_src_send_stream_start (GstBaseSrc * src)
838 {
839   gboolean ret = TRUE;
840
841   if (src->priv->stream_start_pending) {
842     gchar *stream_id;
843
844     stream_id =
845         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
846
847     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
848     ret =
849         gst_pad_push_event (src->srcpad,
850         gst_event_new_stream_start (stream_id));
851     src->priv->stream_start_pending = FALSE;
852     g_free (stream_id);
853   }
854
855   return ret;
856 }
857
858 /**
859  * gst_base_src_set_caps:
860  * @src: a #GstBaseSrc
861  * @caps: a #GstCaps
862  *
863  * Set new caps on the basesrc source pad.
864  *
865  * Returns: %TRUE if the caps could be set
866  */
867 gboolean
868 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
869 {
870   GstBaseSrcClass *bclass;
871   gboolean res = TRUE;
872
873   bclass = GST_BASE_SRC_GET_CLASS (src);
874
875   gst_base_src_send_stream_start (src);
876
877   if (bclass->set_caps)
878     res = bclass->set_caps (src, caps);
879
880   if (res)
881     res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
882
883   return res;
884 }
885
886 static GstCaps *
887 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
888 {
889   GstCaps *caps = NULL;
890   GstPadTemplate *pad_template;
891   GstBaseSrcClass *bclass;
892
893   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
894
895   pad_template =
896       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
897
898   if (pad_template != NULL) {
899     caps = gst_pad_template_get_caps (pad_template);
900
901     if (filter) {
902       GstCaps *intersection;
903
904       intersection =
905           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
906       gst_caps_unref (caps);
907       caps = intersection;
908     }
909   }
910   return caps;
911 }
912
913 static GstCaps *
914 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
915 {
916   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
917   return gst_caps_fixate (caps);
918 }
919
920 static GstCaps *
921 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
922 {
923   GstBaseSrcClass *bclass;
924
925   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
926
927   if (bclass->fixate)
928     caps = bclass->fixate (bsrc, caps);
929
930   return caps;
931 }
932
933 static gboolean
934 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
935 {
936   gboolean res;
937
938   switch (GST_QUERY_TYPE (query)) {
939     case GST_QUERY_POSITION:
940     {
941       GstFormat format;
942
943       gst_query_parse_position (query, &format, NULL);
944
945       GST_DEBUG_OBJECT (src, "position query in format %s",
946           gst_format_get_name (format));
947
948       switch (format) {
949         case GST_FORMAT_PERCENT:
950         {
951           gint64 percent;
952           gint64 position;
953           gint64 duration;
954
955           GST_OBJECT_LOCK (src);
956           position = src->segment.position;
957           duration = src->segment.duration;
958           GST_OBJECT_UNLOCK (src);
959
960           if (position != -1 && duration != -1) {
961             if (position < duration)
962               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
963                   duration);
964             else
965               percent = GST_FORMAT_PERCENT_MAX;
966           } else
967             percent = -1;
968
969           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
970           res = TRUE;
971           break;
972         }
973         default:
974         {
975           gint64 position;
976           GstFormat seg_format;
977
978           GST_OBJECT_LOCK (src);
979           position =
980               gst_segment_to_stream_time (&src->segment, src->segment.format,
981               src->segment.position);
982           seg_format = src->segment.format;
983           GST_OBJECT_UNLOCK (src);
984
985           if (position != -1) {
986             /* convert to requested format */
987             res =
988                 gst_pad_query_convert (src->srcpad, seg_format,
989                 position, format, &position);
990           } else
991             res = TRUE;
992
993           gst_query_set_position (query, format, position);
994           break;
995         }
996       }
997       break;
998     }
999     case GST_QUERY_DURATION:
1000     {
1001       GstFormat format;
1002
1003       gst_query_parse_duration (query, &format, NULL);
1004
1005       GST_DEBUG_OBJECT (src, "duration query in format %s",
1006           gst_format_get_name (format));
1007
1008       switch (format) {
1009         case GST_FORMAT_PERCENT:
1010           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1011               GST_FORMAT_PERCENT_MAX);
1012           res = TRUE;
1013           break;
1014         default:
1015         {
1016           gint64 duration;
1017           GstFormat seg_format;
1018           guint length = 0;
1019
1020           /* may have to refresh duration */
1021           gst_base_src_update_length (src, 0, &length,
1022               g_atomic_int_get (&src->priv->dynamic_size));
1023
1024           /* this is the duration as configured by the subclass. */
1025           GST_OBJECT_LOCK (src);
1026           duration = src->segment.duration;
1027           seg_format = src->segment.format;
1028           GST_OBJECT_UNLOCK (src);
1029
1030           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1031               duration, gst_format_get_name (seg_format));
1032
1033           if (duration != -1) {
1034             /* convert to requested format, if this fails, we have a duration
1035              * but we cannot answer the query, we must return FALSE. */
1036             res =
1037                 gst_pad_query_convert (src->srcpad, seg_format,
1038                 duration, format, &duration);
1039           } else {
1040             /* The subclass did not configure a duration, we assume that the
1041              * media has an unknown duration then and we return TRUE to report
1042              * this. Note that this is not the same as returning FALSE, which
1043              * means that we cannot report the duration at all. */
1044             res = TRUE;
1045           }
1046           gst_query_set_duration (query, format, duration);
1047           break;
1048         }
1049       }
1050       break;
1051     }
1052
1053     case GST_QUERY_SEEKING:
1054     {
1055       GstFormat format, seg_format;
1056       gint64 duration;
1057
1058       GST_OBJECT_LOCK (src);
1059       duration = src->segment.duration;
1060       seg_format = src->segment.format;
1061       GST_OBJECT_UNLOCK (src);
1062
1063       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1064       if (format == seg_format) {
1065         gst_query_set_seeking (query, seg_format,
1066             gst_base_src_seekable (src), 0, duration);
1067         res = TRUE;
1068       } else {
1069         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1070         /* Don't reply to the query to make up for demuxers which don't
1071          * handle the SEEKING query yet. Players like Totem will fall back
1072          * to the duration when the SEEKING query isn't answered. */
1073         res = FALSE;
1074       }
1075       break;
1076     }
1077     case GST_QUERY_SEGMENT:
1078     {
1079       gint64 start, stop;
1080
1081       GST_OBJECT_LOCK (src);
1082       /* no end segment configured, current duration then */
1083       if ((stop = src->segment.stop) == -1)
1084         stop = src->segment.duration;
1085       start = src->segment.start;
1086
1087       /* adjust to stream time */
1088       if (src->segment.time != -1) {
1089         start -= src->segment.time;
1090         if (stop != -1)
1091           stop -= src->segment.time;
1092       }
1093
1094       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1095           start, stop);
1096       GST_OBJECT_UNLOCK (src);
1097       res = TRUE;
1098       break;
1099     }
1100
1101     case GST_QUERY_FORMATS:
1102     {
1103       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1104           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1105       res = TRUE;
1106       break;
1107     }
1108     case GST_QUERY_CONVERT:
1109     {
1110       GstFormat src_fmt, dest_fmt;
1111       gint64 src_val, dest_val;
1112
1113       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1114
1115       /* we can only convert between equal formats... */
1116       if (src_fmt == dest_fmt) {
1117         dest_val = src_val;
1118         res = TRUE;
1119       } else
1120         res = FALSE;
1121
1122       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1123       break;
1124     }
1125     case GST_QUERY_LATENCY:
1126     {
1127       GstClockTime min, max;
1128       gboolean live;
1129
1130       /* Subclasses should override and implement something useful */
1131       res = gst_base_src_query_latency (src, &live, &min, &max);
1132
1133       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1134           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1135           GST_TIME_ARGS (max));
1136
1137       gst_query_set_latency (query, live, min, max);
1138       break;
1139     }
1140     case GST_QUERY_JITTER:
1141     case GST_QUERY_RATE:
1142       res = FALSE;
1143       break;
1144     case GST_QUERY_BUFFERING:
1145     {
1146       GstFormat format, seg_format;
1147       gint64 start, stop, estimated;
1148
1149       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1150
1151       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1152           gst_format_get_name (format));
1153
1154       GST_OBJECT_LOCK (src);
1155       if (src->random_access) {
1156         estimated = 0;
1157         start = 0;
1158         if (format == GST_FORMAT_PERCENT)
1159           stop = GST_FORMAT_PERCENT_MAX;
1160         else
1161           stop = src->segment.duration;
1162       } else {
1163         estimated = -1;
1164         start = -1;
1165         stop = -1;
1166       }
1167       seg_format = src->segment.format;
1168       GST_OBJECT_UNLOCK (src);
1169
1170       /* convert to required format. When the conversion fails, we can't answer
1171        * the query. When the value is unknown, we can don't perform conversion
1172        * but report TRUE. */
1173       if (format != GST_FORMAT_PERCENT && stop != -1) {
1174         res = gst_pad_query_convert (src->srcpad, seg_format,
1175             stop, format, &stop);
1176       } else {
1177         res = TRUE;
1178       }
1179       if (res && format != GST_FORMAT_PERCENT && start != -1)
1180         res = gst_pad_query_convert (src->srcpad, seg_format,
1181             start, format, &start);
1182
1183       gst_query_set_buffering_range (query, format, start, stop, estimated);
1184       break;
1185     }
1186     case GST_QUERY_SCHEDULING:
1187     {
1188       gboolean random_access;
1189
1190       random_access = gst_base_src_is_random_access (src);
1191
1192       /* we can operate in getrange mode if the native format is bytes
1193        * and we are seekable, this condition is set in the random_access
1194        * flag and is set in the _start() method. */
1195       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1196       if (random_access)
1197         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1198       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1199
1200       res = TRUE;
1201       break;
1202     }
1203     case GST_QUERY_CAPS:
1204     {
1205       GstBaseSrcClass *bclass;
1206       GstCaps *caps, *filter;
1207
1208       bclass = GST_BASE_SRC_GET_CLASS (src);
1209       if (bclass->get_caps) {
1210         gst_query_parse_caps (query, &filter);
1211         if ((caps = bclass->get_caps (src, filter))) {
1212           gst_query_set_caps_result (query, caps);
1213           gst_caps_unref (caps);
1214           res = TRUE;
1215         } else {
1216           res = FALSE;
1217         }
1218       } else
1219         res = FALSE;
1220       break;
1221     }
1222     case GST_QUERY_URI:{
1223       if (GST_IS_URI_HANDLER (src)) {
1224         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1225
1226         if (uri != NULL) {
1227           gst_query_set_uri (query, uri);
1228           g_free (uri);
1229           res = TRUE;
1230         } else {
1231           res = FALSE;
1232         }
1233       } else {
1234         res = FALSE;
1235       }
1236       break;
1237     }
1238     default:
1239       res = FALSE;
1240       break;
1241   }
1242   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1243       res);
1244
1245   return res;
1246 }
1247
1248 static gboolean
1249 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1250 {
1251   GstBaseSrc *src;
1252   GstBaseSrcClass *bclass;
1253   gboolean result = FALSE;
1254
1255   src = GST_BASE_SRC (parent);
1256   bclass = GST_BASE_SRC_GET_CLASS (src);
1257
1258   if (bclass->query)
1259     result = bclass->query (src, query);
1260
1261   return result;
1262 }
1263
1264 static gboolean
1265 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1266 {
1267   gboolean res = TRUE;
1268
1269   /* update our offset if the start/stop position was updated */
1270   if (segment->format == GST_FORMAT_BYTES) {
1271     segment->time = segment->start;
1272   } else if (segment->start == 0) {
1273     /* seek to start, we can implement a default for this. */
1274     segment->time = 0;
1275   } else {
1276     res = FALSE;
1277     GST_INFO_OBJECT (src, "Can't do a default seek");
1278   }
1279
1280   return res;
1281 }
1282
1283 static gboolean
1284 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1285 {
1286   GstBaseSrcClass *bclass;
1287   gboolean result = FALSE;
1288
1289   bclass = GST_BASE_SRC_GET_CLASS (src);
1290
1291   GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
1292
1293   if (bclass->do_seek)
1294     result = bclass->do_seek (src, segment);
1295
1296   return result;
1297 }
1298
1299 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1300
1301 static gboolean
1302 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1303     GstSegment * segment)
1304 {
1305   /* By default, we try one of 2 things:
1306    *   - For absolute seek positions, convert the requested position to our
1307    *     configured processing format and place it in the output segment \
1308    *   - For relative seek positions, convert our current (input) values to the
1309    *     seek format, adjust by the relative seek offset and then convert back to
1310    *     the processing format
1311    */
1312   GstSeekType start_type, stop_type;
1313   gint64 start, stop;
1314   GstSeekFlags flags;
1315   GstFormat seek_format, dest_format;
1316   gdouble rate;
1317   gboolean update;
1318   gboolean res = TRUE;
1319
1320   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1321       &start_type, &start, &stop_type, &stop);
1322   dest_format = segment->format;
1323
1324   if (seek_format == dest_format) {
1325     gst_segment_do_seek (segment, rate, seek_format, flags,
1326         start_type, start, stop_type, stop, &update);
1327     return TRUE;
1328   }
1329
1330   if (start_type != GST_SEEK_TYPE_NONE) {
1331     /* FIXME: Handle seek_end by converting the input segment vals */
1332     res =
1333         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1334         &start);
1335     start_type = GST_SEEK_TYPE_SET;
1336   }
1337
1338   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1339     /* FIXME: Handle seek_end by converting the input segment vals */
1340     res =
1341         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1342         &stop);
1343     stop_type = GST_SEEK_TYPE_SET;
1344   }
1345
1346   /* And finally, configure our output segment in the desired format */
1347   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1348       stop_type, stop, &update);
1349
1350   if (!res)
1351     goto no_format;
1352
1353   return res;
1354
1355 no_format:
1356   {
1357     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1358     return FALSE;
1359   }
1360 }
1361
1362 static gboolean
1363 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1364     GstSegment * seeksegment)
1365 {
1366   GstBaseSrcClass *bclass;
1367   gboolean result = FALSE;
1368
1369   bclass = GST_BASE_SRC_GET_CLASS (src);
1370
1371   if (bclass->prepare_seek_segment)
1372     result = bclass->prepare_seek_segment (src, event, seeksegment);
1373
1374   return result;
1375 }
1376
1377 static GstFlowReturn
1378 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1379     guint size, GstBuffer ** buffer)
1380 {
1381   GstFlowReturn ret;
1382   GstBaseSrcPrivate *priv = src->priv;
1383
1384   if (priv->pool) {
1385     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1386   } else if (size != -1) {
1387     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1388     if (G_UNLIKELY (*buffer == NULL))
1389       goto alloc_failed;
1390
1391     ret = GST_FLOW_OK;
1392   } else {
1393     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1394         size);
1395     goto alloc_failed;
1396   }
1397   return ret;
1398
1399   /* ERRORS */
1400 alloc_failed:
1401   {
1402     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1403     return GST_FLOW_ERROR;
1404   }
1405 }
1406
1407 static GstFlowReturn
1408 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1409     guint size, GstBuffer ** buffer)
1410 {
1411   GstBaseSrcClass *bclass;
1412   GstFlowReturn ret;
1413   GstBuffer *res_buf;
1414
1415   bclass = GST_BASE_SRC_GET_CLASS (src);
1416
1417   if (G_UNLIKELY (!bclass->alloc))
1418     goto no_function;
1419   if (G_UNLIKELY (!bclass->fill))
1420     goto no_function;
1421
1422   if (*buffer == NULL) {
1423     /* downstream did not provide us with a buffer to fill, allocate one
1424      * ourselves */
1425     ret = bclass->alloc (src, offset, size, &res_buf);
1426     if (G_UNLIKELY (ret != GST_FLOW_OK))
1427       goto alloc_failed;
1428   } else {
1429     res_buf = *buffer;
1430   }
1431
1432   if (G_LIKELY (size > 0)) {
1433     /* only call fill when there is a size */
1434     ret = bclass->fill (src, offset, size, res_buf);
1435     if (G_UNLIKELY (ret != GST_FLOW_OK))
1436       goto not_ok;
1437   }
1438
1439   *buffer = res_buf;
1440
1441   return GST_FLOW_OK;
1442
1443   /* ERRORS */
1444 no_function:
1445   {
1446     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1447     return GST_FLOW_NOT_SUPPORTED;
1448   }
1449 alloc_failed:
1450   {
1451     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1452     return ret;
1453   }
1454 not_ok:
1455   {
1456     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1457         gst_flow_get_name (ret));
1458     if (*buffer == NULL)
1459       gst_buffer_unref (res_buf);
1460     return ret;
1461   }
1462 }
1463
1464 /* this code implements the seeking. It is a good example
1465  * handling all cases.
1466  *
1467  * A seek updates the currently configured segment.start
1468  * and segment.stop values based on the SEEK_TYPE. If the
1469  * segment.start value is updated, a seek to this new position
1470  * should be performed.
1471  *
1472  * The seek can only be executed when we are not currently
1473  * streaming any data, to make sure that this is the case, we
1474  * acquire the STREAM_LOCK which is taken when we are in the
1475  * _loop() function or when a getrange() is called. Normally
1476  * we will not receive a seek if we are operating in pull mode
1477  * though. When we operate as a live source we might block on the live
1478  * cond, which does not release the STREAM_LOCK. Therefore we will try
1479  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1480  * safe to perform the seek.
1481  *
1482  * When we are in the loop() function, we might be in the middle
1483  * of pushing a buffer, which might block in a sink. To make sure
1484  * that the push gets unblocked we push out a FLUSH_START event.
1485  * Our loop function will get a FLUSHING return value from
1486  * the push and will pause, effectively releasing the STREAM_LOCK.
1487  *
1488  * For a non-flushing seek, we pause the task, which might eventually
1489  * release the STREAM_LOCK. We say eventually because when the sink
1490  * blocks on the sample we might wait a very long time until the sink
1491  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1492  * can continue the seek. A non-flushing seek is normally done in a
1493  * running pipeline to perform seamless playback, this means that the sink is
1494  * PLAYING and will return from its chain function.
1495  * In the case of a non-flushing seek we need to make sure that the
1496  * data we output after the seek is continuous with the previous data,
1497  * this is because a non-flushing seek does not reset the running-time
1498  * to 0. We do this by closing the currently running segment, ie. sending
1499  * a new_segment event with the stop position set to the last processed
1500  * position.
1501  *
1502  * After updating the segment.start/stop values, we prepare for
1503  * streaming again. We push out a FLUSH_STOP to make the peer pad
1504  * accept data again and we start our task again.
1505  *
1506  * A segment seek posts a message on the bus saying that the playback
1507  * of the segment started. We store the segment flag internally because
1508  * when we reach the segment.stop we have to post a segment.done
1509  * instead of EOS when doing a segment seek.
1510  */
1511 static gboolean
1512 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1513 {
1514   gboolean res = TRUE, tres;
1515   gdouble rate;
1516   GstFormat seek_format, dest_format;
1517   GstSeekFlags flags;
1518   GstSeekType start_type, stop_type;
1519   gint64 start, stop;
1520   gboolean flush, playing;
1521   gboolean update;
1522   gboolean relative_seek = FALSE;
1523   gboolean seekseg_configured = FALSE;
1524   GstSegment seeksegment;
1525   guint32 seqnum;
1526   GstEvent *tevent;
1527
1528   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1529
1530   GST_OBJECT_LOCK (src);
1531   dest_format = src->segment.format;
1532   GST_OBJECT_UNLOCK (src);
1533
1534   if (event) {
1535     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1536         &start_type, &start, &stop_type, &stop);
1537
1538     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1539         SEEK_TYPE_IS_RELATIVE (stop_type);
1540
1541     if (dest_format != seek_format && !relative_seek) {
1542       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1543        * here before taking the stream lock, otherwise we must convert it later,
1544        * once we have the stream lock and can read the last configures segment
1545        * start and stop positions */
1546       gst_segment_init (&seeksegment, dest_format);
1547
1548       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1549         goto prepare_failed;
1550
1551       seekseg_configured = TRUE;
1552     }
1553
1554     flush = flags & GST_SEEK_FLAG_FLUSH;
1555     seqnum = gst_event_get_seqnum (event);
1556   } else {
1557     flush = FALSE;
1558     /* get next seqnum */
1559     seqnum = gst_util_seqnum_next ();
1560   }
1561
1562   /* send flush start */
1563   if (flush) {
1564     tevent = gst_event_new_flush_start ();
1565     gst_event_set_seqnum (tevent, seqnum);
1566     gst_pad_push_event (src->srcpad, tevent);
1567   } else
1568     gst_pad_pause_task (src->srcpad);
1569
1570   /* unblock streaming thread. */
1571   if (unlock)
1572     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1573
1574   /* grab streaming lock, this should eventually be possible, either
1575    * because the task is paused, our streaming thread stopped
1576    * or because our peer is flushing. */
1577   GST_PAD_STREAM_LOCK (src->srcpad);
1578   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1579     /* we have seen this event before, issue a warning for now */
1580     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1581         seqnum);
1582   } else {
1583     src->priv->seqnum = seqnum;
1584     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1585   }
1586
1587   if (unlock)
1588     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1589
1590   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1591    * copy the current segment info into the temp segment that we can actually
1592    * attempt the seek with. We only update the real segment if the seek succeeds. */
1593   if (!seekseg_configured) {
1594     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1595
1596     /* now configure the final seek segment */
1597     if (event) {
1598       if (seeksegment.format != seek_format) {
1599         /* OK, here's where we give the subclass a chance to convert the relative
1600          * seek into an absolute one in the processing format. We set up any
1601          * absolute seek above, before taking the stream lock. */
1602         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1603           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1604               "Aborting seek");
1605           res = FALSE;
1606         }
1607       } else {
1608         /* The seek format matches our processing format, no need to ask the
1609          * the subclass to configure the segment. */
1610         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1611             start_type, start, stop_type, stop, &update);
1612       }
1613     }
1614     /* Else, no seek event passed, so we're just (re)starting the
1615        current segment. */
1616   }
1617
1618   if (res) {
1619     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1620         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1621         seeksegment.start, seeksegment.stop, seeksegment.position);
1622
1623     /* do the seek, segment.position contains the new position. */
1624     res = gst_base_src_do_seek (src, &seeksegment);
1625   }
1626
1627   /* and prepare to continue streaming */
1628   if (flush) {
1629     tevent = gst_event_new_flush_stop (TRUE);
1630     gst_event_set_seqnum (tevent, seqnum);
1631     /* send flush stop, peer will accept data and events again. We
1632      * are not yet providing data as we still have the STREAM_LOCK. */
1633     gst_pad_push_event (src->srcpad, tevent);
1634   }
1635
1636   /* The subclass must have converted the segment to the processing format
1637    * by now */
1638   if (res && seeksegment.format != dest_format) {
1639     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1640         "in the correct format. Aborting seek.");
1641     res = FALSE;
1642   }
1643
1644   /* if the seek was successful, we update our real segment and push
1645    * out the new segment. */
1646   if (res) {
1647     GST_OBJECT_LOCK (src);
1648     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1649     GST_OBJECT_UNLOCK (src);
1650
1651     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1652       GstMessage *message;
1653
1654       message = gst_message_new_segment_start (GST_OBJECT (src),
1655           seeksegment.format, seeksegment.position);
1656       gst_message_set_seqnum (message, seqnum);
1657
1658       gst_element_post_message (GST_ELEMENT (src), message);
1659     }
1660
1661     /* for deriving a stop position for the playback segment from the seek
1662      * segment, we must take the duration when the stop is not set */
1663     /* FIXME: This is never used below */
1664     if ((stop = seeksegment.stop) == -1)
1665       stop = seeksegment.duration;
1666
1667     src->priv->segment_pending = TRUE;
1668   }
1669
1670   src->priv->discont = TRUE;
1671   src->running = TRUE;
1672   /* and restart the task in case it got paused explicitly or by
1673    * the FLUSH_START event we pushed out. */
1674   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1675       src->srcpad, NULL);
1676   if (res && !tres)
1677     res = FALSE;
1678
1679   /* and release the lock again so we can continue streaming */
1680   GST_PAD_STREAM_UNLOCK (src->srcpad);
1681
1682   return res;
1683
1684   /* ERROR */
1685 prepare_failed:
1686   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1687       "Aborting seek");
1688   return FALSE;
1689 }
1690
1691 /* all events send to this element directly. This is mainly done from the
1692  * application.
1693  */
1694 static gboolean
1695 gst_base_src_send_event (GstElement * element, GstEvent * event)
1696 {
1697   GstBaseSrc *src;
1698   gboolean result = FALSE;
1699
1700   src = GST_BASE_SRC (element);
1701
1702   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1703
1704   switch (GST_EVENT_TYPE (event)) {
1705       /* bidirectional events */
1706     case GST_EVENT_FLUSH_START:
1707       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1708       result = gst_pad_push_event (src->srcpad, event);
1709       event = NULL;
1710       break;
1711     case GST_EVENT_FLUSH_STOP:
1712       GST_LIVE_LOCK (src->srcpad);
1713       src->priv->segment_pending = TRUE;
1714       /* sending random flushes downstream can break stuff,
1715        * especially sync since all segment info will get flushed */
1716       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1717       result = gst_pad_push_event (src->srcpad, event);
1718       GST_LIVE_UNLOCK (src->srcpad);
1719       event = NULL;
1720       break;
1721
1722       /* downstream serialized events */
1723     case GST_EVENT_EOS:
1724     {
1725       GstBaseSrcClass *bclass;
1726
1727       bclass = GST_BASE_SRC_GET_CLASS (src);
1728
1729       /* queue EOS and make sure the task or pull function performs the EOS
1730        * actions.
1731        *
1732        * We have two possibilities:
1733        *
1734        *  - Before we are to enter the _create function, we check the pending_eos
1735        *    first and do EOS instead of entering it.
1736        *  - If we are in the _create function or we did not manage to set the
1737        *    flag fast enough and we are about to enter the _create function,
1738        *    we unlock it so that we exit with FLUSHING immediately. We then
1739        *    check the EOS flag and do the EOS logic.
1740        */
1741       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1742       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1743
1744
1745       /* unlock the _create function so that we can check the pending_eos flag
1746        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1747        * that we can grab it and stop the unlock again. We don't take the stream
1748        * lock so that this operation is guaranteed to never block. */
1749       gst_base_src_activate_pool (src, FALSE);
1750       if (bclass->unlock)
1751         bclass->unlock (src);
1752
1753       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1754
1755       GST_LIVE_LOCK (src);
1756       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1757       /* now stop the unlock of the streaming thread again. Grabbing the live
1758        * lock is enough because that protects the create function. */
1759       if (bclass->unlock_stop)
1760         bclass->unlock_stop (src);
1761       gst_base_src_activate_pool (src, TRUE);
1762       GST_LIVE_UNLOCK (src);
1763
1764       result = TRUE;
1765       break;
1766     }
1767     case GST_EVENT_SEGMENT:
1768       /* sending random SEGMENT downstream can break sync. */
1769       break;
1770     case GST_EVENT_TAG:
1771     case GST_EVENT_CUSTOM_DOWNSTREAM:
1772     case GST_EVENT_CUSTOM_BOTH:
1773       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1774       GST_OBJECT_LOCK (src);
1775       src->priv->pending_events =
1776           g_list_append (src->priv->pending_events, event);
1777       g_atomic_int_set (&src->priv->have_events, TRUE);
1778       GST_OBJECT_UNLOCK (src);
1779       event = NULL;
1780       result = TRUE;
1781       break;
1782     case GST_EVENT_BUFFERSIZE:
1783       /* does not seem to make much sense currently */
1784       break;
1785
1786       /* upstream events */
1787     case GST_EVENT_QOS:
1788       /* elements should override send_event and do something */
1789       break;
1790     case GST_EVENT_SEEK:
1791     {
1792       gboolean started;
1793
1794       GST_OBJECT_LOCK (src->srcpad);
1795       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1796         goto wrong_mode;
1797       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1798       GST_OBJECT_UNLOCK (src->srcpad);
1799
1800       if (started) {
1801         GST_DEBUG_OBJECT (src, "performing seek");
1802         /* when we are running in push mode, we can execute the
1803          * seek right now. */
1804         result = gst_base_src_perform_seek (src, event, TRUE);
1805       } else {
1806         GstEvent **event_p;
1807
1808         /* else we store the event and execute the seek when we
1809          * get activated */
1810         GST_OBJECT_LOCK (src);
1811         GST_DEBUG_OBJECT (src, "queueing seek");
1812         event_p = &src->pending_seek;
1813         gst_event_replace ((GstEvent **) event_p, event);
1814         GST_OBJECT_UNLOCK (src);
1815         /* assume the seek will work */
1816         result = TRUE;
1817       }
1818       break;
1819     }
1820     case GST_EVENT_NAVIGATION:
1821       /* could make sense for elements that do something with navigation events
1822        * but then they would need to override the send_event function */
1823       break;
1824     case GST_EVENT_LATENCY:
1825       /* does not seem to make sense currently */
1826       break;
1827
1828       /* custom events */
1829     case GST_EVENT_CUSTOM_UPSTREAM:
1830       /* override send_event if you want this */
1831       break;
1832     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1833     case GST_EVENT_CUSTOM_BOTH_OOB:
1834       /* insert a random custom event into the pipeline */
1835       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1836       result = gst_pad_push_event (src->srcpad, event);
1837       /* we gave away the ref to the event in the push */
1838       event = NULL;
1839       break;
1840     default:
1841       break;
1842   }
1843 done:
1844   /* if we still have a ref to the event, unref it now */
1845   if (event)
1846     gst_event_unref (event);
1847
1848   return result;
1849
1850   /* ERRORS */
1851 wrong_mode:
1852   {
1853     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1854     GST_OBJECT_UNLOCK (src->srcpad);
1855     result = FALSE;
1856     goto done;
1857   }
1858 }
1859
1860 static gboolean
1861 gst_base_src_seekable (GstBaseSrc * src)
1862 {
1863   GstBaseSrcClass *bclass;
1864   bclass = GST_BASE_SRC_GET_CLASS (src);
1865   if (bclass->is_seekable)
1866     return bclass->is_seekable (src);
1867   else
1868     return FALSE;
1869 }
1870
1871 static void
1872 gst_base_src_update_qos (GstBaseSrc * src,
1873     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1874 {
1875   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1876       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1877       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1878
1879   GST_OBJECT_LOCK (src);
1880   src->priv->proportion = proportion;
1881   src->priv->earliest_time = timestamp + diff;
1882   GST_OBJECT_UNLOCK (src);
1883 }
1884
1885
1886 static gboolean
1887 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1888 {
1889   gboolean result;
1890
1891   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1892
1893   switch (GST_EVENT_TYPE (event)) {
1894     case GST_EVENT_SEEK:
1895       /* is normally called when in push mode */
1896       if (!gst_base_src_seekable (src))
1897         goto not_seekable;
1898
1899       result = gst_base_src_perform_seek (src, event, TRUE);
1900       break;
1901     case GST_EVENT_FLUSH_START:
1902       /* cancel any blocking getrange, is normally called
1903        * when in pull mode. */
1904       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1905       break;
1906     case GST_EVENT_FLUSH_STOP:
1907       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1908       break;
1909     case GST_EVENT_QOS:
1910     {
1911       gdouble proportion;
1912       GstClockTimeDiff diff;
1913       GstClockTime timestamp;
1914
1915       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1916       gst_base_src_update_qos (src, proportion, diff, timestamp);
1917       result = TRUE;
1918       break;
1919     }
1920     case GST_EVENT_RECONFIGURE:
1921       result = TRUE;
1922       break;
1923     case GST_EVENT_LATENCY:
1924       result = TRUE;
1925       break;
1926     default:
1927       result = FALSE;
1928       break;
1929   }
1930   return result;
1931
1932   /* ERRORS */
1933 not_seekable:
1934   {
1935     GST_DEBUG_OBJECT (src, "is not seekable");
1936     return FALSE;
1937   }
1938 }
1939
1940 static gboolean
1941 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1942 {
1943   GstBaseSrc *src;
1944   GstBaseSrcClass *bclass;
1945   gboolean result = FALSE;
1946
1947   src = GST_BASE_SRC (parent);
1948   bclass = GST_BASE_SRC_GET_CLASS (src);
1949
1950   if (bclass->event) {
1951     if (!(result = bclass->event (src, event)))
1952       goto subclass_failed;
1953   }
1954
1955 done:
1956   gst_event_unref (event);
1957
1958   return result;
1959
1960   /* ERRORS */
1961 subclass_failed:
1962   {
1963     GST_DEBUG_OBJECT (src, "subclass refused event");
1964     goto done;
1965   }
1966 }
1967
1968 static void
1969 gst_base_src_set_property (GObject * object, guint prop_id,
1970     const GValue * value, GParamSpec * pspec)
1971 {
1972   GstBaseSrc *src;
1973
1974   src = GST_BASE_SRC (object);
1975
1976   switch (prop_id) {
1977     case PROP_BLOCKSIZE:
1978       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1979       break;
1980     case PROP_NUM_BUFFERS:
1981       src->num_buffers = g_value_get_int (value);
1982       break;
1983     case PROP_TYPEFIND:
1984       src->typefind = g_value_get_boolean (value);
1985       break;
1986     case PROP_DO_TIMESTAMP:
1987       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1988       break;
1989     default:
1990       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1991       break;
1992   }
1993 }
1994
1995 static void
1996 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1997     GParamSpec * pspec)
1998 {
1999   GstBaseSrc *src;
2000
2001   src = GST_BASE_SRC (object);
2002
2003   switch (prop_id) {
2004     case PROP_BLOCKSIZE:
2005       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2006       break;
2007     case PROP_NUM_BUFFERS:
2008       g_value_set_int (value, src->num_buffers);
2009       break;
2010     case PROP_TYPEFIND:
2011       g_value_set_boolean (value, src->typefind);
2012       break;
2013     case PROP_DO_TIMESTAMP:
2014       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2015       break;
2016     default:
2017       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2018       break;
2019   }
2020 }
2021
2022 /* with STREAM_LOCK and LOCK */
2023 static GstClockReturn
2024 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2025 {
2026   GstClockReturn ret;
2027   GstClockID id;
2028
2029   id = gst_clock_new_single_shot_id (clock, time);
2030
2031   basesrc->clock_id = id;
2032   /* release the live lock while waiting */
2033   GST_LIVE_UNLOCK (basesrc);
2034
2035   ret = gst_clock_id_wait (id, NULL);
2036
2037   GST_LIVE_LOCK (basesrc);
2038   gst_clock_id_unref (id);
2039   basesrc->clock_id = NULL;
2040
2041   return ret;
2042 }
2043
2044 /* perform synchronisation on a buffer.
2045  * with STREAM_LOCK.
2046  */
2047 static GstClockReturn
2048 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2049 {
2050   GstClockReturn result;
2051   GstClockTime start, end;
2052   GstBaseSrcClass *bclass;
2053   GstClockTime base_time;
2054   GstClock *clock;
2055   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2056   gboolean do_timestamp, first, pseudo_live, is_live;
2057
2058   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2059
2060   start = end = -1;
2061   if (bclass->get_times)
2062     bclass->get_times (basesrc, buffer, &start, &end);
2063
2064   /* get buffer timestamp */
2065   dts = GST_BUFFER_DTS (buffer);
2066   pts = GST_BUFFER_PTS (buffer);
2067
2068   if (GST_CLOCK_TIME_IS_VALID (dts))
2069     timestamp = dts;
2070   else
2071     timestamp = pts;
2072
2073   /* grab the lock to prepare for clocking and calculate the startup
2074    * latency. */
2075   GST_OBJECT_LOCK (basesrc);
2076
2077   is_live = basesrc->is_live;
2078   /* if we are asked to sync against the clock we are a pseudo live element */
2079   pseudo_live = (start != -1 && is_live);
2080   /* check for the first buffer */
2081   first = (basesrc->priv->latency == -1);
2082
2083   if (timestamp != -1 && pseudo_live) {
2084     GstClockTime latency;
2085
2086     /* we have a timestamp and a sync time, latency is the diff */
2087     if (timestamp <= start)
2088       latency = start - timestamp;
2089     else
2090       latency = 0;
2091
2092     if (first) {
2093       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2094           GST_TIME_ARGS (latency));
2095       /* first time we calculate latency, just configure */
2096       basesrc->priv->latency = latency;
2097     } else {
2098       if (basesrc->priv->latency != latency) {
2099         /* we have a new latency, FIXME post latency message */
2100         basesrc->priv->latency = latency;
2101         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2102             GST_TIME_ARGS (latency));
2103       }
2104     }
2105   } else if (first) {
2106     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2107         is_live, start != -1);
2108     basesrc->priv->latency = 0;
2109   }
2110
2111   /* get clock, if no clock, we can't sync or do timestamps */
2112   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2113     goto no_clock;
2114   else
2115     gst_object_ref (clock);
2116
2117   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2118
2119   do_timestamp = basesrc->priv->do_timestamp;
2120   GST_OBJECT_UNLOCK (basesrc);
2121
2122   /* first buffer, calculate the timestamp offset */
2123   if (first) {
2124     GstClockTime running_time;
2125
2126     now = gst_clock_get_time (clock);
2127     running_time = now - base_time;
2128
2129     GST_LOG_OBJECT (basesrc,
2130         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2131         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2132         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2133
2134     if (pseudo_live && timestamp != -1) {
2135       /* live source and we need to sync, add startup latency to all timestamps
2136        * to get the real running_time. Live sources should always timestamp
2137        * according to the current running time. */
2138       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2139
2140       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2141           GST_TIME_ARGS (basesrc->priv->ts_offset));
2142     } else {
2143       basesrc->priv->ts_offset = 0;
2144       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2145     }
2146
2147     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2148       if (do_timestamp) {
2149         dts = running_time;
2150       } else {
2151         dts = 0;
2152       }
2153       GST_BUFFER_DTS (buffer) = dts;
2154
2155       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2156           GST_TIME_ARGS (dts));
2157     }
2158   } else {
2159     /* not the first buffer, the timestamp is the diff between the clock and
2160      * base_time */
2161     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2162       now = gst_clock_get_time (clock);
2163
2164       dts = now - base_time;
2165       GST_BUFFER_DTS (buffer) = dts;
2166
2167       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2168           GST_TIME_ARGS (dts));
2169     }
2170   }
2171   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2172     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2173       pts = dts;
2174
2175     GST_BUFFER_PTS (buffer) = dts;
2176
2177     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2178         GST_TIME_ARGS (pts));
2179   }
2180
2181   /* if we don't have a buffer timestamp, we don't sync */
2182   if (!GST_CLOCK_TIME_IS_VALID (start))
2183     goto no_sync;
2184
2185   if (is_live) {
2186     /* for pseudo live sources, add our ts_offset to the timestamp */
2187     if (GST_CLOCK_TIME_IS_VALID (pts))
2188       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2189     if (GST_CLOCK_TIME_IS_VALID (dts))
2190       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2191     start += basesrc->priv->ts_offset;
2192   }
2193
2194   GST_LOG_OBJECT (basesrc,
2195       "waiting for clock, base time %" GST_TIME_FORMAT
2196       ", stream_start %" GST_TIME_FORMAT,
2197       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2198
2199   result = gst_base_src_wait (basesrc, clock, start + base_time);
2200
2201   gst_object_unref (clock);
2202
2203   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2204
2205   return result;
2206
2207   /* special cases */
2208 no_clock:
2209   {
2210     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2211     GST_OBJECT_UNLOCK (basesrc);
2212     return GST_CLOCK_OK;
2213   }
2214 no_sync:
2215   {
2216     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2217     gst_object_unref (clock);
2218     return GST_CLOCK_OK;
2219   }
2220 }
2221
2222 /* Called with STREAM_LOCK and LIVE_LOCK */
2223 static gboolean
2224 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
2225     gboolean force)
2226 {
2227   guint64 size, maxsize;
2228   GstBaseSrcClass *bclass;
2229   GstFormat format;
2230   gint64 stop;
2231
2232   bclass = GST_BASE_SRC_GET_CLASS (src);
2233
2234   format = src->segment.format;
2235   stop = src->segment.stop;
2236   /* get total file size */
2237   size = src->segment.duration;
2238
2239   /* only operate if we are working with bytes */
2240   if (format != GST_FORMAT_BYTES)
2241     return TRUE;
2242
2243   /* the max amount of bytes to read is the total size or
2244    * up to the segment.stop if present. */
2245   if (stop != -1)
2246     maxsize = MIN (size, stop);
2247   else
2248     maxsize = size;
2249
2250   GST_DEBUG_OBJECT (src,
2251       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2252       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2253       *length, size, stop, maxsize);
2254
2255   /* check size if we have one */
2256   if (maxsize != -1) {
2257     /* if we run past the end, check if the file became bigger and
2258      * retry. */
2259     if (G_UNLIKELY (offset + *length >= maxsize || force)) {
2260       /* see if length of the file changed */
2261       if (bclass->get_size)
2262         if (!bclass->get_size (src, &size))
2263           size = -1;
2264
2265       /* make sure we don't exceed the configured segment stop
2266        * if it was set */
2267       if (stop != -1)
2268         maxsize = MIN (size, stop);
2269       else
2270         maxsize = size;
2271
2272       /* if we are at or past the end, EOS */
2273       if (G_UNLIKELY (offset >= maxsize))
2274         goto unexpected_length;
2275
2276       /* else we can clip to the end */
2277       if (G_UNLIKELY (offset + *length >= maxsize))
2278         *length = maxsize - offset;
2279
2280     }
2281   }
2282
2283   /* keep track of current duration.
2284    * segment is in bytes, we checked that above. */
2285   GST_OBJECT_LOCK (src);
2286   src->segment.duration = size;
2287   GST_OBJECT_UNLOCK (src);
2288
2289   return TRUE;
2290
2291   /* ERRORS */
2292 unexpected_length:
2293   {
2294     return FALSE;
2295   }
2296 }
2297
2298 /* must be called with LIVE_LOCK */
2299 static GstFlowReturn
2300 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2301     GstBuffer ** buf)
2302 {
2303   GstFlowReturn ret;
2304   GstBaseSrcClass *bclass;
2305   GstClockReturn status;
2306   GstBuffer *res_buf;
2307   GstBuffer *in_buf;
2308
2309   bclass = GST_BASE_SRC_GET_CLASS (src);
2310
2311 again:
2312   if (src->is_live) {
2313     if (G_UNLIKELY (!src->live_running)) {
2314       ret = gst_base_src_wait_playing (src);
2315       if (ret != GST_FLOW_OK)
2316         goto stopped;
2317     }
2318   }
2319
2320   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2321           && !GST_BASE_SRC_IS_STARTING (src)))
2322     goto not_started;
2323
2324   if (G_UNLIKELY (!bclass->create))
2325     goto no_function;
2326
2327   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE)))
2328     goto unexpected_length;
2329
2330   /* track position */
2331   GST_OBJECT_LOCK (src);
2332   if (src->segment.format == GST_FORMAT_BYTES)
2333     src->segment.position = offset;
2334   GST_OBJECT_UNLOCK (src);
2335
2336   /* normally we don't count buffers */
2337   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2338     if (src->num_buffers_left == 0)
2339       goto reached_num_buffers;
2340     else
2341       src->num_buffers_left--;
2342   }
2343
2344   /* don't enter the create function if a pending EOS event was set. For the
2345    * logic of the pending_eos, check the event function of this class. */
2346   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2347     goto eos;
2348
2349   GST_DEBUG_OBJECT (src,
2350       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2351       G_GINT64_FORMAT, offset, length, src->segment.time);
2352
2353   res_buf = in_buf = *buf;
2354
2355   ret = bclass->create (src, offset, length, &res_buf);
2356
2357   /* The create function could be unlocked because we have a pending EOS. It's
2358    * possible that we have a valid buffer from create that we need to
2359    * discard when the create function returned _OK. */
2360   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2361     if (ret == GST_FLOW_OK) {
2362       if (*buf == NULL)
2363         gst_buffer_unref (res_buf);
2364     }
2365     goto eos;
2366   }
2367
2368   if (G_UNLIKELY (ret != GST_FLOW_OK))
2369     goto not_ok;
2370
2371   /* fallback in case the create function didn't fill a provided buffer */
2372   if (in_buf != NULL && res_buf != in_buf) {
2373     GstMapInfo info;
2374     gsize copied_size;
2375
2376     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2377         "fill the provided buffer, copying");
2378
2379     if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
2380       goto map_failed;
2381
2382     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2383     gst_buffer_unmap (in_buf, &info);
2384     gst_buffer_set_size (in_buf, copied_size);
2385
2386     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2387
2388     gst_buffer_unref (res_buf);
2389     res_buf = in_buf;
2390   }
2391
2392   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2393   if (offset == 0 && src->segment.time == 0
2394       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2395     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2396     res_buf = gst_buffer_make_writable (res_buf);
2397     GST_BUFFER_DTS (res_buf) = 0;
2398   }
2399
2400   /* now sync before pushing the buffer */
2401   status = gst_base_src_do_sync (src, res_buf);
2402
2403   /* waiting for the clock could have made us flushing */
2404   if (G_UNLIKELY (src->priv->flushing))
2405     goto flushing;
2406
2407   switch (status) {
2408     case GST_CLOCK_EARLY:
2409       /* the buffer is too late. We currently don't drop the buffer. */
2410       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2411       break;
2412     case GST_CLOCK_OK:
2413       /* buffer synchronised properly */
2414       GST_DEBUG_OBJECT (src, "buffer ok");
2415       break;
2416     case GST_CLOCK_UNSCHEDULED:
2417       /* this case is triggered when we were waiting for the clock and
2418        * it got unlocked because we did a state change. In any case, get rid of
2419        * the buffer. */
2420       if (*buf == NULL)
2421         gst_buffer_unref (res_buf);
2422
2423       if (!src->live_running) {
2424         /* We return FLUSHING when we are not running to stop the dataflow also
2425          * get rid of the produced buffer. */
2426         GST_DEBUG_OBJECT (src,
2427             "clock was unscheduled (%d), returning FLUSHING", status);
2428         ret = GST_FLOW_FLUSHING;
2429       } else {
2430         /* If we are running when this happens, we quickly switched between
2431          * pause and playing. We try to produce a new buffer */
2432         GST_DEBUG_OBJECT (src,
2433             "clock was unscheduled (%d), but we are running", status);
2434         goto again;
2435       }
2436       break;
2437     default:
2438       /* all other result values are unexpected and errors */
2439       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2440           (_("Internal clock error.")),
2441           ("clock returned unexpected return value %d", status));
2442       if (*buf == NULL)
2443         gst_buffer_unref (res_buf);
2444       ret = GST_FLOW_ERROR;
2445       break;
2446   }
2447   if (G_LIKELY (ret == GST_FLOW_OK))
2448     *buf = res_buf;
2449
2450   return ret;
2451
2452   /* ERROR */
2453 stopped:
2454   {
2455     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2456         gst_flow_get_name (ret));
2457     return ret;
2458   }
2459 not_ok:
2460   {
2461     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2462         gst_flow_get_name (ret));
2463     return ret;
2464   }
2465 map_failed:
2466   {
2467     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
2468         (_("Failed to map buffer.")),
2469         ("failed to map result buffer in WRITE mode"));
2470     if (*buf == NULL)
2471       gst_buffer_unref (res_buf);
2472     return GST_FLOW_ERROR;
2473   }
2474 not_started:
2475   {
2476     GST_DEBUG_OBJECT (src, "getrange but not started");
2477     return GST_FLOW_FLUSHING;
2478   }
2479 no_function:
2480   {
2481     GST_DEBUG_OBJECT (src, "no create function");
2482     return GST_FLOW_NOT_SUPPORTED;
2483   }
2484 unexpected_length:
2485   {
2486     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2487         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2488     return GST_FLOW_EOS;
2489   }
2490 reached_num_buffers:
2491   {
2492     GST_DEBUG_OBJECT (src, "sent all buffers");
2493     return GST_FLOW_EOS;
2494   }
2495 flushing:
2496   {
2497     GST_DEBUG_OBJECT (src, "we are flushing");
2498     if (*buf == NULL)
2499       gst_buffer_unref (res_buf);
2500     return GST_FLOW_FLUSHING;
2501   }
2502 eos:
2503   {
2504     GST_DEBUG_OBJECT (src, "we are EOS");
2505     return GST_FLOW_EOS;
2506   }
2507 }
2508
2509 static GstFlowReturn
2510 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2511     guint length, GstBuffer ** buf)
2512 {
2513   GstBaseSrc *src;
2514   GstFlowReturn res;
2515
2516   src = GST_BASE_SRC_CAST (parent);
2517
2518   GST_LIVE_LOCK (src);
2519   if (G_UNLIKELY (src->priv->flushing))
2520     goto flushing;
2521
2522   res = gst_base_src_get_range (src, offset, length, buf);
2523
2524 done:
2525   GST_LIVE_UNLOCK (src);
2526
2527   return res;
2528
2529   /* ERRORS */
2530 flushing:
2531   {
2532     GST_DEBUG_OBJECT (src, "we are flushing");
2533     res = GST_FLOW_FLUSHING;
2534     goto done;
2535   }
2536 }
2537
2538 static gboolean
2539 gst_base_src_is_random_access (GstBaseSrc * src)
2540 {
2541   /* we need to start the basesrc to check random access */
2542   if (!GST_BASE_SRC_IS_STARTED (src)) {
2543     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2544     if (G_LIKELY (gst_base_src_start (src))) {
2545       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2546         goto start_failed;
2547       gst_base_src_stop (src);
2548     }
2549   }
2550
2551   return src->random_access;
2552
2553   /* ERRORS */
2554 start_failed:
2555   {
2556     GST_DEBUG_OBJECT (src, "failed to start");
2557     return FALSE;
2558   }
2559 }
2560
2561 static void
2562 gst_base_src_loop (GstPad * pad)
2563 {
2564   GstBaseSrc *src;
2565   GstBuffer *buf = NULL;
2566   GstFlowReturn ret;
2567   gint64 position;
2568   gboolean eos;
2569   guint blocksize;
2570   GList *pending_events = NULL, *tmp;
2571
2572   eos = FALSE;
2573
2574   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2575
2576   gst_base_src_send_stream_start (src);
2577
2578   /* check if we need to renegotiate */
2579   if (gst_pad_check_reconfigure (pad)) {
2580     if (!gst_base_src_negotiate (src)) {
2581       gst_pad_mark_reconfigure (pad);
2582       if (GST_PAD_IS_FLUSHING (pad))
2583         goto flushing;
2584       else
2585         goto negotiate_failed;
2586     }
2587   }
2588
2589   GST_LIVE_LOCK (src);
2590
2591   if (G_UNLIKELY (src->priv->flushing))
2592     goto flushing;
2593
2594   blocksize = src->blocksize;
2595
2596   /* if we operate in bytes, we can calculate an offset */
2597   if (src->segment.format == GST_FORMAT_BYTES) {
2598     position = src->segment.position;
2599     /* for negative rates, start with subtracting the blocksize */
2600     if (src->segment.rate < 0.0) {
2601       /* we cannot go below segment.start */
2602       if (position > src->segment.start + blocksize)
2603         position -= blocksize;
2604       else {
2605         /* last block, remainder up to segment.start */
2606         blocksize = position - src->segment.start;
2607         position = src->segment.start;
2608       }
2609     }
2610   } else
2611     position = -1;
2612
2613   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2614       GST_TIME_ARGS (position), blocksize);
2615
2616   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2617   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2618     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2619         gst_flow_get_name (ret));
2620     GST_LIVE_UNLOCK (src);
2621     goto pause;
2622   }
2623   /* this should not happen */
2624   if (G_UNLIKELY (buf == NULL))
2625     goto null_buffer;
2626
2627   /* push events to close/start our segment before we push the buffer. */
2628   if (G_UNLIKELY (src->priv->segment_pending)) {
2629     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2630     src->priv->segment_pending = FALSE;
2631   }
2632
2633   if (g_atomic_int_get (&src->priv->have_events)) {
2634     GST_OBJECT_LOCK (src);
2635     /* take the events */
2636     pending_events = src->priv->pending_events;
2637     src->priv->pending_events = NULL;
2638     g_atomic_int_set (&src->priv->have_events, FALSE);
2639     GST_OBJECT_UNLOCK (src);
2640   }
2641
2642   /* Push out pending events if any */
2643   if (G_UNLIKELY (pending_events != NULL)) {
2644     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2645       GstEvent *ev = (GstEvent *) tmp->data;
2646       gst_pad_push_event (pad, ev);
2647     }
2648     g_list_free (pending_events);
2649   }
2650
2651   /* figure out the new position */
2652   switch (src->segment.format) {
2653     case GST_FORMAT_BYTES:
2654     {
2655       guint bufsize = gst_buffer_get_size (buf);
2656
2657       /* we subtracted above for negative rates */
2658       if (src->segment.rate >= 0.0)
2659         position += bufsize;
2660       break;
2661     }
2662     case GST_FORMAT_TIME:
2663     {
2664       GstClockTime start, duration;
2665
2666       start = GST_BUFFER_TIMESTAMP (buf);
2667       duration = GST_BUFFER_DURATION (buf);
2668
2669       if (GST_CLOCK_TIME_IS_VALID (start))
2670         position = start;
2671       else
2672         position = src->segment.position;
2673
2674       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2675         if (src->segment.rate >= 0.0)
2676           position += duration;
2677         else if (position > duration)
2678           position -= duration;
2679         else
2680           position = 0;
2681       }
2682       break;
2683     }
2684     case GST_FORMAT_DEFAULT:
2685       if (src->segment.rate >= 0.0)
2686         position = GST_BUFFER_OFFSET_END (buf);
2687       else
2688         position = GST_BUFFER_OFFSET (buf);
2689       break;
2690     default:
2691       position = -1;
2692       break;
2693   }
2694   if (position != -1) {
2695     if (src->segment.rate >= 0.0) {
2696       /* positive rate, check if we reached the stop */
2697       if (src->segment.stop != -1) {
2698         if (position >= src->segment.stop) {
2699           eos = TRUE;
2700           position = src->segment.stop;
2701         }
2702       }
2703     } else {
2704       /* negative rate, check if we reached the start. start is always set to
2705        * something different from -1 */
2706       if (position <= src->segment.start) {
2707         eos = TRUE;
2708         position = src->segment.start;
2709       }
2710       /* when going reverse, all buffers are DISCONT */
2711       src->priv->discont = TRUE;
2712     }
2713     GST_OBJECT_LOCK (src);
2714     src->segment.position = position;
2715     GST_OBJECT_UNLOCK (src);
2716   }
2717
2718   if (G_UNLIKELY (src->priv->discont)) {
2719     GST_INFO_OBJECT (src, "marking pending DISCONT");
2720     buf = gst_buffer_make_writable (buf);
2721     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2722     src->priv->discont = FALSE;
2723   }
2724   GST_LIVE_UNLOCK (src);
2725
2726   ret = gst_pad_push (pad, buf);
2727   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2728     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2729       goto not_negotiated;
2730     }
2731     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2732         gst_flow_get_name (ret));
2733     goto pause;
2734   }
2735
2736   if (G_UNLIKELY (eos)) {
2737     GST_INFO_OBJECT (src, "pausing after end of segment");
2738     ret = GST_FLOW_EOS;
2739     goto pause;
2740   }
2741
2742 done:
2743   return;
2744
2745   /* special cases */
2746 not_negotiated:
2747   {
2748     if (gst_pad_needs_reconfigure (pad)) {
2749       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2750       return;
2751     }
2752     /* fallthrough when push returns NOT_NEGOTIATED and we don't have
2753      * a pending negotiation request on our srcpad */
2754   }
2755 negotiate_failed:
2756   {
2757     GST_DEBUG_OBJECT (src, "Not negotiated");
2758     ret = GST_FLOW_NOT_NEGOTIATED;
2759     goto pause;
2760   }
2761 flushing:
2762   {
2763     GST_DEBUG_OBJECT (src, "we are flushing");
2764     GST_LIVE_UNLOCK (src);
2765     ret = GST_FLOW_FLUSHING;
2766     goto pause;
2767   }
2768 pause:
2769   {
2770     const gchar *reason = gst_flow_get_name (ret);
2771     GstEvent *event;
2772
2773     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2774     src->running = FALSE;
2775     gst_pad_pause_task (pad);
2776     if (ret == GST_FLOW_EOS) {
2777       gboolean flag_segment;
2778       GstFormat format;
2779       gint64 position;
2780
2781       /* perform EOS logic */
2782       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2783       format = src->segment.format;
2784       position = src->segment.position;
2785
2786       if (flag_segment) {
2787         GstMessage *message;
2788
2789         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2790             format, position);
2791         gst_message_set_seqnum (message, src->priv->seqnum);
2792         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2793         event = gst_event_new_segment_done (format, position);
2794         gst_event_set_seqnum (event, src->priv->seqnum);
2795         gst_pad_push_event (pad, event);
2796       } else {
2797         event = gst_event_new_eos ();
2798         gst_event_set_seqnum (event, src->priv->seqnum);
2799         gst_pad_push_event (pad, event);
2800       }
2801     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2802       event = gst_event_new_eos ();
2803       gst_event_set_seqnum (event, src->priv->seqnum);
2804       /* for fatal errors we post an error message, post the error
2805        * first so the app knows about the error first.
2806        * Also don't do this for FLUSHING because it happens
2807        * due to flushing and posting an error message because of
2808        * that is the wrong thing to do, e.g. when we're doing
2809        * a flushing seek. */
2810       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2811           (_("Internal data flow error.")),
2812           ("streaming task paused, reason %s (%d)", reason, ret));
2813       gst_pad_push_event (pad, event);
2814     }
2815     goto done;
2816   }
2817 null_buffer:
2818   {
2819     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2820         (_("Internal data flow error.")), ("element returned NULL buffer"));
2821     GST_LIVE_UNLOCK (src);
2822     goto done;
2823   }
2824 }
2825
2826 static gboolean
2827 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2828     GstAllocator * allocator, GstAllocationParams * params)
2829 {
2830   GstAllocator *oldalloc;
2831   GstBufferPool *oldpool;
2832   GstBaseSrcPrivate *priv = basesrc->priv;
2833
2834   if (pool) {
2835     GST_DEBUG_OBJECT (basesrc, "activate pool");
2836     if (!gst_buffer_pool_set_active (pool, TRUE))
2837       goto activate_failed;
2838   }
2839
2840   GST_OBJECT_LOCK (basesrc);
2841   oldpool = priv->pool;
2842   priv->pool = pool;
2843
2844   oldalloc = priv->allocator;
2845   priv->allocator = allocator;
2846
2847   if (params)
2848     priv->params = *params;
2849   else
2850     gst_allocation_params_init (&priv->params);
2851   GST_OBJECT_UNLOCK (basesrc);
2852
2853   if (oldpool) {
2854     /* only deactivate if the pool is not the one we're using */
2855     if (oldpool != pool) {
2856       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2857       gst_buffer_pool_set_active (oldpool, FALSE);
2858     }
2859     gst_object_unref (oldpool);
2860   }
2861   if (oldalloc) {
2862     gst_object_unref (oldalloc);
2863   }
2864   return TRUE;
2865
2866   /* ERRORS */
2867 activate_failed:
2868   {
2869     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2870     return FALSE;
2871   }
2872 }
2873
2874 static gboolean
2875 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2876 {
2877   GstBaseSrcPrivate *priv = basesrc->priv;
2878   GstBufferPool *pool;
2879   gboolean res = TRUE;
2880
2881   GST_OBJECT_LOCK (basesrc);
2882   if ((pool = priv->pool))
2883     pool = gst_object_ref (pool);
2884   GST_OBJECT_UNLOCK (basesrc);
2885
2886   if (pool) {
2887     res = gst_buffer_pool_set_active (pool, active);
2888     gst_object_unref (pool);
2889   }
2890   return res;
2891 }
2892
2893
2894 static gboolean
2895 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2896 {
2897   GstCaps *outcaps;
2898   GstBufferPool *pool;
2899   guint size, min, max;
2900   GstAllocator *allocator;
2901   GstAllocationParams params;
2902   GstStructure *config;
2903   gboolean update_allocator;
2904
2905   gst_query_parse_allocation (query, &outcaps, NULL);
2906
2907   /* we got configuration from our peer or the decide_allocation method,
2908    * parse them */
2909   if (gst_query_get_n_allocation_params (query) > 0) {
2910     /* try the allocator */
2911     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2912     update_allocator = TRUE;
2913   } else {
2914     allocator = NULL;
2915     gst_allocation_params_init (&params);
2916     update_allocator = FALSE;
2917   }
2918
2919   if (gst_query_get_n_allocation_pools (query) > 0) {
2920     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2921
2922     if (pool == NULL) {
2923       /* no pool, we can make our own */
2924       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2925       pool = gst_buffer_pool_new ();
2926     }
2927   } else {
2928     pool = NULL;
2929     size = min = max = 0;
2930   }
2931
2932   /* now configure */
2933   if (pool) {
2934     config = gst_buffer_pool_get_config (pool);
2935     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2936     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2937     gst_buffer_pool_set_config (pool, config);
2938   }
2939
2940   if (update_allocator)
2941     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2942   else
2943     gst_query_add_allocation_param (query, allocator, &params);
2944   if (allocator)
2945     gst_object_unref (allocator);
2946
2947   if (pool) {
2948     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2949     gst_object_unref (pool);
2950   }
2951
2952   return TRUE;
2953 }
2954
2955 static gboolean
2956 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2957 {
2958   GstBaseSrcClass *bclass;
2959   gboolean result = TRUE;
2960   GstQuery *query;
2961   GstBufferPool *pool = NULL;
2962   GstAllocator *allocator = NULL;
2963   GstAllocationParams params;
2964
2965   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2966
2967   /* make query and let peer pad answer, we don't really care if it worked or
2968    * not, if it failed, the allocation query would contain defaults and the
2969    * subclass would then set better values if needed */
2970   query = gst_query_new_allocation (caps, TRUE);
2971   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2972     /* not a problem, just debug a little */
2973     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2974   }
2975
2976   g_assert (bclass->decide_allocation != NULL);
2977   result = bclass->decide_allocation (basesrc, query);
2978
2979   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2980       query);
2981
2982   if (!result)
2983     goto no_decide_allocation;
2984
2985   /* we got configuration from our peer or the decide_allocation method,
2986    * parse them */
2987   if (gst_query_get_n_allocation_params (query) > 0) {
2988     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2989   } else {
2990     allocator = NULL;
2991     gst_allocation_params_init (&params);
2992   }
2993
2994   if (gst_query_get_n_allocation_pools (query) > 0)
2995     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2996
2997   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2998
2999   gst_query_unref (query);
3000
3001   return result;
3002
3003   /* Errors */
3004 no_decide_allocation:
3005   {
3006     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
3007     gst_query_unref (query);
3008
3009     return result;
3010   }
3011 }
3012
3013 /* default negotiation code.
3014  *
3015  * Take intersection between src and sink pads, take first
3016  * caps and fixate.
3017  */
3018 static gboolean
3019 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
3020 {
3021   GstCaps *thiscaps;
3022   GstCaps *caps = NULL;
3023   GstCaps *peercaps = NULL;
3024   gboolean result = FALSE;
3025
3026   /* first see what is possible on our source pad */
3027   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3028   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3029   /* nothing or anything is allowed, we're done */
3030   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3031     goto no_nego_needed;
3032
3033   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3034     goto no_caps;
3035
3036   /* get the peer caps */
3037   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3038   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3039   if (peercaps) {
3040     /* The result is already a subset of our caps */
3041     caps = peercaps;
3042     gst_caps_unref (thiscaps);
3043   } else {
3044     /* no peer, work with our own caps then */
3045     caps = thiscaps;
3046   }
3047   if (caps && !gst_caps_is_empty (caps)) {
3048     /* now fixate */
3049     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3050     if (gst_caps_is_any (caps)) {
3051       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3052       /* hmm, still anything, so element can do anything and
3053        * nego is not needed */
3054       result = TRUE;
3055     } else {
3056       caps = gst_base_src_fixate (basesrc, caps);
3057       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3058       if (gst_caps_is_fixed (caps)) {
3059         /* yay, fixed caps, use those then, it's possible that the subclass does
3060          * not accept this caps after all and we have to fail. */
3061         result = gst_base_src_set_caps (basesrc, caps);
3062       }
3063     }
3064     gst_caps_unref (caps);
3065   } else {
3066     if (caps)
3067       gst_caps_unref (caps);
3068     GST_DEBUG_OBJECT (basesrc, "no common caps");
3069   }
3070   return result;
3071
3072 no_nego_needed:
3073   {
3074     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3075     if (thiscaps)
3076       gst_caps_unref (thiscaps);
3077     return TRUE;
3078   }
3079 no_caps:
3080   {
3081     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3082         ("No supported formats found"),
3083         ("This element did not produce valid caps"));
3084     if (thiscaps)
3085       gst_caps_unref (thiscaps);
3086     return TRUE;
3087   }
3088 }
3089
3090 static gboolean
3091 gst_base_src_negotiate (GstBaseSrc * basesrc)
3092 {
3093   GstBaseSrcClass *bclass;
3094   gboolean result;
3095
3096   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3097
3098   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3099
3100   if (G_LIKELY (bclass->negotiate))
3101     result = bclass->negotiate (basesrc);
3102   else
3103     result = TRUE;
3104
3105   if (G_LIKELY (result)) {
3106     GstCaps *caps;
3107
3108     caps = gst_pad_get_current_caps (basesrc->srcpad);
3109
3110     result = gst_base_src_prepare_allocation (basesrc, caps);
3111
3112     if (caps)
3113       gst_caps_unref (caps);
3114   }
3115   return result;
3116 }
3117
3118 static gboolean
3119 gst_base_src_start (GstBaseSrc * basesrc)
3120 {
3121   GstBaseSrcClass *bclass;
3122   gboolean result;
3123
3124   GST_LIVE_LOCK (basesrc);
3125
3126   GST_OBJECT_LOCK (basesrc);
3127   if (GST_BASE_SRC_IS_STARTING (basesrc))
3128     goto was_starting;
3129   if (GST_BASE_SRC_IS_STARTED (basesrc))
3130     goto was_started;
3131
3132   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3133   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3134   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3135   GST_OBJECT_UNLOCK (basesrc);
3136
3137   basesrc->num_buffers_left = basesrc->num_buffers;
3138   basesrc->running = FALSE;
3139   basesrc->priv->segment_pending = FALSE;
3140   GST_LIVE_UNLOCK (basesrc);
3141
3142   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3143   if (bclass->start)
3144     result = bclass->start (basesrc);
3145   else
3146     result = TRUE;
3147
3148   if (!result)
3149     goto could_not_start;
3150
3151   if (!gst_base_src_is_async (basesrc)) {
3152     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3153     /* not really waiting here, we call this to get the result
3154      * from the start_complete call */
3155     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3156   }
3157
3158   return result;
3159
3160   /* ERROR */
3161 was_starting:
3162   {
3163     GST_DEBUG_OBJECT (basesrc, "was starting");
3164     GST_OBJECT_UNLOCK (basesrc);
3165     GST_LIVE_UNLOCK (basesrc);
3166     return TRUE;
3167   }
3168 was_started:
3169   {
3170     GST_DEBUG_OBJECT (basesrc, "was started");
3171     GST_OBJECT_UNLOCK (basesrc);
3172     GST_LIVE_UNLOCK (basesrc);
3173     return TRUE;
3174   }
3175 could_not_start:
3176   {
3177     GST_DEBUG_OBJECT (basesrc, "could not start");
3178     /* subclass is supposed to post a message. We don't have to call _stop. */
3179     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3180     return FALSE;
3181   }
3182 }
3183
3184 /**
3185  * gst_base_src_start_complete:
3186  * @basesrc: base source instance
3187  * @ret: a #GstFlowReturn
3188  *
3189  * Complete an asynchronous start operation. When the subclass overrides the
3190  * start method, it should call gst_base_src_start_complete() when the start
3191  * operation completes either from the same thread or from an asynchronous
3192  * helper thread.
3193  */
3194 void
3195 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3196 {
3197   gboolean have_size;
3198   guint64 size;
3199   gboolean seekable;
3200   GstFormat format;
3201   GstPadMode mode;
3202   GstEvent *event;
3203
3204   if (ret != GST_FLOW_OK)
3205     goto error;
3206
3207   GST_DEBUG_OBJECT (basesrc, "starting source");
3208   format = basesrc->segment.format;
3209
3210   /* figure out the size */
3211   have_size = FALSE;
3212   size = -1;
3213   if (format == GST_FORMAT_BYTES) {
3214     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3215
3216     if (bclass->get_size) {
3217       if (!(have_size = bclass->get_size (basesrc, &size)))
3218         size = -1;
3219     }
3220     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3221     /* only update the size when operating in bytes, subclass is supposed
3222      * to set duration in the start method for other formats */
3223     GST_OBJECT_LOCK (basesrc);
3224     basesrc->segment.duration = size;
3225     GST_OBJECT_UNLOCK (basesrc);
3226   }
3227
3228   GST_DEBUG_OBJECT (basesrc,
3229       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3230       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3231       basesrc->segment.duration);
3232
3233   seekable = gst_base_src_seekable (basesrc);
3234   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3235
3236   /* update for random access flag */
3237   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3238
3239   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3240
3241   /* stop flushing now but for live sources, still block in the LIVE lock when
3242    * we are not yet PLAYING */
3243   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3244
3245   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3246
3247   GST_OBJECT_LOCK (basesrc->srcpad);
3248   mode = GST_PAD_MODE (basesrc->srcpad);
3249   GST_OBJECT_UNLOCK (basesrc->srcpad);
3250
3251   /* take the stream lock here, we only want to let the task run when we have
3252    * set the STARTED flag */
3253   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3254   if (mode == GST_PAD_MODE_PUSH) {
3255     /* do initial seek, which will start the task */
3256     GST_OBJECT_LOCK (basesrc);
3257     event = basesrc->pending_seek;
3258     basesrc->pending_seek = NULL;
3259     GST_OBJECT_UNLOCK (basesrc);
3260
3261     /* The perform seek code will start the task when finished. We don't have to
3262      * unlock the streaming thread because it is not running yet */
3263     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3264       goto seek_failed;
3265
3266     if (event)
3267       gst_event_unref (event);
3268   } else {
3269     /* if not random_access, we cannot operate in pull mode for now */
3270     if (G_UNLIKELY (!basesrc->random_access))
3271       goto no_get_range;
3272   }
3273
3274   GST_OBJECT_LOCK (basesrc);
3275   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3276   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3277   basesrc->priv->start_result = ret;
3278   GST_ASYNC_SIGNAL (basesrc);
3279   GST_OBJECT_UNLOCK (basesrc);
3280
3281   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3282
3283   return;
3284
3285 seek_failed:
3286   {
3287     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3288     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3289     gst_base_src_stop (basesrc);
3290     if (event)
3291       gst_event_unref (event);
3292     ret = GST_FLOW_ERROR;
3293     goto error;
3294   }
3295 no_get_range:
3296   {
3297     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3298     gst_base_src_stop (basesrc);
3299     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3300     ret = GST_FLOW_ERROR;
3301     goto error;
3302   }
3303 error:
3304   {
3305     GST_OBJECT_LOCK (basesrc);
3306     basesrc->priv->start_result = ret;
3307     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3308     GST_ASYNC_SIGNAL (basesrc);
3309     GST_OBJECT_UNLOCK (basesrc);
3310     return;
3311   }
3312 }
3313
3314 /**
3315  * gst_base_src_start_wait:
3316  * @basesrc: base source instance
3317  *
3318  * Wait until the start operation completes.
3319  *
3320  * Returns: a #GstFlowReturn.
3321  */
3322 GstFlowReturn
3323 gst_base_src_start_wait (GstBaseSrc * basesrc)
3324 {
3325   GstFlowReturn result;
3326
3327   GST_OBJECT_LOCK (basesrc);
3328   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3329     GST_ASYNC_WAIT (basesrc);
3330   }
3331   result = basesrc->priv->start_result;
3332   GST_OBJECT_UNLOCK (basesrc);
3333
3334   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3335
3336   return result;
3337 }
3338
3339 static gboolean
3340 gst_base_src_stop (GstBaseSrc * basesrc)
3341 {
3342   GstBaseSrcClass *bclass;
3343   gboolean result = TRUE;
3344
3345   GST_DEBUG_OBJECT (basesrc, "stopping source");
3346
3347   /* flush all */
3348   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3349   /* stop the task */
3350   gst_pad_stop_task (basesrc->srcpad);
3351
3352   GST_OBJECT_LOCK (basesrc);
3353   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3354     goto was_stopped;
3355
3356   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3357   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3358   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3359   GST_ASYNC_SIGNAL (basesrc);
3360   GST_OBJECT_UNLOCK (basesrc);
3361
3362   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3363   if (bclass->stop)
3364     result = bclass->stop (basesrc);
3365
3366   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3367
3368   return result;
3369
3370 was_stopped:
3371   {
3372     GST_DEBUG_OBJECT (basesrc, "was stopped");
3373     GST_OBJECT_UNLOCK (basesrc);
3374     return TRUE;
3375   }
3376 }
3377
3378 /* start or stop flushing dataprocessing
3379  */
3380 static gboolean
3381 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3382     gboolean flushing, gboolean live_play, gboolean * playing)
3383 {
3384   GstBaseSrcClass *bclass;
3385
3386   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3387
3388   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3389
3390   if (flushing) {
3391     gst_base_src_activate_pool (basesrc, FALSE);
3392     /* unlock any subclasses, we need to do this before grabbing the
3393      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3394      * unlock to the params because of backwards compat (see seek handler)*/
3395     if (bclass->unlock)
3396       bclass->unlock (basesrc);
3397   }
3398
3399   /* the live lock is released when we are blocked, waiting for playing or
3400    * when we sync to the clock. */
3401   GST_LIVE_LOCK (basesrc);
3402   if (playing)
3403     *playing = basesrc->live_running;
3404   basesrc->priv->flushing = flushing;
3405   if (flushing) {
3406     /* if we are locked in the live lock, signal it to make it flush */
3407     basesrc->live_running = TRUE;
3408
3409     /* clear pending EOS if any */
3410     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3411
3412     /* step 1, now that we have the LIVE lock, clear our unlock request */
3413     if (bclass->unlock_stop)
3414       bclass->unlock_stop (basesrc);
3415
3416     /* step 2, unblock clock sync (if any) or any other blocking thing */
3417     if (basesrc->clock_id)
3418       gst_clock_id_unschedule (basesrc->clock_id);
3419   } else {
3420     /* signal the live source that it can start playing */
3421     basesrc->live_running = live_play;
3422
3423     gst_base_src_activate_pool (basesrc, TRUE);
3424
3425     /* Drop all delayed events */
3426     GST_OBJECT_LOCK (basesrc);
3427     if (basesrc->priv->pending_events) {
3428       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3429           NULL);
3430       g_list_free (basesrc->priv->pending_events);
3431       basesrc->priv->pending_events = NULL;
3432       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3433     }
3434     GST_OBJECT_UNLOCK (basesrc);
3435   }
3436   GST_LIVE_SIGNAL (basesrc);
3437   GST_LIVE_UNLOCK (basesrc);
3438
3439   return TRUE;
3440 }
3441
3442 /* the purpose of this function is to make sure that a live source blocks in the
3443  * LIVE lock or leaves the LIVE lock and continues playing. */
3444 static gboolean
3445 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3446 {
3447   GstBaseSrcClass *bclass;
3448
3449   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3450
3451   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3452   if (!live_play) {
3453     GST_DEBUG_OBJECT (basesrc, "unlock");
3454     if (bclass->unlock)
3455       bclass->unlock (basesrc);
3456   }
3457
3458   /* we are now able to grab the LIVE lock, when we get it, we can be
3459    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3460    * for the clock. */
3461   GST_LIVE_LOCK (basesrc);
3462   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3463
3464   /* unblock clock sync (if any) */
3465   if (basesrc->clock_id)
3466     gst_clock_id_unschedule (basesrc->clock_id);
3467
3468   /* configure what to do when we get to the LIVE lock. */
3469   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3470   basesrc->live_running = live_play;
3471
3472   if (live_play) {
3473     gboolean start;
3474
3475     /* clear our unlock request when going to PLAYING */
3476     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3477     if (bclass->unlock_stop)
3478       bclass->unlock_stop (basesrc);
3479
3480     /* for live sources we restart the timestamp correction */
3481     basesrc->priv->latency = -1;
3482     /* have to restart the task in case it stopped because of the unlock when
3483      * we went to PAUSED. Only do this if we operating in push mode. */
3484     GST_OBJECT_LOCK (basesrc->srcpad);
3485     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3486     GST_OBJECT_UNLOCK (basesrc->srcpad);
3487     if (start)
3488       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3489           basesrc->srcpad, NULL);
3490     GST_DEBUG_OBJECT (basesrc, "signal");
3491     GST_LIVE_SIGNAL (basesrc);
3492   }
3493   GST_LIVE_UNLOCK (basesrc);
3494
3495   return TRUE;
3496 }
3497
3498 static gboolean
3499 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3500 {
3501   GstBaseSrc *basesrc;
3502
3503   basesrc = GST_BASE_SRC (parent);
3504
3505   /* prepare subclass first */
3506   if (active) {
3507     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3508
3509     if (G_UNLIKELY (!basesrc->can_activate_push))
3510       goto no_push_activation;
3511
3512     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3513       goto error_start;
3514   } else {
3515     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3516     /* now we can stop the source */
3517     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3518       goto error_stop;
3519   }
3520   return TRUE;
3521
3522   /* ERRORS */
3523 no_push_activation:
3524   {
3525     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3526     return FALSE;
3527   }
3528 error_start:
3529   {
3530     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3531     return FALSE;
3532   }
3533 error_stop:
3534   {
3535     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3536     return FALSE;
3537   }
3538 }
3539
3540 static gboolean
3541 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3542 {
3543   GstBaseSrc *basesrc;
3544
3545   basesrc = GST_BASE_SRC (parent);
3546
3547   /* prepare subclass first */
3548   if (active) {
3549     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3550     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3551       goto error_start;
3552   } else {
3553     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3554     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3555       goto error_stop;
3556   }
3557   return TRUE;
3558
3559   /* ERRORS */
3560 error_start:
3561   {
3562     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3563     return FALSE;
3564   }
3565 error_stop:
3566   {
3567     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3568     return FALSE;
3569   }
3570 }
3571
3572 static gboolean
3573 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3574     GstPadMode mode, gboolean active)
3575 {
3576   gboolean res;
3577   GstBaseSrc *src = GST_BASE_SRC (parent);
3578
3579   src->priv->stream_start_pending = FALSE;
3580
3581   switch (mode) {
3582     case GST_PAD_MODE_PULL:
3583       res = gst_base_src_activate_pull (pad, parent, active);
3584       break;
3585     case GST_PAD_MODE_PUSH:
3586       src->priv->stream_start_pending = active;
3587       res = gst_base_src_activate_push (pad, parent, active);
3588       break;
3589     default:
3590       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3591       res = FALSE;
3592       break;
3593   }
3594   return res;
3595 }
3596
3597
3598 static GstStateChangeReturn
3599 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3600 {
3601   GstBaseSrc *basesrc;
3602   GstStateChangeReturn result;
3603   gboolean no_preroll = FALSE;
3604
3605   basesrc = GST_BASE_SRC (element);
3606
3607   switch (transition) {
3608     case GST_STATE_CHANGE_NULL_TO_READY:
3609       break;
3610     case GST_STATE_CHANGE_READY_TO_PAUSED:
3611       no_preroll = gst_base_src_is_live (basesrc);
3612       break;
3613     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3614       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3615       if (gst_base_src_is_live (basesrc)) {
3616         /* now we can start playback */
3617         gst_base_src_set_playing (basesrc, TRUE);
3618       }
3619       break;
3620     default:
3621       break;
3622   }
3623
3624   if ((result =
3625           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3626               transition)) == GST_STATE_CHANGE_FAILURE)
3627     goto failure;
3628
3629   switch (transition) {
3630     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3631       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3632       if (gst_base_src_is_live (basesrc)) {
3633         /* make sure we block in the live lock in PAUSED */
3634         gst_base_src_set_playing (basesrc, FALSE);
3635         no_preroll = TRUE;
3636       }
3637       break;
3638     case GST_STATE_CHANGE_PAUSED_TO_READY:
3639     {
3640       /* we don't need to unblock anything here, the pad deactivation code
3641        * already did this */
3642       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3643       gst_event_replace (&basesrc->pending_seek, NULL);
3644       break;
3645     }
3646     case GST_STATE_CHANGE_READY_TO_NULL:
3647       break;
3648     default:
3649       break;
3650   }
3651
3652   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3653     result = GST_STATE_CHANGE_NO_PREROLL;
3654
3655   return result;
3656
3657   /* ERRORS */
3658 failure:
3659   {
3660     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3661     return result;
3662   }
3663 }
3664
3665 /**
3666  * gst_base_src_get_buffer_pool:
3667  * @src: a #GstBaseSrc
3668  *
3669  * Returns: (transfer full): the instance of the #GstBufferPool used
3670  * by the src; free it after use it
3671  */
3672 GstBufferPool *
3673 gst_base_src_get_buffer_pool (GstBaseSrc * src)
3674 {
3675   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
3676
3677   if (src->priv->pool)
3678     return gst_object_ref (src->priv->pool);
3679
3680   return NULL;
3681 }
3682
3683 /**
3684  * gst_base_src_get_allocator:
3685  * @src: a #GstBaseSrc
3686  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3687  * used
3688  * @params: (out) (allow-none) (transfer full): the
3689  * #GstAllocatorParams of @allocator
3690  *
3691  * Lets #GstBaseSrc sub-classes to know the memory @allocator
3692  * used by the base class and its @params.
3693  *
3694  * Unref the @allocator after use it.
3695  */
3696 void
3697 gst_base_src_get_allocator (GstBaseSrc * src,
3698     GstAllocator ** allocator, GstAllocationParams * params)
3699 {
3700   g_return_if_fail (GST_IS_BASE_SRC (src));
3701
3702   if (allocator)
3703     *allocator = src->priv->allocator ?
3704         gst_object_ref (src->priv->allocator) : NULL;
3705
3706   if (params)
3707     *params = src->priv->params;
3708 }