bedb88e211697b01984c109c3a8dfefc42377362
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * An application may send an EOS event to a source element to make it
142  * perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163 #include <gst/glib-compat-private.h>
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gst-i18n-lib.h>
168
169 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
170 #define GST_CAT_DEFAULT gst_base_src_debug
171
172 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
173 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
174 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
177 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
178 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
179 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
180 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_TYPEFIND        FALSE
192 #define DEFAULT_DO_TIMESTAMP    FALSE
193
194 enum
195 {
196   PROP_0,
197   PROP_BLOCKSIZE,
198   PROP_NUM_BUFFERS,
199   PROP_TYPEFIND,
200   PROP_DO_TIMESTAMP
201 };
202
203 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
204    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
205
206 struct _GstBaseSrcPrivate
207 {
208   gboolean discont;
209   gboolean flushing;
210
211   GstFlowReturn start_result;
212   gboolean async;
213
214   /* if a stream-start event should be sent */
215   gboolean stream_start_pending;
216
217   /* if segment should be sent */
218   gboolean segment_pending;
219
220   /* if EOS is pending (atomic) */
221   gint pending_eos;
222
223   /* startup latency is the time it takes between going to PLAYING and producing
224    * the first BUFFER with running_time 0. This value is included in the latency
225    * reporting. */
226   GstClockTime latency;
227   /* timestamp offset, this is the offset add to the values of gst_times for
228    * pseudo live sources */
229   GstClockTimeDiff ts_offset;
230
231   gboolean do_timestamp;
232   volatile gint dynamic_size;
233
234   /* stream sequence number */
235   guint32 seqnum;
236
237   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
238    * pushed in the data stream */
239   GList *pending_events;
240   volatile gint have_events;
241
242   /* QoS *//* with LOCK */
243   gboolean qos_enabled;
244   gdouble proportion;
245   GstClockTime earliest_time;
246
247   GstBufferPool *pool;
248   GstAllocator *allocator;
249   GstAllocationParams params;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_class_init (GstBaseSrcClass * klass);
255 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
256 static void gst_base_src_finalize (GObject * object);
257
258
259 GType
260 gst_base_src_get_type (void)
261 {
262   static volatile gsize base_src_type = 0;
263
264   if (g_once_init_enter (&base_src_type)) {
265     GType _type;
266     static const GTypeInfo base_src_info = {
267       sizeof (GstBaseSrcClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_base_src_class_init,
271       NULL,
272       NULL,
273       sizeof (GstBaseSrc),
274       0,
275       (GInstanceInitFunc) gst_base_src_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_ELEMENT,
279         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
280     g_once_init_leave (&base_src_type, _type);
281   }
282   return base_src_type;
283 }
284
285 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
286     GstCaps * filter);
287 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
288 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
289
290 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
291 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
292     GstPadMode mode, gboolean active);
293 static void gst_base_src_set_property (GObject * object, guint prop_id,
294     const GValue * value, GParamSpec * pspec);
295 static void gst_base_src_get_property (GObject * object, guint prop_id,
296     GValue * value, GParamSpec * pspec);
297 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
298     GstEvent * event);
299 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
300 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
301
302 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
303     GstQuery * query);
304
305 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
306     gboolean active);
307 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
308 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
309     GstSegment * segment);
310 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
311 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
312     GstEvent * event, GstSegment * segment);
313 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
314     guint64 offset, guint size, GstBuffer ** buf);
315 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
316     guint64 offset, guint size, GstBuffer ** buf);
317 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
318     GstQuery * query);
319
320 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
321     gboolean flushing, gboolean live_play, gboolean * playing);
322
323 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
324 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
325
326 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
327     GstStateChange transition);
328
329 static void gst_base_src_loop (GstPad * pad);
330 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
331     guint64 offset, guint length, GstBuffer ** buf);
332 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
333     guint length, GstBuffer ** buf);
334 static gboolean gst_base_src_seekable (GstBaseSrc * src);
335 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
337     guint * length);
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
349
350   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
351
352   parent_class = g_type_class_peek_parent (klass);
353
354   gobject_class->finalize = gst_base_src_finalize;
355   gobject_class->set_property = gst_base_src_set_property;
356   gobject_class->get_property = gst_base_src_get_property;
357
358   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
359       g_param_spec_uint ("blocksize", "Block size",
360           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
361           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
363       g_param_spec_int ("num-buffers", "num-buffers",
364           "Number of buffers to output before sending EOS (-1 = unlimited)",
365           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
366           G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
368       g_param_spec_boolean ("typefind", "Typefind",
369           "Run typefind before negotiating", DEFAULT_TYPEFIND,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
372       g_param_spec_boolean ("do-timestamp", "Do timestamp",
373           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   gstelement_class->change_state =
377       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
378   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
379
380   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
381   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
382   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
383   klass->prepare_seek_segment =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
385   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
386   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
387   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
388   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
389   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
390   klass->decide_allocation =
391       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
392
393   /* Registering debug symbols for function pointers */
394   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
395   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
396   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
397   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
398   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
399 }
400
401 static void
402 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
403 {
404   GstPad *pad;
405   GstPadTemplate *pad_template;
406
407   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
408
409   basesrc->is_live = FALSE;
410   g_mutex_init (&basesrc->live_lock);
411   g_cond_init (&basesrc->live_cond);
412   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
413   basesrc->num_buffers_left = -1;
414
415   basesrc->can_activate_push = TRUE;
416
417   pad_template =
418       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
419   g_return_if_fail (pad_template != NULL);
420
421   GST_DEBUG_OBJECT (basesrc, "creating src pad");
422   pad = gst_pad_new_from_template (pad_template, "src");
423
424   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
425   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
426   gst_pad_set_event_function (pad, gst_base_src_event);
427   gst_pad_set_query_function (pad, gst_base_src_query);
428   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
429
430   /* hold pointer to pad */
431   basesrc->srcpad = pad;
432   GST_DEBUG_OBJECT (basesrc, "adding src pad");
433   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
434
435   basesrc->blocksize = DEFAULT_BLOCKSIZE;
436   basesrc->clock_id = NULL;
437   /* we operate in BYTES by default */
438   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
439   basesrc->typefind = DEFAULT_TYPEFIND;
440   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
441   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
442
443   basesrc->priv->start_result = GST_FLOW_FLUSHING;
444   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
445   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
446   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
447
448   GST_DEBUG_OBJECT (basesrc, "init done");
449 }
450
451 static void
452 gst_base_src_finalize (GObject * object)
453 {
454   GstBaseSrc *basesrc;
455   GstEvent **event_p;
456
457   basesrc = GST_BASE_SRC (object);
458
459   g_mutex_clear (&basesrc->live_lock);
460   g_cond_clear (&basesrc->live_cond);
461
462   event_p = &basesrc->pending_seek;
463   gst_event_replace (event_p, NULL);
464
465   if (basesrc->priv->pending_events) {
466     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
467         NULL);
468     g_list_free (basesrc->priv->pending_events);
469   }
470
471   G_OBJECT_CLASS (parent_class)->finalize (object);
472 }
473
474 /**
475  * gst_base_src_wait_playing:
476  * @src: the src
477  *
478  * If the #GstBaseSrcClass.create() method performs its own synchronisation
479  * against the clock it must unblock when going from PLAYING to the PAUSED state
480  * and call this method before continuing to produce the remaining data.
481  *
482  * This function will block until a state change to PLAYING happens (in which
483  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
484  * to a state change to READY or a FLUSH event (in which case this function
485  * returns #GST_FLOW_FLUSHING).
486  *
487  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
488  * continue. Any other return value should be returned from the create vmethod.
489  */
490 GstFlowReturn
491 gst_base_src_wait_playing (GstBaseSrc * src)
492 {
493   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
494
495   do {
496     /* block until the state changes, or we get a flush, or something */
497     GST_DEBUG_OBJECT (src, "live source waiting for running state");
498     GST_LIVE_WAIT (src);
499     GST_DEBUG_OBJECT (src, "live source unlocked");
500     if (src->priv->flushing)
501       goto flushing;
502   } while (G_UNLIKELY (!src->live_running));
503
504   return GST_FLOW_OK;
505
506   /* ERRORS */
507 flushing:
508   {
509     GST_DEBUG_OBJECT (src, "we are flushing");
510     return GST_FLOW_FLUSHING;
511   }
512 }
513
514 /**
515  * gst_base_src_set_live:
516  * @src: base source instance
517  * @live: new live-mode
518  *
519  * If the element listens to a live source, @live should
520  * be set to %TRUE.
521  *
522  * A live source will not produce data in the PAUSED state and
523  * will therefore not be able to participate in the PREROLL phase
524  * of a pipeline. To signal this fact to the application and the
525  * pipeline, the state change return value of the live source will
526  * be GST_STATE_CHANGE_NO_PREROLL.
527  */
528 void
529 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
530 {
531   g_return_if_fail (GST_IS_BASE_SRC (src));
532
533   GST_OBJECT_LOCK (src);
534   src->is_live = live;
535   GST_OBJECT_UNLOCK (src);
536 }
537
538 /**
539  * gst_base_src_is_live:
540  * @src: base source instance
541  *
542  * Check if an element is in live mode.
543  *
544  * Returns: %TRUE if element is in live mode.
545  */
546 gboolean
547 gst_base_src_is_live (GstBaseSrc * src)
548 {
549   gboolean result;
550
551   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
552
553   GST_OBJECT_LOCK (src);
554   result = src->is_live;
555   GST_OBJECT_UNLOCK (src);
556
557   return result;
558 }
559
560 /**
561  * gst_base_src_set_format:
562  * @src: base source instance
563  * @format: the format to use
564  *
565  * Sets the default format of the source. This will be the format used
566  * for sending NEW_SEGMENT events and for performing seeks.
567  *
568  * If a format of GST_FORMAT_BYTES is set, the element will be able to
569  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
570  *
571  * This function must only be called in states < %GST_STATE_PAUSED.
572  */
573 void
574 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
575 {
576   g_return_if_fail (GST_IS_BASE_SRC (src));
577   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
578
579   GST_OBJECT_LOCK (src);
580   gst_segment_init (&src->segment, format);
581   GST_OBJECT_UNLOCK (src);
582 }
583
584 /**
585  * gst_base_src_set_dynamic_size:
586  * @src: base source instance
587  * @dynamic: new dynamic size mode
588  *
589  * If not @dynamic, size is only updated when needed, such as when trying to
590  * read past current tracked size.  Otherwise, size is checked for upon each
591  * read.
592  */
593 void
594 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
595 {
596   g_return_if_fail (GST_IS_BASE_SRC (src));
597
598   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
599 }
600
601 /**
602  * gst_base_src_set_async:
603  * @src: base source instance
604  * @async: new async mode
605  *
606  * Configure async behaviour in @src, no state change will block. The open,
607  * close, start, stop, play and pause virtual methods will be executed in a
608  * different thread and are thus allowed to perform blocking operations. Any
609  * blocking operation should be unblocked with the unlock vmethod.
610  */
611 void
612 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
613 {
614   g_return_if_fail (GST_IS_BASE_SRC (src));
615
616   GST_OBJECT_LOCK (src);
617   src->priv->async = async;
618   GST_OBJECT_UNLOCK (src);
619 }
620
621 /**
622  * gst_base_src_is_async:
623  * @src: base source instance
624  *
625  * Get the current async behaviour of @src. See also gst_base_src_set_async().
626  *
627  * Returns: %TRUE if @src is operating in async mode.
628  */
629 gboolean
630 gst_base_src_is_async (GstBaseSrc * src)
631 {
632   gboolean res;
633
634   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
635
636   GST_OBJECT_LOCK (src);
637   res = src->priv->async;
638   GST_OBJECT_UNLOCK (src);
639
640   return res;
641 }
642
643
644 /**
645  * gst_base_src_query_latency:
646  * @src: the source
647  * @live: (out) (allow-none): if the source is live
648  * @min_latency: (out) (allow-none): the min latency of the source
649  * @max_latency: (out) (allow-none): the max latency of the source
650  *
651  * Query the source for the latency parameters. @live will be TRUE when @src is
652  * configured as a live source. @min_latency will be set to the difference
653  * between the running time and the timestamp of the first buffer.
654  * @max_latency is always the undefined value of -1.
655  *
656  * This function is mostly used by subclasses.
657  *
658  * Returns: TRUE if the query succeeded.
659  */
660 gboolean
661 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
662     GstClockTime * min_latency, GstClockTime * max_latency)
663 {
664   GstClockTime min;
665
666   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
667
668   GST_OBJECT_LOCK (src);
669   if (live)
670     *live = src->is_live;
671
672   /* if we have a startup latency, report this one, else report 0. Subclasses
673    * are supposed to override the query function if they want something
674    * else. */
675   if (src->priv->latency != -1)
676     min = src->priv->latency;
677   else
678     min = 0;
679
680   if (min_latency)
681     *min_latency = min;
682   if (max_latency)
683     *max_latency = -1;
684
685   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
686       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
687       GST_TIME_ARGS (-1));
688   GST_OBJECT_UNLOCK (src);
689
690   return TRUE;
691 }
692
693 /**
694  * gst_base_src_set_blocksize:
695  * @src: the source
696  * @blocksize: the new blocksize in bytes
697  *
698  * Set the number of bytes that @src will push out with each buffer. When
699  * @blocksize is set to -1, a default length will be used.
700  */
701 void
702 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
703 {
704   g_return_if_fail (GST_IS_BASE_SRC (src));
705
706   GST_OBJECT_LOCK (src);
707   src->blocksize = blocksize;
708   GST_OBJECT_UNLOCK (src);
709 }
710
711 /**
712  * gst_base_src_get_blocksize:
713  * @src: the source
714  *
715  * Get the number of bytes that @src will push out with each buffer.
716  *
717  * Returns: the number of bytes pushed with each buffer.
718  */
719 guint
720 gst_base_src_get_blocksize (GstBaseSrc * src)
721 {
722   gint res;
723
724   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
725
726   GST_OBJECT_LOCK (src);
727   res = src->blocksize;
728   GST_OBJECT_UNLOCK (src);
729
730   return res;
731 }
732
733
734 /**
735  * gst_base_src_set_do_timestamp:
736  * @src: the source
737  * @timestamp: enable or disable timestamping
738  *
739  * Configure @src to automatically timestamp outgoing buffers based on the
740  * current running_time of the pipeline. This property is mostly useful for live
741  * sources.
742  */
743 void
744 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
745 {
746   g_return_if_fail (GST_IS_BASE_SRC (src));
747
748   GST_OBJECT_LOCK (src);
749   src->priv->do_timestamp = timestamp;
750   GST_OBJECT_UNLOCK (src);
751 }
752
753 /**
754  * gst_base_src_get_do_timestamp:
755  * @src: the source
756  *
757  * Query if @src timestamps outgoing buffers based on the current running_time.
758  *
759  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
760  */
761 gboolean
762 gst_base_src_get_do_timestamp (GstBaseSrc * src)
763 {
764   gboolean res;
765
766   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
767
768   GST_OBJECT_LOCK (src);
769   res = src->priv->do_timestamp;
770   GST_OBJECT_UNLOCK (src);
771
772   return res;
773 }
774
775 /**
776  * gst_base_src_new_seamless_segment:
777  * @src: The source
778  * @start: The new start value for the segment
779  * @stop: Stop value for the new segment
780  * @position: The position value for the new segent
781  *
782  * Prepare a new seamless segment for emission downstream. This function must
783  * only be called by derived sub-classes, and only from the create() function,
784  * as the stream-lock needs to be held.
785  *
786  * The format for the new segment will be the current format of the source, as
787  * configured with gst_base_src_set_format()
788  *
789  * Returns: %TRUE if preparation of the seamless segment succeeded.
790  */
791 gboolean
792 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
793     gint64 position)
794 {
795   gboolean res = TRUE;
796
797   GST_DEBUG_OBJECT (src,
798       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
799       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
800       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
801
802   GST_OBJECT_LOCK (src);
803
804   src->segment.base = gst_segment_to_running_time (&src->segment,
805       src->segment.format, src->segment.position);
806   src->segment.start = start;
807   src->segment.stop = stop;
808   src->segment.position = position;
809
810   /* forward, we send data from position to stop */
811   src->priv->segment_pending = TRUE;
812   GST_OBJECT_UNLOCK (src);
813
814   src->priv->discont = TRUE;
815   src->running = TRUE;
816
817   return res;
818 }
819
820 static gboolean
821 gst_base_src_send_stream_start (GstBaseSrc * src)
822 {
823   gboolean ret = TRUE;
824
825   if (src->priv->stream_start_pending) {
826     ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
827     src->priv->stream_start_pending = FALSE;
828   }
829
830   return ret;
831 }
832
833 /**
834  * gst_base_src_set_caps:
835  * @src: a #GstBaseSrc
836  * @caps: a #GstCaps
837  *
838  * Set new caps on the basesrc source pad.
839  *
840  * Returns: %TRUE if the caps could be set
841  */
842 gboolean
843 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
844 {
845   GstBaseSrcClass *bclass;
846   gboolean res = TRUE;
847
848   bclass = GST_BASE_SRC_GET_CLASS (src);
849
850   gst_base_src_send_stream_start (src);
851
852   if (bclass->set_caps)
853     res = bclass->set_caps (src, caps);
854   if (res)
855     res = gst_pad_set_caps (src->srcpad, caps);
856
857   return res;
858 }
859
860 static GstCaps *
861 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
862 {
863   GstCaps *caps = NULL;
864   GstPadTemplate *pad_template;
865   GstBaseSrcClass *bclass;
866
867   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
868
869   pad_template =
870       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
871
872   if (pad_template != NULL) {
873     caps = gst_pad_template_get_caps (pad_template);
874
875     if (filter) {
876       GstCaps *intersection;
877
878       intersection =
879           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
880       gst_caps_unref (caps);
881       caps = intersection;
882     }
883   }
884   return caps;
885 }
886
887 static GstCaps *
888 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
889 {
890   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
891   return gst_caps_fixate (caps);
892 }
893
894 static GstCaps *
895 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
896 {
897   GstBaseSrcClass *bclass;
898
899   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
900
901   if (bclass->fixate)
902     caps = bclass->fixate (bsrc, caps);
903
904   return caps;
905 }
906
907 static gboolean
908 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
909 {
910   gboolean res;
911
912   switch (GST_QUERY_TYPE (query)) {
913     case GST_QUERY_POSITION:
914     {
915       GstFormat format;
916
917       gst_query_parse_position (query, &format, NULL);
918
919       GST_DEBUG_OBJECT (src, "position query in format %s",
920           gst_format_get_name (format));
921
922       switch (format) {
923         case GST_FORMAT_PERCENT:
924         {
925           gint64 percent;
926           gint64 position;
927           gint64 duration;
928
929           GST_OBJECT_LOCK (src);
930           position = src->segment.position;
931           duration = src->segment.duration;
932           GST_OBJECT_UNLOCK (src);
933
934           if (position != -1 && duration != -1) {
935             if (position < duration)
936               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
937                   duration);
938             else
939               percent = GST_FORMAT_PERCENT_MAX;
940           } else
941             percent = -1;
942
943           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
944           res = TRUE;
945           break;
946         }
947         default:
948         {
949           gint64 position;
950           GstFormat seg_format;
951
952           GST_OBJECT_LOCK (src);
953           position =
954               gst_segment_to_stream_time (&src->segment, src->segment.format,
955               src->segment.position);
956           seg_format = src->segment.format;
957           GST_OBJECT_UNLOCK (src);
958
959           if (position != -1) {
960             /* convert to requested format */
961             res =
962                 gst_pad_query_convert (src->srcpad, seg_format,
963                 position, format, &position);
964           } else
965             res = TRUE;
966
967           gst_query_set_position (query, format, position);
968           break;
969         }
970       }
971       break;
972     }
973     case GST_QUERY_DURATION:
974     {
975       GstFormat format;
976
977       gst_query_parse_duration (query, &format, NULL);
978
979       GST_DEBUG_OBJECT (src, "duration query in format %s",
980           gst_format_get_name (format));
981
982       switch (format) {
983         case GST_FORMAT_PERCENT:
984           gst_query_set_duration (query, GST_FORMAT_PERCENT,
985               GST_FORMAT_PERCENT_MAX);
986           res = TRUE;
987           break;
988         default:
989         {
990           gint64 duration;
991           GstFormat seg_format;
992           guint length = 0;
993
994           /* may have to refresh duration */
995           if (g_atomic_int_get (&src->priv->dynamic_size))
996             gst_base_src_update_length (src, 0, &length);
997
998           /* this is the duration as configured by the subclass. */
999           GST_OBJECT_LOCK (src);
1000           duration = src->segment.duration;
1001           seg_format = src->segment.format;
1002           GST_OBJECT_UNLOCK (src);
1003
1004           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1005               duration, gst_format_get_name (seg_format));
1006
1007           if (duration != -1) {
1008             /* convert to requested format, if this fails, we have a duration
1009              * but we cannot answer the query, we must return FALSE. */
1010             res =
1011                 gst_pad_query_convert (src->srcpad, seg_format,
1012                 duration, format, &duration);
1013           } else {
1014             /* The subclass did not configure a duration, we assume that the
1015              * media has an unknown duration then and we return TRUE to report
1016              * this. Note that this is not the same as returning FALSE, which
1017              * means that we cannot report the duration at all. */
1018             res = TRUE;
1019           }
1020           gst_query_set_duration (query, format, duration);
1021           break;
1022         }
1023       }
1024       break;
1025     }
1026
1027     case GST_QUERY_SEEKING:
1028     {
1029       GstFormat format, seg_format;
1030       gint64 duration;
1031
1032       GST_OBJECT_LOCK (src);
1033       duration = src->segment.duration;
1034       seg_format = src->segment.format;
1035       GST_OBJECT_UNLOCK (src);
1036
1037       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1038       if (format == seg_format) {
1039         gst_query_set_seeking (query, seg_format,
1040             gst_base_src_seekable (src), 0, duration);
1041         res = TRUE;
1042       } else {
1043         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1044         /* Don't reply to the query to make up for demuxers which don't
1045          * handle the SEEKING query yet. Players like Totem will fall back
1046          * to the duration when the SEEKING query isn't answered. */
1047         res = FALSE;
1048       }
1049       break;
1050     }
1051     case GST_QUERY_SEGMENT:
1052     {
1053       gint64 start, stop;
1054
1055       GST_OBJECT_LOCK (src);
1056       /* no end segment configured, current duration then */
1057       if ((stop = src->segment.stop) == -1)
1058         stop = src->segment.duration;
1059       start = src->segment.start;
1060
1061       /* adjust to stream time */
1062       if (src->segment.time != -1) {
1063         start -= src->segment.time;
1064         if (stop != -1)
1065           stop -= src->segment.time;
1066       }
1067
1068       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1069           start, stop);
1070       GST_OBJECT_UNLOCK (src);
1071       res = TRUE;
1072       break;
1073     }
1074
1075     case GST_QUERY_FORMATS:
1076     {
1077       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1078           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1079       res = TRUE;
1080       break;
1081     }
1082     case GST_QUERY_CONVERT:
1083     {
1084       GstFormat src_fmt, dest_fmt;
1085       gint64 src_val, dest_val;
1086
1087       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1088
1089       /* we can only convert between equal formats... */
1090       if (src_fmt == dest_fmt) {
1091         dest_val = src_val;
1092         res = TRUE;
1093       } else
1094         res = FALSE;
1095
1096       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1097       break;
1098     }
1099     case GST_QUERY_LATENCY:
1100     {
1101       GstClockTime min, max;
1102       gboolean live;
1103
1104       /* Subclasses should override and implement something useful */
1105       res = gst_base_src_query_latency (src, &live, &min, &max);
1106
1107       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1108           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1109           GST_TIME_ARGS (max));
1110
1111       gst_query_set_latency (query, live, min, max);
1112       break;
1113     }
1114     case GST_QUERY_JITTER:
1115     case GST_QUERY_RATE:
1116       res = FALSE;
1117       break;
1118     case GST_QUERY_BUFFERING:
1119     {
1120       GstFormat format, seg_format;
1121       gint64 start, stop, estimated;
1122
1123       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1124
1125       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1126           gst_format_get_name (format));
1127
1128       GST_OBJECT_LOCK (src);
1129       if (src->random_access) {
1130         estimated = 0;
1131         start = 0;
1132         if (format == GST_FORMAT_PERCENT)
1133           stop = GST_FORMAT_PERCENT_MAX;
1134         else
1135           stop = src->segment.duration;
1136       } else {
1137         estimated = -1;
1138         start = -1;
1139         stop = -1;
1140       }
1141       seg_format = src->segment.format;
1142       GST_OBJECT_UNLOCK (src);
1143
1144       /* convert to required format. When the conversion fails, we can't answer
1145        * the query. When the value is unknown, we can don't perform conversion
1146        * but report TRUE. */
1147       if (format != GST_FORMAT_PERCENT && stop != -1) {
1148         res = gst_pad_query_convert (src->srcpad, seg_format,
1149             stop, format, &stop);
1150       } else {
1151         res = TRUE;
1152       }
1153       if (res && format != GST_FORMAT_PERCENT && start != -1)
1154         res = gst_pad_query_convert (src->srcpad, seg_format,
1155             start, format, &start);
1156
1157       gst_query_set_buffering_range (query, format, start, stop, estimated);
1158       break;
1159     }
1160     case GST_QUERY_SCHEDULING:
1161     {
1162       gboolean random_access;
1163
1164       random_access = gst_base_src_is_random_access (src);
1165
1166       /* we can operate in getrange mode if the native format is bytes
1167        * and we are seekable, this condition is set in the random_access
1168        * flag and is set in the _start() method. */
1169       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1170       if (random_access)
1171         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1172       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1173
1174       res = TRUE;
1175       break;
1176     }
1177     case GST_QUERY_CAPS:
1178     {
1179       GstBaseSrcClass *bclass;
1180       GstCaps *caps, *filter;
1181
1182       bclass = GST_BASE_SRC_GET_CLASS (src);
1183       if (bclass->get_caps) {
1184         gst_query_parse_caps (query, &filter);
1185         if ((caps = bclass->get_caps (src, filter))) {
1186           gst_query_set_caps_result (query, caps);
1187           gst_caps_unref (caps);
1188           res = TRUE;
1189         } else {
1190           res = FALSE;
1191         }
1192       } else
1193         res = FALSE;
1194       break;
1195     }
1196     default:
1197       res = FALSE;
1198       break;
1199   }
1200   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1201       res);
1202
1203   return res;
1204 }
1205
1206 static gboolean
1207 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1208 {
1209   GstBaseSrc *src;
1210   GstBaseSrcClass *bclass;
1211   gboolean result = FALSE;
1212
1213   src = GST_BASE_SRC (parent);
1214   bclass = GST_BASE_SRC_GET_CLASS (src);
1215
1216   if (bclass->query)
1217     result = bclass->query (src, query);
1218
1219   return result;
1220 }
1221
1222 static gboolean
1223 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1224 {
1225   gboolean res = TRUE;
1226
1227   /* update our offset if the start/stop position was updated */
1228   if (segment->format == GST_FORMAT_BYTES) {
1229     segment->time = segment->start;
1230   } else if (segment->start == 0) {
1231     /* seek to start, we can implement a default for this. */
1232     segment->time = 0;
1233   } else {
1234     res = FALSE;
1235     GST_INFO_OBJECT (src, "Can't do a default seek");
1236   }
1237
1238   return res;
1239 }
1240
1241 static gboolean
1242 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1243 {
1244   GstBaseSrcClass *bclass;
1245   gboolean result = FALSE;
1246
1247   bclass = GST_BASE_SRC_GET_CLASS (src);
1248
1249   if (bclass->do_seek)
1250     result = bclass->do_seek (src, segment);
1251
1252   return result;
1253 }
1254
1255 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1256
1257 static gboolean
1258 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1259     GstSegment * segment)
1260 {
1261   /* By default, we try one of 2 things:
1262    *   - For absolute seek positions, convert the requested position to our
1263    *     configured processing format and place it in the output segment \
1264    *   - For relative seek positions, convert our current (input) values to the
1265    *     seek format, adjust by the relative seek offset and then convert back to
1266    *     the processing format
1267    */
1268   GstSeekType cur_type, stop_type;
1269   gint64 cur, stop;
1270   GstSeekFlags flags;
1271   GstFormat seek_format, dest_format;
1272   gdouble rate;
1273   gboolean update;
1274   gboolean res = TRUE;
1275
1276   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1277       &cur_type, &cur, &stop_type, &stop);
1278   dest_format = segment->format;
1279
1280   if (seek_format == dest_format) {
1281     gst_segment_do_seek (segment, rate, seek_format, flags,
1282         cur_type, cur, stop_type, stop, &update);
1283     return TRUE;
1284   }
1285
1286   if (cur_type != GST_SEEK_TYPE_NONE) {
1287     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1288     res =
1289         gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format,
1290         &cur);
1291     cur_type = GST_SEEK_TYPE_SET;
1292   }
1293
1294   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1295     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1296     res =
1297         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1298         &stop);
1299     stop_type = GST_SEEK_TYPE_SET;
1300   }
1301
1302   /* And finally, configure our output segment in the desired format */
1303   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1304       stop_type, stop, &update);
1305
1306   if (!res)
1307     goto no_format;
1308
1309   return res;
1310
1311 no_format:
1312   {
1313     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1314     return FALSE;
1315   }
1316 }
1317
1318 static gboolean
1319 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1320     GstSegment * seeksegment)
1321 {
1322   GstBaseSrcClass *bclass;
1323   gboolean result = FALSE;
1324
1325   bclass = GST_BASE_SRC_GET_CLASS (src);
1326
1327   if (bclass->prepare_seek_segment)
1328     result = bclass->prepare_seek_segment (src, event, seeksegment);
1329
1330   return result;
1331 }
1332
1333 static GstFlowReturn
1334 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1335     guint size, GstBuffer ** buffer)
1336 {
1337   GstFlowReturn ret;
1338   GstBaseSrcPrivate *priv = src->priv;
1339
1340   if (priv->pool) {
1341     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1342   } else if (size != -1) {
1343     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1344     if (G_UNLIKELY (*buffer == NULL))
1345       goto alloc_failed;
1346
1347     ret = GST_FLOW_OK;
1348   } else {
1349     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1350         size);
1351     goto alloc_failed;
1352   }
1353   return ret;
1354
1355   /* ERRORS */
1356 alloc_failed:
1357   {
1358     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1359     return GST_FLOW_ERROR;
1360   }
1361 }
1362
1363 static GstFlowReturn
1364 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1365     guint size, GstBuffer ** buffer)
1366 {
1367   GstBaseSrcClass *bclass;
1368   GstFlowReturn ret;
1369   GstBuffer *res_buf;
1370
1371   bclass = GST_BASE_SRC_GET_CLASS (src);
1372
1373   if (G_UNLIKELY (!bclass->alloc))
1374     goto no_function;
1375   if (G_UNLIKELY (!bclass->fill))
1376     goto no_function;
1377
1378   if (*buffer == NULL) {
1379     /* downstream did not provide us with a buffer to fill, allocate one
1380      * ourselves */
1381     ret = bclass->alloc (src, offset, size, &res_buf);
1382     if (G_UNLIKELY (ret != GST_FLOW_OK))
1383       goto alloc_failed;
1384   } else {
1385     res_buf = *buffer;
1386   }
1387
1388   if (G_LIKELY (size > 0)) {
1389     /* only call fill when there is a size */
1390     ret = bclass->fill (src, offset, size, res_buf);
1391     if (G_UNLIKELY (ret != GST_FLOW_OK))
1392       goto not_ok;
1393   }
1394
1395   *buffer = res_buf;
1396
1397   return GST_FLOW_OK;
1398
1399   /* ERRORS */
1400 no_function:
1401   {
1402     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1403     return GST_FLOW_NOT_SUPPORTED;
1404   }
1405 alloc_failed:
1406   {
1407     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1408     return ret;
1409   }
1410 not_ok:
1411   {
1412     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1413         gst_flow_get_name (ret));
1414     if (*buffer == NULL)
1415       gst_buffer_unref (res_buf);
1416     return ret;
1417   }
1418 }
1419
1420 /* this code implements the seeking. It is a good example
1421  * handling all cases.
1422  *
1423  * A seek updates the currently configured segment.start
1424  * and segment.stop values based on the SEEK_TYPE. If the
1425  * segment.start value is updated, a seek to this new position
1426  * should be performed.
1427  *
1428  * The seek can only be executed when we are not currently
1429  * streaming any data, to make sure that this is the case, we
1430  * acquire the STREAM_LOCK which is taken when we are in the
1431  * _loop() function or when a getrange() is called. Normally
1432  * we will not receive a seek if we are operating in pull mode
1433  * though. When we operate as a live source we might block on the live
1434  * cond, which does not release the STREAM_LOCK. Therefore we will try
1435  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1436  * safe to perform the seek.
1437  *
1438  * When we are in the loop() function, we might be in the middle
1439  * of pushing a buffer, which might block in a sink. To make sure
1440  * that the push gets unblocked we push out a FLUSH_START event.
1441  * Our loop function will get a FLUSHING return value from
1442  * the push and will pause, effectively releasing the STREAM_LOCK.
1443  *
1444  * For a non-flushing seek, we pause the task, which might eventually
1445  * release the STREAM_LOCK. We say eventually because when the sink
1446  * blocks on the sample we might wait a very long time until the sink
1447  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1448  * can continue the seek. A non-flushing seek is normally done in a
1449  * running pipeline to perform seamless playback, this means that the sink is
1450  * PLAYING and will return from its chain function.
1451  * In the case of a non-flushing seek we need to make sure that the
1452  * data we output after the seek is continuous with the previous data,
1453  * this is because a non-flushing seek does not reset the running-time
1454  * to 0. We do this by closing the currently running segment, ie. sending
1455  * a new_segment event with the stop position set to the last processed
1456  * position.
1457  *
1458  * After updating the segment.start/stop values, we prepare for
1459  * streaming again. We push out a FLUSH_STOP to make the peer pad
1460  * accept data again and we start our task again.
1461  *
1462  * A segment seek posts a message on the bus saying that the playback
1463  * of the segment started. We store the segment flag internally because
1464  * when we reach the segment.stop we have to post a segment.done
1465  * instead of EOS when doing a segment seek.
1466  */
1467 static gboolean
1468 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1469 {
1470   gboolean res = TRUE, tres;
1471   gdouble rate;
1472   GstFormat seek_format, dest_format;
1473   GstSeekFlags flags;
1474   GstSeekType cur_type, stop_type;
1475   gint64 cur, stop;
1476   gboolean flush, playing;
1477   gboolean update;
1478   gboolean relative_seek = FALSE;
1479   gboolean seekseg_configured = FALSE;
1480   GstSegment seeksegment;
1481   guint32 seqnum;
1482   GstEvent *tevent;
1483
1484   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1485
1486   GST_OBJECT_LOCK (src);
1487   dest_format = src->segment.format;
1488   GST_OBJECT_UNLOCK (src);
1489
1490   if (event) {
1491     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1492         &cur_type, &cur, &stop_type, &stop);
1493
1494     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1495         SEEK_TYPE_IS_RELATIVE (stop_type);
1496
1497     if (dest_format != seek_format && !relative_seek) {
1498       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1499        * here before taking the stream lock, otherwise we must convert it later,
1500        * once we have the stream lock and can read the last configures segment
1501        * start and stop positions */
1502       gst_segment_init (&seeksegment, dest_format);
1503
1504       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1505         goto prepare_failed;
1506
1507       seekseg_configured = TRUE;
1508     }
1509
1510     flush = flags & GST_SEEK_FLAG_FLUSH;
1511     seqnum = gst_event_get_seqnum (event);
1512   } else {
1513     flush = FALSE;
1514     /* get next seqnum */
1515     seqnum = gst_util_seqnum_next ();
1516   }
1517
1518   /* send flush start */
1519   if (flush) {
1520     tevent = gst_event_new_flush_start ();
1521     gst_event_set_seqnum (tevent, seqnum);
1522     gst_pad_push_event (src->srcpad, tevent);
1523   } else
1524     gst_pad_pause_task (src->srcpad);
1525
1526   /* unblock streaming thread. */
1527   if (unlock)
1528     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1529
1530   /* grab streaming lock, this should eventually be possible, either
1531    * because the task is paused, our streaming thread stopped
1532    * or because our peer is flushing. */
1533   GST_PAD_STREAM_LOCK (src->srcpad);
1534   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1535     /* we have seen this event before, issue a warning for now */
1536     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1537         seqnum);
1538   } else {
1539     src->priv->seqnum = seqnum;
1540     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1541   }
1542
1543   if (unlock)
1544     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1545
1546   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1547    * copy the current segment info into the temp segment that we can actually
1548    * attempt the seek with. We only update the real segment if the seek succeeds. */
1549   if (!seekseg_configured) {
1550     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1551
1552     /* now configure the final seek segment */
1553     if (event) {
1554       if (seeksegment.format != seek_format) {
1555         /* OK, here's where we give the subclass a chance to convert the relative
1556          * seek into an absolute one in the processing format. We set up any
1557          * absolute seek above, before taking the stream lock. */
1558         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1559           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1560               "Aborting seek");
1561           res = FALSE;
1562         }
1563       } else {
1564         /* The seek format matches our processing format, no need to ask the
1565          * the subclass to configure the segment. */
1566         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1567             cur_type, cur, stop_type, stop, &update);
1568       }
1569     }
1570     /* Else, no seek event passed, so we're just (re)starting the
1571        current segment. */
1572   }
1573
1574   if (res) {
1575     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1576         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1577         seeksegment.start, seeksegment.stop, seeksegment.position);
1578
1579     /* do the seek, segment.position contains the new position. */
1580     res = gst_base_src_do_seek (src, &seeksegment);
1581   }
1582
1583   /* and prepare to continue streaming */
1584   if (flush) {
1585     tevent = gst_event_new_flush_stop (TRUE);
1586     gst_event_set_seqnum (tevent, seqnum);
1587     /* send flush stop, peer will accept data and events again. We
1588      * are not yet providing data as we still have the STREAM_LOCK. */
1589     gst_pad_push_event (src->srcpad, tevent);
1590   }
1591
1592   /* The subclass must have converted the segment to the processing format
1593    * by now */
1594   if (res && seeksegment.format != dest_format) {
1595     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1596         "in the correct format. Aborting seek.");
1597     res = FALSE;
1598   }
1599
1600   /* if the seek was successful, we update our real segment and push
1601    * out the new segment. */
1602   if (res) {
1603     GST_OBJECT_LOCK (src);
1604     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1605     GST_OBJECT_UNLOCK (src);
1606
1607     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1608       GstMessage *message;
1609
1610       message = gst_message_new_segment_start (GST_OBJECT (src),
1611           seeksegment.format, seeksegment.position);
1612       gst_message_set_seqnum (message, seqnum);
1613
1614       gst_element_post_message (GST_ELEMENT (src), message);
1615     }
1616
1617     /* for deriving a stop position for the playback segment from the seek
1618      * segment, we must take the duration when the stop is not set */
1619     if ((stop = seeksegment.stop) == -1)
1620       stop = seeksegment.duration;
1621
1622     src->priv->segment_pending = TRUE;
1623   }
1624
1625   src->priv->discont = TRUE;
1626   src->running = TRUE;
1627   /* and restart the task in case it got paused explicitly or by
1628    * the FLUSH_START event we pushed out. */
1629   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1630       src->srcpad, NULL);
1631   if (res && !tres)
1632     res = FALSE;
1633
1634   /* and release the lock again so we can continue streaming */
1635   GST_PAD_STREAM_UNLOCK (src->srcpad);
1636
1637   return res;
1638
1639   /* ERROR */
1640 prepare_failed:
1641   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1642       "Aborting seek");
1643   return FALSE;
1644 }
1645
1646 /* all events send to this element directly. This is mainly done from the
1647  * application.
1648  */
1649 static gboolean
1650 gst_base_src_send_event (GstElement * element, GstEvent * event)
1651 {
1652   GstBaseSrc *src;
1653   gboolean result = FALSE;
1654
1655   src = GST_BASE_SRC (element);
1656
1657   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1658
1659   switch (GST_EVENT_TYPE (event)) {
1660       /* bidirectional events */
1661     case GST_EVENT_FLUSH_START:
1662       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1663       result = gst_pad_push_event (src->srcpad, event);
1664       event = NULL;
1665       break;
1666     case GST_EVENT_FLUSH_STOP:
1667       GST_LIVE_LOCK (src->srcpad);
1668       src->priv->segment_pending = TRUE;
1669       /* sending random flushes downstream can break stuff,
1670        * especially sync since all segment info will get flushed */
1671       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1672       result = gst_pad_push_event (src->srcpad, event);
1673       GST_LIVE_UNLOCK (src->srcpad);
1674       event = NULL;
1675       break;
1676
1677       /* downstream serialized events */
1678     case GST_EVENT_EOS:
1679     {
1680       GstBaseSrcClass *bclass;
1681
1682       bclass = GST_BASE_SRC_GET_CLASS (src);
1683
1684       /* queue EOS and make sure the task or pull function performs the EOS
1685        * actions.
1686        *
1687        * We have two possibilities:
1688        *
1689        *  - Before we are to enter the _create function, we check the pending_eos
1690        *    first and do EOS instead of entering it.
1691        *  - If we are in the _create function or we did not manage to set the
1692        *    flag fast enough and we are about to enter the _create function,
1693        *    we unlock it so that we exit with FLUSHING immediately. We then
1694        *    check the EOS flag and do the EOS logic.
1695        */
1696       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1697       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1698
1699
1700       /* unlock the _create function so that we can check the pending_eos flag
1701        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1702        * that we can grab it and stop the unlock again. We don't take the stream
1703        * lock so that this operation is guaranteed to never block. */
1704       gst_base_src_activate_pool (src, FALSE);
1705       if (bclass->unlock)
1706         bclass->unlock (src);
1707
1708       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1709
1710       GST_LIVE_LOCK (src);
1711       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1712       /* now stop the unlock of the streaming thread again. Grabbing the live
1713        * lock is enough because that protects the create function. */
1714       if (bclass->unlock_stop)
1715         bclass->unlock_stop (src);
1716       gst_base_src_activate_pool (src, TRUE);
1717       GST_LIVE_UNLOCK (src);
1718
1719       result = TRUE;
1720       break;
1721     }
1722     case GST_EVENT_SEGMENT:
1723       /* sending random SEGMENT downstream can break sync. */
1724       break;
1725     case GST_EVENT_TAG:
1726     case GST_EVENT_CUSTOM_DOWNSTREAM:
1727     case GST_EVENT_CUSTOM_BOTH:
1728       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1729       GST_OBJECT_LOCK (src);
1730       src->priv->pending_events =
1731           g_list_append (src->priv->pending_events, event);
1732       g_atomic_int_set (&src->priv->have_events, TRUE);
1733       GST_OBJECT_UNLOCK (src);
1734       event = NULL;
1735       result = TRUE;
1736       break;
1737     case GST_EVENT_BUFFERSIZE:
1738       /* does not seem to make much sense currently */
1739       break;
1740
1741       /* upstream events */
1742     case GST_EVENT_QOS:
1743       /* elements should override send_event and do something */
1744       break;
1745     case GST_EVENT_SEEK:
1746     {
1747       gboolean started;
1748
1749       GST_OBJECT_LOCK (src->srcpad);
1750       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1751         goto wrong_mode;
1752       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1753       GST_OBJECT_UNLOCK (src->srcpad);
1754
1755       if (started) {
1756         GST_DEBUG_OBJECT (src, "performing seek");
1757         /* when we are running in push mode, we can execute the
1758          * seek right now. */
1759         result = gst_base_src_perform_seek (src, event, TRUE);
1760       } else {
1761         GstEvent **event_p;
1762
1763         /* else we store the event and execute the seek when we
1764          * get activated */
1765         GST_OBJECT_LOCK (src);
1766         GST_DEBUG_OBJECT (src, "queueing seek");
1767         event_p = &src->pending_seek;
1768         gst_event_replace ((GstEvent **) event_p, event);
1769         GST_OBJECT_UNLOCK (src);
1770         /* assume the seek will work */
1771         result = TRUE;
1772       }
1773       break;
1774     }
1775     case GST_EVENT_NAVIGATION:
1776       /* could make sense for elements that do something with navigation events
1777        * but then they would need to override the send_event function */
1778       break;
1779     case GST_EVENT_LATENCY:
1780       /* does not seem to make sense currently */
1781       break;
1782
1783       /* custom events */
1784     case GST_EVENT_CUSTOM_UPSTREAM:
1785       /* override send_event if you want this */
1786       break;
1787     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1788     case GST_EVENT_CUSTOM_BOTH_OOB:
1789       /* insert a random custom event into the pipeline */
1790       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1791       result = gst_pad_push_event (src->srcpad, event);
1792       /* we gave away the ref to the event in the push */
1793       event = NULL;
1794       break;
1795     default:
1796       break;
1797   }
1798 done:
1799   /* if we still have a ref to the event, unref it now */
1800   if (event)
1801     gst_event_unref (event);
1802
1803   return result;
1804
1805   /* ERRORS */
1806 wrong_mode:
1807   {
1808     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1809     GST_OBJECT_UNLOCK (src->srcpad);
1810     result = FALSE;
1811     goto done;
1812   }
1813 }
1814
1815 static gboolean
1816 gst_base_src_seekable (GstBaseSrc * src)
1817 {
1818   GstBaseSrcClass *bclass;
1819   bclass = GST_BASE_SRC_GET_CLASS (src);
1820   if (bclass->is_seekable)
1821     return bclass->is_seekable (src);
1822   else
1823     return FALSE;
1824 }
1825
1826 static void
1827 gst_base_src_update_qos (GstBaseSrc * src,
1828     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1829 {
1830   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1831       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1832       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1833
1834   GST_OBJECT_LOCK (src);
1835   src->priv->proportion = proportion;
1836   src->priv->earliest_time = timestamp + diff;
1837   GST_OBJECT_UNLOCK (src);
1838 }
1839
1840
1841 static gboolean
1842 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1843 {
1844   gboolean result;
1845
1846   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1847
1848   switch (GST_EVENT_TYPE (event)) {
1849     case GST_EVENT_SEEK:
1850       /* is normally called when in push mode */
1851       if (!gst_base_src_seekable (src))
1852         goto not_seekable;
1853
1854       result = gst_base_src_perform_seek (src, event, TRUE);
1855       break;
1856     case GST_EVENT_FLUSH_START:
1857       /* cancel any blocking getrange, is normally called
1858        * when in pull mode. */
1859       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1860       break;
1861     case GST_EVENT_FLUSH_STOP:
1862       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1863       break;
1864     case GST_EVENT_QOS:
1865     {
1866       gdouble proportion;
1867       GstClockTimeDiff diff;
1868       GstClockTime timestamp;
1869
1870       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1871       gst_base_src_update_qos (src, proportion, diff, timestamp);
1872       result = TRUE;
1873       break;
1874     }
1875     case GST_EVENT_RECONFIGURE:
1876       result = TRUE;
1877       break;
1878     case GST_EVENT_LATENCY:
1879       result = TRUE;
1880       break;
1881     default:
1882       result = FALSE;
1883       break;
1884   }
1885   return result;
1886
1887   /* ERRORS */
1888 not_seekable:
1889   {
1890     GST_DEBUG_OBJECT (src, "is not seekable");
1891     return FALSE;
1892   }
1893 }
1894
1895 static gboolean
1896 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1897 {
1898   GstBaseSrc *src;
1899   GstBaseSrcClass *bclass;
1900   gboolean result = FALSE;
1901
1902   src = GST_BASE_SRC (parent);
1903   bclass = GST_BASE_SRC_GET_CLASS (src);
1904
1905   if (bclass->event) {
1906     if (!(result = bclass->event (src, event)))
1907       goto subclass_failed;
1908   }
1909
1910 done:
1911   gst_event_unref (event);
1912
1913   return result;
1914
1915   /* ERRORS */
1916 subclass_failed:
1917   {
1918     GST_DEBUG_OBJECT (src, "subclass refused event");
1919     goto done;
1920   }
1921 }
1922
1923 static void
1924 gst_base_src_set_property (GObject * object, guint prop_id,
1925     const GValue * value, GParamSpec * pspec)
1926 {
1927   GstBaseSrc *src;
1928
1929   src = GST_BASE_SRC (object);
1930
1931   switch (prop_id) {
1932     case PROP_BLOCKSIZE:
1933       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1934       break;
1935     case PROP_NUM_BUFFERS:
1936       src->num_buffers = g_value_get_int (value);
1937       break;
1938     case PROP_TYPEFIND:
1939       src->typefind = g_value_get_boolean (value);
1940       break;
1941     case PROP_DO_TIMESTAMP:
1942       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1943       break;
1944     default:
1945       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1946       break;
1947   }
1948 }
1949
1950 static void
1951 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1952     GParamSpec * pspec)
1953 {
1954   GstBaseSrc *src;
1955
1956   src = GST_BASE_SRC (object);
1957
1958   switch (prop_id) {
1959     case PROP_BLOCKSIZE:
1960       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1961       break;
1962     case PROP_NUM_BUFFERS:
1963       g_value_set_int (value, src->num_buffers);
1964       break;
1965     case PROP_TYPEFIND:
1966       g_value_set_boolean (value, src->typefind);
1967       break;
1968     case PROP_DO_TIMESTAMP:
1969       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1970       break;
1971     default:
1972       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1973       break;
1974   }
1975 }
1976
1977 /* with STREAM_LOCK and LOCK */
1978 static GstClockReturn
1979 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1980 {
1981   GstClockReturn ret;
1982   GstClockID id;
1983
1984   id = gst_clock_new_single_shot_id (clock, time);
1985
1986   basesrc->clock_id = id;
1987   /* release the live lock while waiting */
1988   GST_LIVE_UNLOCK (basesrc);
1989
1990   ret = gst_clock_id_wait (id, NULL);
1991
1992   GST_LIVE_LOCK (basesrc);
1993   gst_clock_id_unref (id);
1994   basesrc->clock_id = NULL;
1995
1996   return ret;
1997 }
1998
1999 /* perform synchronisation on a buffer.
2000  * with STREAM_LOCK.
2001  */
2002 static GstClockReturn
2003 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2004 {
2005   GstClockReturn result;
2006   GstClockTime start, end;
2007   GstBaseSrcClass *bclass;
2008   GstClockTime base_time;
2009   GstClock *clock;
2010   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2011   gboolean do_timestamp, first, pseudo_live, is_live;
2012
2013   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2014
2015   start = end = -1;
2016   if (bclass->get_times)
2017     bclass->get_times (basesrc, buffer, &start, &end);
2018
2019   /* get buffer timestamp */
2020   dts = GST_BUFFER_DTS (buffer);
2021   pts = GST_BUFFER_PTS (buffer);
2022
2023   if (GST_CLOCK_TIME_IS_VALID (dts))
2024     timestamp = dts;
2025   else
2026     timestamp = pts;
2027
2028   /* grab the lock to prepare for clocking and calculate the startup
2029    * latency. */
2030   GST_OBJECT_LOCK (basesrc);
2031
2032   is_live = basesrc->is_live;
2033   /* if we are asked to sync against the clock we are a pseudo live element */
2034   pseudo_live = (start != -1 && is_live);
2035   /* check for the first buffer */
2036   first = (basesrc->priv->latency == -1);
2037
2038   if (timestamp != -1 && pseudo_live) {
2039     GstClockTime latency;
2040
2041     /* we have a timestamp and a sync time, latency is the diff */
2042     if (timestamp <= start)
2043       latency = start - timestamp;
2044     else
2045       latency = 0;
2046
2047     if (first) {
2048       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2049           GST_TIME_ARGS (latency));
2050       /* first time we calculate latency, just configure */
2051       basesrc->priv->latency = latency;
2052     } else {
2053       if (basesrc->priv->latency != latency) {
2054         /* we have a new latency, FIXME post latency message */
2055         basesrc->priv->latency = latency;
2056         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2057             GST_TIME_ARGS (latency));
2058       }
2059     }
2060   } else if (first) {
2061     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2062         is_live, start != -1);
2063     basesrc->priv->latency = 0;
2064   }
2065
2066   /* get clock, if no clock, we can't sync or do timestamps */
2067   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2068     goto no_clock;
2069   else
2070     gst_object_ref (clock);
2071
2072   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2073
2074   do_timestamp = basesrc->priv->do_timestamp;
2075   GST_OBJECT_UNLOCK (basesrc);
2076
2077   /* first buffer, calculate the timestamp offset */
2078   if (first) {
2079     GstClockTime running_time;
2080
2081     now = gst_clock_get_time (clock);
2082     running_time = now - base_time;
2083
2084     GST_LOG_OBJECT (basesrc,
2085         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2086         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2087         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2088
2089     if (pseudo_live && timestamp != -1) {
2090       /* live source and we need to sync, add startup latency to all timestamps
2091        * to get the real running_time. Live sources should always timestamp
2092        * according to the current running time. */
2093       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2094
2095       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2096           GST_TIME_ARGS (basesrc->priv->ts_offset));
2097     } else {
2098       basesrc->priv->ts_offset = 0;
2099       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2100     }
2101
2102     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2103       if (do_timestamp) {
2104         dts = running_time;
2105       } else {
2106         dts = 0;
2107       }
2108       GST_BUFFER_DTS (buffer) = dts;
2109
2110       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2111           GST_TIME_ARGS (dts));
2112     }
2113   } else {
2114     /* not the first buffer, the timestamp is the diff between the clock and
2115      * base_time */
2116     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2117       now = gst_clock_get_time (clock);
2118
2119       dts = now - base_time;
2120       GST_BUFFER_DTS (buffer) = dts;
2121
2122       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2123           GST_TIME_ARGS (dts));
2124     }
2125   }
2126   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2127     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2128       pts = dts;
2129
2130     GST_BUFFER_PTS (buffer) = dts;
2131
2132     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2133         GST_TIME_ARGS (pts));
2134   }
2135
2136   /* if we don't have a buffer timestamp, we don't sync */
2137   if (!GST_CLOCK_TIME_IS_VALID (start))
2138     goto no_sync;
2139
2140   if (is_live) {
2141     /* for pseudo live sources, add our ts_offset to the timestamp */
2142     if (GST_CLOCK_TIME_IS_VALID (pts))
2143       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2144     if (GST_CLOCK_TIME_IS_VALID (dts))
2145       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2146     start += basesrc->priv->ts_offset;
2147   }
2148
2149   GST_LOG_OBJECT (basesrc,
2150       "waiting for clock, base time %" GST_TIME_FORMAT
2151       ", stream_start %" GST_TIME_FORMAT,
2152       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2153
2154   result = gst_base_src_wait (basesrc, clock, start + base_time);
2155
2156   gst_object_unref (clock);
2157
2158   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2159
2160   return result;
2161
2162   /* special cases */
2163 no_clock:
2164   {
2165     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2166     GST_OBJECT_UNLOCK (basesrc);
2167     return GST_CLOCK_OK;
2168   }
2169 no_sync:
2170   {
2171     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2172     gst_object_unref (clock);
2173     return GST_CLOCK_OK;
2174   }
2175 }
2176
2177 /* Called with STREAM_LOCK and LIVE_LOCK */
2178 static gboolean
2179 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2180 {
2181   guint64 size, maxsize;
2182   GstBaseSrcClass *bclass;
2183   GstFormat format;
2184   gint64 stop;
2185   gboolean dynamic;
2186
2187   bclass = GST_BASE_SRC_GET_CLASS (src);
2188
2189   format = src->segment.format;
2190   stop = src->segment.stop;
2191   /* get total file size */
2192   size = src->segment.duration;
2193
2194   /* only operate if we are working with bytes */
2195   if (format != GST_FORMAT_BYTES)
2196     return TRUE;
2197
2198   /* the max amount of bytes to read is the total size or
2199    * up to the segment.stop if present. */
2200   if (stop != -1)
2201     maxsize = MIN (size, stop);
2202   else
2203     maxsize = size;
2204
2205   GST_DEBUG_OBJECT (src,
2206       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2207       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2208       *length, size, stop, maxsize);
2209
2210   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2211   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2212
2213   /* check size if we have one */
2214   if (maxsize != -1) {
2215     /* if we run past the end, check if the file became bigger and
2216      * retry. */
2217     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2218       /* see if length of the file changed */
2219       if (bclass->get_size)
2220         if (!bclass->get_size (src, &size))
2221           size = -1;
2222
2223       /* make sure we don't exceed the configured segment stop
2224        * if it was set */
2225       if (stop != -1)
2226         maxsize = MIN (size, stop);
2227       else
2228         maxsize = size;
2229
2230       /* if we are at or past the end, EOS */
2231       if (G_UNLIKELY (offset >= maxsize))
2232         goto unexpected_length;
2233
2234       /* else we can clip to the end */
2235       if (G_UNLIKELY (offset + *length >= maxsize))
2236         *length = maxsize - offset;
2237
2238     }
2239   }
2240
2241   /* keep track of current duration.
2242    * segment is in bytes, we checked that above. */
2243   GST_OBJECT_LOCK (src);
2244   src->segment.duration = size;
2245   GST_OBJECT_UNLOCK (src);
2246
2247   return TRUE;
2248
2249   /* ERRORS */
2250 unexpected_length:
2251   {
2252     return FALSE;
2253   }
2254 }
2255
2256 /* must be called with LIVE_LOCK */
2257 static GstFlowReturn
2258 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2259     GstBuffer ** buf)
2260 {
2261   GstFlowReturn ret;
2262   GstBaseSrcClass *bclass;
2263   GstClockReturn status;
2264   GstBuffer *res_buf;
2265   GstBuffer *in_buf;
2266
2267   bclass = GST_BASE_SRC_GET_CLASS (src);
2268
2269 again:
2270   if (src->is_live) {
2271     if (G_UNLIKELY (!src->live_running)) {
2272       ret = gst_base_src_wait_playing (src);
2273       if (ret != GST_FLOW_OK)
2274         goto stopped;
2275     }
2276   }
2277
2278   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2279           && !GST_BASE_SRC_IS_STARTING (src)))
2280     goto not_started;
2281
2282   if (G_UNLIKELY (!bclass->create))
2283     goto no_function;
2284
2285   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2286     goto unexpected_length;
2287
2288   /* track position */
2289   GST_OBJECT_LOCK (src);
2290   if (src->segment.format == GST_FORMAT_BYTES)
2291     src->segment.position = offset;
2292   GST_OBJECT_UNLOCK (src);
2293
2294   /* normally we don't count buffers */
2295   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2296     if (src->num_buffers_left == 0)
2297       goto reached_num_buffers;
2298     else
2299       src->num_buffers_left--;
2300   }
2301
2302   /* don't enter the create function if a pending EOS event was set. For the
2303    * logic of the pending_eos, check the event function of this class. */
2304   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2305     goto eos;
2306
2307   GST_DEBUG_OBJECT (src,
2308       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2309       G_GINT64_FORMAT, offset, length, src->segment.time);
2310
2311   res_buf = in_buf = *buf;
2312
2313   ret = bclass->create (src, offset, length, &res_buf);
2314
2315   /* The create function could be unlocked because we have a pending EOS. It's
2316    * possible that we have a valid buffer from create that we need to
2317    * discard when the create function returned _OK. */
2318   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2319     if (ret == GST_FLOW_OK) {
2320       if (*buf == NULL)
2321         gst_buffer_unref (res_buf);
2322     }
2323     goto eos;
2324   }
2325
2326   if (G_UNLIKELY (ret != GST_FLOW_OK))
2327     goto not_ok;
2328
2329   /* fallback in case the create function didn't fill a provided buffer */
2330   if (in_buf != NULL && res_buf != in_buf) {
2331     GstMapInfo info;
2332     gsize copied_size;
2333
2334     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2335         "fill the provided buffer, copying");
2336
2337     gst_buffer_map (in_buf, &info, GST_MAP_WRITE);
2338     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2339     gst_buffer_unmap (in_buf, &info);
2340     gst_buffer_set_size (in_buf, copied_size);
2341
2342     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2343
2344     gst_buffer_unref (res_buf);
2345     res_buf = in_buf;
2346   }
2347
2348   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2349   if (offset == 0 && src->segment.time == 0
2350       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2351     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2352     res_buf = gst_buffer_make_writable (res_buf);
2353     GST_BUFFER_DTS (res_buf) = 0;
2354   }
2355
2356   /* now sync before pushing the buffer */
2357   status = gst_base_src_do_sync (src, res_buf);
2358
2359   /* waiting for the clock could have made us flushing */
2360   if (G_UNLIKELY (src->priv->flushing))
2361     goto flushing;
2362
2363   switch (status) {
2364     case GST_CLOCK_EARLY:
2365       /* the buffer is too late. We currently don't drop the buffer. */
2366       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2367       break;
2368     case GST_CLOCK_OK:
2369       /* buffer synchronised properly */
2370       GST_DEBUG_OBJECT (src, "buffer ok");
2371       break;
2372     case GST_CLOCK_UNSCHEDULED:
2373       /* this case is triggered when we were waiting for the clock and
2374        * it got unlocked because we did a state change. In any case, get rid of
2375        * the buffer. */
2376       if (*buf == NULL)
2377         gst_buffer_unref (res_buf);
2378
2379       if (!src->live_running) {
2380         /* We return FLUSHING when we are not running to stop the dataflow also
2381          * get rid of the produced buffer. */
2382         GST_DEBUG_OBJECT (src,
2383             "clock was unscheduled (%d), returning FLUSHING", status);
2384         ret = GST_FLOW_FLUSHING;
2385       } else {
2386         /* If we are running when this happens, we quickly switched between
2387          * pause and playing. We try to produce a new buffer */
2388         GST_DEBUG_OBJECT (src,
2389             "clock was unscheduled (%d), but we are running", status);
2390         goto again;
2391       }
2392       break;
2393     default:
2394       /* all other result values are unexpected and errors */
2395       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2396           (_("Internal clock error.")),
2397           ("clock returned unexpected return value %d", status));
2398       if (*buf == NULL)
2399         gst_buffer_unref (res_buf);
2400       ret = GST_FLOW_ERROR;
2401       break;
2402   }
2403   if (G_LIKELY (ret == GST_FLOW_OK))
2404     *buf = res_buf;
2405
2406   return ret;
2407
2408   /* ERROR */
2409 stopped:
2410   {
2411     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2412         gst_flow_get_name (ret));
2413     return ret;
2414   }
2415 not_ok:
2416   {
2417     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2418         gst_flow_get_name (ret));
2419     return ret;
2420   }
2421 not_started:
2422   {
2423     GST_DEBUG_OBJECT (src, "getrange but not started");
2424     return GST_FLOW_FLUSHING;
2425   }
2426 no_function:
2427   {
2428     GST_DEBUG_OBJECT (src, "no create function");
2429     return GST_FLOW_NOT_SUPPORTED;
2430   }
2431 unexpected_length:
2432   {
2433     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2434         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2435     return GST_FLOW_EOS;
2436   }
2437 reached_num_buffers:
2438   {
2439     GST_DEBUG_OBJECT (src, "sent all buffers");
2440     return GST_FLOW_EOS;
2441   }
2442 flushing:
2443   {
2444     GST_DEBUG_OBJECT (src, "we are flushing");
2445     if (*buf == NULL)
2446       gst_buffer_unref (res_buf);
2447     return GST_FLOW_FLUSHING;
2448   }
2449 eos:
2450   {
2451     GST_DEBUG_OBJECT (src, "we are EOS");
2452     return GST_FLOW_EOS;
2453   }
2454 }
2455
2456 static GstFlowReturn
2457 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2458     guint length, GstBuffer ** buf)
2459 {
2460   GstBaseSrc *src;
2461   GstFlowReturn res;
2462
2463   src = GST_BASE_SRC_CAST (parent);
2464
2465   GST_LIVE_LOCK (src);
2466   if (G_UNLIKELY (src->priv->flushing))
2467     goto flushing;
2468
2469   res = gst_base_src_get_range (src, offset, length, buf);
2470
2471 done:
2472   GST_LIVE_UNLOCK (src);
2473
2474   return res;
2475
2476   /* ERRORS */
2477 flushing:
2478   {
2479     GST_DEBUG_OBJECT (src, "we are flushing");
2480     res = GST_FLOW_FLUSHING;
2481     goto done;
2482   }
2483 }
2484
2485 static gboolean
2486 gst_base_src_is_random_access (GstBaseSrc * src)
2487 {
2488   /* we need to start the basesrc to check random access */
2489   if (!GST_BASE_SRC_IS_STARTED (src)) {
2490     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2491     if (G_LIKELY (gst_base_src_start (src))) {
2492       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2493         goto start_failed;
2494       gst_base_src_stop (src);
2495     }
2496   }
2497
2498   return src->random_access;
2499
2500   /* ERRORS */
2501 start_failed:
2502   {
2503     GST_DEBUG_OBJECT (src, "failed to start");
2504     return FALSE;
2505   }
2506 }
2507
2508 static void
2509 gst_base_src_loop (GstPad * pad)
2510 {
2511   GstBaseSrc *src;
2512   GstBuffer *buf = NULL;
2513   GstFlowReturn ret;
2514   gint64 position;
2515   gboolean eos;
2516   guint blocksize;
2517   GList *pending_events = NULL, *tmp;
2518
2519   eos = FALSE;
2520
2521   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2522
2523   gst_base_src_send_stream_start (src);
2524
2525   /* check if we need to renegotiate */
2526   if (gst_pad_check_reconfigure (pad)) {
2527     if (!gst_base_src_negotiate (src))
2528       goto not_negotiated;
2529   }
2530
2531   GST_LIVE_LOCK (src);
2532
2533   if (G_UNLIKELY (src->priv->flushing))
2534     goto flushing;
2535
2536   blocksize = src->blocksize;
2537
2538   /* if we operate in bytes, we can calculate an offset */
2539   if (src->segment.format == GST_FORMAT_BYTES) {
2540     position = src->segment.position;
2541     /* for negative rates, start with subtracting the blocksize */
2542     if (src->segment.rate < 0.0) {
2543       /* we cannot go below segment.start */
2544       if (position > src->segment.start + blocksize)
2545         position -= blocksize;
2546       else {
2547         /* last block, remainder up to segment.start */
2548         blocksize = position - src->segment.start;
2549         position = src->segment.start;
2550       }
2551     }
2552   } else
2553     position = -1;
2554
2555   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2556       GST_TIME_ARGS (position), blocksize);
2557
2558   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2559   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2560     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2561         gst_flow_get_name (ret));
2562     GST_LIVE_UNLOCK (src);
2563     goto pause;
2564   }
2565   /* this should not happen */
2566   if (G_UNLIKELY (buf == NULL))
2567     goto null_buffer;
2568
2569   /* push events to close/start our segment before we push the buffer. */
2570   if (G_UNLIKELY (src->priv->segment_pending)) {
2571     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2572     src->priv->segment_pending = FALSE;
2573   }
2574
2575   if (g_atomic_int_get (&src->priv->have_events)) {
2576     GST_OBJECT_LOCK (src);
2577     /* take the events */
2578     pending_events = src->priv->pending_events;
2579     src->priv->pending_events = NULL;
2580     g_atomic_int_set (&src->priv->have_events, FALSE);
2581     GST_OBJECT_UNLOCK (src);
2582   }
2583
2584   /* Push out pending events if any */
2585   if (G_UNLIKELY (pending_events != NULL)) {
2586     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2587       GstEvent *ev = (GstEvent *) tmp->data;
2588       gst_pad_push_event (pad, ev);
2589     }
2590     g_list_free (pending_events);
2591   }
2592
2593   /* figure out the new position */
2594   switch (src->segment.format) {
2595     case GST_FORMAT_BYTES:
2596     {
2597       guint bufsize = gst_buffer_get_size (buf);
2598
2599       /* we subtracted above for negative rates */
2600       if (src->segment.rate >= 0.0)
2601         position += bufsize;
2602       break;
2603     }
2604     case GST_FORMAT_TIME:
2605     {
2606       GstClockTime start, duration;
2607
2608       start = GST_BUFFER_TIMESTAMP (buf);
2609       duration = GST_BUFFER_DURATION (buf);
2610
2611       if (GST_CLOCK_TIME_IS_VALID (start))
2612         position = start;
2613       else
2614         position = src->segment.position;
2615
2616       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2617         if (src->segment.rate >= 0.0)
2618           position += duration;
2619         else if (position > duration)
2620           position -= duration;
2621         else
2622           position = 0;
2623       }
2624       break;
2625     }
2626     case GST_FORMAT_DEFAULT:
2627       if (src->segment.rate >= 0.0)
2628         position = GST_BUFFER_OFFSET_END (buf);
2629       else
2630         position = GST_BUFFER_OFFSET (buf);
2631       break;
2632     default:
2633       position = -1;
2634       break;
2635   }
2636   if (position != -1) {
2637     if (src->segment.rate >= 0.0) {
2638       /* positive rate, check if we reached the stop */
2639       if (src->segment.stop != -1) {
2640         if (position >= src->segment.stop) {
2641           eos = TRUE;
2642           position = src->segment.stop;
2643         }
2644       }
2645     } else {
2646       /* negative rate, check if we reached the start. start is always set to
2647        * something different from -1 */
2648       if (position <= src->segment.start) {
2649         eos = TRUE;
2650         position = src->segment.start;
2651       }
2652       /* when going reverse, all buffers are DISCONT */
2653       src->priv->discont = TRUE;
2654     }
2655     GST_OBJECT_LOCK (src);
2656     src->segment.position = position;
2657     GST_OBJECT_UNLOCK (src);
2658   }
2659
2660   if (G_UNLIKELY (src->priv->discont)) {
2661     GST_INFO_OBJECT (src, "marking pending DISCONT");
2662     buf = gst_buffer_make_writable (buf);
2663     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2664     src->priv->discont = FALSE;
2665   }
2666   GST_LIVE_UNLOCK (src);
2667
2668   ret = gst_pad_push (pad, buf);
2669   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2670     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2671         gst_flow_get_name (ret));
2672     goto pause;
2673   }
2674
2675   if (G_UNLIKELY (eos)) {
2676     GST_INFO_OBJECT (src, "pausing after end of segment");
2677     ret = GST_FLOW_EOS;
2678     goto pause;
2679   }
2680
2681 done:
2682   return;
2683
2684   /* special cases */
2685 not_negotiated:
2686   {
2687     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2688     ret = GST_FLOW_NOT_NEGOTIATED;
2689     goto pause;
2690   }
2691 flushing:
2692   {
2693     GST_DEBUG_OBJECT (src, "we are flushing");
2694     GST_LIVE_UNLOCK (src);
2695     ret = GST_FLOW_FLUSHING;
2696     goto pause;
2697   }
2698 pause:
2699   {
2700     const gchar *reason = gst_flow_get_name (ret);
2701     GstEvent *event;
2702
2703     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2704     src->running = FALSE;
2705     gst_pad_pause_task (pad);
2706     if (ret == GST_FLOW_EOS) {
2707       gboolean flag_segment;
2708       GstFormat format;
2709       gint64 position;
2710
2711       /* perform EOS logic */
2712       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2713       format = src->segment.format;
2714       position = src->segment.position;
2715
2716       if (flag_segment) {
2717         GstMessage *message;
2718
2719         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2720             format, position);
2721         gst_message_set_seqnum (message, src->priv->seqnum);
2722         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2723         event = gst_event_new_segment_done (format, position);
2724         gst_event_set_seqnum (event, src->priv->seqnum);
2725         gst_pad_push_event (pad, event);
2726       } else {
2727         event = gst_event_new_eos ();
2728         gst_event_set_seqnum (event, src->priv->seqnum);
2729         gst_pad_push_event (pad, event);
2730       }
2731     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2732       event = gst_event_new_eos ();
2733       gst_event_set_seqnum (event, src->priv->seqnum);
2734       /* for fatal errors we post an error message, post the error
2735        * first so the app knows about the error first.
2736        * Also don't do this for FLUSHING because it happens
2737        * due to flushing and posting an error message because of
2738        * that is the wrong thing to do, e.g. when we're doing
2739        * a flushing seek. */
2740       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2741           (_("Internal data flow error.")),
2742           ("streaming task paused, reason %s (%d)", reason, ret));
2743       gst_pad_push_event (pad, event);
2744     }
2745     goto done;
2746   }
2747 null_buffer:
2748   {
2749     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2750         (_("Internal data flow error.")), ("element returned NULL buffer"));
2751     GST_LIVE_UNLOCK (src);
2752     goto done;
2753   }
2754 }
2755
2756 static gboolean
2757 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2758     GstAllocator * allocator, GstAllocationParams * params)
2759 {
2760   GstAllocator *oldalloc;
2761   GstBufferPool *oldpool;
2762   GstBaseSrcPrivate *priv = basesrc->priv;
2763
2764   if (pool) {
2765     GST_DEBUG_OBJECT (basesrc, "activate pool");
2766     if (!gst_buffer_pool_set_active (pool, TRUE))
2767       goto activate_failed;
2768   }
2769
2770   GST_OBJECT_LOCK (basesrc);
2771   oldpool = priv->pool;
2772   priv->pool = pool;
2773
2774   oldalloc = priv->allocator;
2775   priv->allocator = allocator;
2776
2777   if (params)
2778     priv->params = *params;
2779   else
2780     gst_allocation_params_init (&priv->params);
2781   GST_OBJECT_UNLOCK (basesrc);
2782
2783   if (oldpool) {
2784     /* only deactivate if the pool is not the one we're using */
2785     if (oldpool != pool) {
2786       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2787       gst_buffer_pool_set_active (oldpool, FALSE);
2788     }
2789     gst_object_unref (oldpool);
2790   }
2791   if (oldalloc) {
2792     gst_object_unref (oldalloc);
2793   }
2794   return TRUE;
2795
2796   /* ERRORS */
2797 activate_failed:
2798   {
2799     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2800     return FALSE;
2801   }
2802 }
2803
2804 static gboolean
2805 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2806 {
2807   GstBaseSrcPrivate *priv = basesrc->priv;
2808   GstBufferPool *pool;
2809   gboolean res = TRUE;
2810
2811   GST_OBJECT_LOCK (basesrc);
2812   if ((pool = priv->pool))
2813     pool = gst_object_ref (pool);
2814   GST_OBJECT_UNLOCK (basesrc);
2815
2816   if (pool) {
2817     res = gst_buffer_pool_set_active (pool, active);
2818     gst_object_unref (pool);
2819   }
2820   return res;
2821 }
2822
2823
2824 static gboolean
2825 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2826 {
2827   GstCaps *outcaps;
2828   GstBufferPool *pool;
2829   guint size, min, max;
2830   GstAllocator *allocator;
2831   GstAllocationParams params;
2832   GstStructure *config;
2833   gboolean update_allocator;
2834
2835   gst_query_parse_allocation (query, &outcaps, NULL);
2836
2837   /* we got configuration from our peer or the decide_allocation method,
2838    * parse them */
2839   if (gst_query_get_n_allocation_params (query) > 0) {
2840     /* try the allocator */
2841     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2842     update_allocator = TRUE;
2843   } else {
2844     allocator = NULL;
2845     gst_allocation_params_init (&params);
2846     update_allocator = FALSE;
2847   }
2848
2849   if (gst_query_get_n_allocation_pools (query) > 0) {
2850     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2851
2852     if (pool == NULL) {
2853       /* no pool, we can make our own */
2854       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2855       pool = gst_buffer_pool_new ();
2856     }
2857   } else {
2858     pool = NULL;
2859     size = min = max = 0;
2860   }
2861
2862   /* now configure */
2863   if (pool) {
2864     config = gst_buffer_pool_get_config (pool);
2865     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2866     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2867     gst_buffer_pool_set_config (pool, config);
2868   }
2869
2870   if (update_allocator)
2871     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2872   else
2873     gst_query_add_allocation_param (query, allocator, &params);
2874   if (allocator)
2875     gst_object_unref (allocator);
2876
2877   if (pool) {
2878     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2879     gst_object_unref (pool);
2880   }
2881
2882   return TRUE;
2883 }
2884
2885 static gboolean
2886 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2887 {
2888   GstBaseSrcClass *bclass;
2889   gboolean result = TRUE;
2890   GstQuery *query;
2891   GstBufferPool *pool = NULL;
2892   GstAllocator *allocator = NULL;
2893   GstAllocationParams params;
2894
2895   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2896
2897   /* make query and let peer pad answer, we don't really care if it worked or
2898    * not, if it failed, the allocation query would contain defaults and the
2899    * subclass would then set better values if needed */
2900   query = gst_query_new_allocation (caps, TRUE);
2901   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2902     /* not a problem, just debug a little */
2903     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2904   }
2905
2906   g_assert (bclass->decide_allocation != NULL);
2907   result = bclass->decide_allocation (basesrc, query);
2908
2909   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2910       query);
2911
2912   if (!result)
2913     goto no_decide_allocation;
2914
2915   /* we got configuration from our peer or the decide_allocation method,
2916    * parse them */
2917   if (gst_query_get_n_allocation_params (query) > 0) {
2918     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2919   } else {
2920     allocator = NULL;
2921     gst_allocation_params_init (&params);
2922   }
2923
2924   if (gst_query_get_n_allocation_pools (query) > 0)
2925     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2926
2927   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2928
2929   gst_query_unref (query);
2930
2931   return result;
2932
2933   /* Errors */
2934 no_decide_allocation:
2935   {
2936     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
2937     gst_query_unref (query);
2938
2939     return result;
2940   }
2941 }
2942
2943 /* default negotiation code.
2944  *
2945  * Take intersection between src and sink pads, take first
2946  * caps and fixate.
2947  */
2948 static gboolean
2949 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2950 {
2951   GstCaps *thiscaps;
2952   GstCaps *caps = NULL;
2953   GstCaps *peercaps = NULL;
2954   gboolean result = FALSE;
2955
2956   /* first see what is possible on our source pad */
2957   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2958   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2959   /* nothing or anything is allowed, we're done */
2960   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2961     goto no_nego_needed;
2962
2963   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2964     goto no_caps;
2965
2966   /* get the peer caps */
2967   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2968   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2969   if (peercaps) {
2970     /* The result is already a subset of our caps */
2971     caps = peercaps;
2972     gst_caps_unref (thiscaps);
2973   } else {
2974     /* no peer, work with our own caps then */
2975     caps = thiscaps;
2976   }
2977   if (caps && !gst_caps_is_empty (caps)) {
2978     /* now fixate */
2979     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2980     if (gst_caps_is_any (caps)) {
2981       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
2982       /* hmm, still anything, so element can do anything and
2983        * nego is not needed */
2984       result = TRUE;
2985     } else {
2986       caps = gst_base_src_fixate (basesrc, caps);
2987       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2988       if (gst_caps_is_fixed (caps)) {
2989         /* yay, fixed caps, use those then, it's possible that the subclass does
2990          * not accept this caps after all and we have to fail. */
2991         result = gst_base_src_set_caps (basesrc, caps);
2992       }
2993     }
2994     gst_caps_unref (caps);
2995   } else {
2996     if (caps)
2997       gst_caps_unref (caps);
2998     GST_DEBUG_OBJECT (basesrc, "no common caps");
2999   }
3000   return result;
3001
3002 no_nego_needed:
3003   {
3004     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3005     if (thiscaps)
3006       gst_caps_unref (thiscaps);
3007     return TRUE;
3008   }
3009 no_caps:
3010   {
3011     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3012         ("No supported formats found"),
3013         ("This element did not produce valid caps"));
3014     if (thiscaps)
3015       gst_caps_unref (thiscaps);
3016     return TRUE;
3017   }
3018 }
3019
3020 static gboolean
3021 gst_base_src_negotiate (GstBaseSrc * basesrc)
3022 {
3023   GstBaseSrcClass *bclass;
3024   gboolean result;
3025
3026   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3027
3028   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3029
3030   if (G_LIKELY (bclass->negotiate))
3031     result = bclass->negotiate (basesrc);
3032   else
3033     result = TRUE;
3034
3035   if (G_LIKELY (result)) {
3036     GstCaps *caps;
3037
3038     caps = gst_pad_get_current_caps (basesrc->srcpad);
3039
3040     result = gst_base_src_prepare_allocation (basesrc, caps);
3041
3042     if (caps)
3043       gst_caps_unref (caps);
3044   }
3045   return result;
3046 }
3047
3048 static gboolean
3049 gst_base_src_start (GstBaseSrc * basesrc)
3050 {
3051   GstBaseSrcClass *bclass;
3052   gboolean result;
3053
3054   GST_LIVE_LOCK (basesrc);
3055   if (GST_BASE_SRC_IS_STARTING (basesrc))
3056     goto was_starting;
3057   if (GST_BASE_SRC_IS_STARTED (basesrc))
3058     goto was_started;
3059
3060   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3061   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3062   basesrc->num_buffers_left = basesrc->num_buffers;
3063   basesrc->running = FALSE;
3064   basesrc->priv->segment_pending = FALSE;
3065   GST_OBJECT_LOCK (basesrc);
3066   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3067   GST_OBJECT_UNLOCK (basesrc);
3068   GST_LIVE_UNLOCK (basesrc);
3069
3070   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3071   if (bclass->start)
3072     result = bclass->start (basesrc);
3073   else
3074     result = TRUE;
3075
3076   if (!result)
3077     goto could_not_start;
3078
3079   if (!gst_base_src_is_async (basesrc))
3080     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3081
3082   return result;
3083
3084   /* ERROR */
3085 was_starting:
3086   {
3087     GST_DEBUG_OBJECT (basesrc, "was starting");
3088     GST_LIVE_UNLOCK (basesrc);
3089     return TRUE;
3090   }
3091 was_started:
3092   {
3093     GST_DEBUG_OBJECT (basesrc, "was started");
3094     GST_LIVE_UNLOCK (basesrc);
3095     return TRUE;
3096   }
3097 could_not_start:
3098   {
3099     GST_DEBUG_OBJECT (basesrc, "could not start");
3100     /* subclass is supposed to post a message. We don't have to call _stop. */
3101     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3102     return FALSE;
3103   }
3104 }
3105
3106 /**
3107  * gst_base_src_start_complete:
3108  * @basesrc: base source instance
3109  * @ret: a #GstFlowReturn
3110  *
3111  * Complete an asynchronous start operation. When the subclass overrides the
3112  * start method, it should call gst_base_src_start_complete() when the start
3113  * operation completes either from the same thread or from an asynchronous
3114  * helper thread.
3115  */
3116 void
3117 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3118 {
3119   gboolean have_size;
3120   guint64 size;
3121   gboolean seekable;
3122   GstFormat format;
3123   GstPadMode mode;
3124   GstEvent *event;
3125
3126   if (ret != GST_FLOW_OK)
3127     goto error;
3128
3129   GST_DEBUG_OBJECT (basesrc, "starting source");
3130   format = basesrc->segment.format;
3131
3132   /* figure out the size */
3133   have_size = FALSE;
3134   size = -1;
3135   if (format == GST_FORMAT_BYTES) {
3136     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3137
3138     if (bclass->get_size) {
3139       if (!(have_size = bclass->get_size (basesrc, &size)))
3140         size = -1;
3141     }
3142     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3143     /* only update the size when operating in bytes, subclass is supposed
3144      * to set duration in the start method for other formats */
3145     GST_OBJECT_LOCK (basesrc);
3146     basesrc->segment.duration = size;
3147     GST_OBJECT_UNLOCK (basesrc);
3148   }
3149
3150   GST_DEBUG_OBJECT (basesrc,
3151       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3152       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3153       basesrc->segment.duration);
3154
3155   seekable = gst_base_src_seekable (basesrc);
3156   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3157
3158   /* update for random access flag */
3159   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3160
3161   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3162
3163   /* stop flushing now but for live sources, still block in the LIVE lock when
3164    * we are not yet PLAYING */
3165   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3166
3167   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3168
3169   GST_OBJECT_LOCK (basesrc->srcpad);
3170   mode = GST_PAD_MODE (basesrc->srcpad);
3171   GST_OBJECT_UNLOCK (basesrc->srcpad);
3172
3173   /* take the stream lock here, we only want to let the task run when we have
3174    * set the STARTED flag */
3175   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3176   if (mode == GST_PAD_MODE_PUSH) {
3177     /* do initial seek, which will start the task */
3178     GST_OBJECT_LOCK (basesrc);
3179     event = basesrc->pending_seek;
3180     basesrc->pending_seek = NULL;
3181     GST_OBJECT_UNLOCK (basesrc);
3182
3183     /* The perform seek code will start the task when finished. We don't have to
3184      * unlock the streaming thread because it is not running yet */
3185     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3186       goto seek_failed;
3187
3188     if (event)
3189       gst_event_unref (event);
3190   } else {
3191     /* if not random_access, we cannot operate in pull mode for now */
3192     if (G_UNLIKELY (!basesrc->random_access))
3193       goto no_get_range;
3194   }
3195
3196   GST_LIVE_LOCK (basesrc);
3197   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3198   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3199   basesrc->priv->start_result = ret;
3200   GST_LIVE_SIGNAL (basesrc);
3201   GST_LIVE_UNLOCK (basesrc);
3202
3203   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3204
3205   return;
3206
3207 seek_failed:
3208   {
3209     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3210     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3211     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3212     if (event)
3213       gst_event_unref (event);
3214     ret = GST_FLOW_ERROR;
3215     goto error;
3216   }
3217 no_get_range:
3218   {
3219     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3220     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3221     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3222     ret = GST_FLOW_ERROR;
3223     goto error;
3224   }
3225 error:
3226   {
3227     GST_LIVE_LOCK (basesrc);
3228     basesrc->priv->start_result = ret;
3229     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3230     GST_LIVE_SIGNAL (basesrc);
3231     GST_LIVE_UNLOCK (basesrc);
3232     return;
3233   }
3234 }
3235
3236 /**
3237  * gst_base_src_start_wait:
3238  * @basesrc: base source instance
3239  *
3240  * Wait until the start operation completes.
3241  *
3242  * Returns: a #GstFlowReturn.
3243  */
3244 GstFlowReturn
3245 gst_base_src_start_wait (GstBaseSrc * basesrc)
3246 {
3247   GstFlowReturn result;
3248
3249   GST_LIVE_LOCK (basesrc);
3250   if (G_UNLIKELY (basesrc->priv->flushing))
3251     goto flushing;
3252
3253   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3254     GST_LIVE_WAIT (basesrc);
3255     if (G_UNLIKELY (basesrc->priv->flushing))
3256       goto flushing;
3257   }
3258   result = basesrc->priv->start_result;
3259   GST_LIVE_UNLOCK (basesrc);
3260
3261   return result;
3262
3263   /* ERRORS */
3264 flushing:
3265   {
3266     GST_DEBUG_OBJECT (basesrc, "we are flushing");
3267     GST_LIVE_UNLOCK (basesrc);
3268     return GST_FLOW_FLUSHING;
3269   }
3270 }
3271
3272 static gboolean
3273 gst_base_src_stop (GstBaseSrc * basesrc)
3274 {
3275   GstBaseSrcClass *bclass;
3276   gboolean result = TRUE;
3277
3278   GST_DEBUG_OBJECT (basesrc, "stopping source");
3279
3280   /* flush all */
3281   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3282   /* stop the task */
3283   gst_pad_stop_task (basesrc->srcpad);
3284
3285   GST_LIVE_LOCK (basesrc);
3286   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3287     goto was_stopped;
3288
3289   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3290   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3291   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3292   GST_LIVE_SIGNAL (basesrc);
3293   GST_LIVE_UNLOCK (basesrc);
3294
3295   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3296   if (bclass->stop)
3297     result = bclass->stop (basesrc);
3298
3299   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3300
3301   return result;
3302
3303 was_stopped:
3304   {
3305     GST_DEBUG_OBJECT (basesrc, "was started");
3306     GST_LIVE_UNLOCK (basesrc);
3307     return TRUE;
3308   }
3309 }
3310
3311 /* start or stop flushing dataprocessing
3312  */
3313 static gboolean
3314 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3315     gboolean flushing, gboolean live_play, gboolean * playing)
3316 {
3317   GstBaseSrcClass *bclass;
3318
3319   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3320
3321   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3322
3323   if (flushing) {
3324     gst_base_src_activate_pool (basesrc, FALSE);
3325     /* unlock any subclasses, we need to do this before grabbing the
3326      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3327      * unlock to the params because of backwards compat (see seek handler)*/
3328     if (bclass->unlock)
3329       bclass->unlock (basesrc);
3330   }
3331
3332   /* the live lock is released when we are blocked, waiting for playing or
3333    * when we sync to the clock. */
3334   GST_LIVE_LOCK (basesrc);
3335   if (playing)
3336     *playing = basesrc->live_running;
3337   basesrc->priv->flushing = flushing;
3338   if (flushing) {
3339     /* if we are locked in the live lock, signal it to make it flush */
3340     basesrc->live_running = TRUE;
3341
3342     /* clear pending EOS if any */
3343     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3344
3345     /* step 1, now that we have the LIVE lock, clear our unlock request */
3346     if (bclass->unlock_stop)
3347       bclass->unlock_stop (basesrc);
3348
3349     /* step 2, unblock clock sync (if any) or any other blocking thing */
3350     if (basesrc->clock_id)
3351       gst_clock_id_unschedule (basesrc->clock_id);
3352   } else {
3353     /* signal the live source that it can start playing */
3354     basesrc->live_running = live_play;
3355
3356     gst_base_src_activate_pool (basesrc, TRUE);
3357
3358     /* Drop all delayed events */
3359     GST_OBJECT_LOCK (basesrc);
3360     if (basesrc->priv->pending_events) {
3361       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3362           NULL);
3363       g_list_free (basesrc->priv->pending_events);
3364       basesrc->priv->pending_events = NULL;
3365       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3366     }
3367     GST_OBJECT_UNLOCK (basesrc);
3368   }
3369   GST_LIVE_SIGNAL (basesrc);
3370   GST_LIVE_UNLOCK (basesrc);
3371
3372   return TRUE;
3373 }
3374
3375 /* the purpose of this function is to make sure that a live source blocks in the
3376  * LIVE lock or leaves the LIVE lock and continues playing. */
3377 static gboolean
3378 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3379 {
3380   GstBaseSrcClass *bclass;
3381
3382   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3383
3384   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3385   if (!live_play) {
3386     GST_DEBUG_OBJECT (basesrc, "unlock");
3387     if (bclass->unlock)
3388       bclass->unlock (basesrc);
3389   }
3390
3391   /* we are now able to grab the LIVE lock, when we get it, we can be
3392    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3393    * for the clock. */
3394   GST_LIVE_LOCK (basesrc);
3395   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3396
3397   /* unblock clock sync (if any) */
3398   if (basesrc->clock_id)
3399     gst_clock_id_unschedule (basesrc->clock_id);
3400
3401   /* configure what to do when we get to the LIVE lock. */
3402   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3403   basesrc->live_running = live_play;
3404
3405   if (live_play) {
3406     gboolean start;
3407
3408     /* clear our unlock request when going to PLAYING */
3409     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3410     if (bclass->unlock_stop)
3411       bclass->unlock_stop (basesrc);
3412
3413     /* for live sources we restart the timestamp correction */
3414     basesrc->priv->latency = -1;
3415     /* have to restart the task in case it stopped because of the unlock when
3416      * we went to PAUSED. Only do this if we operating in push mode. */
3417     GST_OBJECT_LOCK (basesrc->srcpad);
3418     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3419     GST_OBJECT_UNLOCK (basesrc->srcpad);
3420     if (start)
3421       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3422           basesrc->srcpad, NULL);
3423     GST_DEBUG_OBJECT (basesrc, "signal");
3424     GST_LIVE_SIGNAL (basesrc);
3425   }
3426   GST_LIVE_UNLOCK (basesrc);
3427
3428   return TRUE;
3429 }
3430
3431 static gboolean
3432 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3433 {
3434   GstBaseSrc *basesrc;
3435
3436   basesrc = GST_BASE_SRC (parent);
3437
3438   /* prepare subclass first */
3439   if (active) {
3440     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3441
3442     if (G_UNLIKELY (!basesrc->can_activate_push))
3443       goto no_push_activation;
3444
3445     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3446       goto error_start;
3447   } else {
3448     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3449     /* now we can stop the source */
3450     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3451       goto error_stop;
3452   }
3453   return TRUE;
3454
3455   /* ERRORS */
3456 no_push_activation:
3457   {
3458     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3459     return FALSE;
3460   }
3461 error_start:
3462   {
3463     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3464     return FALSE;
3465   }
3466 error_stop:
3467   {
3468     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3469     return FALSE;
3470   }
3471 }
3472
3473 static gboolean
3474 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3475 {
3476   GstBaseSrc *basesrc;
3477
3478   basesrc = GST_BASE_SRC (parent);
3479
3480   /* prepare subclass first */
3481   if (active) {
3482     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3483     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3484       goto error_start;
3485   } else {
3486     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3487     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3488       goto error_stop;
3489   }
3490   return TRUE;
3491
3492   /* ERRORS */
3493 error_start:
3494   {
3495     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3496     return FALSE;
3497   }
3498 error_stop:
3499   {
3500     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3501     return FALSE;
3502   }
3503 }
3504
3505 static gboolean
3506 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3507     GstPadMode mode, gboolean active)
3508 {
3509   gboolean res;
3510
3511   switch (mode) {
3512     case GST_PAD_MODE_PULL:
3513       res = gst_base_src_activate_pull (pad, parent, active);
3514       break;
3515     case GST_PAD_MODE_PUSH:
3516       res = gst_base_src_activate_push (pad, parent, active);
3517       break;
3518     default:
3519       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3520       res = FALSE;
3521       break;
3522   }
3523   return res;
3524 }
3525
3526
3527 static GstStateChangeReturn
3528 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3529 {
3530   GstBaseSrc *basesrc;
3531   GstStateChangeReturn result;
3532   gboolean no_preroll = FALSE;
3533
3534   basesrc = GST_BASE_SRC (element);
3535
3536   switch (transition) {
3537     case GST_STATE_CHANGE_NULL_TO_READY:
3538       break;
3539     case GST_STATE_CHANGE_READY_TO_PAUSED:
3540       basesrc->priv->stream_start_pending = TRUE;
3541       no_preroll = gst_base_src_is_live (basesrc);
3542       break;
3543     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3544       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3545       if (gst_base_src_is_live (basesrc)) {
3546         /* now we can start playback */
3547         gst_base_src_set_playing (basesrc, TRUE);
3548       }
3549       break;
3550     default:
3551       break;
3552   }
3553
3554   if ((result =
3555           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3556               transition)) == GST_STATE_CHANGE_FAILURE)
3557     goto failure;
3558
3559   switch (transition) {
3560     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3561       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3562       if (gst_base_src_is_live (basesrc)) {
3563         /* make sure we block in the live lock in PAUSED */
3564         gst_base_src_set_playing (basesrc, FALSE);
3565         no_preroll = TRUE;
3566       }
3567       break;
3568     case GST_STATE_CHANGE_PAUSED_TO_READY:
3569     {
3570       /* we don't need to unblock anything here, the pad deactivation code
3571        * already did this */
3572       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3573       gst_event_replace (&basesrc->pending_seek, NULL);
3574       basesrc->priv->stream_start_pending = FALSE;
3575       break;
3576     }
3577     case GST_STATE_CHANGE_READY_TO_NULL:
3578       break;
3579     default:
3580       break;
3581   }
3582
3583   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3584     result = GST_STATE_CHANGE_NO_PREROLL;
3585
3586   return result;
3587
3588   /* ERRORS */
3589 failure:
3590   {
3591     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3592     return result;
3593   }
3594 }