d845517e8476127562b8d6f854e9ba62eecfbd24
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *
125  *   gst_element_class_set_static_metadata (gstelement_class,
126  *      "Source name",
127  *      "Source",
128  *      "My Source element",
129  *      "The author &lt;my.sink@my.email&gt;");
130  * }
131  * ]|
132  *
133  * <refsect2>
134  * <title>Controlled shutdown of live sources in applications</title>
135  * <para>
136  * Applications that record from a live source may want to stop recording
137  * in a controlled way, so that the recording is stopped, but the data
138  * already in the pipeline is processed to the end (remember that many live
139  * sources would go on recording forever otherwise). For that to happen the
140  * application needs to make the source stop recording and send an EOS
141  * event down the pipeline. The application would then wait for an
142  * EOS message posted on the pipeline's bus to know when all data has
143  * been processed and the pipeline can safely be stopped.
144  *
145  * An application may send an EOS event to a source element to make it
146  * perform the EOS logic (send EOS event downstream or post a
147  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
148  * with the gst_element_send_event() function on the element or its parent bin.
149  *
150  * After the EOS has been sent to the element, the application should wait for
151  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
152  * received, it may safely shut down the entire pipeline.
153  *
154  * Last reviewed on 2007-12-19 (0.10.16)
155  * </para>
156  * </refsect2>
157  */
158
159 #ifdef HAVE_CONFIG_H
160 #  include "config.h"
161 #endif
162
163 #include <stdlib.h>
164 #include <string.h>
165
166 #include <gst/gst_private.h>
167 #include <gst/glib-compat-private.h>
168
169 #include "gstbasesrc.h"
170 #include "gsttypefindhelper.h"
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
183 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
184 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
185
186
187 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
188 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
189 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
190
191
192 /* BaseSrc signals and args */
193 enum
194 {
195   /* FILL ME */
196   LAST_SIGNAL
197 };
198
199 #define DEFAULT_BLOCKSIZE       4096
200 #define DEFAULT_NUM_BUFFERS     -1
201 #define DEFAULT_TYPEFIND        FALSE
202 #define DEFAULT_DO_TIMESTAMP    FALSE
203
204 enum
205 {
206   PROP_0,
207   PROP_BLOCKSIZE,
208   PROP_NUM_BUFFERS,
209   PROP_TYPEFIND,
210   PROP_DO_TIMESTAMP
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean discont;
219   gboolean flushing;
220
221   GstFlowReturn start_result;
222   gboolean async;
223
224   /* if a stream-start event should be sent */
225   gboolean stream_start_pending;
226
227   /* if segment should be sent */
228   gboolean segment_pending;
229
230   /* if EOS is pending (atomic) */
231   gint pending_eos;
232
233   /* startup latency is the time it takes between going to PLAYING and producing
234    * the first BUFFER with running_time 0. This value is included in the latency
235    * reporting. */
236   GstClockTime latency;
237   /* timestamp offset, this is the offset add to the values of gst_times for
238    * pseudo live sources */
239   GstClockTimeDiff ts_offset;
240
241   gboolean do_timestamp;
242   volatile gint dynamic_size;
243
244   /* stream sequence number */
245   guint32 seqnum;
246
247   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
248    * pushed in the data stream */
249   GList *pending_events;
250   volatile gint have_events;
251
252   /* QoS *//* with LOCK */
253   gboolean qos_enabled;
254   gdouble proportion;
255   GstClockTime earliest_time;
256
257   GstBufferPool *pool;
258   GstAllocator *allocator;
259   GstAllocationParams params;
260
261   GCond async_cond;
262 };
263
264 static GstElementClass *parent_class = NULL;
265
266 static void gst_base_src_class_init (GstBaseSrcClass * klass);
267 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
268 static void gst_base_src_finalize (GObject * object);
269
270
271 GType
272 gst_base_src_get_type (void)
273 {
274   static volatile gsize base_src_type = 0;
275
276   if (g_once_init_enter (&base_src_type)) {
277     GType _type;
278     static const GTypeInfo base_src_info = {
279       sizeof (GstBaseSrcClass),
280       NULL,
281       NULL,
282       (GClassInitFunc) gst_base_src_class_init,
283       NULL,
284       NULL,
285       sizeof (GstBaseSrc),
286       0,
287       (GInstanceInitFunc) gst_base_src_init,
288     };
289
290     _type = g_type_register_static (GST_TYPE_ELEMENT,
291         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
292     g_once_init_leave (&base_src_type, _type);
293   }
294   return base_src_type;
295 }
296
297 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
298     GstCaps * filter);
299 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
300 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
301
302 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
303 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
304     GstPadMode mode, gboolean active);
305 static void gst_base_src_set_property (GObject * object, guint prop_id,
306     const GValue * value, GParamSpec * pspec);
307 static void gst_base_src_get_property (GObject * object, guint prop_id,
308     GValue * value, GParamSpec * pspec);
309 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
310     GstEvent * event);
311 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
312 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
313
314 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
315     GstQuery * query);
316
317 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
318     gboolean active);
319 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
321     GstSegment * segment);
322 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
323 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
324     GstEvent * event, GstSegment * segment);
325 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
326     guint64 offset, guint size, GstBuffer ** buf);
327 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
328     guint64 offset, guint size, GstBuffer ** buf);
329 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
330     GstQuery * query);
331
332 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
333     gboolean flushing, gboolean live_play, gboolean * playing);
334
335 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
337
338 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
339     GstStateChange transition);
340
341 static void gst_base_src_loop (GstPad * pad);
342 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
343     guint64 offset, guint length, GstBuffer ** buf);
344 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
345     guint length, GstBuffer ** buf);
346 static gboolean gst_base_src_seekable (GstBaseSrc * src);
347 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
348 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
349     guint * length);
350
351 static void
352 gst_base_src_class_init (GstBaseSrcClass * klass)
353 {
354   GObjectClass *gobject_class;
355   GstElementClass *gstelement_class;
356
357   gobject_class = G_OBJECT_CLASS (klass);
358   gstelement_class = GST_ELEMENT_CLASS (klass);
359
360   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
361
362   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
363
364   parent_class = g_type_class_peek_parent (klass);
365
366   gobject_class->finalize = gst_base_src_finalize;
367   gobject_class->set_property = gst_base_src_set_property;
368   gobject_class->get_property = gst_base_src_get_property;
369
370   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
371       g_param_spec_uint ("blocksize", "Block size",
372           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
373           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
374   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
375       g_param_spec_int ("num-buffers", "num-buffers",
376           "Number of buffers to output before sending EOS (-1 = unlimited)",
377           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
378           G_PARAM_STATIC_STRINGS));
379   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
380       g_param_spec_boolean ("typefind", "Typefind",
381           "Run typefind before negotiating", DEFAULT_TYPEFIND,
382           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
383   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
384       g_param_spec_boolean ("do-timestamp", "Do timestamp",
385           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
386           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
387
388   gstelement_class->change_state =
389       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
390   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
391
392   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
393   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
394   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
395   klass->prepare_seek_segment =
396       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
397   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
398   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
399   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
400   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
401   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
402   klass->decide_allocation =
403       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
404
405   /* Registering debug symbols for function pointers */
406   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
407   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
408   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
409   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
410   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
411 }
412
413 static void
414 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
415 {
416   GstPad *pad;
417   GstPadTemplate *pad_template;
418
419   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
420
421   basesrc->is_live = FALSE;
422   g_mutex_init (&basesrc->live_lock);
423   g_cond_init (&basesrc->live_cond);
424   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
425   basesrc->num_buffers_left = -1;
426
427   basesrc->can_activate_push = TRUE;
428
429   pad_template =
430       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
431   g_return_if_fail (pad_template != NULL);
432
433   GST_DEBUG_OBJECT (basesrc, "creating src pad");
434   pad = gst_pad_new_from_template (pad_template, "src");
435
436   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
437   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
438   gst_pad_set_event_function (pad, gst_base_src_event);
439   gst_pad_set_query_function (pad, gst_base_src_query);
440   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
441
442   /* hold pointer to pad */
443   basesrc->srcpad = pad;
444   GST_DEBUG_OBJECT (basesrc, "adding src pad");
445   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
446
447   basesrc->blocksize = DEFAULT_BLOCKSIZE;
448   basesrc->clock_id = NULL;
449   /* we operate in BYTES by default */
450   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
451   basesrc->typefind = DEFAULT_TYPEFIND;
452   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
453   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
454
455   g_cond_init (&basesrc->priv->async_cond);
456   basesrc->priv->start_result = GST_FLOW_FLUSHING;
457   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
458   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
459   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
460
461   GST_DEBUG_OBJECT (basesrc, "init done");
462 }
463
464 static void
465 gst_base_src_finalize (GObject * object)
466 {
467   GstBaseSrc *basesrc;
468   GstEvent **event_p;
469
470   basesrc = GST_BASE_SRC (object);
471
472   g_mutex_clear (&basesrc->live_lock);
473   g_cond_clear (&basesrc->live_cond);
474   g_cond_clear (&basesrc->priv->async_cond);
475
476   event_p = &basesrc->pending_seek;
477   gst_event_replace (event_p, NULL);
478
479   if (basesrc->priv->pending_events) {
480     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
481         NULL);
482     g_list_free (basesrc->priv->pending_events);
483   }
484
485   G_OBJECT_CLASS (parent_class)->finalize (object);
486 }
487
488 /**
489  * gst_base_src_wait_playing:
490  * @src: the src
491  *
492  * If the #GstBaseSrcClass.create() method performs its own synchronisation
493  * against the clock it must unblock when going from PLAYING to the PAUSED state
494  * and call this method before continuing to produce the remaining data.
495  *
496  * This function will block until a state change to PLAYING happens (in which
497  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
498  * to a state change to READY or a FLUSH event (in which case this function
499  * returns #GST_FLOW_FLUSHING).
500  *
501  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
502  * continue. Any other return value should be returned from the create vmethod.
503  */
504 GstFlowReturn
505 gst_base_src_wait_playing (GstBaseSrc * src)
506 {
507   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
508
509   do {
510     /* block until the state changes, or we get a flush, or something */
511     GST_DEBUG_OBJECT (src, "live source waiting for running state");
512     GST_LIVE_WAIT (src);
513     GST_DEBUG_OBJECT (src, "live source unlocked");
514     if (src->priv->flushing)
515       goto flushing;
516   } while (G_UNLIKELY (!src->live_running));
517
518   return GST_FLOW_OK;
519
520   /* ERRORS */
521 flushing:
522   {
523     GST_DEBUG_OBJECT (src, "we are flushing");
524     return GST_FLOW_FLUSHING;
525   }
526 }
527
528 /**
529  * gst_base_src_set_live:
530  * @src: base source instance
531  * @live: new live-mode
532  *
533  * If the element listens to a live source, @live should
534  * be set to %TRUE.
535  *
536  * A live source will not produce data in the PAUSED state and
537  * will therefore not be able to participate in the PREROLL phase
538  * of a pipeline. To signal this fact to the application and the
539  * pipeline, the state change return value of the live source will
540  * be GST_STATE_CHANGE_NO_PREROLL.
541  */
542 void
543 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
544 {
545   g_return_if_fail (GST_IS_BASE_SRC (src));
546
547   GST_OBJECT_LOCK (src);
548   src->is_live = live;
549   GST_OBJECT_UNLOCK (src);
550 }
551
552 /**
553  * gst_base_src_is_live:
554  * @src: base source instance
555  *
556  * Check if an element is in live mode.
557  *
558  * Returns: %TRUE if element is in live mode.
559  */
560 gboolean
561 gst_base_src_is_live (GstBaseSrc * src)
562 {
563   gboolean result;
564
565   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
566
567   GST_OBJECT_LOCK (src);
568   result = src->is_live;
569   GST_OBJECT_UNLOCK (src);
570
571   return result;
572 }
573
574 /**
575  * gst_base_src_set_format:
576  * @src: base source instance
577  * @format: the format to use
578  *
579  * Sets the default format of the source. This will be the format used
580  * for sending NEW_SEGMENT events and for performing seeks.
581  *
582  * If a format of GST_FORMAT_BYTES is set, the element will be able to
583  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
584  *
585  * This function must only be called in states < %GST_STATE_PAUSED.
586  */
587 void
588 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
589 {
590   g_return_if_fail (GST_IS_BASE_SRC (src));
591   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
592
593   GST_OBJECT_LOCK (src);
594   gst_segment_init (&src->segment, format);
595   GST_OBJECT_UNLOCK (src);
596 }
597
598 /**
599  * gst_base_src_set_dynamic_size:
600  * @src: base source instance
601  * @dynamic: new dynamic size mode
602  *
603  * If not @dynamic, size is only updated when needed, such as when trying to
604  * read past current tracked size.  Otherwise, size is checked for upon each
605  * read.
606  */
607 void
608 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
609 {
610   g_return_if_fail (GST_IS_BASE_SRC (src));
611
612   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
613 }
614
615 /**
616  * gst_base_src_set_async:
617  * @src: base source instance
618  * @async: new async mode
619  *
620  * Configure async behaviour in @src, no state change will block. The open,
621  * close, start, stop, play and pause virtual methods will be executed in a
622  * different thread and are thus allowed to perform blocking operations. Any
623  * blocking operation should be unblocked with the unlock vmethod.
624  */
625 void
626 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
627 {
628   g_return_if_fail (GST_IS_BASE_SRC (src));
629
630   GST_OBJECT_LOCK (src);
631   src->priv->async = async;
632   GST_OBJECT_UNLOCK (src);
633 }
634
635 /**
636  * gst_base_src_is_async:
637  * @src: base source instance
638  *
639  * Get the current async behaviour of @src. See also gst_base_src_set_async().
640  *
641  * Returns: %TRUE if @src is operating in async mode.
642  */
643 gboolean
644 gst_base_src_is_async (GstBaseSrc * src)
645 {
646   gboolean res;
647
648   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
649
650   GST_OBJECT_LOCK (src);
651   res = src->priv->async;
652   GST_OBJECT_UNLOCK (src);
653
654   return res;
655 }
656
657
658 /**
659  * gst_base_src_query_latency:
660  * @src: the source
661  * @live: (out) (allow-none): if the source is live
662  * @min_latency: (out) (allow-none): the min latency of the source
663  * @max_latency: (out) (allow-none): the max latency of the source
664  *
665  * Query the source for the latency parameters. @live will be TRUE when @src is
666  * configured as a live source. @min_latency will be set to the difference
667  * between the running time and the timestamp of the first buffer.
668  * @max_latency is always the undefined value of -1.
669  *
670  * This function is mostly used by subclasses.
671  *
672  * Returns: TRUE if the query succeeded.
673  */
674 gboolean
675 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
676     GstClockTime * min_latency, GstClockTime * max_latency)
677 {
678   GstClockTime min;
679
680   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
681
682   GST_OBJECT_LOCK (src);
683   if (live)
684     *live = src->is_live;
685
686   /* if we have a startup latency, report this one, else report 0. Subclasses
687    * are supposed to override the query function if they want something
688    * else. */
689   if (src->priv->latency != -1)
690     min = src->priv->latency;
691   else
692     min = 0;
693
694   if (min_latency)
695     *min_latency = min;
696   if (max_latency)
697     *max_latency = -1;
698
699   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
700       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
701       GST_TIME_ARGS (-1));
702   GST_OBJECT_UNLOCK (src);
703
704   return TRUE;
705 }
706
707 /**
708  * gst_base_src_set_blocksize:
709  * @src: the source
710  * @blocksize: the new blocksize in bytes
711  *
712  * Set the number of bytes that @src will push out with each buffer. When
713  * @blocksize is set to -1, a default length will be used.
714  */
715 void
716 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
717 {
718   g_return_if_fail (GST_IS_BASE_SRC (src));
719
720   GST_OBJECT_LOCK (src);
721   src->blocksize = blocksize;
722   GST_OBJECT_UNLOCK (src);
723 }
724
725 /**
726  * gst_base_src_get_blocksize:
727  * @src: the source
728  *
729  * Get the number of bytes that @src will push out with each buffer.
730  *
731  * Returns: the number of bytes pushed with each buffer.
732  */
733 guint
734 gst_base_src_get_blocksize (GstBaseSrc * src)
735 {
736   gint res;
737
738   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
739
740   GST_OBJECT_LOCK (src);
741   res = src->blocksize;
742   GST_OBJECT_UNLOCK (src);
743
744   return res;
745 }
746
747
748 /**
749  * gst_base_src_set_do_timestamp:
750  * @src: the source
751  * @timestamp: enable or disable timestamping
752  *
753  * Configure @src to automatically timestamp outgoing buffers based on the
754  * current running_time of the pipeline. This property is mostly useful for live
755  * sources.
756  */
757 void
758 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
759 {
760   g_return_if_fail (GST_IS_BASE_SRC (src));
761
762   GST_OBJECT_LOCK (src);
763   src->priv->do_timestamp = timestamp;
764   GST_OBJECT_UNLOCK (src);
765 }
766
767 /**
768  * gst_base_src_get_do_timestamp:
769  * @src: the source
770  *
771  * Query if @src timestamps outgoing buffers based on the current running_time.
772  *
773  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
774  */
775 gboolean
776 gst_base_src_get_do_timestamp (GstBaseSrc * src)
777 {
778   gboolean res;
779
780   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
781
782   GST_OBJECT_LOCK (src);
783   res = src->priv->do_timestamp;
784   GST_OBJECT_UNLOCK (src);
785
786   return res;
787 }
788
789 /**
790  * gst_base_src_new_seamless_segment:
791  * @src: The source
792  * @start: The new start value for the segment
793  * @stop: Stop value for the new segment
794  * @time: The new time value for the start of the new segent
795  *
796  * Prepare a new seamless segment for emission downstream. This function must
797  * only be called by derived sub-classes, and only from the create() function,
798  * as the stream-lock needs to be held.
799  *
800  * The format for the new segment will be the current format of the source, as
801  * configured with gst_base_src_set_format()
802  *
803  * Returns: %TRUE if preparation of the seamless segment succeeded.
804  */
805 gboolean
806 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
807     gint64 time)
808 {
809   gboolean res = TRUE;
810
811   GST_OBJECT_LOCK (src);
812
813   src->segment.base = gst_segment_to_running_time (&src->segment,
814       src->segment.format, src->segment.position);
815   src->segment.position = src->segment.start = start;
816   src->segment.stop = stop;
817   src->segment.time = time;
818
819   /* Mark pending segment. Will be sent before next data */
820   src->priv->segment_pending = TRUE;
821
822   GST_DEBUG_OBJECT (src,
823       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
824       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
825       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
826       GST_TIME_ARGS (src->segment.base));
827
828   GST_OBJECT_UNLOCK (src);
829
830   src->priv->discont = TRUE;
831   src->running = TRUE;
832
833   return res;
834 }
835
836 static gboolean
837 gst_base_src_send_stream_start (GstBaseSrc * src)
838 {
839   gboolean ret = TRUE;
840
841   if (src->priv->stream_start_pending) {
842     gchar *stream_id;
843
844     stream_id =
845         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
846
847     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
848     ret =
849         gst_pad_push_event (src->srcpad,
850         gst_event_new_stream_start (stream_id));
851     src->priv->stream_start_pending = FALSE;
852     g_free (stream_id);
853   }
854
855   return ret;
856 }
857
858 /**
859  * gst_base_src_set_caps:
860  * @src: a #GstBaseSrc
861  * @caps: a #GstCaps
862  *
863  * Set new caps on the basesrc source pad.
864  *
865  * Returns: %TRUE if the caps could be set
866  */
867 gboolean
868 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
869 {
870   GstBaseSrcClass *bclass;
871   gboolean res = TRUE;
872
873   bclass = GST_BASE_SRC_GET_CLASS (src);
874
875   gst_base_src_send_stream_start (src);
876
877   if (bclass->set_caps)
878     res = bclass->set_caps (src, caps);
879
880   if (res)
881     res = gst_pad_set_caps (src->srcpad, caps);
882
883   return res;
884 }
885
886 static GstCaps *
887 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
888 {
889   GstCaps *caps = NULL;
890   GstPadTemplate *pad_template;
891   GstBaseSrcClass *bclass;
892
893   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
894
895   pad_template =
896       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
897
898   if (pad_template != NULL) {
899     caps = gst_pad_template_get_caps (pad_template);
900
901     if (filter) {
902       GstCaps *intersection;
903
904       intersection =
905           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
906       gst_caps_unref (caps);
907       caps = intersection;
908     }
909   }
910   return caps;
911 }
912
913 static GstCaps *
914 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
915 {
916   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
917   return gst_caps_fixate (caps);
918 }
919
920 static GstCaps *
921 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
922 {
923   GstBaseSrcClass *bclass;
924
925   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
926
927   if (bclass->fixate)
928     caps = bclass->fixate (bsrc, caps);
929
930   return caps;
931 }
932
933 static gboolean
934 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
935 {
936   gboolean res;
937
938   switch (GST_QUERY_TYPE (query)) {
939     case GST_QUERY_POSITION:
940     {
941       GstFormat format;
942
943       gst_query_parse_position (query, &format, NULL);
944
945       GST_DEBUG_OBJECT (src, "position query in format %s",
946           gst_format_get_name (format));
947
948       switch (format) {
949         case GST_FORMAT_PERCENT:
950         {
951           gint64 percent;
952           gint64 position;
953           gint64 duration;
954
955           GST_OBJECT_LOCK (src);
956           position = src->segment.position;
957           duration = src->segment.duration;
958           GST_OBJECT_UNLOCK (src);
959
960           if (position != -1 && duration != -1) {
961             if (position < duration)
962               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
963                   duration);
964             else
965               percent = GST_FORMAT_PERCENT_MAX;
966           } else
967             percent = -1;
968
969           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
970           res = TRUE;
971           break;
972         }
973         default:
974         {
975           gint64 position;
976           GstFormat seg_format;
977
978           GST_OBJECT_LOCK (src);
979           position =
980               gst_segment_to_stream_time (&src->segment, src->segment.format,
981               src->segment.position);
982           seg_format = src->segment.format;
983           GST_OBJECT_UNLOCK (src);
984
985           if (position != -1) {
986             /* convert to requested format */
987             res =
988                 gst_pad_query_convert (src->srcpad, seg_format,
989                 position, format, &position);
990           } else
991             res = TRUE;
992
993           gst_query_set_position (query, format, position);
994           break;
995         }
996       }
997       break;
998     }
999     case GST_QUERY_DURATION:
1000     {
1001       GstFormat format;
1002
1003       gst_query_parse_duration (query, &format, NULL);
1004
1005       GST_DEBUG_OBJECT (src, "duration query in format %s",
1006           gst_format_get_name (format));
1007
1008       switch (format) {
1009         case GST_FORMAT_PERCENT:
1010           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1011               GST_FORMAT_PERCENT_MAX);
1012           res = TRUE;
1013           break;
1014         default:
1015         {
1016           gint64 duration;
1017           GstFormat seg_format;
1018           guint length = 0;
1019
1020           /* may have to refresh duration */
1021           if (g_atomic_int_get (&src->priv->dynamic_size))
1022             gst_base_src_update_length (src, 0, &length);
1023
1024           /* this is the duration as configured by the subclass. */
1025           GST_OBJECT_LOCK (src);
1026           duration = src->segment.duration;
1027           seg_format = src->segment.format;
1028           GST_OBJECT_UNLOCK (src);
1029
1030           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1031               duration, gst_format_get_name (seg_format));
1032
1033           if (duration != -1) {
1034             /* convert to requested format, if this fails, we have a duration
1035              * but we cannot answer the query, we must return FALSE. */
1036             res =
1037                 gst_pad_query_convert (src->srcpad, seg_format,
1038                 duration, format, &duration);
1039           } else {
1040             /* The subclass did not configure a duration, we assume that the
1041              * media has an unknown duration then and we return TRUE to report
1042              * this. Note that this is not the same as returning FALSE, which
1043              * means that we cannot report the duration at all. */
1044             res = TRUE;
1045           }
1046           gst_query_set_duration (query, format, duration);
1047           break;
1048         }
1049       }
1050       break;
1051     }
1052
1053     case GST_QUERY_SEEKING:
1054     {
1055       GstFormat format, seg_format;
1056       gint64 duration;
1057
1058       GST_OBJECT_LOCK (src);
1059       duration = src->segment.duration;
1060       seg_format = src->segment.format;
1061       GST_OBJECT_UNLOCK (src);
1062
1063       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1064       if (format == seg_format) {
1065         gst_query_set_seeking (query, seg_format,
1066             gst_base_src_seekable (src), 0, duration);
1067         res = TRUE;
1068       } else {
1069         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1070         /* Don't reply to the query to make up for demuxers which don't
1071          * handle the SEEKING query yet. Players like Totem will fall back
1072          * to the duration when the SEEKING query isn't answered. */
1073         res = FALSE;
1074       }
1075       break;
1076     }
1077     case GST_QUERY_SEGMENT:
1078     {
1079       gint64 start, stop;
1080
1081       GST_OBJECT_LOCK (src);
1082       /* no end segment configured, current duration then */
1083       if ((stop = src->segment.stop) == -1)
1084         stop = src->segment.duration;
1085       start = src->segment.start;
1086
1087       /* adjust to stream time */
1088       if (src->segment.time != -1) {
1089         start -= src->segment.time;
1090         if (stop != -1)
1091           stop -= src->segment.time;
1092       }
1093
1094       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1095           start, stop);
1096       GST_OBJECT_UNLOCK (src);
1097       res = TRUE;
1098       break;
1099     }
1100
1101     case GST_QUERY_FORMATS:
1102     {
1103       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1104           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1105       res = TRUE;
1106       break;
1107     }
1108     case GST_QUERY_CONVERT:
1109     {
1110       GstFormat src_fmt, dest_fmt;
1111       gint64 src_val, dest_val;
1112
1113       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1114
1115       /* we can only convert between equal formats... */
1116       if (src_fmt == dest_fmt) {
1117         dest_val = src_val;
1118         res = TRUE;
1119       } else
1120         res = FALSE;
1121
1122       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1123       break;
1124     }
1125     case GST_QUERY_LATENCY:
1126     {
1127       GstClockTime min, max;
1128       gboolean live;
1129
1130       /* Subclasses should override and implement something useful */
1131       res = gst_base_src_query_latency (src, &live, &min, &max);
1132
1133       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1134           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1135           GST_TIME_ARGS (max));
1136
1137       gst_query_set_latency (query, live, min, max);
1138       break;
1139     }
1140     case GST_QUERY_JITTER:
1141     case GST_QUERY_RATE:
1142       res = FALSE;
1143       break;
1144     case GST_QUERY_BUFFERING:
1145     {
1146       GstFormat format, seg_format;
1147       gint64 start, stop, estimated;
1148
1149       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1150
1151       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1152           gst_format_get_name (format));
1153
1154       GST_OBJECT_LOCK (src);
1155       if (src->random_access) {
1156         estimated = 0;
1157         start = 0;
1158         if (format == GST_FORMAT_PERCENT)
1159           stop = GST_FORMAT_PERCENT_MAX;
1160         else
1161           stop = src->segment.duration;
1162       } else {
1163         estimated = -1;
1164         start = -1;
1165         stop = -1;
1166       }
1167       seg_format = src->segment.format;
1168       GST_OBJECT_UNLOCK (src);
1169
1170       /* convert to required format. When the conversion fails, we can't answer
1171        * the query. When the value is unknown, we can don't perform conversion
1172        * but report TRUE. */
1173       if (format != GST_FORMAT_PERCENT && stop != -1) {
1174         res = gst_pad_query_convert (src->srcpad, seg_format,
1175             stop, format, &stop);
1176       } else {
1177         res = TRUE;
1178       }
1179       if (res && format != GST_FORMAT_PERCENT && start != -1)
1180         res = gst_pad_query_convert (src->srcpad, seg_format,
1181             start, format, &start);
1182
1183       gst_query_set_buffering_range (query, format, start, stop, estimated);
1184       break;
1185     }
1186     case GST_QUERY_SCHEDULING:
1187     {
1188       gboolean random_access;
1189
1190       random_access = gst_base_src_is_random_access (src);
1191
1192       /* we can operate in getrange mode if the native format is bytes
1193        * and we are seekable, this condition is set in the random_access
1194        * flag and is set in the _start() method. */
1195       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1196       if (random_access)
1197         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1198       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1199
1200       res = TRUE;
1201       break;
1202     }
1203     case GST_QUERY_CAPS:
1204     {
1205       GstBaseSrcClass *bclass;
1206       GstCaps *caps, *filter;
1207
1208       bclass = GST_BASE_SRC_GET_CLASS (src);
1209       if (bclass->get_caps) {
1210         gst_query_parse_caps (query, &filter);
1211         if ((caps = bclass->get_caps (src, filter))) {
1212           gst_query_set_caps_result (query, caps);
1213           gst_caps_unref (caps);
1214           res = TRUE;
1215         } else {
1216           res = FALSE;
1217         }
1218       } else
1219         res = FALSE;
1220       break;
1221     }
1222     case GST_QUERY_URI:{
1223       if (GST_IS_URI_HANDLER (src)) {
1224         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1225
1226         if (uri != NULL) {
1227           gst_query_set_uri (query, uri);
1228           g_free (uri);
1229           res = TRUE;
1230         } else {
1231           res = FALSE;
1232         }
1233       } else {
1234         res = FALSE;
1235       }
1236       break;
1237     }
1238     default:
1239       res = FALSE;
1240       break;
1241   }
1242   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1243       res);
1244
1245   return res;
1246 }
1247
1248 static gboolean
1249 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1250 {
1251   GstBaseSrc *src;
1252   GstBaseSrcClass *bclass;
1253   gboolean result = FALSE;
1254
1255   src = GST_BASE_SRC (parent);
1256   bclass = GST_BASE_SRC_GET_CLASS (src);
1257
1258   if (bclass->query)
1259     result = bclass->query (src, query);
1260
1261   return result;
1262 }
1263
1264 static gboolean
1265 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1266 {
1267   gboolean res = TRUE;
1268
1269   /* update our offset if the start/stop position was updated */
1270   if (segment->format == GST_FORMAT_BYTES) {
1271     segment->time = segment->start;
1272   } else if (segment->start == 0) {
1273     /* seek to start, we can implement a default for this. */
1274     segment->time = 0;
1275   } else {
1276     res = FALSE;
1277     GST_INFO_OBJECT (src, "Can't do a default seek");
1278   }
1279
1280   return res;
1281 }
1282
1283 static gboolean
1284 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1285 {
1286   GstBaseSrcClass *bclass;
1287   gboolean result = FALSE;
1288
1289   bclass = GST_BASE_SRC_GET_CLASS (src);
1290
1291   if (bclass->do_seek)
1292     result = bclass->do_seek (src, segment);
1293
1294   return result;
1295 }
1296
1297 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1298
1299 static gboolean
1300 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1301     GstSegment * segment)
1302 {
1303   /* By default, we try one of 2 things:
1304    *   - For absolute seek positions, convert the requested position to our
1305    *     configured processing format and place it in the output segment \
1306    *   - For relative seek positions, convert our current (input) values to the
1307    *     seek format, adjust by the relative seek offset and then convert back to
1308    *     the processing format
1309    */
1310   GstSeekType start_type, stop_type;
1311   gint64 start, stop;
1312   GstSeekFlags flags;
1313   GstFormat seek_format, dest_format;
1314   gdouble rate;
1315   gboolean update;
1316   gboolean res = TRUE;
1317
1318   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1319       &start_type, &start, &stop_type, &stop);
1320   dest_format = segment->format;
1321
1322   if (seek_format == dest_format) {
1323     gst_segment_do_seek (segment, rate, seek_format, flags,
1324         start_type, start, stop_type, stop, &update);
1325     return TRUE;
1326   }
1327
1328   if (start_type != GST_SEEK_TYPE_NONE) {
1329     /* FIXME: Handle seek_end by converting the input segment vals */
1330     res =
1331         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1332         &start);
1333     start_type = GST_SEEK_TYPE_SET;
1334   }
1335
1336   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1337     /* FIXME: Handle seek_end by converting the input segment vals */
1338     res =
1339         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1340         &stop);
1341     stop_type = GST_SEEK_TYPE_SET;
1342   }
1343
1344   /* And finally, configure our output segment in the desired format */
1345   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1346       stop_type, stop, &update);
1347
1348   if (!res)
1349     goto no_format;
1350
1351   return res;
1352
1353 no_format:
1354   {
1355     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1356     return FALSE;
1357   }
1358 }
1359
1360 static gboolean
1361 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1362     GstSegment * seeksegment)
1363 {
1364   GstBaseSrcClass *bclass;
1365   gboolean result = FALSE;
1366
1367   bclass = GST_BASE_SRC_GET_CLASS (src);
1368
1369   if (bclass->prepare_seek_segment)
1370     result = bclass->prepare_seek_segment (src, event, seeksegment);
1371
1372   return result;
1373 }
1374
1375 static GstFlowReturn
1376 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1377     guint size, GstBuffer ** buffer)
1378 {
1379   GstFlowReturn ret;
1380   GstBaseSrcPrivate *priv = src->priv;
1381
1382   if (priv->pool) {
1383     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1384   } else if (size != -1) {
1385     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1386     if (G_UNLIKELY (*buffer == NULL))
1387       goto alloc_failed;
1388
1389     ret = GST_FLOW_OK;
1390   } else {
1391     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1392         size);
1393     goto alloc_failed;
1394   }
1395   return ret;
1396
1397   /* ERRORS */
1398 alloc_failed:
1399   {
1400     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1401     return GST_FLOW_ERROR;
1402   }
1403 }
1404
1405 static GstFlowReturn
1406 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1407     guint size, GstBuffer ** buffer)
1408 {
1409   GstBaseSrcClass *bclass;
1410   GstFlowReturn ret;
1411   GstBuffer *res_buf;
1412
1413   bclass = GST_BASE_SRC_GET_CLASS (src);
1414
1415   if (G_UNLIKELY (!bclass->alloc))
1416     goto no_function;
1417   if (G_UNLIKELY (!bclass->fill))
1418     goto no_function;
1419
1420   if (*buffer == NULL) {
1421     /* downstream did not provide us with a buffer to fill, allocate one
1422      * ourselves */
1423     ret = bclass->alloc (src, offset, size, &res_buf);
1424     if (G_UNLIKELY (ret != GST_FLOW_OK))
1425       goto alloc_failed;
1426   } else {
1427     res_buf = *buffer;
1428   }
1429
1430   if (G_LIKELY (size > 0)) {
1431     /* only call fill when there is a size */
1432     ret = bclass->fill (src, offset, size, res_buf);
1433     if (G_UNLIKELY (ret != GST_FLOW_OK))
1434       goto not_ok;
1435   }
1436
1437   *buffer = res_buf;
1438
1439   return GST_FLOW_OK;
1440
1441   /* ERRORS */
1442 no_function:
1443   {
1444     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1445     return GST_FLOW_NOT_SUPPORTED;
1446   }
1447 alloc_failed:
1448   {
1449     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1450     return ret;
1451   }
1452 not_ok:
1453   {
1454     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1455         gst_flow_get_name (ret));
1456     if (*buffer == NULL)
1457       gst_buffer_unref (res_buf);
1458     return ret;
1459   }
1460 }
1461
1462 /* this code implements the seeking. It is a good example
1463  * handling all cases.
1464  *
1465  * A seek updates the currently configured segment.start
1466  * and segment.stop values based on the SEEK_TYPE. If the
1467  * segment.start value is updated, a seek to this new position
1468  * should be performed.
1469  *
1470  * The seek can only be executed when we are not currently
1471  * streaming any data, to make sure that this is the case, we
1472  * acquire the STREAM_LOCK which is taken when we are in the
1473  * _loop() function or when a getrange() is called. Normally
1474  * we will not receive a seek if we are operating in pull mode
1475  * though. When we operate as a live source we might block on the live
1476  * cond, which does not release the STREAM_LOCK. Therefore we will try
1477  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1478  * safe to perform the seek.
1479  *
1480  * When we are in the loop() function, we might be in the middle
1481  * of pushing a buffer, which might block in a sink. To make sure
1482  * that the push gets unblocked we push out a FLUSH_START event.
1483  * Our loop function will get a FLUSHING return value from
1484  * the push and will pause, effectively releasing the STREAM_LOCK.
1485  *
1486  * For a non-flushing seek, we pause the task, which might eventually
1487  * release the STREAM_LOCK. We say eventually because when the sink
1488  * blocks on the sample we might wait a very long time until the sink
1489  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1490  * can continue the seek. A non-flushing seek is normally done in a
1491  * running pipeline to perform seamless playback, this means that the sink is
1492  * PLAYING and will return from its chain function.
1493  * In the case of a non-flushing seek we need to make sure that the
1494  * data we output after the seek is continuous with the previous data,
1495  * this is because a non-flushing seek does not reset the running-time
1496  * to 0. We do this by closing the currently running segment, ie. sending
1497  * a new_segment event with the stop position set to the last processed
1498  * position.
1499  *
1500  * After updating the segment.start/stop values, we prepare for
1501  * streaming again. We push out a FLUSH_STOP to make the peer pad
1502  * accept data again and we start our task again.
1503  *
1504  * A segment seek posts a message on the bus saying that the playback
1505  * of the segment started. We store the segment flag internally because
1506  * when we reach the segment.stop we have to post a segment.done
1507  * instead of EOS when doing a segment seek.
1508  */
1509 static gboolean
1510 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1511 {
1512   gboolean res = TRUE, tres;
1513   gdouble rate;
1514   GstFormat seek_format, dest_format;
1515   GstSeekFlags flags;
1516   GstSeekType start_type, stop_type;
1517   gint64 start, stop;
1518   gboolean flush, playing;
1519   gboolean update;
1520   gboolean relative_seek = FALSE;
1521   gboolean seekseg_configured = FALSE;
1522   GstSegment seeksegment;
1523   guint32 seqnum;
1524   GstEvent *tevent;
1525
1526   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1527
1528   GST_OBJECT_LOCK (src);
1529   dest_format = src->segment.format;
1530   GST_OBJECT_UNLOCK (src);
1531
1532   if (event) {
1533     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1534         &start_type, &start, &stop_type, &stop);
1535
1536     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1537         SEEK_TYPE_IS_RELATIVE (stop_type);
1538
1539     if (dest_format != seek_format && !relative_seek) {
1540       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1541        * here before taking the stream lock, otherwise we must convert it later,
1542        * once we have the stream lock and can read the last configures segment
1543        * start and stop positions */
1544       gst_segment_init (&seeksegment, dest_format);
1545
1546       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1547         goto prepare_failed;
1548
1549       seekseg_configured = TRUE;
1550     }
1551
1552     flush = flags & GST_SEEK_FLAG_FLUSH;
1553     seqnum = gst_event_get_seqnum (event);
1554   } else {
1555     flush = FALSE;
1556     /* get next seqnum */
1557     seqnum = gst_util_seqnum_next ();
1558   }
1559
1560   /* send flush start */
1561   if (flush) {
1562     tevent = gst_event_new_flush_start ();
1563     gst_event_set_seqnum (tevent, seqnum);
1564     gst_pad_push_event (src->srcpad, tevent);
1565   } else
1566     gst_pad_pause_task (src->srcpad);
1567
1568   /* unblock streaming thread. */
1569   if (unlock)
1570     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1571
1572   /* grab streaming lock, this should eventually be possible, either
1573    * because the task is paused, our streaming thread stopped
1574    * or because our peer is flushing. */
1575   GST_PAD_STREAM_LOCK (src->srcpad);
1576   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1577     /* we have seen this event before, issue a warning for now */
1578     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1579         seqnum);
1580   } else {
1581     src->priv->seqnum = seqnum;
1582     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1583   }
1584
1585   if (unlock)
1586     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1587
1588   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1589    * copy the current segment info into the temp segment that we can actually
1590    * attempt the seek with. We only update the real segment if the seek succeeds. */
1591   if (!seekseg_configured) {
1592     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1593
1594     /* now configure the final seek segment */
1595     if (event) {
1596       if (seeksegment.format != seek_format) {
1597         /* OK, here's where we give the subclass a chance to convert the relative
1598          * seek into an absolute one in the processing format. We set up any
1599          * absolute seek above, before taking the stream lock. */
1600         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1601           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1602               "Aborting seek");
1603           res = FALSE;
1604         }
1605       } else {
1606         /* The seek format matches our processing format, no need to ask the
1607          * the subclass to configure the segment. */
1608         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1609             start_type, start, stop_type, stop, &update);
1610       }
1611     }
1612     /* Else, no seek event passed, so we're just (re)starting the
1613        current segment. */
1614   }
1615
1616   if (res) {
1617     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1618         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1619         seeksegment.start, seeksegment.stop, seeksegment.position);
1620
1621     /* do the seek, segment.position contains the new position. */
1622     res = gst_base_src_do_seek (src, &seeksegment);
1623   }
1624
1625   /* and prepare to continue streaming */
1626   if (flush) {
1627     tevent = gst_event_new_flush_stop (TRUE);
1628     gst_event_set_seqnum (tevent, seqnum);
1629     /* send flush stop, peer will accept data and events again. We
1630      * are not yet providing data as we still have the STREAM_LOCK. */
1631     gst_pad_push_event (src->srcpad, tevent);
1632   }
1633
1634   /* The subclass must have converted the segment to the processing format
1635    * by now */
1636   if (res && seeksegment.format != dest_format) {
1637     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1638         "in the correct format. Aborting seek.");
1639     res = FALSE;
1640   }
1641
1642   /* if the seek was successful, we update our real segment and push
1643    * out the new segment. */
1644   if (res) {
1645     GST_OBJECT_LOCK (src);
1646     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1647     GST_OBJECT_UNLOCK (src);
1648
1649     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1650       GstMessage *message;
1651
1652       message = gst_message_new_segment_start (GST_OBJECT (src),
1653           seeksegment.format, seeksegment.position);
1654       gst_message_set_seqnum (message, seqnum);
1655
1656       gst_element_post_message (GST_ELEMENT (src), message);
1657     }
1658
1659     /* for deriving a stop position for the playback segment from the seek
1660      * segment, we must take the duration when the stop is not set */
1661     if ((stop = seeksegment.stop) == -1)
1662       stop = seeksegment.duration;
1663
1664     src->priv->segment_pending = TRUE;
1665   }
1666
1667   src->priv->discont = TRUE;
1668   src->running = TRUE;
1669   /* and restart the task in case it got paused explicitly or by
1670    * the FLUSH_START event we pushed out. */
1671   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1672       src->srcpad, NULL);
1673   if (res && !tres)
1674     res = FALSE;
1675
1676   /* and release the lock again so we can continue streaming */
1677   GST_PAD_STREAM_UNLOCK (src->srcpad);
1678
1679   return res;
1680
1681   /* ERROR */
1682 prepare_failed:
1683   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1684       "Aborting seek");
1685   return FALSE;
1686 }
1687
1688 /* all events send to this element directly. This is mainly done from the
1689  * application.
1690  */
1691 static gboolean
1692 gst_base_src_send_event (GstElement * element, GstEvent * event)
1693 {
1694   GstBaseSrc *src;
1695   gboolean result = FALSE;
1696
1697   src = GST_BASE_SRC (element);
1698
1699   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1700
1701   switch (GST_EVENT_TYPE (event)) {
1702       /* bidirectional events */
1703     case GST_EVENT_FLUSH_START:
1704       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1705       result = gst_pad_push_event (src->srcpad, event);
1706       event = NULL;
1707       break;
1708     case GST_EVENT_FLUSH_STOP:
1709       GST_LIVE_LOCK (src->srcpad);
1710       src->priv->segment_pending = TRUE;
1711       /* sending random flushes downstream can break stuff,
1712        * especially sync since all segment info will get flushed */
1713       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1714       result = gst_pad_push_event (src->srcpad, event);
1715       GST_LIVE_UNLOCK (src->srcpad);
1716       event = NULL;
1717       break;
1718
1719       /* downstream serialized events */
1720     case GST_EVENT_EOS:
1721     {
1722       GstBaseSrcClass *bclass;
1723
1724       bclass = GST_BASE_SRC_GET_CLASS (src);
1725
1726       /* queue EOS and make sure the task or pull function performs the EOS
1727        * actions.
1728        *
1729        * We have two possibilities:
1730        *
1731        *  - Before we are to enter the _create function, we check the pending_eos
1732        *    first and do EOS instead of entering it.
1733        *  - If we are in the _create function or we did not manage to set the
1734        *    flag fast enough and we are about to enter the _create function,
1735        *    we unlock it so that we exit with FLUSHING immediately. We then
1736        *    check the EOS flag and do the EOS logic.
1737        */
1738       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1739       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1740
1741
1742       /* unlock the _create function so that we can check the pending_eos flag
1743        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1744        * that we can grab it and stop the unlock again. We don't take the stream
1745        * lock so that this operation is guaranteed to never block. */
1746       gst_base_src_activate_pool (src, FALSE);
1747       if (bclass->unlock)
1748         bclass->unlock (src);
1749
1750       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1751
1752       GST_LIVE_LOCK (src);
1753       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1754       /* now stop the unlock of the streaming thread again. Grabbing the live
1755        * lock is enough because that protects the create function. */
1756       if (bclass->unlock_stop)
1757         bclass->unlock_stop (src);
1758       gst_base_src_activate_pool (src, TRUE);
1759       GST_LIVE_UNLOCK (src);
1760
1761       result = TRUE;
1762       break;
1763     }
1764     case GST_EVENT_SEGMENT:
1765       /* sending random SEGMENT downstream can break sync. */
1766       break;
1767     case GST_EVENT_TAG:
1768     case GST_EVENT_CUSTOM_DOWNSTREAM:
1769     case GST_EVENT_CUSTOM_BOTH:
1770       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1771       GST_OBJECT_LOCK (src);
1772       src->priv->pending_events =
1773           g_list_append (src->priv->pending_events, event);
1774       g_atomic_int_set (&src->priv->have_events, TRUE);
1775       GST_OBJECT_UNLOCK (src);
1776       event = NULL;
1777       result = TRUE;
1778       break;
1779     case GST_EVENT_BUFFERSIZE:
1780       /* does not seem to make much sense currently */
1781       break;
1782
1783       /* upstream events */
1784     case GST_EVENT_QOS:
1785       /* elements should override send_event and do something */
1786       break;
1787     case GST_EVENT_SEEK:
1788     {
1789       gboolean started;
1790
1791       GST_OBJECT_LOCK (src->srcpad);
1792       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1793         goto wrong_mode;
1794       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1795       GST_OBJECT_UNLOCK (src->srcpad);
1796
1797       if (started) {
1798         GST_DEBUG_OBJECT (src, "performing seek");
1799         /* when we are running in push mode, we can execute the
1800          * seek right now. */
1801         result = gst_base_src_perform_seek (src, event, TRUE);
1802       } else {
1803         GstEvent **event_p;
1804
1805         /* else we store the event and execute the seek when we
1806          * get activated */
1807         GST_OBJECT_LOCK (src);
1808         GST_DEBUG_OBJECT (src, "queueing seek");
1809         event_p = &src->pending_seek;
1810         gst_event_replace ((GstEvent **) event_p, event);
1811         GST_OBJECT_UNLOCK (src);
1812         /* assume the seek will work */
1813         result = TRUE;
1814       }
1815       break;
1816     }
1817     case GST_EVENT_NAVIGATION:
1818       /* could make sense for elements that do something with navigation events
1819        * but then they would need to override the send_event function */
1820       break;
1821     case GST_EVENT_LATENCY:
1822       /* does not seem to make sense currently */
1823       break;
1824
1825       /* custom events */
1826     case GST_EVENT_CUSTOM_UPSTREAM:
1827       /* override send_event if you want this */
1828       break;
1829     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1830     case GST_EVENT_CUSTOM_BOTH_OOB:
1831       /* insert a random custom event into the pipeline */
1832       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1833       result = gst_pad_push_event (src->srcpad, event);
1834       /* we gave away the ref to the event in the push */
1835       event = NULL;
1836       break;
1837     default:
1838       break;
1839   }
1840 done:
1841   /* if we still have a ref to the event, unref it now */
1842   if (event)
1843     gst_event_unref (event);
1844
1845   return result;
1846
1847   /* ERRORS */
1848 wrong_mode:
1849   {
1850     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1851     GST_OBJECT_UNLOCK (src->srcpad);
1852     result = FALSE;
1853     goto done;
1854   }
1855 }
1856
1857 static gboolean
1858 gst_base_src_seekable (GstBaseSrc * src)
1859 {
1860   GstBaseSrcClass *bclass;
1861   bclass = GST_BASE_SRC_GET_CLASS (src);
1862   if (bclass->is_seekable)
1863     return bclass->is_seekable (src);
1864   else
1865     return FALSE;
1866 }
1867
1868 static void
1869 gst_base_src_update_qos (GstBaseSrc * src,
1870     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1871 {
1872   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1873       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1874       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1875
1876   GST_OBJECT_LOCK (src);
1877   src->priv->proportion = proportion;
1878   src->priv->earliest_time = timestamp + diff;
1879   GST_OBJECT_UNLOCK (src);
1880 }
1881
1882
1883 static gboolean
1884 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1885 {
1886   gboolean result;
1887
1888   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1889
1890   switch (GST_EVENT_TYPE (event)) {
1891     case GST_EVENT_SEEK:
1892       /* is normally called when in push mode */
1893       if (!gst_base_src_seekable (src))
1894         goto not_seekable;
1895
1896       result = gst_base_src_perform_seek (src, event, TRUE);
1897       break;
1898     case GST_EVENT_FLUSH_START:
1899       /* cancel any blocking getrange, is normally called
1900        * when in pull mode. */
1901       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1902       break;
1903     case GST_EVENT_FLUSH_STOP:
1904       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1905       break;
1906     case GST_EVENT_QOS:
1907     {
1908       gdouble proportion;
1909       GstClockTimeDiff diff;
1910       GstClockTime timestamp;
1911
1912       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1913       gst_base_src_update_qos (src, proportion, diff, timestamp);
1914       result = TRUE;
1915       break;
1916     }
1917     case GST_EVENT_RECONFIGURE:
1918       result = TRUE;
1919       break;
1920     case GST_EVENT_LATENCY:
1921       result = TRUE;
1922       break;
1923     default:
1924       result = FALSE;
1925       break;
1926   }
1927   return result;
1928
1929   /* ERRORS */
1930 not_seekable:
1931   {
1932     GST_DEBUG_OBJECT (src, "is not seekable");
1933     return FALSE;
1934   }
1935 }
1936
1937 static gboolean
1938 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1939 {
1940   GstBaseSrc *src;
1941   GstBaseSrcClass *bclass;
1942   gboolean result = FALSE;
1943
1944   src = GST_BASE_SRC (parent);
1945   bclass = GST_BASE_SRC_GET_CLASS (src);
1946
1947   if (bclass->event) {
1948     if (!(result = bclass->event (src, event)))
1949       goto subclass_failed;
1950   }
1951
1952 done:
1953   gst_event_unref (event);
1954
1955   return result;
1956
1957   /* ERRORS */
1958 subclass_failed:
1959   {
1960     GST_DEBUG_OBJECT (src, "subclass refused event");
1961     goto done;
1962   }
1963 }
1964
1965 static void
1966 gst_base_src_set_property (GObject * object, guint prop_id,
1967     const GValue * value, GParamSpec * pspec)
1968 {
1969   GstBaseSrc *src;
1970
1971   src = GST_BASE_SRC (object);
1972
1973   switch (prop_id) {
1974     case PROP_BLOCKSIZE:
1975       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1976       break;
1977     case PROP_NUM_BUFFERS:
1978       src->num_buffers = g_value_get_int (value);
1979       break;
1980     case PROP_TYPEFIND:
1981       src->typefind = g_value_get_boolean (value);
1982       break;
1983     case PROP_DO_TIMESTAMP:
1984       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1985       break;
1986     default:
1987       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1988       break;
1989   }
1990 }
1991
1992 static void
1993 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1994     GParamSpec * pspec)
1995 {
1996   GstBaseSrc *src;
1997
1998   src = GST_BASE_SRC (object);
1999
2000   switch (prop_id) {
2001     case PROP_BLOCKSIZE:
2002       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2003       break;
2004     case PROP_NUM_BUFFERS:
2005       g_value_set_int (value, src->num_buffers);
2006       break;
2007     case PROP_TYPEFIND:
2008       g_value_set_boolean (value, src->typefind);
2009       break;
2010     case PROP_DO_TIMESTAMP:
2011       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2012       break;
2013     default:
2014       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2015       break;
2016   }
2017 }
2018
2019 /* with STREAM_LOCK and LOCK */
2020 static GstClockReturn
2021 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2022 {
2023   GstClockReturn ret;
2024   GstClockID id;
2025
2026   id = gst_clock_new_single_shot_id (clock, time);
2027
2028   basesrc->clock_id = id;
2029   /* release the live lock while waiting */
2030   GST_LIVE_UNLOCK (basesrc);
2031
2032   ret = gst_clock_id_wait (id, NULL);
2033
2034   GST_LIVE_LOCK (basesrc);
2035   gst_clock_id_unref (id);
2036   basesrc->clock_id = NULL;
2037
2038   return ret;
2039 }
2040
2041 /* perform synchronisation on a buffer.
2042  * with STREAM_LOCK.
2043  */
2044 static GstClockReturn
2045 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2046 {
2047   GstClockReturn result;
2048   GstClockTime start, end;
2049   GstBaseSrcClass *bclass;
2050   GstClockTime base_time;
2051   GstClock *clock;
2052   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2053   gboolean do_timestamp, first, pseudo_live, is_live;
2054
2055   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2056
2057   start = end = -1;
2058   if (bclass->get_times)
2059     bclass->get_times (basesrc, buffer, &start, &end);
2060
2061   /* get buffer timestamp */
2062   dts = GST_BUFFER_DTS (buffer);
2063   pts = GST_BUFFER_PTS (buffer);
2064
2065   if (GST_CLOCK_TIME_IS_VALID (dts))
2066     timestamp = dts;
2067   else
2068     timestamp = pts;
2069
2070   /* grab the lock to prepare for clocking and calculate the startup
2071    * latency. */
2072   GST_OBJECT_LOCK (basesrc);
2073
2074   is_live = basesrc->is_live;
2075   /* if we are asked to sync against the clock we are a pseudo live element */
2076   pseudo_live = (start != -1 && is_live);
2077   /* check for the first buffer */
2078   first = (basesrc->priv->latency == -1);
2079
2080   if (timestamp != -1 && pseudo_live) {
2081     GstClockTime latency;
2082
2083     /* we have a timestamp and a sync time, latency is the diff */
2084     if (timestamp <= start)
2085       latency = start - timestamp;
2086     else
2087       latency = 0;
2088
2089     if (first) {
2090       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2091           GST_TIME_ARGS (latency));
2092       /* first time we calculate latency, just configure */
2093       basesrc->priv->latency = latency;
2094     } else {
2095       if (basesrc->priv->latency != latency) {
2096         /* we have a new latency, FIXME post latency message */
2097         basesrc->priv->latency = latency;
2098         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2099             GST_TIME_ARGS (latency));
2100       }
2101     }
2102   } else if (first) {
2103     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2104         is_live, start != -1);
2105     basesrc->priv->latency = 0;
2106   }
2107
2108   /* get clock, if no clock, we can't sync or do timestamps */
2109   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2110     goto no_clock;
2111   else
2112     gst_object_ref (clock);
2113
2114   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2115
2116   do_timestamp = basesrc->priv->do_timestamp;
2117   GST_OBJECT_UNLOCK (basesrc);
2118
2119   /* first buffer, calculate the timestamp offset */
2120   if (first) {
2121     GstClockTime running_time;
2122
2123     now = gst_clock_get_time (clock);
2124     running_time = now - base_time;
2125
2126     GST_LOG_OBJECT (basesrc,
2127         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2128         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2129         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2130
2131     if (pseudo_live && timestamp != -1) {
2132       /* live source and we need to sync, add startup latency to all timestamps
2133        * to get the real running_time. Live sources should always timestamp
2134        * according to the current running time. */
2135       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2136
2137       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2138           GST_TIME_ARGS (basesrc->priv->ts_offset));
2139     } else {
2140       basesrc->priv->ts_offset = 0;
2141       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2142     }
2143
2144     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2145       if (do_timestamp) {
2146         dts = running_time;
2147       } else {
2148         dts = 0;
2149       }
2150       GST_BUFFER_DTS (buffer) = dts;
2151
2152       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2153           GST_TIME_ARGS (dts));
2154     }
2155   } else {
2156     /* not the first buffer, the timestamp is the diff between the clock and
2157      * base_time */
2158     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2159       now = gst_clock_get_time (clock);
2160
2161       dts = now - base_time;
2162       GST_BUFFER_DTS (buffer) = dts;
2163
2164       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2165           GST_TIME_ARGS (dts));
2166     }
2167   }
2168   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2169     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2170       pts = dts;
2171
2172     GST_BUFFER_PTS (buffer) = dts;
2173
2174     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2175         GST_TIME_ARGS (pts));
2176   }
2177
2178   /* if we don't have a buffer timestamp, we don't sync */
2179   if (!GST_CLOCK_TIME_IS_VALID (start))
2180     goto no_sync;
2181
2182   if (is_live) {
2183     /* for pseudo live sources, add our ts_offset to the timestamp */
2184     if (GST_CLOCK_TIME_IS_VALID (pts))
2185       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2186     if (GST_CLOCK_TIME_IS_VALID (dts))
2187       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2188     start += basesrc->priv->ts_offset;
2189   }
2190
2191   GST_LOG_OBJECT (basesrc,
2192       "waiting for clock, base time %" GST_TIME_FORMAT
2193       ", stream_start %" GST_TIME_FORMAT,
2194       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2195
2196   result = gst_base_src_wait (basesrc, clock, start + base_time);
2197
2198   gst_object_unref (clock);
2199
2200   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2201
2202   return result;
2203
2204   /* special cases */
2205 no_clock:
2206   {
2207     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2208     GST_OBJECT_UNLOCK (basesrc);
2209     return GST_CLOCK_OK;
2210   }
2211 no_sync:
2212   {
2213     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2214     gst_object_unref (clock);
2215     return GST_CLOCK_OK;
2216   }
2217 }
2218
2219 /* Called with STREAM_LOCK and LIVE_LOCK */
2220 static gboolean
2221 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2222 {
2223   guint64 size, maxsize;
2224   GstBaseSrcClass *bclass;
2225   GstFormat format;
2226   gint64 stop;
2227   gboolean dynamic;
2228
2229   bclass = GST_BASE_SRC_GET_CLASS (src);
2230
2231   format = src->segment.format;
2232   stop = src->segment.stop;
2233   /* get total file size */
2234   size = src->segment.duration;
2235
2236   /* only operate if we are working with bytes */
2237   if (format != GST_FORMAT_BYTES)
2238     return TRUE;
2239
2240   /* the max amount of bytes to read is the total size or
2241    * up to the segment.stop if present. */
2242   if (stop != -1)
2243     maxsize = MIN (size, stop);
2244   else
2245     maxsize = size;
2246
2247   GST_DEBUG_OBJECT (src,
2248       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2249       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2250       *length, size, stop, maxsize);
2251
2252   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2253   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2254
2255   /* check size if we have one */
2256   if (maxsize != -1) {
2257     /* if we run past the end, check if the file became bigger and
2258      * retry. */
2259     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2260       /* see if length of the file changed */
2261       if (bclass->get_size)
2262         if (!bclass->get_size (src, &size))
2263           size = -1;
2264
2265       /* make sure we don't exceed the configured segment stop
2266        * if it was set */
2267       if (stop != -1)
2268         maxsize = MIN (size, stop);
2269       else
2270         maxsize = size;
2271
2272       /* if we are at or past the end, EOS */
2273       if (G_UNLIKELY (offset >= maxsize))
2274         goto unexpected_length;
2275
2276       /* else we can clip to the end */
2277       if (G_UNLIKELY (offset + *length >= maxsize))
2278         *length = maxsize - offset;
2279
2280     }
2281   }
2282
2283   /* keep track of current duration.
2284    * segment is in bytes, we checked that above. */
2285   GST_OBJECT_LOCK (src);
2286   src->segment.duration = size;
2287   GST_OBJECT_UNLOCK (src);
2288
2289   return TRUE;
2290
2291   /* ERRORS */
2292 unexpected_length:
2293   {
2294     return FALSE;
2295   }
2296 }
2297
2298 /* must be called with LIVE_LOCK */
2299 static GstFlowReturn
2300 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2301     GstBuffer ** buf)
2302 {
2303   GstFlowReturn ret;
2304   GstBaseSrcClass *bclass;
2305   GstClockReturn status;
2306   GstBuffer *res_buf;
2307   GstBuffer *in_buf;
2308
2309   bclass = GST_BASE_SRC_GET_CLASS (src);
2310
2311 again:
2312   if (src->is_live) {
2313     if (G_UNLIKELY (!src->live_running)) {
2314       ret = gst_base_src_wait_playing (src);
2315       if (ret != GST_FLOW_OK)
2316         goto stopped;
2317     }
2318   }
2319
2320   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2321           && !GST_BASE_SRC_IS_STARTING (src)))
2322     goto not_started;
2323
2324   if (G_UNLIKELY (!bclass->create))
2325     goto no_function;
2326
2327   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2328     goto unexpected_length;
2329
2330   /* track position */
2331   GST_OBJECT_LOCK (src);
2332   if (src->segment.format == GST_FORMAT_BYTES)
2333     src->segment.position = offset;
2334   GST_OBJECT_UNLOCK (src);
2335
2336   /* normally we don't count buffers */
2337   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2338     if (src->num_buffers_left == 0)
2339       goto reached_num_buffers;
2340     else
2341       src->num_buffers_left--;
2342   }
2343
2344   /* don't enter the create function if a pending EOS event was set. For the
2345    * logic of the pending_eos, check the event function of this class. */
2346   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2347     goto eos;
2348
2349   GST_DEBUG_OBJECT (src,
2350       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2351       G_GINT64_FORMAT, offset, length, src->segment.time);
2352
2353   res_buf = in_buf = *buf;
2354
2355   ret = bclass->create (src, offset, length, &res_buf);
2356
2357   /* The create function could be unlocked because we have a pending EOS. It's
2358    * possible that we have a valid buffer from create that we need to
2359    * discard when the create function returned _OK. */
2360   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2361     if (ret == GST_FLOW_OK) {
2362       if (*buf == NULL)
2363         gst_buffer_unref (res_buf);
2364     }
2365     goto eos;
2366   }
2367
2368   if (G_UNLIKELY (ret != GST_FLOW_OK))
2369     goto not_ok;
2370
2371   /* fallback in case the create function didn't fill a provided buffer */
2372   if (in_buf != NULL && res_buf != in_buf) {
2373     GstMapInfo info;
2374     gsize copied_size;
2375
2376     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2377         "fill the provided buffer, copying");
2378
2379     gst_buffer_map (in_buf, &info, GST_MAP_WRITE);
2380     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2381     gst_buffer_unmap (in_buf, &info);
2382     gst_buffer_set_size (in_buf, copied_size);
2383
2384     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2385
2386     gst_buffer_unref (res_buf);
2387     res_buf = in_buf;
2388   }
2389
2390   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2391   if (offset == 0 && src->segment.time == 0
2392       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2393     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2394     res_buf = gst_buffer_make_writable (res_buf);
2395     GST_BUFFER_DTS (res_buf) = 0;
2396   }
2397
2398   /* now sync before pushing the buffer */
2399   status = gst_base_src_do_sync (src, res_buf);
2400
2401   /* waiting for the clock could have made us flushing */
2402   if (G_UNLIKELY (src->priv->flushing))
2403     goto flushing;
2404
2405   switch (status) {
2406     case GST_CLOCK_EARLY:
2407       /* the buffer is too late. We currently don't drop the buffer. */
2408       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2409       break;
2410     case GST_CLOCK_OK:
2411       /* buffer synchronised properly */
2412       GST_DEBUG_OBJECT (src, "buffer ok");
2413       break;
2414     case GST_CLOCK_UNSCHEDULED:
2415       /* this case is triggered when we were waiting for the clock and
2416        * it got unlocked because we did a state change. In any case, get rid of
2417        * the buffer. */
2418       if (*buf == NULL)
2419         gst_buffer_unref (res_buf);
2420
2421       if (!src->live_running) {
2422         /* We return FLUSHING when we are not running to stop the dataflow also
2423          * get rid of the produced buffer. */
2424         GST_DEBUG_OBJECT (src,
2425             "clock was unscheduled (%d), returning FLUSHING", status);
2426         ret = GST_FLOW_FLUSHING;
2427       } else {
2428         /* If we are running when this happens, we quickly switched between
2429          * pause and playing. We try to produce a new buffer */
2430         GST_DEBUG_OBJECT (src,
2431             "clock was unscheduled (%d), but we are running", status);
2432         goto again;
2433       }
2434       break;
2435     default:
2436       /* all other result values are unexpected and errors */
2437       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2438           (_("Internal clock error.")),
2439           ("clock returned unexpected return value %d", status));
2440       if (*buf == NULL)
2441         gst_buffer_unref (res_buf);
2442       ret = GST_FLOW_ERROR;
2443       break;
2444   }
2445   if (G_LIKELY (ret == GST_FLOW_OK))
2446     *buf = res_buf;
2447
2448   return ret;
2449
2450   /* ERROR */
2451 stopped:
2452   {
2453     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2454         gst_flow_get_name (ret));
2455     return ret;
2456   }
2457 not_ok:
2458   {
2459     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2460         gst_flow_get_name (ret));
2461     return ret;
2462   }
2463 not_started:
2464   {
2465     GST_DEBUG_OBJECT (src, "getrange but not started");
2466     return GST_FLOW_FLUSHING;
2467   }
2468 no_function:
2469   {
2470     GST_DEBUG_OBJECT (src, "no create function");
2471     return GST_FLOW_NOT_SUPPORTED;
2472   }
2473 unexpected_length:
2474   {
2475     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2476         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2477     return GST_FLOW_EOS;
2478   }
2479 reached_num_buffers:
2480   {
2481     GST_DEBUG_OBJECT (src, "sent all buffers");
2482     return GST_FLOW_EOS;
2483   }
2484 flushing:
2485   {
2486     GST_DEBUG_OBJECT (src, "we are flushing");
2487     if (*buf == NULL)
2488       gst_buffer_unref (res_buf);
2489     return GST_FLOW_FLUSHING;
2490   }
2491 eos:
2492   {
2493     GST_DEBUG_OBJECT (src, "we are EOS");
2494     return GST_FLOW_EOS;
2495   }
2496 }
2497
2498 static GstFlowReturn
2499 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2500     guint length, GstBuffer ** buf)
2501 {
2502   GstBaseSrc *src;
2503   GstFlowReturn res;
2504
2505   src = GST_BASE_SRC_CAST (parent);
2506
2507   GST_LIVE_LOCK (src);
2508   if (G_UNLIKELY (src->priv->flushing))
2509     goto flushing;
2510
2511   res = gst_base_src_get_range (src, offset, length, buf);
2512
2513 done:
2514   GST_LIVE_UNLOCK (src);
2515
2516   return res;
2517
2518   /* ERRORS */
2519 flushing:
2520   {
2521     GST_DEBUG_OBJECT (src, "we are flushing");
2522     res = GST_FLOW_FLUSHING;
2523     goto done;
2524   }
2525 }
2526
2527 static gboolean
2528 gst_base_src_is_random_access (GstBaseSrc * src)
2529 {
2530   /* we need to start the basesrc to check random access */
2531   if (!GST_BASE_SRC_IS_STARTED (src)) {
2532     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2533     if (G_LIKELY (gst_base_src_start (src))) {
2534       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2535         goto start_failed;
2536       gst_base_src_stop (src);
2537     }
2538   }
2539
2540   return src->random_access;
2541
2542   /* ERRORS */
2543 start_failed:
2544   {
2545     GST_DEBUG_OBJECT (src, "failed to start");
2546     return FALSE;
2547   }
2548 }
2549
2550 static void
2551 gst_base_src_loop (GstPad * pad)
2552 {
2553   GstBaseSrc *src;
2554   GstBuffer *buf = NULL;
2555   GstFlowReturn ret;
2556   gint64 position;
2557   gboolean eos;
2558   guint blocksize;
2559   GList *pending_events = NULL, *tmp;
2560
2561   eos = FALSE;
2562
2563   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2564
2565   gst_base_src_send_stream_start (src);
2566
2567   /* check if we need to renegotiate */
2568   if (gst_pad_check_reconfigure (pad)) {
2569     if (!gst_base_src_negotiate (src))
2570       goto not_negotiated;
2571   }
2572
2573   GST_LIVE_LOCK (src);
2574
2575   if (G_UNLIKELY (src->priv->flushing))
2576     goto flushing;
2577
2578   blocksize = src->blocksize;
2579
2580   /* if we operate in bytes, we can calculate an offset */
2581   if (src->segment.format == GST_FORMAT_BYTES) {
2582     position = src->segment.position;
2583     /* for negative rates, start with subtracting the blocksize */
2584     if (src->segment.rate < 0.0) {
2585       /* we cannot go below segment.start */
2586       if (position > src->segment.start + blocksize)
2587         position -= blocksize;
2588       else {
2589         /* last block, remainder up to segment.start */
2590         blocksize = position - src->segment.start;
2591         position = src->segment.start;
2592       }
2593     }
2594   } else
2595     position = -1;
2596
2597   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2598       GST_TIME_ARGS (position), blocksize);
2599
2600   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2601   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2602     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2603         gst_flow_get_name (ret));
2604     GST_LIVE_UNLOCK (src);
2605     goto pause;
2606   }
2607   /* this should not happen */
2608   if (G_UNLIKELY (buf == NULL))
2609     goto null_buffer;
2610
2611   /* push events to close/start our segment before we push the buffer. */
2612   if (G_UNLIKELY (src->priv->segment_pending)) {
2613     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2614     src->priv->segment_pending = FALSE;
2615   }
2616
2617   if (g_atomic_int_get (&src->priv->have_events)) {
2618     GST_OBJECT_LOCK (src);
2619     /* take the events */
2620     pending_events = src->priv->pending_events;
2621     src->priv->pending_events = NULL;
2622     g_atomic_int_set (&src->priv->have_events, FALSE);
2623     GST_OBJECT_UNLOCK (src);
2624   }
2625
2626   /* Push out pending events if any */
2627   if (G_UNLIKELY (pending_events != NULL)) {
2628     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2629       GstEvent *ev = (GstEvent *) tmp->data;
2630       gst_pad_push_event (pad, ev);
2631     }
2632     g_list_free (pending_events);
2633   }
2634
2635   /* figure out the new position */
2636   switch (src->segment.format) {
2637     case GST_FORMAT_BYTES:
2638     {
2639       guint bufsize = gst_buffer_get_size (buf);
2640
2641       /* we subtracted above for negative rates */
2642       if (src->segment.rate >= 0.0)
2643         position += bufsize;
2644       break;
2645     }
2646     case GST_FORMAT_TIME:
2647     {
2648       GstClockTime start, duration;
2649
2650       start = GST_BUFFER_TIMESTAMP (buf);
2651       duration = GST_BUFFER_DURATION (buf);
2652
2653       if (GST_CLOCK_TIME_IS_VALID (start))
2654         position = start;
2655       else
2656         position = src->segment.position;
2657
2658       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2659         if (src->segment.rate >= 0.0)
2660           position += duration;
2661         else if (position > duration)
2662           position -= duration;
2663         else
2664           position = 0;
2665       }
2666       break;
2667     }
2668     case GST_FORMAT_DEFAULT:
2669       if (src->segment.rate >= 0.0)
2670         position = GST_BUFFER_OFFSET_END (buf);
2671       else
2672         position = GST_BUFFER_OFFSET (buf);
2673       break;
2674     default:
2675       position = -1;
2676       break;
2677   }
2678   if (position != -1) {
2679     if (src->segment.rate >= 0.0) {
2680       /* positive rate, check if we reached the stop */
2681       if (src->segment.stop != -1) {
2682         if (position >= src->segment.stop) {
2683           eos = TRUE;
2684           position = src->segment.stop;
2685         }
2686       }
2687     } else {
2688       /* negative rate, check if we reached the start. start is always set to
2689        * something different from -1 */
2690       if (position <= src->segment.start) {
2691         eos = TRUE;
2692         position = src->segment.start;
2693       }
2694       /* when going reverse, all buffers are DISCONT */
2695       src->priv->discont = TRUE;
2696     }
2697     GST_OBJECT_LOCK (src);
2698     src->segment.position = position;
2699     GST_OBJECT_UNLOCK (src);
2700   }
2701
2702   if (G_UNLIKELY (src->priv->discont)) {
2703     GST_INFO_OBJECT (src, "marking pending DISCONT");
2704     buf = gst_buffer_make_writable (buf);
2705     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2706     src->priv->discont = FALSE;
2707   }
2708   GST_LIVE_UNLOCK (src);
2709
2710   ret = gst_pad_push (pad, buf);
2711   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2712     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2713       goto not_negotiated;
2714     }
2715     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2716         gst_flow_get_name (ret));
2717     goto pause;
2718   }
2719
2720   if (G_UNLIKELY (eos)) {
2721     GST_INFO_OBJECT (src, "pausing after end of segment");
2722     ret = GST_FLOW_EOS;
2723     goto pause;
2724   }
2725
2726 done:
2727   return;
2728
2729   /* special cases */
2730 not_negotiated:
2731   {
2732     if (gst_pad_needs_reconfigure (pad)) {
2733       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2734       return;
2735     }
2736     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2737     ret = GST_FLOW_NOT_NEGOTIATED;
2738     goto pause;
2739   }
2740 flushing:
2741   {
2742     GST_DEBUG_OBJECT (src, "we are flushing");
2743     GST_LIVE_UNLOCK (src);
2744     ret = GST_FLOW_FLUSHING;
2745     goto pause;
2746   }
2747 pause:
2748   {
2749     const gchar *reason = gst_flow_get_name (ret);
2750     GstEvent *event;
2751
2752     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2753     src->running = FALSE;
2754     gst_pad_pause_task (pad);
2755     if (ret == GST_FLOW_EOS) {
2756       gboolean flag_segment;
2757       GstFormat format;
2758       gint64 position;
2759
2760       /* perform EOS logic */
2761       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2762       format = src->segment.format;
2763       position = src->segment.position;
2764
2765       if (flag_segment) {
2766         GstMessage *message;
2767
2768         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2769             format, position);
2770         gst_message_set_seqnum (message, src->priv->seqnum);
2771         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2772         event = gst_event_new_segment_done (format, position);
2773         gst_event_set_seqnum (event, src->priv->seqnum);
2774         gst_pad_push_event (pad, event);
2775       } else {
2776         event = gst_event_new_eos ();
2777         gst_event_set_seqnum (event, src->priv->seqnum);
2778         gst_pad_push_event (pad, event);
2779       }
2780     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2781       event = gst_event_new_eos ();
2782       gst_event_set_seqnum (event, src->priv->seqnum);
2783       /* for fatal errors we post an error message, post the error
2784        * first so the app knows about the error first.
2785        * Also don't do this for FLUSHING because it happens
2786        * due to flushing and posting an error message because of
2787        * that is the wrong thing to do, e.g. when we're doing
2788        * a flushing seek. */
2789       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2790           (_("Internal data flow error.")),
2791           ("streaming task paused, reason %s (%d)", reason, ret));
2792       gst_pad_push_event (pad, event);
2793     }
2794     goto done;
2795   }
2796 null_buffer:
2797   {
2798     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2799         (_("Internal data flow error.")), ("element returned NULL buffer"));
2800     GST_LIVE_UNLOCK (src);
2801     goto done;
2802   }
2803 }
2804
2805 static gboolean
2806 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2807     GstAllocator * allocator, GstAllocationParams * params)
2808 {
2809   GstAllocator *oldalloc;
2810   GstBufferPool *oldpool;
2811   GstBaseSrcPrivate *priv = basesrc->priv;
2812
2813   if (pool) {
2814     GST_DEBUG_OBJECT (basesrc, "activate pool");
2815     if (!gst_buffer_pool_set_active (pool, TRUE))
2816       goto activate_failed;
2817   }
2818
2819   GST_OBJECT_LOCK (basesrc);
2820   oldpool = priv->pool;
2821   priv->pool = pool;
2822
2823   oldalloc = priv->allocator;
2824   priv->allocator = allocator;
2825
2826   if (params)
2827     priv->params = *params;
2828   else
2829     gst_allocation_params_init (&priv->params);
2830   GST_OBJECT_UNLOCK (basesrc);
2831
2832   if (oldpool) {
2833     /* only deactivate if the pool is not the one we're using */
2834     if (oldpool != pool) {
2835       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2836       gst_buffer_pool_set_active (oldpool, FALSE);
2837     }
2838     gst_object_unref (oldpool);
2839   }
2840   if (oldalloc) {
2841     gst_object_unref (oldalloc);
2842   }
2843   return TRUE;
2844
2845   /* ERRORS */
2846 activate_failed:
2847   {
2848     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2849     return FALSE;
2850   }
2851 }
2852
2853 static gboolean
2854 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2855 {
2856   GstBaseSrcPrivate *priv = basesrc->priv;
2857   GstBufferPool *pool;
2858   gboolean res = TRUE;
2859
2860   GST_OBJECT_LOCK (basesrc);
2861   if ((pool = priv->pool))
2862     pool = gst_object_ref (pool);
2863   GST_OBJECT_UNLOCK (basesrc);
2864
2865   if (pool) {
2866     res = gst_buffer_pool_set_active (pool, active);
2867     gst_object_unref (pool);
2868   }
2869   return res;
2870 }
2871
2872
2873 static gboolean
2874 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2875 {
2876   GstCaps *outcaps;
2877   GstBufferPool *pool;
2878   guint size, min, max;
2879   GstAllocator *allocator;
2880   GstAllocationParams params;
2881   GstStructure *config;
2882   gboolean update_allocator;
2883
2884   gst_query_parse_allocation (query, &outcaps, NULL);
2885
2886   /* we got configuration from our peer or the decide_allocation method,
2887    * parse them */
2888   if (gst_query_get_n_allocation_params (query) > 0) {
2889     /* try the allocator */
2890     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2891     update_allocator = TRUE;
2892   } else {
2893     allocator = NULL;
2894     gst_allocation_params_init (&params);
2895     update_allocator = FALSE;
2896   }
2897
2898   if (gst_query_get_n_allocation_pools (query) > 0) {
2899     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2900
2901     if (pool == NULL) {
2902       /* no pool, we can make our own */
2903       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2904       pool = gst_buffer_pool_new ();
2905     }
2906   } else {
2907     pool = NULL;
2908     size = min = max = 0;
2909   }
2910
2911   /* now configure */
2912   if (pool) {
2913     config = gst_buffer_pool_get_config (pool);
2914     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2915     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2916     gst_buffer_pool_set_config (pool, config);
2917   }
2918
2919   if (update_allocator)
2920     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2921   else
2922     gst_query_add_allocation_param (query, allocator, &params);
2923   if (allocator)
2924     gst_object_unref (allocator);
2925
2926   if (pool) {
2927     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2928     gst_object_unref (pool);
2929   }
2930
2931   return TRUE;
2932 }
2933
2934 static gboolean
2935 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2936 {
2937   GstBaseSrcClass *bclass;
2938   gboolean result = TRUE;
2939   GstQuery *query;
2940   GstBufferPool *pool = NULL;
2941   GstAllocator *allocator = NULL;
2942   GstAllocationParams params;
2943
2944   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2945
2946   /* make query and let peer pad answer, we don't really care if it worked or
2947    * not, if it failed, the allocation query would contain defaults and the
2948    * subclass would then set better values if needed */
2949   query = gst_query_new_allocation (caps, TRUE);
2950   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2951     /* not a problem, just debug a little */
2952     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2953   }
2954
2955   g_assert (bclass->decide_allocation != NULL);
2956   result = bclass->decide_allocation (basesrc, query);
2957
2958   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2959       query);
2960
2961   if (!result)
2962     goto no_decide_allocation;
2963
2964   /* we got configuration from our peer or the decide_allocation method,
2965    * parse them */
2966   if (gst_query_get_n_allocation_params (query) > 0) {
2967     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2968   } else {
2969     allocator = NULL;
2970     gst_allocation_params_init (&params);
2971   }
2972
2973   if (gst_query_get_n_allocation_pools (query) > 0)
2974     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2975
2976   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2977
2978   gst_query_unref (query);
2979
2980   return result;
2981
2982   /* Errors */
2983 no_decide_allocation:
2984   {
2985     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
2986     gst_query_unref (query);
2987
2988     return result;
2989   }
2990 }
2991
2992 /* default negotiation code.
2993  *
2994  * Take intersection between src and sink pads, take first
2995  * caps and fixate.
2996  */
2997 static gboolean
2998 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2999 {
3000   GstCaps *thiscaps;
3001   GstCaps *caps = NULL;
3002   GstCaps *peercaps = NULL;
3003   gboolean result = FALSE;
3004
3005   /* first see what is possible on our source pad */
3006   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3007   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3008   /* nothing or anything is allowed, we're done */
3009   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3010     goto no_nego_needed;
3011
3012   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3013     goto no_caps;
3014
3015   /* get the peer caps */
3016   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3017   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3018   if (peercaps) {
3019     /* The result is already a subset of our caps */
3020     caps = peercaps;
3021     gst_caps_unref (thiscaps);
3022   } else {
3023     /* no peer, work with our own caps then */
3024     caps = thiscaps;
3025   }
3026   if (caps && !gst_caps_is_empty (caps)) {
3027     /* now fixate */
3028     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3029     if (gst_caps_is_any (caps)) {
3030       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3031       /* hmm, still anything, so element can do anything and
3032        * nego is not needed */
3033       result = TRUE;
3034     } else {
3035       caps = gst_base_src_fixate (basesrc, caps);
3036       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3037       if (gst_caps_is_fixed (caps)) {
3038         /* yay, fixed caps, use those then, it's possible that the subclass does
3039          * not accept this caps after all and we have to fail. */
3040         result = gst_base_src_set_caps (basesrc, caps);
3041       }
3042     }
3043     gst_caps_unref (caps);
3044   } else {
3045     if (caps)
3046       gst_caps_unref (caps);
3047     GST_DEBUG_OBJECT (basesrc, "no common caps");
3048   }
3049   return result;
3050
3051 no_nego_needed:
3052   {
3053     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3054     if (thiscaps)
3055       gst_caps_unref (thiscaps);
3056     return TRUE;
3057   }
3058 no_caps:
3059   {
3060     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3061         ("No supported formats found"),
3062         ("This element did not produce valid caps"));
3063     if (thiscaps)
3064       gst_caps_unref (thiscaps);
3065     return TRUE;
3066   }
3067 }
3068
3069 static gboolean
3070 gst_base_src_negotiate (GstBaseSrc * basesrc)
3071 {
3072   GstBaseSrcClass *bclass;
3073   gboolean result;
3074
3075   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3076
3077   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3078
3079   if (G_LIKELY (bclass->negotiate))
3080     result = bclass->negotiate (basesrc);
3081   else
3082     result = TRUE;
3083
3084   if (G_LIKELY (result)) {
3085     GstCaps *caps;
3086
3087     caps = gst_pad_get_current_caps (basesrc->srcpad);
3088
3089     result = gst_base_src_prepare_allocation (basesrc, caps);
3090
3091     if (caps)
3092       gst_caps_unref (caps);
3093   }
3094   return result;
3095 }
3096
3097 static gboolean
3098 gst_base_src_start (GstBaseSrc * basesrc)
3099 {
3100   GstBaseSrcClass *bclass;
3101   gboolean result;
3102
3103   GST_LIVE_LOCK (basesrc);
3104
3105   GST_OBJECT_LOCK (basesrc);
3106   if (GST_BASE_SRC_IS_STARTING (basesrc))
3107     goto was_starting;
3108   if (GST_BASE_SRC_IS_STARTED (basesrc))
3109     goto was_started;
3110
3111   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3112   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3113   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3114   GST_OBJECT_UNLOCK (basesrc);
3115
3116   basesrc->num_buffers_left = basesrc->num_buffers;
3117   basesrc->running = FALSE;
3118   basesrc->priv->segment_pending = FALSE;
3119   GST_LIVE_UNLOCK (basesrc);
3120
3121   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3122   if (bclass->start)
3123     result = bclass->start (basesrc);
3124   else
3125     result = TRUE;
3126
3127   if (!result)
3128     goto could_not_start;
3129
3130   if (!gst_base_src_is_async (basesrc)) {
3131     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3132     /* not really waiting here, we call this to get the result
3133      * from the start_complete call */
3134     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3135   }
3136
3137   return result;
3138
3139   /* ERROR */
3140 was_starting:
3141   {
3142     GST_DEBUG_OBJECT (basesrc, "was starting");
3143     GST_OBJECT_UNLOCK (basesrc);
3144     GST_LIVE_UNLOCK (basesrc);
3145     return TRUE;
3146   }
3147 was_started:
3148   {
3149     GST_DEBUG_OBJECT (basesrc, "was started");
3150     GST_OBJECT_UNLOCK (basesrc);
3151     GST_LIVE_UNLOCK (basesrc);
3152     return TRUE;
3153   }
3154 could_not_start:
3155   {
3156     GST_DEBUG_OBJECT (basesrc, "could not start");
3157     /* subclass is supposed to post a message. We don't have to call _stop. */
3158     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3159     return FALSE;
3160   }
3161 }
3162
3163 /**
3164  * gst_base_src_start_complete:
3165  * @basesrc: base source instance
3166  * @ret: a #GstFlowReturn
3167  *
3168  * Complete an asynchronous start operation. When the subclass overrides the
3169  * start method, it should call gst_base_src_start_complete() when the start
3170  * operation completes either from the same thread or from an asynchronous
3171  * helper thread.
3172  */
3173 void
3174 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3175 {
3176   gboolean have_size;
3177   guint64 size;
3178   gboolean seekable;
3179   GstFormat format;
3180   GstPadMode mode;
3181   GstEvent *event;
3182
3183   if (ret != GST_FLOW_OK)
3184     goto error;
3185
3186   GST_DEBUG_OBJECT (basesrc, "starting source");
3187   format = basesrc->segment.format;
3188
3189   /* figure out the size */
3190   have_size = FALSE;
3191   size = -1;
3192   if (format == GST_FORMAT_BYTES) {
3193     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3194
3195     if (bclass->get_size) {
3196       if (!(have_size = bclass->get_size (basesrc, &size)))
3197         size = -1;
3198     }
3199     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3200     /* only update the size when operating in bytes, subclass is supposed
3201      * to set duration in the start method for other formats */
3202     GST_OBJECT_LOCK (basesrc);
3203     basesrc->segment.duration = size;
3204     GST_OBJECT_UNLOCK (basesrc);
3205   }
3206
3207   GST_DEBUG_OBJECT (basesrc,
3208       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3209       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3210       basesrc->segment.duration);
3211
3212   seekable = gst_base_src_seekable (basesrc);
3213   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3214
3215   /* update for random access flag */
3216   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3217
3218   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3219
3220   /* stop flushing now but for live sources, still block in the LIVE lock when
3221    * we are not yet PLAYING */
3222   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3223
3224   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3225
3226   GST_OBJECT_LOCK (basesrc->srcpad);
3227   mode = GST_PAD_MODE (basesrc->srcpad);
3228   GST_OBJECT_UNLOCK (basesrc->srcpad);
3229
3230   /* take the stream lock here, we only want to let the task run when we have
3231    * set the STARTED flag */
3232   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3233   if (mode == GST_PAD_MODE_PUSH) {
3234     /* do initial seek, which will start the task */
3235     GST_OBJECT_LOCK (basesrc);
3236     event = basesrc->pending_seek;
3237     basesrc->pending_seek = NULL;
3238     GST_OBJECT_UNLOCK (basesrc);
3239
3240     /* The perform seek code will start the task when finished. We don't have to
3241      * unlock the streaming thread because it is not running yet */
3242     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3243       goto seek_failed;
3244
3245     if (event)
3246       gst_event_unref (event);
3247   } else {
3248     /* if not random_access, we cannot operate in pull mode for now */
3249     if (G_UNLIKELY (!basesrc->random_access))
3250       goto no_get_range;
3251   }
3252
3253   GST_OBJECT_LOCK (basesrc);
3254   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3255   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3256   basesrc->priv->start_result = ret;
3257   GST_ASYNC_SIGNAL (basesrc);
3258   GST_OBJECT_UNLOCK (basesrc);
3259
3260   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3261
3262   return;
3263
3264 seek_failed:
3265   {
3266     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3267     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3268     gst_base_src_stop (basesrc);
3269     if (event)
3270       gst_event_unref (event);
3271     ret = GST_FLOW_ERROR;
3272     goto error;
3273   }
3274 no_get_range:
3275   {
3276     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3277     gst_base_src_stop (basesrc);
3278     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3279     ret = GST_FLOW_ERROR;
3280     goto error;
3281   }
3282 error:
3283   {
3284     GST_OBJECT_LOCK (basesrc);
3285     basesrc->priv->start_result = ret;
3286     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3287     GST_ASYNC_SIGNAL (basesrc);
3288     GST_OBJECT_UNLOCK (basesrc);
3289     return;
3290   }
3291 }
3292
3293 /**
3294  * gst_base_src_start_wait:
3295  * @basesrc: base source instance
3296  *
3297  * Wait until the start operation completes.
3298  *
3299  * Returns: a #GstFlowReturn.
3300  */
3301 GstFlowReturn
3302 gst_base_src_start_wait (GstBaseSrc * basesrc)
3303 {
3304   GstFlowReturn result;
3305
3306   GST_OBJECT_LOCK (basesrc);
3307   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3308     GST_ASYNC_WAIT (basesrc);
3309   }
3310   result = basesrc->priv->start_result;
3311   GST_OBJECT_UNLOCK (basesrc);
3312
3313   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3314
3315   return result;
3316 }
3317
3318 static gboolean
3319 gst_base_src_stop (GstBaseSrc * basesrc)
3320 {
3321   GstBaseSrcClass *bclass;
3322   gboolean result = TRUE;
3323
3324   GST_DEBUG_OBJECT (basesrc, "stopping source");
3325
3326   /* flush all */
3327   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3328   /* stop the task */
3329   gst_pad_stop_task (basesrc->srcpad);
3330
3331   GST_OBJECT_LOCK (basesrc);
3332   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3333     goto was_stopped;
3334
3335   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3336   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3337   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3338   GST_ASYNC_SIGNAL (basesrc);
3339   GST_OBJECT_UNLOCK (basesrc);
3340
3341   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3342   if (bclass->stop)
3343     result = bclass->stop (basesrc);
3344
3345   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3346
3347   return result;
3348
3349 was_stopped:
3350   {
3351     GST_DEBUG_OBJECT (basesrc, "was stopped");
3352     GST_OBJECT_UNLOCK (basesrc);
3353     return TRUE;
3354   }
3355 }
3356
3357 /* start or stop flushing dataprocessing
3358  */
3359 static gboolean
3360 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3361     gboolean flushing, gboolean live_play, gboolean * playing)
3362 {
3363   GstBaseSrcClass *bclass;
3364
3365   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3366
3367   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3368
3369   if (flushing) {
3370     gst_base_src_activate_pool (basesrc, FALSE);
3371     /* unlock any subclasses, we need to do this before grabbing the
3372      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3373      * unlock to the params because of backwards compat (see seek handler)*/
3374     if (bclass->unlock)
3375       bclass->unlock (basesrc);
3376   }
3377
3378   /* the live lock is released when we are blocked, waiting for playing or
3379    * when we sync to the clock. */
3380   GST_LIVE_LOCK (basesrc);
3381   if (playing)
3382     *playing = basesrc->live_running;
3383   basesrc->priv->flushing = flushing;
3384   if (flushing) {
3385     /* if we are locked in the live lock, signal it to make it flush */
3386     basesrc->live_running = TRUE;
3387
3388     /* clear pending EOS if any */
3389     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3390
3391     /* step 1, now that we have the LIVE lock, clear our unlock request */
3392     if (bclass->unlock_stop)
3393       bclass->unlock_stop (basesrc);
3394
3395     /* step 2, unblock clock sync (if any) or any other blocking thing */
3396     if (basesrc->clock_id)
3397       gst_clock_id_unschedule (basesrc->clock_id);
3398   } else {
3399     /* signal the live source that it can start playing */
3400     basesrc->live_running = live_play;
3401
3402     gst_base_src_activate_pool (basesrc, TRUE);
3403
3404     /* Drop all delayed events */
3405     GST_OBJECT_LOCK (basesrc);
3406     if (basesrc->priv->pending_events) {
3407       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3408           NULL);
3409       g_list_free (basesrc->priv->pending_events);
3410       basesrc->priv->pending_events = NULL;
3411       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3412     }
3413     GST_OBJECT_UNLOCK (basesrc);
3414   }
3415   GST_LIVE_SIGNAL (basesrc);
3416   GST_LIVE_UNLOCK (basesrc);
3417
3418   return TRUE;
3419 }
3420
3421 /* the purpose of this function is to make sure that a live source blocks in the
3422  * LIVE lock or leaves the LIVE lock and continues playing. */
3423 static gboolean
3424 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3425 {
3426   GstBaseSrcClass *bclass;
3427
3428   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3429
3430   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3431   if (!live_play) {
3432     GST_DEBUG_OBJECT (basesrc, "unlock");
3433     if (bclass->unlock)
3434       bclass->unlock (basesrc);
3435   }
3436
3437   /* we are now able to grab the LIVE lock, when we get it, we can be
3438    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3439    * for the clock. */
3440   GST_LIVE_LOCK (basesrc);
3441   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3442
3443   /* unblock clock sync (if any) */
3444   if (basesrc->clock_id)
3445     gst_clock_id_unschedule (basesrc->clock_id);
3446
3447   /* configure what to do when we get to the LIVE lock. */
3448   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3449   basesrc->live_running = live_play;
3450
3451   if (live_play) {
3452     gboolean start;
3453
3454     /* clear our unlock request when going to PLAYING */
3455     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3456     if (bclass->unlock_stop)
3457       bclass->unlock_stop (basesrc);
3458
3459     /* for live sources we restart the timestamp correction */
3460     basesrc->priv->latency = -1;
3461     /* have to restart the task in case it stopped because of the unlock when
3462      * we went to PAUSED. Only do this if we operating in push mode. */
3463     GST_OBJECT_LOCK (basesrc->srcpad);
3464     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3465     GST_OBJECT_UNLOCK (basesrc->srcpad);
3466     if (start)
3467       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3468           basesrc->srcpad, NULL);
3469     GST_DEBUG_OBJECT (basesrc, "signal");
3470     GST_LIVE_SIGNAL (basesrc);
3471   }
3472   GST_LIVE_UNLOCK (basesrc);
3473
3474   return TRUE;
3475 }
3476
3477 static gboolean
3478 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3479 {
3480   GstBaseSrc *basesrc;
3481
3482   basesrc = GST_BASE_SRC (parent);
3483
3484   /* prepare subclass first */
3485   if (active) {
3486     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3487
3488     if (G_UNLIKELY (!basesrc->can_activate_push))
3489       goto no_push_activation;
3490
3491     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3492       goto error_start;
3493   } else {
3494     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3495     /* now we can stop the source */
3496     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3497       goto error_stop;
3498   }
3499   return TRUE;
3500
3501   /* ERRORS */
3502 no_push_activation:
3503   {
3504     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3505     return FALSE;
3506   }
3507 error_start:
3508   {
3509     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3510     return FALSE;
3511   }
3512 error_stop:
3513   {
3514     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3515     return FALSE;
3516   }
3517 }
3518
3519 static gboolean
3520 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3521 {
3522   GstBaseSrc *basesrc;
3523
3524   basesrc = GST_BASE_SRC (parent);
3525
3526   /* prepare subclass first */
3527   if (active) {
3528     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3529     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3530       goto error_start;
3531   } else {
3532     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3533     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3534       goto error_stop;
3535   }
3536   return TRUE;
3537
3538   /* ERRORS */
3539 error_start:
3540   {
3541     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3542     return FALSE;
3543   }
3544 error_stop:
3545   {
3546     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3547     return FALSE;
3548   }
3549 }
3550
3551 static gboolean
3552 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3553     GstPadMode mode, gboolean active)
3554 {
3555   gboolean res;
3556   GstBaseSrc *src = GST_BASE_SRC (parent);
3557
3558   src->priv->stream_start_pending = FALSE;
3559
3560   switch (mode) {
3561     case GST_PAD_MODE_PULL:
3562       res = gst_base_src_activate_pull (pad, parent, active);
3563       break;
3564     case GST_PAD_MODE_PUSH:
3565       src->priv->stream_start_pending = active;
3566       res = gst_base_src_activate_push (pad, parent, active);
3567       break;
3568     default:
3569       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3570       res = FALSE;
3571       break;
3572   }
3573   return res;
3574 }
3575
3576
3577 static GstStateChangeReturn
3578 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3579 {
3580   GstBaseSrc *basesrc;
3581   GstStateChangeReturn result;
3582   gboolean no_preroll = FALSE;
3583
3584   basesrc = GST_BASE_SRC (element);
3585
3586   switch (transition) {
3587     case GST_STATE_CHANGE_NULL_TO_READY:
3588       break;
3589     case GST_STATE_CHANGE_READY_TO_PAUSED:
3590       no_preroll = gst_base_src_is_live (basesrc);
3591       break;
3592     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3593       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3594       if (gst_base_src_is_live (basesrc)) {
3595         /* now we can start playback */
3596         gst_base_src_set_playing (basesrc, TRUE);
3597       }
3598       break;
3599     default:
3600       break;
3601   }
3602
3603   if ((result =
3604           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3605               transition)) == GST_STATE_CHANGE_FAILURE)
3606     goto failure;
3607
3608   switch (transition) {
3609     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3610       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3611       if (gst_base_src_is_live (basesrc)) {
3612         /* make sure we block in the live lock in PAUSED */
3613         gst_base_src_set_playing (basesrc, FALSE);
3614         no_preroll = TRUE;
3615       }
3616       break;
3617     case GST_STATE_CHANGE_PAUSED_TO_READY:
3618     {
3619       /* we don't need to unblock anything here, the pad deactivation code
3620        * already did this */
3621       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3622       gst_event_replace (&basesrc->pending_seek, NULL);
3623       break;
3624     }
3625     case GST_STATE_CHANGE_READY_TO_NULL:
3626       break;
3627     default:
3628       break;
3629   }
3630
3631   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3632     result = GST_STATE_CHANGE_NO_PREROLL;
3633
3634   return result;
3635
3636   /* ERRORS */
3637 failure:
3638   {
3639     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3640     return result;
3641   }
3642 }
3643
3644 /**
3645  * gst_base_src_get_buffer_pool:
3646  * @src: a #GstBaseSrc
3647  *
3648  * Returns: (transfer full): the instance of the #GstBufferPool used
3649  * by the src; free it after use it
3650  */
3651 GstBufferPool *
3652 gst_base_src_get_buffer_pool (GstBaseSrc * src)
3653 {
3654   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
3655
3656   if (src->priv->pool)
3657     return gst_object_ref (src->priv->pool);
3658
3659   return NULL;
3660 }
3661
3662 /**
3663  * gst_base_src_get_allocator:
3664  * @src: a #GstBaseSrc
3665  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3666  * used
3667  * @params: (out) (allow-none) (transfer full): the
3668  * #GstAllocatorParams of @allocator
3669  *
3670  * Lets #GstBaseSrc sub-classes to know the memory @allocator
3671  * used by the base class and its @params.
3672  *
3673  * Unref the @allocator after use it.
3674  */
3675 void
3676 gst_base_src_get_allocator (GstBaseSrc * src,
3677     GstAllocator ** allocator, GstAllocationParams * params)
3678 {
3679   g_return_if_fail (GST_IS_BASE_SRC (src));
3680
3681   if (allocator)
3682     *allocator = src->priv->allocator ?
3683         gst_object_ref (src->priv->allocator) : NULL;
3684
3685   if (params)
3686     *params = src->priv->params;
3687 }