56fd861503d512daa3ec6e934ffe01876d75a025
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * An application may send an EOS event to a source element to make it
142  * perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163 #include <gst/glib-compat-private.h>
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gst-i18n-lib.h>
168
169 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
170 #define GST_CAT_DEFAULT gst_base_src_debug
171
172 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
173 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
174 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
177 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
178 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
179 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
180 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_TYPEFIND        FALSE
192 #define DEFAULT_DO_TIMESTAMP    FALSE
193
194 enum
195 {
196   PROP_0,
197   PROP_BLOCKSIZE,
198   PROP_NUM_BUFFERS,
199   PROP_TYPEFIND,
200   PROP_DO_TIMESTAMP
201 };
202
203 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
204    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
205
206 struct _GstBaseSrcPrivate
207 {
208   gboolean discont;
209   gboolean flushing;
210
211   GstFlowReturn start_result;
212   gboolean async;
213
214   /* if a stream-start event should be sent */
215   gboolean stream_start_pending;
216
217   /* if segment should be sent */
218   gboolean segment_pending;
219
220   /* if EOS is pending (atomic) */
221   gint pending_eos;
222
223   /* startup latency is the time it takes between going to PLAYING and producing
224    * the first BUFFER with running_time 0. This value is included in the latency
225    * reporting. */
226   GstClockTime latency;
227   /* timestamp offset, this is the offset add to the values of gst_times for
228    * pseudo live sources */
229   GstClockTimeDiff ts_offset;
230
231   gboolean do_timestamp;
232   volatile gint dynamic_size;
233
234   /* stream sequence number */
235   guint32 seqnum;
236
237   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
238    * pushed in the data stream */
239   GList *pending_events;
240   volatile gint have_events;
241
242   /* QoS *//* with LOCK */
243   gboolean qos_enabled;
244   gdouble proportion;
245   GstClockTime earliest_time;
246
247   GstBufferPool *pool;
248   GstAllocator *allocator;
249   GstAllocationParams params;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_class_init (GstBaseSrcClass * klass);
255 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
256 static void gst_base_src_finalize (GObject * object);
257
258
259 GType
260 gst_base_src_get_type (void)
261 {
262   static volatile gsize base_src_type = 0;
263
264   if (g_once_init_enter (&base_src_type)) {
265     GType _type;
266     static const GTypeInfo base_src_info = {
267       sizeof (GstBaseSrcClass),
268       NULL,
269       NULL,
270       (GClassInitFunc) gst_base_src_class_init,
271       NULL,
272       NULL,
273       sizeof (GstBaseSrc),
274       0,
275       (GInstanceInitFunc) gst_base_src_init,
276     };
277
278     _type = g_type_register_static (GST_TYPE_ELEMENT,
279         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
280     g_once_init_leave (&base_src_type, _type);
281   }
282   return base_src_type;
283 }
284
285 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
286     GstCaps * filter);
287 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
288 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
289
290 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
291 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
292     GstPadMode mode, gboolean active);
293 static void gst_base_src_set_property (GObject * object, guint prop_id,
294     const GValue * value, GParamSpec * pspec);
295 static void gst_base_src_get_property (GObject * object, guint prop_id,
296     GValue * value, GParamSpec * pspec);
297 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
298     GstEvent * event);
299 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
300 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
301
302 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
303     GstQuery * query);
304
305 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
306     gboolean active);
307 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
308 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
309     GstSegment * segment);
310 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
311 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
312     GstEvent * event, GstSegment * segment);
313 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
314     guint64 offset, guint size, GstBuffer ** buf);
315 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
316     guint64 offset, guint size, GstBuffer ** buf);
317 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
318     GstQuery * query);
319
320 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
321     gboolean flushing, gboolean live_play, gboolean * playing);
322
323 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
324 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
325
326 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
327     GstStateChange transition);
328
329 static void gst_base_src_loop (GstPad * pad);
330 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
331     guint64 offset, guint length, GstBuffer ** buf);
332 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
333     guint length, GstBuffer ** buf);
334 static gboolean gst_base_src_seekable (GstBaseSrc * src);
335 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
337     guint * length);
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
349
350   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
351
352   parent_class = g_type_class_peek_parent (klass);
353
354   gobject_class->finalize = gst_base_src_finalize;
355   gobject_class->set_property = gst_base_src_set_property;
356   gobject_class->get_property = gst_base_src_get_property;
357
358   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
359       g_param_spec_uint ("blocksize", "Block size",
360           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
361           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
363       g_param_spec_int ("num-buffers", "num-buffers",
364           "Number of buffers to output before sending EOS (-1 = unlimited)",
365           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
366           G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
368       g_param_spec_boolean ("typefind", "Typefind",
369           "Run typefind before negotiating", DEFAULT_TYPEFIND,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
372       g_param_spec_boolean ("do-timestamp", "Do timestamp",
373           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   gstelement_class->change_state =
377       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
378   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
379
380   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
381   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
382   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
383   klass->prepare_seek_segment =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
385   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
386   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
387   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
388   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
389   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
390   klass->decide_allocation =
391       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
392
393   /* Registering debug symbols for function pointers */
394   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
395   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
396   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
397   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
398   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
399 }
400
401 static void
402 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
403 {
404   GstPad *pad;
405   GstPadTemplate *pad_template;
406
407   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
408
409   basesrc->is_live = FALSE;
410   g_mutex_init (&basesrc->live_lock);
411   g_cond_init (&basesrc->live_cond);
412   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
413   basesrc->num_buffers_left = -1;
414
415   basesrc->can_activate_push = TRUE;
416
417   pad_template =
418       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
419   g_return_if_fail (pad_template != NULL);
420
421   GST_DEBUG_OBJECT (basesrc, "creating src pad");
422   pad = gst_pad_new_from_template (pad_template, "src");
423
424   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
425   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
426   gst_pad_set_event_function (pad, gst_base_src_event);
427   gst_pad_set_query_function (pad, gst_base_src_query);
428   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
429
430   /* hold pointer to pad */
431   basesrc->srcpad = pad;
432   GST_DEBUG_OBJECT (basesrc, "adding src pad");
433   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
434
435   basesrc->blocksize = DEFAULT_BLOCKSIZE;
436   basesrc->clock_id = NULL;
437   /* we operate in BYTES by default */
438   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
439   basesrc->typefind = DEFAULT_TYPEFIND;
440   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
441   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
442
443   basesrc->priv->start_result = GST_FLOW_FLUSHING;
444   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
445   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
446   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
447
448   GST_DEBUG_OBJECT (basesrc, "init done");
449 }
450
451 static void
452 gst_base_src_finalize (GObject * object)
453 {
454   GstBaseSrc *basesrc;
455   GstEvent **event_p;
456
457   basesrc = GST_BASE_SRC (object);
458
459   g_mutex_clear (&basesrc->live_lock);
460   g_cond_clear (&basesrc->live_cond);
461
462   event_p = &basesrc->pending_seek;
463   gst_event_replace (event_p, NULL);
464
465   if (basesrc->priv->pending_events) {
466     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
467         NULL);
468     g_list_free (basesrc->priv->pending_events);
469   }
470
471   G_OBJECT_CLASS (parent_class)->finalize (object);
472 }
473
474 /**
475  * gst_base_src_wait_playing:
476  * @src: the src
477  *
478  * If the #GstBaseSrcClass.create() method performs its own synchronisation
479  * against the clock it must unblock when going from PLAYING to the PAUSED state
480  * and call this method before continuing to produce the remaining data.
481  *
482  * This function will block until a state change to PLAYING happens (in which
483  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
484  * to a state change to READY or a FLUSH event (in which case this function
485  * returns #GST_FLOW_FLUSHING).
486  *
487  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
488  * continue. Any other return value should be returned from the create vmethod.
489  */
490 GstFlowReturn
491 gst_base_src_wait_playing (GstBaseSrc * src)
492 {
493   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
494
495   do {
496     /* block until the state changes, or we get a flush, or something */
497     GST_DEBUG_OBJECT (src, "live source waiting for running state");
498     GST_LIVE_WAIT (src);
499     GST_DEBUG_OBJECT (src, "live source unlocked");
500     if (src->priv->flushing)
501       goto flushing;
502   } while (G_UNLIKELY (!src->live_running));
503
504   return GST_FLOW_OK;
505
506   /* ERRORS */
507 flushing:
508   {
509     GST_DEBUG_OBJECT (src, "we are flushing");
510     return GST_FLOW_FLUSHING;
511   }
512 }
513
514 /**
515  * gst_base_src_set_live:
516  * @src: base source instance
517  * @live: new live-mode
518  *
519  * If the element listens to a live source, @live should
520  * be set to %TRUE.
521  *
522  * A live source will not produce data in the PAUSED state and
523  * will therefore not be able to participate in the PREROLL phase
524  * of a pipeline. To signal this fact to the application and the
525  * pipeline, the state change return value of the live source will
526  * be GST_STATE_CHANGE_NO_PREROLL.
527  */
528 void
529 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
530 {
531   g_return_if_fail (GST_IS_BASE_SRC (src));
532
533   GST_OBJECT_LOCK (src);
534   src->is_live = live;
535   GST_OBJECT_UNLOCK (src);
536 }
537
538 /**
539  * gst_base_src_is_live:
540  * @src: base source instance
541  *
542  * Check if an element is in live mode.
543  *
544  * Returns: %TRUE if element is in live mode.
545  */
546 gboolean
547 gst_base_src_is_live (GstBaseSrc * src)
548 {
549   gboolean result;
550
551   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
552
553   GST_OBJECT_LOCK (src);
554   result = src->is_live;
555   GST_OBJECT_UNLOCK (src);
556
557   return result;
558 }
559
560 /**
561  * gst_base_src_set_format:
562  * @src: base source instance
563  * @format: the format to use
564  *
565  * Sets the default format of the source. This will be the format used
566  * for sending NEW_SEGMENT events and for performing seeks.
567  *
568  * If a format of GST_FORMAT_BYTES is set, the element will be able to
569  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
570  *
571  * This function must only be called in states < %GST_STATE_PAUSED.
572  */
573 void
574 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
575 {
576   g_return_if_fail (GST_IS_BASE_SRC (src));
577   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
578
579   GST_OBJECT_LOCK (src);
580   gst_segment_init (&src->segment, format);
581   GST_OBJECT_UNLOCK (src);
582 }
583
584 /**
585  * gst_base_src_set_dynamic_size:
586  * @src: base source instance
587  * @dynamic: new dynamic size mode
588  *
589  * If not @dynamic, size is only updated when needed, such as when trying to
590  * read past current tracked size.  Otherwise, size is checked for upon each
591  * read.
592  */
593 void
594 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
595 {
596   g_return_if_fail (GST_IS_BASE_SRC (src));
597
598   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
599 }
600
601 /**
602  * gst_base_src_set_async:
603  * @src: base source instance
604  * @async: new async mode
605  *
606  * Configure async behaviour in @src, no state change will block. The open,
607  * close, start, stop, play and pause virtual methods will be executed in a
608  * different thread and are thus allowed to perform blocking operations. Any
609  * blocking operation should be unblocked with the unlock vmethod.
610  */
611 void
612 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
613 {
614   g_return_if_fail (GST_IS_BASE_SRC (src));
615
616   GST_OBJECT_LOCK (src);
617   src->priv->async = async;
618   GST_OBJECT_UNLOCK (src);
619 }
620
621 /**
622  * gst_base_src_is_async:
623  * @src: base source instance
624  *
625  * Get the current async behaviour of @src. See also gst_base_src_set_async().
626  *
627  * Returns: %TRUE if @src is operating in async mode.
628  */
629 gboolean
630 gst_base_src_is_async (GstBaseSrc * src)
631 {
632   gboolean res;
633
634   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
635
636   GST_OBJECT_LOCK (src);
637   res = src->priv->async;
638   GST_OBJECT_UNLOCK (src);
639
640   return res;
641 }
642
643
644 /**
645  * gst_base_src_query_latency:
646  * @src: the source
647  * @live: (out) (allow-none): if the source is live
648  * @min_latency: (out) (allow-none): the min latency of the source
649  * @max_latency: (out) (allow-none): the max latency of the source
650  *
651  * Query the source for the latency parameters. @live will be TRUE when @src is
652  * configured as a live source. @min_latency will be set to the difference
653  * between the running time and the timestamp of the first buffer.
654  * @max_latency is always the undefined value of -1.
655  *
656  * This function is mostly used by subclasses.
657  *
658  * Returns: TRUE if the query succeeded.
659  */
660 gboolean
661 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
662     GstClockTime * min_latency, GstClockTime * max_latency)
663 {
664   GstClockTime min;
665
666   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
667
668   GST_OBJECT_LOCK (src);
669   if (live)
670     *live = src->is_live;
671
672   /* if we have a startup latency, report this one, else report 0. Subclasses
673    * are supposed to override the query function if they want something
674    * else. */
675   if (src->priv->latency != -1)
676     min = src->priv->latency;
677   else
678     min = 0;
679
680   if (min_latency)
681     *min_latency = min;
682   if (max_latency)
683     *max_latency = -1;
684
685   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
686       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
687       GST_TIME_ARGS (-1));
688   GST_OBJECT_UNLOCK (src);
689
690   return TRUE;
691 }
692
693 /**
694  * gst_base_src_set_blocksize:
695  * @src: the source
696  * @blocksize: the new blocksize in bytes
697  *
698  * Set the number of bytes that @src will push out with each buffer. When
699  * @blocksize is set to -1, a default length will be used.
700  */
701 void
702 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
703 {
704   g_return_if_fail (GST_IS_BASE_SRC (src));
705
706   GST_OBJECT_LOCK (src);
707   src->blocksize = blocksize;
708   GST_OBJECT_UNLOCK (src);
709 }
710
711 /**
712  * gst_base_src_get_blocksize:
713  * @src: the source
714  *
715  * Get the number of bytes that @src will push out with each buffer.
716  *
717  * Returns: the number of bytes pushed with each buffer.
718  */
719 guint
720 gst_base_src_get_blocksize (GstBaseSrc * src)
721 {
722   gint res;
723
724   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
725
726   GST_OBJECT_LOCK (src);
727   res = src->blocksize;
728   GST_OBJECT_UNLOCK (src);
729
730   return res;
731 }
732
733
734 /**
735  * gst_base_src_set_do_timestamp:
736  * @src: the source
737  * @timestamp: enable or disable timestamping
738  *
739  * Configure @src to automatically timestamp outgoing buffers based on the
740  * current running_time of the pipeline. This property is mostly useful for live
741  * sources.
742  */
743 void
744 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
745 {
746   g_return_if_fail (GST_IS_BASE_SRC (src));
747
748   GST_OBJECT_LOCK (src);
749   src->priv->do_timestamp = timestamp;
750   GST_OBJECT_UNLOCK (src);
751 }
752
753 /**
754  * gst_base_src_get_do_timestamp:
755  * @src: the source
756  *
757  * Query if @src timestamps outgoing buffers based on the current running_time.
758  *
759  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
760  */
761 gboolean
762 gst_base_src_get_do_timestamp (GstBaseSrc * src)
763 {
764   gboolean res;
765
766   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
767
768   GST_OBJECT_LOCK (src);
769   res = src->priv->do_timestamp;
770   GST_OBJECT_UNLOCK (src);
771
772   return res;
773 }
774
775 /**
776  * gst_base_src_new_seamless_segment:
777  * @src: The source
778  * @start: The new start value for the segment
779  * @stop: Stop value for the new segment
780  * @position: The position value for the new segent
781  *
782  * Prepare a new seamless segment for emission downstream. This function must
783  * only be called by derived sub-classes, and only from the create() function,
784  * as the stream-lock needs to be held.
785  *
786  * The format for the new segment will be the current format of the source, as
787  * configured with gst_base_src_set_format()
788  *
789  * Returns: %TRUE if preparation of the seamless segment succeeded.
790  */
791 gboolean
792 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
793     gint64 position)
794 {
795   gboolean res = TRUE;
796
797   GST_DEBUG_OBJECT (src,
798       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
799       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
800       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
801
802   GST_OBJECT_LOCK (src);
803
804   src->segment.base = gst_segment_to_running_time (&src->segment,
805       src->segment.format, src->segment.position);
806   src->segment.start = start;
807   src->segment.stop = stop;
808   src->segment.position = position;
809
810   /* forward, we send data from position to stop */
811   src->priv->segment_pending = TRUE;
812   GST_OBJECT_UNLOCK (src);
813
814   src->priv->discont = TRUE;
815   src->running = TRUE;
816
817   return res;
818 }
819
820 static gboolean
821 gst_base_src_send_stream_start (GstBaseSrc * src)
822 {
823   gboolean ret = TRUE;
824
825   if (src->priv->stream_start_pending) {
826     gchar *stream_id;
827
828     stream_id =
829         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
830
831     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
832     ret =
833         gst_pad_push_event (src->srcpad,
834         gst_event_new_stream_start (stream_id));
835     src->priv->stream_start_pending = FALSE;
836     g_free (stream_id);
837   }
838
839   return ret;
840 }
841
842 /**
843  * gst_base_src_set_caps:
844  * @src: a #GstBaseSrc
845  * @caps: a #GstCaps
846  *
847  * Set new caps on the basesrc source pad.
848  *
849  * Returns: %TRUE if the caps could be set
850  */
851 gboolean
852 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
853 {
854   GstBaseSrcClass *bclass;
855   gboolean res = TRUE;
856
857   bclass = GST_BASE_SRC_GET_CLASS (src);
858
859   gst_base_src_send_stream_start (src);
860
861   if (bclass->set_caps)
862     res = bclass->set_caps (src, caps);
863   if (res)
864     res = gst_pad_set_caps (src->srcpad, caps);
865
866   return res;
867 }
868
869 static GstCaps *
870 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
871 {
872   GstCaps *caps = NULL;
873   GstPadTemplate *pad_template;
874   GstBaseSrcClass *bclass;
875
876   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
877
878   pad_template =
879       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
880
881   if (pad_template != NULL) {
882     caps = gst_pad_template_get_caps (pad_template);
883
884     if (filter) {
885       GstCaps *intersection;
886
887       intersection =
888           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
889       gst_caps_unref (caps);
890       caps = intersection;
891     }
892   }
893   return caps;
894 }
895
896 static GstCaps *
897 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
898 {
899   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
900   return gst_caps_fixate (caps);
901 }
902
903 static GstCaps *
904 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
905 {
906   GstBaseSrcClass *bclass;
907
908   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
909
910   if (bclass->fixate)
911     caps = bclass->fixate (bsrc, caps);
912
913   return caps;
914 }
915
916 static gboolean
917 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
918 {
919   gboolean res;
920
921   switch (GST_QUERY_TYPE (query)) {
922     case GST_QUERY_POSITION:
923     {
924       GstFormat format;
925
926       gst_query_parse_position (query, &format, NULL);
927
928       GST_DEBUG_OBJECT (src, "position query in format %s",
929           gst_format_get_name (format));
930
931       switch (format) {
932         case GST_FORMAT_PERCENT:
933         {
934           gint64 percent;
935           gint64 position;
936           gint64 duration;
937
938           GST_OBJECT_LOCK (src);
939           position = src->segment.position;
940           duration = src->segment.duration;
941           GST_OBJECT_UNLOCK (src);
942
943           if (position != -1 && duration != -1) {
944             if (position < duration)
945               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
946                   duration);
947             else
948               percent = GST_FORMAT_PERCENT_MAX;
949           } else
950             percent = -1;
951
952           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
953           res = TRUE;
954           break;
955         }
956         default:
957         {
958           gint64 position;
959           GstFormat seg_format;
960
961           GST_OBJECT_LOCK (src);
962           position =
963               gst_segment_to_stream_time (&src->segment, src->segment.format,
964               src->segment.position);
965           seg_format = src->segment.format;
966           GST_OBJECT_UNLOCK (src);
967
968           if (position != -1) {
969             /* convert to requested format */
970             res =
971                 gst_pad_query_convert (src->srcpad, seg_format,
972                 position, format, &position);
973           } else
974             res = TRUE;
975
976           gst_query_set_position (query, format, position);
977           break;
978         }
979       }
980       break;
981     }
982     case GST_QUERY_DURATION:
983     {
984       GstFormat format;
985
986       gst_query_parse_duration (query, &format, NULL);
987
988       GST_DEBUG_OBJECT (src, "duration query in format %s",
989           gst_format_get_name (format));
990
991       switch (format) {
992         case GST_FORMAT_PERCENT:
993           gst_query_set_duration (query, GST_FORMAT_PERCENT,
994               GST_FORMAT_PERCENT_MAX);
995           res = TRUE;
996           break;
997         default:
998         {
999           gint64 duration;
1000           GstFormat seg_format;
1001           guint length = 0;
1002
1003           /* may have to refresh duration */
1004           if (g_atomic_int_get (&src->priv->dynamic_size))
1005             gst_base_src_update_length (src, 0, &length);
1006
1007           /* this is the duration as configured by the subclass. */
1008           GST_OBJECT_LOCK (src);
1009           duration = src->segment.duration;
1010           seg_format = src->segment.format;
1011           GST_OBJECT_UNLOCK (src);
1012
1013           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1014               duration, gst_format_get_name (seg_format));
1015
1016           if (duration != -1) {
1017             /* convert to requested format, if this fails, we have a duration
1018              * but we cannot answer the query, we must return FALSE. */
1019             res =
1020                 gst_pad_query_convert (src->srcpad, seg_format,
1021                 duration, format, &duration);
1022           } else {
1023             /* The subclass did not configure a duration, we assume that the
1024              * media has an unknown duration then and we return TRUE to report
1025              * this. Note that this is not the same as returning FALSE, which
1026              * means that we cannot report the duration at all. */
1027             res = TRUE;
1028           }
1029           gst_query_set_duration (query, format, duration);
1030           break;
1031         }
1032       }
1033       break;
1034     }
1035
1036     case GST_QUERY_SEEKING:
1037     {
1038       GstFormat format, seg_format;
1039       gint64 duration;
1040
1041       GST_OBJECT_LOCK (src);
1042       duration = src->segment.duration;
1043       seg_format = src->segment.format;
1044       GST_OBJECT_UNLOCK (src);
1045
1046       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1047       if (format == seg_format) {
1048         gst_query_set_seeking (query, seg_format,
1049             gst_base_src_seekable (src), 0, duration);
1050         res = TRUE;
1051       } else {
1052         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1053         /* Don't reply to the query to make up for demuxers which don't
1054          * handle the SEEKING query yet. Players like Totem will fall back
1055          * to the duration when the SEEKING query isn't answered. */
1056         res = FALSE;
1057       }
1058       break;
1059     }
1060     case GST_QUERY_SEGMENT:
1061     {
1062       gint64 start, stop;
1063
1064       GST_OBJECT_LOCK (src);
1065       /* no end segment configured, current duration then */
1066       if ((stop = src->segment.stop) == -1)
1067         stop = src->segment.duration;
1068       start = src->segment.start;
1069
1070       /* adjust to stream time */
1071       if (src->segment.time != -1) {
1072         start -= src->segment.time;
1073         if (stop != -1)
1074           stop -= src->segment.time;
1075       }
1076
1077       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1078           start, stop);
1079       GST_OBJECT_UNLOCK (src);
1080       res = TRUE;
1081       break;
1082     }
1083
1084     case GST_QUERY_FORMATS:
1085     {
1086       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1087           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1088       res = TRUE;
1089       break;
1090     }
1091     case GST_QUERY_CONVERT:
1092     {
1093       GstFormat src_fmt, dest_fmt;
1094       gint64 src_val, dest_val;
1095
1096       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1097
1098       /* we can only convert between equal formats... */
1099       if (src_fmt == dest_fmt) {
1100         dest_val = src_val;
1101         res = TRUE;
1102       } else
1103         res = FALSE;
1104
1105       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1106       break;
1107     }
1108     case GST_QUERY_LATENCY:
1109     {
1110       GstClockTime min, max;
1111       gboolean live;
1112
1113       /* Subclasses should override and implement something useful */
1114       res = gst_base_src_query_latency (src, &live, &min, &max);
1115
1116       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1117           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1118           GST_TIME_ARGS (max));
1119
1120       gst_query_set_latency (query, live, min, max);
1121       break;
1122     }
1123     case GST_QUERY_JITTER:
1124     case GST_QUERY_RATE:
1125       res = FALSE;
1126       break;
1127     case GST_QUERY_BUFFERING:
1128     {
1129       GstFormat format, seg_format;
1130       gint64 start, stop, estimated;
1131
1132       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1133
1134       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1135           gst_format_get_name (format));
1136
1137       GST_OBJECT_LOCK (src);
1138       if (src->random_access) {
1139         estimated = 0;
1140         start = 0;
1141         if (format == GST_FORMAT_PERCENT)
1142           stop = GST_FORMAT_PERCENT_MAX;
1143         else
1144           stop = src->segment.duration;
1145       } else {
1146         estimated = -1;
1147         start = -1;
1148         stop = -1;
1149       }
1150       seg_format = src->segment.format;
1151       GST_OBJECT_UNLOCK (src);
1152
1153       /* convert to required format. When the conversion fails, we can't answer
1154        * the query. When the value is unknown, we can don't perform conversion
1155        * but report TRUE. */
1156       if (format != GST_FORMAT_PERCENT && stop != -1) {
1157         res = gst_pad_query_convert (src->srcpad, seg_format,
1158             stop, format, &stop);
1159       } else {
1160         res = TRUE;
1161       }
1162       if (res && format != GST_FORMAT_PERCENT && start != -1)
1163         res = gst_pad_query_convert (src->srcpad, seg_format,
1164             start, format, &start);
1165
1166       gst_query_set_buffering_range (query, format, start, stop, estimated);
1167       break;
1168     }
1169     case GST_QUERY_SCHEDULING:
1170     {
1171       gboolean random_access;
1172
1173       random_access = gst_base_src_is_random_access (src);
1174
1175       /* we can operate in getrange mode if the native format is bytes
1176        * and we are seekable, this condition is set in the random_access
1177        * flag and is set in the _start() method. */
1178       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1179       if (random_access)
1180         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1181       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1182
1183       res = TRUE;
1184       break;
1185     }
1186     case GST_QUERY_CAPS:
1187     {
1188       GstBaseSrcClass *bclass;
1189       GstCaps *caps, *filter;
1190
1191       bclass = GST_BASE_SRC_GET_CLASS (src);
1192       if (bclass->get_caps) {
1193         gst_query_parse_caps (query, &filter);
1194         if ((caps = bclass->get_caps (src, filter))) {
1195           gst_query_set_caps_result (query, caps);
1196           gst_caps_unref (caps);
1197           res = TRUE;
1198         } else {
1199           res = FALSE;
1200         }
1201       } else
1202         res = FALSE;
1203       break;
1204     }
1205     case GST_QUERY_URI:{
1206       if (GST_IS_URI_HANDLER (src)) {
1207         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1208
1209         if (uri != NULL) {
1210           gst_query_set_uri (query, uri);
1211           g_free (uri);
1212           res = TRUE;
1213         } else {
1214           res = FALSE;
1215         }
1216       } else {
1217         res = FALSE;
1218       }
1219       break;
1220     }
1221     default:
1222       res = FALSE;
1223       break;
1224   }
1225   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1226       res);
1227
1228   return res;
1229 }
1230
1231 static gboolean
1232 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1233 {
1234   GstBaseSrc *src;
1235   GstBaseSrcClass *bclass;
1236   gboolean result = FALSE;
1237
1238   src = GST_BASE_SRC (parent);
1239   bclass = GST_BASE_SRC_GET_CLASS (src);
1240
1241   if (bclass->query)
1242     result = bclass->query (src, query);
1243
1244   return result;
1245 }
1246
1247 static gboolean
1248 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1249 {
1250   gboolean res = TRUE;
1251
1252   /* update our offset if the start/stop position was updated */
1253   if (segment->format == GST_FORMAT_BYTES) {
1254     segment->time = segment->start;
1255   } else if (segment->start == 0) {
1256     /* seek to start, we can implement a default for this. */
1257     segment->time = 0;
1258   } else {
1259     res = FALSE;
1260     GST_INFO_OBJECT (src, "Can't do a default seek");
1261   }
1262
1263   return res;
1264 }
1265
1266 static gboolean
1267 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1268 {
1269   GstBaseSrcClass *bclass;
1270   gboolean result = FALSE;
1271
1272   bclass = GST_BASE_SRC_GET_CLASS (src);
1273
1274   if (bclass->do_seek)
1275     result = bclass->do_seek (src, segment);
1276
1277   return result;
1278 }
1279
1280 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1281
1282 static gboolean
1283 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1284     GstSegment * segment)
1285 {
1286   /* By default, we try one of 2 things:
1287    *   - For absolute seek positions, convert the requested position to our
1288    *     configured processing format and place it in the output segment \
1289    *   - For relative seek positions, convert our current (input) values to the
1290    *     seek format, adjust by the relative seek offset and then convert back to
1291    *     the processing format
1292    */
1293   GstSeekType start_type, stop_type;
1294   gint64 start, stop;
1295   GstSeekFlags flags;
1296   GstFormat seek_format, dest_format;
1297   gdouble rate;
1298   gboolean update;
1299   gboolean res = TRUE;
1300
1301   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1302       &start_type, &start, &stop_type, &stop);
1303   dest_format = segment->format;
1304
1305   if (seek_format == dest_format) {
1306     gst_segment_do_seek (segment, rate, seek_format, flags,
1307         start_type, start, stop_type, stop, &update);
1308     return TRUE;
1309   }
1310
1311   if (start_type != GST_SEEK_TYPE_NONE) {
1312     /* FIXME: Handle seek_end by converting the input segment vals */
1313     res =
1314         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1315         &start);
1316     start_type = GST_SEEK_TYPE_SET;
1317   }
1318
1319   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1320     /* FIXME: Handle seek_end by converting the input segment vals */
1321     res =
1322         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1323         &stop);
1324     stop_type = GST_SEEK_TYPE_SET;
1325   }
1326
1327   /* And finally, configure our output segment in the desired format */
1328   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1329       stop_type, stop, &update);
1330
1331   if (!res)
1332     goto no_format;
1333
1334   return res;
1335
1336 no_format:
1337   {
1338     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1339     return FALSE;
1340   }
1341 }
1342
1343 static gboolean
1344 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1345     GstSegment * seeksegment)
1346 {
1347   GstBaseSrcClass *bclass;
1348   gboolean result = FALSE;
1349
1350   bclass = GST_BASE_SRC_GET_CLASS (src);
1351
1352   if (bclass->prepare_seek_segment)
1353     result = bclass->prepare_seek_segment (src, event, seeksegment);
1354
1355   return result;
1356 }
1357
1358 static GstFlowReturn
1359 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1360     guint size, GstBuffer ** buffer)
1361 {
1362   GstFlowReturn ret;
1363   GstBaseSrcPrivate *priv = src->priv;
1364
1365   if (priv->pool) {
1366     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1367   } else if (size != -1) {
1368     *buffer = gst_buffer_new_allocate (priv->allocator, size, &priv->params);
1369     if (G_UNLIKELY (*buffer == NULL))
1370       goto alloc_failed;
1371
1372     ret = GST_FLOW_OK;
1373   } else {
1374     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1375         size);
1376     goto alloc_failed;
1377   }
1378   return ret;
1379
1380   /* ERRORS */
1381 alloc_failed:
1382   {
1383     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1384     return GST_FLOW_ERROR;
1385   }
1386 }
1387
1388 static GstFlowReturn
1389 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1390     guint size, GstBuffer ** buffer)
1391 {
1392   GstBaseSrcClass *bclass;
1393   GstFlowReturn ret;
1394   GstBuffer *res_buf;
1395
1396   bclass = GST_BASE_SRC_GET_CLASS (src);
1397
1398   if (G_UNLIKELY (!bclass->alloc))
1399     goto no_function;
1400   if (G_UNLIKELY (!bclass->fill))
1401     goto no_function;
1402
1403   if (*buffer == NULL) {
1404     /* downstream did not provide us with a buffer to fill, allocate one
1405      * ourselves */
1406     ret = bclass->alloc (src, offset, size, &res_buf);
1407     if (G_UNLIKELY (ret != GST_FLOW_OK))
1408       goto alloc_failed;
1409   } else {
1410     res_buf = *buffer;
1411   }
1412
1413   if (G_LIKELY (size > 0)) {
1414     /* only call fill when there is a size */
1415     ret = bclass->fill (src, offset, size, res_buf);
1416     if (G_UNLIKELY (ret != GST_FLOW_OK))
1417       goto not_ok;
1418   }
1419
1420   *buffer = res_buf;
1421
1422   return GST_FLOW_OK;
1423
1424   /* ERRORS */
1425 no_function:
1426   {
1427     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1428     return GST_FLOW_NOT_SUPPORTED;
1429   }
1430 alloc_failed:
1431   {
1432     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1433     return ret;
1434   }
1435 not_ok:
1436   {
1437     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1438         gst_flow_get_name (ret));
1439     if (*buffer == NULL)
1440       gst_buffer_unref (res_buf);
1441     return ret;
1442   }
1443 }
1444
1445 /* this code implements the seeking. It is a good example
1446  * handling all cases.
1447  *
1448  * A seek updates the currently configured segment.start
1449  * and segment.stop values based on the SEEK_TYPE. If the
1450  * segment.start value is updated, a seek to this new position
1451  * should be performed.
1452  *
1453  * The seek can only be executed when we are not currently
1454  * streaming any data, to make sure that this is the case, we
1455  * acquire the STREAM_LOCK which is taken when we are in the
1456  * _loop() function or when a getrange() is called. Normally
1457  * we will not receive a seek if we are operating in pull mode
1458  * though. When we operate as a live source we might block on the live
1459  * cond, which does not release the STREAM_LOCK. Therefore we will try
1460  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1461  * safe to perform the seek.
1462  *
1463  * When we are in the loop() function, we might be in the middle
1464  * of pushing a buffer, which might block in a sink. To make sure
1465  * that the push gets unblocked we push out a FLUSH_START event.
1466  * Our loop function will get a FLUSHING return value from
1467  * the push and will pause, effectively releasing the STREAM_LOCK.
1468  *
1469  * For a non-flushing seek, we pause the task, which might eventually
1470  * release the STREAM_LOCK. We say eventually because when the sink
1471  * blocks on the sample we might wait a very long time until the sink
1472  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1473  * can continue the seek. A non-flushing seek is normally done in a
1474  * running pipeline to perform seamless playback, this means that the sink is
1475  * PLAYING and will return from its chain function.
1476  * In the case of a non-flushing seek we need to make sure that the
1477  * data we output after the seek is continuous with the previous data,
1478  * this is because a non-flushing seek does not reset the running-time
1479  * to 0. We do this by closing the currently running segment, ie. sending
1480  * a new_segment event with the stop position set to the last processed
1481  * position.
1482  *
1483  * After updating the segment.start/stop values, we prepare for
1484  * streaming again. We push out a FLUSH_STOP to make the peer pad
1485  * accept data again and we start our task again.
1486  *
1487  * A segment seek posts a message on the bus saying that the playback
1488  * of the segment started. We store the segment flag internally because
1489  * when we reach the segment.stop we have to post a segment.done
1490  * instead of EOS when doing a segment seek.
1491  */
1492 static gboolean
1493 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1494 {
1495   gboolean res = TRUE, tres;
1496   gdouble rate;
1497   GstFormat seek_format, dest_format;
1498   GstSeekFlags flags;
1499   GstSeekType start_type, stop_type;
1500   gint64 start, stop;
1501   gboolean flush, playing;
1502   gboolean update;
1503   gboolean relative_seek = FALSE;
1504   gboolean seekseg_configured = FALSE;
1505   GstSegment seeksegment;
1506   guint32 seqnum;
1507   GstEvent *tevent;
1508
1509   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1510
1511   GST_OBJECT_LOCK (src);
1512   dest_format = src->segment.format;
1513   GST_OBJECT_UNLOCK (src);
1514
1515   if (event) {
1516     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1517         &start_type, &start, &stop_type, &stop);
1518
1519     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1520         SEEK_TYPE_IS_RELATIVE (stop_type);
1521
1522     if (dest_format != seek_format && !relative_seek) {
1523       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1524        * here before taking the stream lock, otherwise we must convert it later,
1525        * once we have the stream lock and can read the last configures segment
1526        * start and stop positions */
1527       gst_segment_init (&seeksegment, dest_format);
1528
1529       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1530         goto prepare_failed;
1531
1532       seekseg_configured = TRUE;
1533     }
1534
1535     flush = flags & GST_SEEK_FLAG_FLUSH;
1536     seqnum = gst_event_get_seqnum (event);
1537   } else {
1538     flush = FALSE;
1539     /* get next seqnum */
1540     seqnum = gst_util_seqnum_next ();
1541   }
1542
1543   /* send flush start */
1544   if (flush) {
1545     tevent = gst_event_new_flush_start ();
1546     gst_event_set_seqnum (tevent, seqnum);
1547     gst_pad_push_event (src->srcpad, tevent);
1548   } else
1549     gst_pad_pause_task (src->srcpad);
1550
1551   /* unblock streaming thread. */
1552   if (unlock)
1553     gst_base_src_set_flushing (src, TRUE, FALSE, &playing);
1554
1555   /* grab streaming lock, this should eventually be possible, either
1556    * because the task is paused, our streaming thread stopped
1557    * or because our peer is flushing. */
1558   GST_PAD_STREAM_LOCK (src->srcpad);
1559   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1560     /* we have seen this event before, issue a warning for now */
1561     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1562         seqnum);
1563   } else {
1564     src->priv->seqnum = seqnum;
1565     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1566   }
1567
1568   if (unlock)
1569     gst_base_src_set_flushing (src, FALSE, playing, NULL);
1570
1571   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1572    * copy the current segment info into the temp segment that we can actually
1573    * attempt the seek with. We only update the real segment if the seek succeeds. */
1574   if (!seekseg_configured) {
1575     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1576
1577     /* now configure the final seek segment */
1578     if (event) {
1579       if (seeksegment.format != seek_format) {
1580         /* OK, here's where we give the subclass a chance to convert the relative
1581          * seek into an absolute one in the processing format. We set up any
1582          * absolute seek above, before taking the stream lock. */
1583         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1584           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1585               "Aborting seek");
1586           res = FALSE;
1587         }
1588       } else {
1589         /* The seek format matches our processing format, no need to ask the
1590          * the subclass to configure the segment. */
1591         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1592             start_type, start, stop_type, stop, &update);
1593       }
1594     }
1595     /* Else, no seek event passed, so we're just (re)starting the
1596        current segment. */
1597   }
1598
1599   if (res) {
1600     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1601         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1602         seeksegment.start, seeksegment.stop, seeksegment.position);
1603
1604     /* do the seek, segment.position contains the new position. */
1605     res = gst_base_src_do_seek (src, &seeksegment);
1606   }
1607
1608   /* and prepare to continue streaming */
1609   if (flush) {
1610     tevent = gst_event_new_flush_stop (TRUE);
1611     gst_event_set_seqnum (tevent, seqnum);
1612     /* send flush stop, peer will accept data and events again. We
1613      * are not yet providing data as we still have the STREAM_LOCK. */
1614     gst_pad_push_event (src->srcpad, tevent);
1615   }
1616
1617   /* The subclass must have converted the segment to the processing format
1618    * by now */
1619   if (res && seeksegment.format != dest_format) {
1620     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1621         "in the correct format. Aborting seek.");
1622     res = FALSE;
1623   }
1624
1625   /* if the seek was successful, we update our real segment and push
1626    * out the new segment. */
1627   if (res) {
1628     GST_OBJECT_LOCK (src);
1629     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1630     GST_OBJECT_UNLOCK (src);
1631
1632     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1633       GstMessage *message;
1634
1635       message = gst_message_new_segment_start (GST_OBJECT (src),
1636           seeksegment.format, seeksegment.position);
1637       gst_message_set_seqnum (message, seqnum);
1638
1639       gst_element_post_message (GST_ELEMENT (src), message);
1640     }
1641
1642     /* for deriving a stop position for the playback segment from the seek
1643      * segment, we must take the duration when the stop is not set */
1644     if ((stop = seeksegment.stop) == -1)
1645       stop = seeksegment.duration;
1646
1647     src->priv->segment_pending = TRUE;
1648   }
1649
1650   src->priv->discont = TRUE;
1651   src->running = TRUE;
1652   /* and restart the task in case it got paused explicitly or by
1653    * the FLUSH_START event we pushed out. */
1654   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1655       src->srcpad, NULL);
1656   if (res && !tres)
1657     res = FALSE;
1658
1659   /* and release the lock again so we can continue streaming */
1660   GST_PAD_STREAM_UNLOCK (src->srcpad);
1661
1662   return res;
1663
1664   /* ERROR */
1665 prepare_failed:
1666   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1667       "Aborting seek");
1668   return FALSE;
1669 }
1670
1671 /* all events send to this element directly. This is mainly done from the
1672  * application.
1673  */
1674 static gboolean
1675 gst_base_src_send_event (GstElement * element, GstEvent * event)
1676 {
1677   GstBaseSrc *src;
1678   gboolean result = FALSE;
1679
1680   src = GST_BASE_SRC (element);
1681
1682   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1683
1684   switch (GST_EVENT_TYPE (event)) {
1685       /* bidirectional events */
1686     case GST_EVENT_FLUSH_START:
1687       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1688       result = gst_pad_push_event (src->srcpad, event);
1689       event = NULL;
1690       break;
1691     case GST_EVENT_FLUSH_STOP:
1692       GST_LIVE_LOCK (src->srcpad);
1693       src->priv->segment_pending = TRUE;
1694       /* sending random flushes downstream can break stuff,
1695        * especially sync since all segment info will get flushed */
1696       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1697       result = gst_pad_push_event (src->srcpad, event);
1698       GST_LIVE_UNLOCK (src->srcpad);
1699       event = NULL;
1700       break;
1701
1702       /* downstream serialized events */
1703     case GST_EVENT_EOS:
1704     {
1705       GstBaseSrcClass *bclass;
1706
1707       bclass = GST_BASE_SRC_GET_CLASS (src);
1708
1709       /* queue EOS and make sure the task or pull function performs the EOS
1710        * actions.
1711        *
1712        * We have two possibilities:
1713        *
1714        *  - Before we are to enter the _create function, we check the pending_eos
1715        *    first and do EOS instead of entering it.
1716        *  - If we are in the _create function or we did not manage to set the
1717        *    flag fast enough and we are about to enter the _create function,
1718        *    we unlock it so that we exit with FLUSHING immediately. We then
1719        *    check the EOS flag and do the EOS logic.
1720        */
1721       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1722       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1723
1724
1725       /* unlock the _create function so that we can check the pending_eos flag
1726        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1727        * that we can grab it and stop the unlock again. We don't take the stream
1728        * lock so that this operation is guaranteed to never block. */
1729       gst_base_src_activate_pool (src, FALSE);
1730       if (bclass->unlock)
1731         bclass->unlock (src);
1732
1733       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1734
1735       GST_LIVE_LOCK (src);
1736       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1737       /* now stop the unlock of the streaming thread again. Grabbing the live
1738        * lock is enough because that protects the create function. */
1739       if (bclass->unlock_stop)
1740         bclass->unlock_stop (src);
1741       gst_base_src_activate_pool (src, TRUE);
1742       GST_LIVE_UNLOCK (src);
1743
1744       result = TRUE;
1745       break;
1746     }
1747     case GST_EVENT_SEGMENT:
1748       /* sending random SEGMENT downstream can break sync. */
1749       break;
1750     case GST_EVENT_TAG:
1751     case GST_EVENT_CUSTOM_DOWNSTREAM:
1752     case GST_EVENT_CUSTOM_BOTH:
1753       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1754       GST_OBJECT_LOCK (src);
1755       src->priv->pending_events =
1756           g_list_append (src->priv->pending_events, event);
1757       g_atomic_int_set (&src->priv->have_events, TRUE);
1758       GST_OBJECT_UNLOCK (src);
1759       event = NULL;
1760       result = TRUE;
1761       break;
1762     case GST_EVENT_BUFFERSIZE:
1763       /* does not seem to make much sense currently */
1764       break;
1765
1766       /* upstream events */
1767     case GST_EVENT_QOS:
1768       /* elements should override send_event and do something */
1769       break;
1770     case GST_EVENT_SEEK:
1771     {
1772       gboolean started;
1773
1774       GST_OBJECT_LOCK (src->srcpad);
1775       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1776         goto wrong_mode;
1777       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1778       GST_OBJECT_UNLOCK (src->srcpad);
1779
1780       if (started) {
1781         GST_DEBUG_OBJECT (src, "performing seek");
1782         /* when we are running in push mode, we can execute the
1783          * seek right now. */
1784         result = gst_base_src_perform_seek (src, event, TRUE);
1785       } else {
1786         GstEvent **event_p;
1787
1788         /* else we store the event and execute the seek when we
1789          * get activated */
1790         GST_OBJECT_LOCK (src);
1791         GST_DEBUG_OBJECT (src, "queueing seek");
1792         event_p = &src->pending_seek;
1793         gst_event_replace ((GstEvent **) event_p, event);
1794         GST_OBJECT_UNLOCK (src);
1795         /* assume the seek will work */
1796         result = TRUE;
1797       }
1798       break;
1799     }
1800     case GST_EVENT_NAVIGATION:
1801       /* could make sense for elements that do something with navigation events
1802        * but then they would need to override the send_event function */
1803       break;
1804     case GST_EVENT_LATENCY:
1805       /* does not seem to make sense currently */
1806       break;
1807
1808       /* custom events */
1809     case GST_EVENT_CUSTOM_UPSTREAM:
1810       /* override send_event if you want this */
1811       break;
1812     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1813     case GST_EVENT_CUSTOM_BOTH_OOB:
1814       /* insert a random custom event into the pipeline */
1815       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1816       result = gst_pad_push_event (src->srcpad, event);
1817       /* we gave away the ref to the event in the push */
1818       event = NULL;
1819       break;
1820     default:
1821       break;
1822   }
1823 done:
1824   /* if we still have a ref to the event, unref it now */
1825   if (event)
1826     gst_event_unref (event);
1827
1828   return result;
1829
1830   /* ERRORS */
1831 wrong_mode:
1832   {
1833     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1834     GST_OBJECT_UNLOCK (src->srcpad);
1835     result = FALSE;
1836     goto done;
1837   }
1838 }
1839
1840 static gboolean
1841 gst_base_src_seekable (GstBaseSrc * src)
1842 {
1843   GstBaseSrcClass *bclass;
1844   bclass = GST_BASE_SRC_GET_CLASS (src);
1845   if (bclass->is_seekable)
1846     return bclass->is_seekable (src);
1847   else
1848     return FALSE;
1849 }
1850
1851 static void
1852 gst_base_src_update_qos (GstBaseSrc * src,
1853     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1854 {
1855   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1856       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1857       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1858
1859   GST_OBJECT_LOCK (src);
1860   src->priv->proportion = proportion;
1861   src->priv->earliest_time = timestamp + diff;
1862   GST_OBJECT_UNLOCK (src);
1863 }
1864
1865
1866 static gboolean
1867 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1868 {
1869   gboolean result;
1870
1871   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1872
1873   switch (GST_EVENT_TYPE (event)) {
1874     case GST_EVENT_SEEK:
1875       /* is normally called when in push mode */
1876       if (!gst_base_src_seekable (src))
1877         goto not_seekable;
1878
1879       result = gst_base_src_perform_seek (src, event, TRUE);
1880       break;
1881     case GST_EVENT_FLUSH_START:
1882       /* cancel any blocking getrange, is normally called
1883        * when in pull mode. */
1884       result = gst_base_src_set_flushing (src, TRUE, FALSE, NULL);
1885       break;
1886     case GST_EVENT_FLUSH_STOP:
1887       result = gst_base_src_set_flushing (src, FALSE, TRUE, NULL);
1888       break;
1889     case GST_EVENT_QOS:
1890     {
1891       gdouble proportion;
1892       GstClockTimeDiff diff;
1893       GstClockTime timestamp;
1894
1895       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1896       gst_base_src_update_qos (src, proportion, diff, timestamp);
1897       result = TRUE;
1898       break;
1899     }
1900     case GST_EVENT_RECONFIGURE:
1901       result = TRUE;
1902       break;
1903     case GST_EVENT_LATENCY:
1904       result = TRUE;
1905       break;
1906     default:
1907       result = FALSE;
1908       break;
1909   }
1910   return result;
1911
1912   /* ERRORS */
1913 not_seekable:
1914   {
1915     GST_DEBUG_OBJECT (src, "is not seekable");
1916     return FALSE;
1917   }
1918 }
1919
1920 static gboolean
1921 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1922 {
1923   GstBaseSrc *src;
1924   GstBaseSrcClass *bclass;
1925   gboolean result = FALSE;
1926
1927   src = GST_BASE_SRC (parent);
1928   bclass = GST_BASE_SRC_GET_CLASS (src);
1929
1930   if (bclass->event) {
1931     if (!(result = bclass->event (src, event)))
1932       goto subclass_failed;
1933   }
1934
1935 done:
1936   gst_event_unref (event);
1937
1938   return result;
1939
1940   /* ERRORS */
1941 subclass_failed:
1942   {
1943     GST_DEBUG_OBJECT (src, "subclass refused event");
1944     goto done;
1945   }
1946 }
1947
1948 static void
1949 gst_base_src_set_property (GObject * object, guint prop_id,
1950     const GValue * value, GParamSpec * pspec)
1951 {
1952   GstBaseSrc *src;
1953
1954   src = GST_BASE_SRC (object);
1955
1956   switch (prop_id) {
1957     case PROP_BLOCKSIZE:
1958       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1959       break;
1960     case PROP_NUM_BUFFERS:
1961       src->num_buffers = g_value_get_int (value);
1962       break;
1963     case PROP_TYPEFIND:
1964       src->typefind = g_value_get_boolean (value);
1965       break;
1966     case PROP_DO_TIMESTAMP:
1967       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1968       break;
1969     default:
1970       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1971       break;
1972   }
1973 }
1974
1975 static void
1976 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1977     GParamSpec * pspec)
1978 {
1979   GstBaseSrc *src;
1980
1981   src = GST_BASE_SRC (object);
1982
1983   switch (prop_id) {
1984     case PROP_BLOCKSIZE:
1985       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1986       break;
1987     case PROP_NUM_BUFFERS:
1988       g_value_set_int (value, src->num_buffers);
1989       break;
1990     case PROP_TYPEFIND:
1991       g_value_set_boolean (value, src->typefind);
1992       break;
1993     case PROP_DO_TIMESTAMP:
1994       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1995       break;
1996     default:
1997       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1998       break;
1999   }
2000 }
2001
2002 /* with STREAM_LOCK and LOCK */
2003 static GstClockReturn
2004 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2005 {
2006   GstClockReturn ret;
2007   GstClockID id;
2008
2009   id = gst_clock_new_single_shot_id (clock, time);
2010
2011   basesrc->clock_id = id;
2012   /* release the live lock while waiting */
2013   GST_LIVE_UNLOCK (basesrc);
2014
2015   ret = gst_clock_id_wait (id, NULL);
2016
2017   GST_LIVE_LOCK (basesrc);
2018   gst_clock_id_unref (id);
2019   basesrc->clock_id = NULL;
2020
2021   return ret;
2022 }
2023
2024 /* perform synchronisation on a buffer.
2025  * with STREAM_LOCK.
2026  */
2027 static GstClockReturn
2028 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2029 {
2030   GstClockReturn result;
2031   GstClockTime start, end;
2032   GstBaseSrcClass *bclass;
2033   GstClockTime base_time;
2034   GstClock *clock;
2035   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2036   gboolean do_timestamp, first, pseudo_live, is_live;
2037
2038   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2039
2040   start = end = -1;
2041   if (bclass->get_times)
2042     bclass->get_times (basesrc, buffer, &start, &end);
2043
2044   /* get buffer timestamp */
2045   dts = GST_BUFFER_DTS (buffer);
2046   pts = GST_BUFFER_PTS (buffer);
2047
2048   if (GST_CLOCK_TIME_IS_VALID (dts))
2049     timestamp = dts;
2050   else
2051     timestamp = pts;
2052
2053   /* grab the lock to prepare for clocking and calculate the startup
2054    * latency. */
2055   GST_OBJECT_LOCK (basesrc);
2056
2057   is_live = basesrc->is_live;
2058   /* if we are asked to sync against the clock we are a pseudo live element */
2059   pseudo_live = (start != -1 && is_live);
2060   /* check for the first buffer */
2061   first = (basesrc->priv->latency == -1);
2062
2063   if (timestamp != -1 && pseudo_live) {
2064     GstClockTime latency;
2065
2066     /* we have a timestamp and a sync time, latency is the diff */
2067     if (timestamp <= start)
2068       latency = start - timestamp;
2069     else
2070       latency = 0;
2071
2072     if (first) {
2073       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2074           GST_TIME_ARGS (latency));
2075       /* first time we calculate latency, just configure */
2076       basesrc->priv->latency = latency;
2077     } else {
2078       if (basesrc->priv->latency != latency) {
2079         /* we have a new latency, FIXME post latency message */
2080         basesrc->priv->latency = latency;
2081         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2082             GST_TIME_ARGS (latency));
2083       }
2084     }
2085   } else if (first) {
2086     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2087         is_live, start != -1);
2088     basesrc->priv->latency = 0;
2089   }
2090
2091   /* get clock, if no clock, we can't sync or do timestamps */
2092   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2093     goto no_clock;
2094   else
2095     gst_object_ref (clock);
2096
2097   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2098
2099   do_timestamp = basesrc->priv->do_timestamp;
2100   GST_OBJECT_UNLOCK (basesrc);
2101
2102   /* first buffer, calculate the timestamp offset */
2103   if (first) {
2104     GstClockTime running_time;
2105
2106     now = gst_clock_get_time (clock);
2107     running_time = now - base_time;
2108
2109     GST_LOG_OBJECT (basesrc,
2110         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2111         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2112         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2113
2114     if (pseudo_live && timestamp != -1) {
2115       /* live source and we need to sync, add startup latency to all timestamps
2116        * to get the real running_time. Live sources should always timestamp
2117        * according to the current running time. */
2118       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2119
2120       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2121           GST_TIME_ARGS (basesrc->priv->ts_offset));
2122     } else {
2123       basesrc->priv->ts_offset = 0;
2124       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2125     }
2126
2127     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2128       if (do_timestamp) {
2129         dts = running_time;
2130       } else {
2131         dts = 0;
2132       }
2133       GST_BUFFER_DTS (buffer) = dts;
2134
2135       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2136           GST_TIME_ARGS (dts));
2137     }
2138   } else {
2139     /* not the first buffer, the timestamp is the diff between the clock and
2140      * base_time */
2141     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2142       now = gst_clock_get_time (clock);
2143
2144       dts = now - base_time;
2145       GST_BUFFER_DTS (buffer) = dts;
2146
2147       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2148           GST_TIME_ARGS (dts));
2149     }
2150   }
2151   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2152     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2153       pts = dts;
2154
2155     GST_BUFFER_PTS (buffer) = dts;
2156
2157     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2158         GST_TIME_ARGS (pts));
2159   }
2160
2161   /* if we don't have a buffer timestamp, we don't sync */
2162   if (!GST_CLOCK_TIME_IS_VALID (start))
2163     goto no_sync;
2164
2165   if (is_live) {
2166     /* for pseudo live sources, add our ts_offset to the timestamp */
2167     if (GST_CLOCK_TIME_IS_VALID (pts))
2168       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2169     if (GST_CLOCK_TIME_IS_VALID (dts))
2170       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2171     start += basesrc->priv->ts_offset;
2172   }
2173
2174   GST_LOG_OBJECT (basesrc,
2175       "waiting for clock, base time %" GST_TIME_FORMAT
2176       ", stream_start %" GST_TIME_FORMAT,
2177       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2178
2179   result = gst_base_src_wait (basesrc, clock, start + base_time);
2180
2181   gst_object_unref (clock);
2182
2183   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2184
2185   return result;
2186
2187   /* special cases */
2188 no_clock:
2189   {
2190     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2191     GST_OBJECT_UNLOCK (basesrc);
2192     return GST_CLOCK_OK;
2193   }
2194 no_sync:
2195   {
2196     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2197     gst_object_unref (clock);
2198     return GST_CLOCK_OK;
2199   }
2200 }
2201
2202 /* Called with STREAM_LOCK and LIVE_LOCK */
2203 static gboolean
2204 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2205 {
2206   guint64 size, maxsize;
2207   GstBaseSrcClass *bclass;
2208   GstFormat format;
2209   gint64 stop;
2210   gboolean dynamic;
2211
2212   bclass = GST_BASE_SRC_GET_CLASS (src);
2213
2214   format = src->segment.format;
2215   stop = src->segment.stop;
2216   /* get total file size */
2217   size = src->segment.duration;
2218
2219   /* only operate if we are working with bytes */
2220   if (format != GST_FORMAT_BYTES)
2221     return TRUE;
2222
2223   /* the max amount of bytes to read is the total size or
2224    * up to the segment.stop if present. */
2225   if (stop != -1)
2226     maxsize = MIN (size, stop);
2227   else
2228     maxsize = size;
2229
2230   GST_DEBUG_OBJECT (src,
2231       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2232       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2233       *length, size, stop, maxsize);
2234
2235   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2236   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2237
2238   /* check size if we have one */
2239   if (maxsize != -1) {
2240     /* if we run past the end, check if the file became bigger and
2241      * retry. */
2242     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2243       /* see if length of the file changed */
2244       if (bclass->get_size)
2245         if (!bclass->get_size (src, &size))
2246           size = -1;
2247
2248       /* make sure we don't exceed the configured segment stop
2249        * if it was set */
2250       if (stop != -1)
2251         maxsize = MIN (size, stop);
2252       else
2253         maxsize = size;
2254
2255       /* if we are at or past the end, EOS */
2256       if (G_UNLIKELY (offset >= maxsize))
2257         goto unexpected_length;
2258
2259       /* else we can clip to the end */
2260       if (G_UNLIKELY (offset + *length >= maxsize))
2261         *length = maxsize - offset;
2262
2263     }
2264   }
2265
2266   /* keep track of current duration.
2267    * segment is in bytes, we checked that above. */
2268   GST_OBJECT_LOCK (src);
2269   src->segment.duration = size;
2270   GST_OBJECT_UNLOCK (src);
2271
2272   return TRUE;
2273
2274   /* ERRORS */
2275 unexpected_length:
2276   {
2277     return FALSE;
2278   }
2279 }
2280
2281 /* must be called with LIVE_LOCK */
2282 static GstFlowReturn
2283 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2284     GstBuffer ** buf)
2285 {
2286   GstFlowReturn ret;
2287   GstBaseSrcClass *bclass;
2288   GstClockReturn status;
2289   GstBuffer *res_buf;
2290   GstBuffer *in_buf;
2291
2292   bclass = GST_BASE_SRC_GET_CLASS (src);
2293
2294 again:
2295   if (src->is_live) {
2296     if (G_UNLIKELY (!src->live_running)) {
2297       ret = gst_base_src_wait_playing (src);
2298       if (ret != GST_FLOW_OK)
2299         goto stopped;
2300     }
2301   }
2302
2303   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2304           && !GST_BASE_SRC_IS_STARTING (src)))
2305     goto not_started;
2306
2307   if (G_UNLIKELY (!bclass->create))
2308     goto no_function;
2309
2310   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2311     goto unexpected_length;
2312
2313   /* track position */
2314   GST_OBJECT_LOCK (src);
2315   if (src->segment.format == GST_FORMAT_BYTES)
2316     src->segment.position = offset;
2317   GST_OBJECT_UNLOCK (src);
2318
2319   /* normally we don't count buffers */
2320   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2321     if (src->num_buffers_left == 0)
2322       goto reached_num_buffers;
2323     else
2324       src->num_buffers_left--;
2325   }
2326
2327   /* don't enter the create function if a pending EOS event was set. For the
2328    * logic of the pending_eos, check the event function of this class. */
2329   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2330     goto eos;
2331
2332   GST_DEBUG_OBJECT (src,
2333       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2334       G_GINT64_FORMAT, offset, length, src->segment.time);
2335
2336   res_buf = in_buf = *buf;
2337
2338   ret = bclass->create (src, offset, length, &res_buf);
2339
2340   /* The create function could be unlocked because we have a pending EOS. It's
2341    * possible that we have a valid buffer from create that we need to
2342    * discard when the create function returned _OK. */
2343   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2344     if (ret == GST_FLOW_OK) {
2345       if (*buf == NULL)
2346         gst_buffer_unref (res_buf);
2347     }
2348     goto eos;
2349   }
2350
2351   if (G_UNLIKELY (ret != GST_FLOW_OK))
2352     goto not_ok;
2353
2354   /* fallback in case the create function didn't fill a provided buffer */
2355   if (in_buf != NULL && res_buf != in_buf) {
2356     GstMapInfo info;
2357     gsize copied_size;
2358
2359     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2360         "fill the provided buffer, copying");
2361
2362     gst_buffer_map (in_buf, &info, GST_MAP_WRITE);
2363     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2364     gst_buffer_unmap (in_buf, &info);
2365     gst_buffer_set_size (in_buf, copied_size);
2366
2367     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2368
2369     gst_buffer_unref (res_buf);
2370     res_buf = in_buf;
2371   }
2372
2373   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2374   if (offset == 0 && src->segment.time == 0
2375       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2376     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2377     res_buf = gst_buffer_make_writable (res_buf);
2378     GST_BUFFER_DTS (res_buf) = 0;
2379   }
2380
2381   /* now sync before pushing the buffer */
2382   status = gst_base_src_do_sync (src, res_buf);
2383
2384   /* waiting for the clock could have made us flushing */
2385   if (G_UNLIKELY (src->priv->flushing))
2386     goto flushing;
2387
2388   switch (status) {
2389     case GST_CLOCK_EARLY:
2390       /* the buffer is too late. We currently don't drop the buffer. */
2391       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2392       break;
2393     case GST_CLOCK_OK:
2394       /* buffer synchronised properly */
2395       GST_DEBUG_OBJECT (src, "buffer ok");
2396       break;
2397     case GST_CLOCK_UNSCHEDULED:
2398       /* this case is triggered when we were waiting for the clock and
2399        * it got unlocked because we did a state change. In any case, get rid of
2400        * the buffer. */
2401       if (*buf == NULL)
2402         gst_buffer_unref (res_buf);
2403
2404       if (!src->live_running) {
2405         /* We return FLUSHING when we are not running to stop the dataflow also
2406          * get rid of the produced buffer. */
2407         GST_DEBUG_OBJECT (src,
2408             "clock was unscheduled (%d), returning FLUSHING", status);
2409         ret = GST_FLOW_FLUSHING;
2410       } else {
2411         /* If we are running when this happens, we quickly switched between
2412          * pause and playing. We try to produce a new buffer */
2413         GST_DEBUG_OBJECT (src,
2414             "clock was unscheduled (%d), but we are running", status);
2415         goto again;
2416       }
2417       break;
2418     default:
2419       /* all other result values are unexpected and errors */
2420       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2421           (_("Internal clock error.")),
2422           ("clock returned unexpected return value %d", status));
2423       if (*buf == NULL)
2424         gst_buffer_unref (res_buf);
2425       ret = GST_FLOW_ERROR;
2426       break;
2427   }
2428   if (G_LIKELY (ret == GST_FLOW_OK))
2429     *buf = res_buf;
2430
2431   return ret;
2432
2433   /* ERROR */
2434 stopped:
2435   {
2436     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2437         gst_flow_get_name (ret));
2438     return ret;
2439   }
2440 not_ok:
2441   {
2442     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2443         gst_flow_get_name (ret));
2444     return ret;
2445   }
2446 not_started:
2447   {
2448     GST_DEBUG_OBJECT (src, "getrange but not started");
2449     return GST_FLOW_FLUSHING;
2450   }
2451 no_function:
2452   {
2453     GST_DEBUG_OBJECT (src, "no create function");
2454     return GST_FLOW_NOT_SUPPORTED;
2455   }
2456 unexpected_length:
2457   {
2458     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2459         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2460     return GST_FLOW_EOS;
2461   }
2462 reached_num_buffers:
2463   {
2464     GST_DEBUG_OBJECT (src, "sent all buffers");
2465     return GST_FLOW_EOS;
2466   }
2467 flushing:
2468   {
2469     GST_DEBUG_OBJECT (src, "we are flushing");
2470     if (*buf == NULL)
2471       gst_buffer_unref (res_buf);
2472     return GST_FLOW_FLUSHING;
2473   }
2474 eos:
2475   {
2476     GST_DEBUG_OBJECT (src, "we are EOS");
2477     return GST_FLOW_EOS;
2478   }
2479 }
2480
2481 static GstFlowReturn
2482 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2483     guint length, GstBuffer ** buf)
2484 {
2485   GstBaseSrc *src;
2486   GstFlowReturn res;
2487
2488   src = GST_BASE_SRC_CAST (parent);
2489
2490   GST_LIVE_LOCK (src);
2491   if (G_UNLIKELY (src->priv->flushing))
2492     goto flushing;
2493
2494   res = gst_base_src_get_range (src, offset, length, buf);
2495
2496 done:
2497   GST_LIVE_UNLOCK (src);
2498
2499   return res;
2500
2501   /* ERRORS */
2502 flushing:
2503   {
2504     GST_DEBUG_OBJECT (src, "we are flushing");
2505     res = GST_FLOW_FLUSHING;
2506     goto done;
2507   }
2508 }
2509
2510 static gboolean
2511 gst_base_src_is_random_access (GstBaseSrc * src)
2512 {
2513   /* we need to start the basesrc to check random access */
2514   if (!GST_BASE_SRC_IS_STARTED (src)) {
2515     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2516     if (G_LIKELY (gst_base_src_start (src))) {
2517       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2518         goto start_failed;
2519       gst_base_src_stop (src);
2520     }
2521   }
2522
2523   return src->random_access;
2524
2525   /* ERRORS */
2526 start_failed:
2527   {
2528     GST_DEBUG_OBJECT (src, "failed to start");
2529     return FALSE;
2530   }
2531 }
2532
2533 static void
2534 gst_base_src_loop (GstPad * pad)
2535 {
2536   GstBaseSrc *src;
2537   GstBuffer *buf = NULL;
2538   GstFlowReturn ret;
2539   gint64 position;
2540   gboolean eos;
2541   guint blocksize;
2542   GList *pending_events = NULL, *tmp;
2543
2544   eos = FALSE;
2545
2546   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2547
2548   gst_base_src_send_stream_start (src);
2549
2550   /* check if we need to renegotiate */
2551   if (gst_pad_check_reconfigure (pad)) {
2552     if (!gst_base_src_negotiate (src))
2553       goto not_negotiated;
2554   }
2555
2556   GST_LIVE_LOCK (src);
2557
2558   if (G_UNLIKELY (src->priv->flushing))
2559     goto flushing;
2560
2561   blocksize = src->blocksize;
2562
2563   /* if we operate in bytes, we can calculate an offset */
2564   if (src->segment.format == GST_FORMAT_BYTES) {
2565     position = src->segment.position;
2566     /* for negative rates, start with subtracting the blocksize */
2567     if (src->segment.rate < 0.0) {
2568       /* we cannot go below segment.start */
2569       if (position > src->segment.start + blocksize)
2570         position -= blocksize;
2571       else {
2572         /* last block, remainder up to segment.start */
2573         blocksize = position - src->segment.start;
2574         position = src->segment.start;
2575       }
2576     }
2577   } else
2578     position = -1;
2579
2580   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2581       GST_TIME_ARGS (position), blocksize);
2582
2583   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2584   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2585     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2586         gst_flow_get_name (ret));
2587     GST_LIVE_UNLOCK (src);
2588     goto pause;
2589   }
2590   /* this should not happen */
2591   if (G_UNLIKELY (buf == NULL))
2592     goto null_buffer;
2593
2594   /* push events to close/start our segment before we push the buffer. */
2595   if (G_UNLIKELY (src->priv->segment_pending)) {
2596     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2597     src->priv->segment_pending = FALSE;
2598   }
2599
2600   if (g_atomic_int_get (&src->priv->have_events)) {
2601     GST_OBJECT_LOCK (src);
2602     /* take the events */
2603     pending_events = src->priv->pending_events;
2604     src->priv->pending_events = NULL;
2605     g_atomic_int_set (&src->priv->have_events, FALSE);
2606     GST_OBJECT_UNLOCK (src);
2607   }
2608
2609   /* Push out pending events if any */
2610   if (G_UNLIKELY (pending_events != NULL)) {
2611     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2612       GstEvent *ev = (GstEvent *) tmp->data;
2613       gst_pad_push_event (pad, ev);
2614     }
2615     g_list_free (pending_events);
2616   }
2617
2618   /* figure out the new position */
2619   switch (src->segment.format) {
2620     case GST_FORMAT_BYTES:
2621     {
2622       guint bufsize = gst_buffer_get_size (buf);
2623
2624       /* we subtracted above for negative rates */
2625       if (src->segment.rate >= 0.0)
2626         position += bufsize;
2627       break;
2628     }
2629     case GST_FORMAT_TIME:
2630     {
2631       GstClockTime start, duration;
2632
2633       start = GST_BUFFER_TIMESTAMP (buf);
2634       duration = GST_BUFFER_DURATION (buf);
2635
2636       if (GST_CLOCK_TIME_IS_VALID (start))
2637         position = start;
2638       else
2639         position = src->segment.position;
2640
2641       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2642         if (src->segment.rate >= 0.0)
2643           position += duration;
2644         else if (position > duration)
2645           position -= duration;
2646         else
2647           position = 0;
2648       }
2649       break;
2650     }
2651     case GST_FORMAT_DEFAULT:
2652       if (src->segment.rate >= 0.0)
2653         position = GST_BUFFER_OFFSET_END (buf);
2654       else
2655         position = GST_BUFFER_OFFSET (buf);
2656       break;
2657     default:
2658       position = -1;
2659       break;
2660   }
2661   if (position != -1) {
2662     if (src->segment.rate >= 0.0) {
2663       /* positive rate, check if we reached the stop */
2664       if (src->segment.stop != -1) {
2665         if (position >= src->segment.stop) {
2666           eos = TRUE;
2667           position = src->segment.stop;
2668         }
2669       }
2670     } else {
2671       /* negative rate, check if we reached the start. start is always set to
2672        * something different from -1 */
2673       if (position <= src->segment.start) {
2674         eos = TRUE;
2675         position = src->segment.start;
2676       }
2677       /* when going reverse, all buffers are DISCONT */
2678       src->priv->discont = TRUE;
2679     }
2680     GST_OBJECT_LOCK (src);
2681     src->segment.position = position;
2682     GST_OBJECT_UNLOCK (src);
2683   }
2684
2685   if (G_UNLIKELY (src->priv->discont)) {
2686     GST_INFO_OBJECT (src, "marking pending DISCONT");
2687     buf = gst_buffer_make_writable (buf);
2688     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2689     src->priv->discont = FALSE;
2690   }
2691   GST_LIVE_UNLOCK (src);
2692
2693   ret = gst_pad_push (pad, buf);
2694   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2695     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2696       goto not_negotiated;
2697     }
2698     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2699         gst_flow_get_name (ret));
2700     goto pause;
2701   }
2702
2703   if (G_UNLIKELY (eos)) {
2704     GST_INFO_OBJECT (src, "pausing after end of segment");
2705     ret = GST_FLOW_EOS;
2706     goto pause;
2707   }
2708
2709 done:
2710   return;
2711
2712   /* special cases */
2713 not_negotiated:
2714   {
2715     if (gst_pad_needs_reconfigure (pad)) {
2716       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2717       return;
2718     }
2719     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2720     ret = GST_FLOW_NOT_NEGOTIATED;
2721     goto pause;
2722   }
2723 flushing:
2724   {
2725     GST_DEBUG_OBJECT (src, "we are flushing");
2726     GST_LIVE_UNLOCK (src);
2727     ret = GST_FLOW_FLUSHING;
2728     goto pause;
2729   }
2730 pause:
2731   {
2732     const gchar *reason = gst_flow_get_name (ret);
2733     GstEvent *event;
2734
2735     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2736     src->running = FALSE;
2737     gst_pad_pause_task (pad);
2738     if (ret == GST_FLOW_EOS) {
2739       gboolean flag_segment;
2740       GstFormat format;
2741       gint64 position;
2742
2743       /* perform EOS logic */
2744       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
2745       format = src->segment.format;
2746       position = src->segment.position;
2747
2748       if (flag_segment) {
2749         GstMessage *message;
2750
2751         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2752             format, position);
2753         gst_message_set_seqnum (message, src->priv->seqnum);
2754         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2755         event = gst_event_new_segment_done (format, position);
2756         gst_event_set_seqnum (event, src->priv->seqnum);
2757         gst_pad_push_event (pad, event);
2758       } else {
2759         event = gst_event_new_eos ();
2760         gst_event_set_seqnum (event, src->priv->seqnum);
2761         gst_pad_push_event (pad, event);
2762       }
2763     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2764       event = gst_event_new_eos ();
2765       gst_event_set_seqnum (event, src->priv->seqnum);
2766       /* for fatal errors we post an error message, post the error
2767        * first so the app knows about the error first.
2768        * Also don't do this for FLUSHING because it happens
2769        * due to flushing and posting an error message because of
2770        * that is the wrong thing to do, e.g. when we're doing
2771        * a flushing seek. */
2772       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2773           (_("Internal data flow error.")),
2774           ("streaming task paused, reason %s (%d)", reason, ret));
2775       gst_pad_push_event (pad, event);
2776     }
2777     goto done;
2778   }
2779 null_buffer:
2780   {
2781     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2782         (_("Internal data flow error.")), ("element returned NULL buffer"));
2783     GST_LIVE_UNLOCK (src);
2784     goto done;
2785   }
2786 }
2787
2788 static gboolean
2789 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2790     GstAllocator * allocator, GstAllocationParams * params)
2791 {
2792   GstAllocator *oldalloc;
2793   GstBufferPool *oldpool;
2794   GstBaseSrcPrivate *priv = basesrc->priv;
2795
2796   if (pool) {
2797     GST_DEBUG_OBJECT (basesrc, "activate pool");
2798     if (!gst_buffer_pool_set_active (pool, TRUE))
2799       goto activate_failed;
2800   }
2801
2802   GST_OBJECT_LOCK (basesrc);
2803   oldpool = priv->pool;
2804   priv->pool = pool;
2805
2806   oldalloc = priv->allocator;
2807   priv->allocator = allocator;
2808
2809   if (params)
2810     priv->params = *params;
2811   else
2812     gst_allocation_params_init (&priv->params);
2813   GST_OBJECT_UNLOCK (basesrc);
2814
2815   if (oldpool) {
2816     /* only deactivate if the pool is not the one we're using */
2817     if (oldpool != pool) {
2818       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2819       gst_buffer_pool_set_active (oldpool, FALSE);
2820     }
2821     gst_object_unref (oldpool);
2822   }
2823   if (oldalloc) {
2824     gst_object_unref (oldalloc);
2825   }
2826   return TRUE;
2827
2828   /* ERRORS */
2829 activate_failed:
2830   {
2831     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2832     return FALSE;
2833   }
2834 }
2835
2836 static gboolean
2837 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2838 {
2839   GstBaseSrcPrivate *priv = basesrc->priv;
2840   GstBufferPool *pool;
2841   gboolean res = TRUE;
2842
2843   GST_OBJECT_LOCK (basesrc);
2844   if ((pool = priv->pool))
2845     pool = gst_object_ref (pool);
2846   GST_OBJECT_UNLOCK (basesrc);
2847
2848   if (pool) {
2849     res = gst_buffer_pool_set_active (pool, active);
2850     gst_object_unref (pool);
2851   }
2852   return res;
2853 }
2854
2855
2856 static gboolean
2857 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
2858 {
2859   GstCaps *outcaps;
2860   GstBufferPool *pool;
2861   guint size, min, max;
2862   GstAllocator *allocator;
2863   GstAllocationParams params;
2864   GstStructure *config;
2865   gboolean update_allocator;
2866
2867   gst_query_parse_allocation (query, &outcaps, NULL);
2868
2869   /* we got configuration from our peer or the decide_allocation method,
2870    * parse them */
2871   if (gst_query_get_n_allocation_params (query) > 0) {
2872     /* try the allocator */
2873     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2874     update_allocator = TRUE;
2875   } else {
2876     allocator = NULL;
2877     gst_allocation_params_init (&params);
2878     update_allocator = FALSE;
2879   }
2880
2881   if (gst_query_get_n_allocation_pools (query) > 0) {
2882     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
2883
2884     if (pool == NULL) {
2885       /* no pool, we can make our own */
2886       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2887       pool = gst_buffer_pool_new ();
2888     }
2889   } else {
2890     pool = NULL;
2891     size = min = max = 0;
2892   }
2893
2894   /* now configure */
2895   if (pool) {
2896     config = gst_buffer_pool_get_config (pool);
2897     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
2898     gst_buffer_pool_config_set_allocator (config, allocator, &params);
2899     gst_buffer_pool_set_config (pool, config);
2900   }
2901
2902   if (update_allocator)
2903     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
2904   else
2905     gst_query_add_allocation_param (query, allocator, &params);
2906   if (allocator)
2907     gst_object_unref (allocator);
2908
2909   if (pool) {
2910     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
2911     gst_object_unref (pool);
2912   }
2913
2914   return TRUE;
2915 }
2916
2917 static gboolean
2918 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2919 {
2920   GstBaseSrcClass *bclass;
2921   gboolean result = TRUE;
2922   GstQuery *query;
2923   GstBufferPool *pool = NULL;
2924   GstAllocator *allocator = NULL;
2925   GstAllocationParams params;
2926
2927   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2928
2929   /* make query and let peer pad answer, we don't really care if it worked or
2930    * not, if it failed, the allocation query would contain defaults and the
2931    * subclass would then set better values if needed */
2932   query = gst_query_new_allocation (caps, TRUE);
2933   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2934     /* not a problem, just debug a little */
2935     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2936   }
2937
2938   g_assert (bclass->decide_allocation != NULL);
2939   result = bclass->decide_allocation (basesrc, query);
2940
2941   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2942       query);
2943
2944   if (!result)
2945     goto no_decide_allocation;
2946
2947   /* we got configuration from our peer or the decide_allocation method,
2948    * parse them */
2949   if (gst_query_get_n_allocation_params (query) > 0) {
2950     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
2951   } else {
2952     allocator = NULL;
2953     gst_allocation_params_init (&params);
2954   }
2955
2956   if (gst_query_get_n_allocation_pools (query) > 0)
2957     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
2958
2959   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
2960
2961   gst_query_unref (query);
2962
2963   return result;
2964
2965   /* Errors */
2966 no_decide_allocation:
2967   {
2968     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
2969     gst_query_unref (query);
2970
2971     return result;
2972   }
2973 }
2974
2975 /* default negotiation code.
2976  *
2977  * Take intersection between src and sink pads, take first
2978  * caps and fixate.
2979  */
2980 static gboolean
2981 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2982 {
2983   GstCaps *thiscaps;
2984   GstCaps *caps = NULL;
2985   GstCaps *peercaps = NULL;
2986   gboolean result = FALSE;
2987
2988   /* first see what is possible on our source pad */
2989   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2990   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2991   /* nothing or anything is allowed, we're done */
2992   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2993     goto no_nego_needed;
2994
2995   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2996     goto no_caps;
2997
2998   /* get the peer caps */
2999   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3000   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3001   if (peercaps) {
3002     /* The result is already a subset of our caps */
3003     caps = peercaps;
3004     gst_caps_unref (thiscaps);
3005   } else {
3006     /* no peer, work with our own caps then */
3007     caps = thiscaps;
3008   }
3009   if (caps && !gst_caps_is_empty (caps)) {
3010     /* now fixate */
3011     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3012     if (gst_caps_is_any (caps)) {
3013       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3014       /* hmm, still anything, so element can do anything and
3015        * nego is not needed */
3016       result = TRUE;
3017     } else {
3018       caps = gst_base_src_fixate (basesrc, caps);
3019       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3020       if (gst_caps_is_fixed (caps)) {
3021         /* yay, fixed caps, use those then, it's possible that the subclass does
3022          * not accept this caps after all and we have to fail. */
3023         result = gst_base_src_set_caps (basesrc, caps);
3024       }
3025     }
3026     gst_caps_unref (caps);
3027   } else {
3028     if (caps)
3029       gst_caps_unref (caps);
3030     GST_DEBUG_OBJECT (basesrc, "no common caps");
3031   }
3032   return result;
3033
3034 no_nego_needed:
3035   {
3036     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3037     if (thiscaps)
3038       gst_caps_unref (thiscaps);
3039     return TRUE;
3040   }
3041 no_caps:
3042   {
3043     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3044         ("No supported formats found"),
3045         ("This element did not produce valid caps"));
3046     if (thiscaps)
3047       gst_caps_unref (thiscaps);
3048     return TRUE;
3049   }
3050 }
3051
3052 static gboolean
3053 gst_base_src_negotiate (GstBaseSrc * basesrc)
3054 {
3055   GstBaseSrcClass *bclass;
3056   gboolean result;
3057
3058   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3059
3060   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3061
3062   if (G_LIKELY (bclass->negotiate))
3063     result = bclass->negotiate (basesrc);
3064   else
3065     result = TRUE;
3066
3067   if (G_LIKELY (result)) {
3068     GstCaps *caps;
3069
3070     caps = gst_pad_get_current_caps (basesrc->srcpad);
3071
3072     result = gst_base_src_prepare_allocation (basesrc, caps);
3073
3074     if (caps)
3075       gst_caps_unref (caps);
3076   }
3077   return result;
3078 }
3079
3080 static gboolean
3081 gst_base_src_start (GstBaseSrc * basesrc)
3082 {
3083   GstBaseSrcClass *bclass;
3084   gboolean result;
3085
3086   GST_LIVE_LOCK (basesrc);
3087   if (GST_BASE_SRC_IS_STARTING (basesrc))
3088     goto was_starting;
3089   if (GST_BASE_SRC_IS_STARTED (basesrc))
3090     goto was_started;
3091
3092   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3093   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3094   basesrc->num_buffers_left = basesrc->num_buffers;
3095   basesrc->running = FALSE;
3096   basesrc->priv->segment_pending = FALSE;
3097   GST_OBJECT_LOCK (basesrc);
3098   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3099   GST_OBJECT_UNLOCK (basesrc);
3100   GST_LIVE_UNLOCK (basesrc);
3101
3102   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3103   if (bclass->start)
3104     result = bclass->start (basesrc);
3105   else
3106     result = TRUE;
3107
3108   if (!result)
3109     goto could_not_start;
3110
3111   if (!gst_base_src_is_async (basesrc))
3112     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3113
3114   return result;
3115
3116   /* ERROR */
3117 was_starting:
3118   {
3119     GST_DEBUG_OBJECT (basesrc, "was starting");
3120     GST_LIVE_UNLOCK (basesrc);
3121     return TRUE;
3122   }
3123 was_started:
3124   {
3125     GST_DEBUG_OBJECT (basesrc, "was started");
3126     GST_LIVE_UNLOCK (basesrc);
3127     return TRUE;
3128   }
3129 could_not_start:
3130   {
3131     GST_DEBUG_OBJECT (basesrc, "could not start");
3132     /* subclass is supposed to post a message. We don't have to call _stop. */
3133     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3134     return FALSE;
3135   }
3136 }
3137
3138 /**
3139  * gst_base_src_start_complete:
3140  * @basesrc: base source instance
3141  * @ret: a #GstFlowReturn
3142  *
3143  * Complete an asynchronous start operation. When the subclass overrides the
3144  * start method, it should call gst_base_src_start_complete() when the start
3145  * operation completes either from the same thread or from an asynchronous
3146  * helper thread.
3147  */
3148 void
3149 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3150 {
3151   gboolean have_size;
3152   guint64 size;
3153   gboolean seekable;
3154   GstFormat format;
3155   GstPadMode mode;
3156   GstEvent *event;
3157
3158   if (ret != GST_FLOW_OK)
3159     goto error;
3160
3161   GST_DEBUG_OBJECT (basesrc, "starting source");
3162   format = basesrc->segment.format;
3163
3164   /* figure out the size */
3165   have_size = FALSE;
3166   size = -1;
3167   if (format == GST_FORMAT_BYTES) {
3168     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3169
3170     if (bclass->get_size) {
3171       if (!(have_size = bclass->get_size (basesrc, &size)))
3172         size = -1;
3173     }
3174     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3175     /* only update the size when operating in bytes, subclass is supposed
3176      * to set duration in the start method for other formats */
3177     GST_OBJECT_LOCK (basesrc);
3178     basesrc->segment.duration = size;
3179     GST_OBJECT_UNLOCK (basesrc);
3180   }
3181
3182   GST_DEBUG_OBJECT (basesrc,
3183       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3184       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3185       basesrc->segment.duration);
3186
3187   seekable = gst_base_src_seekable (basesrc);
3188   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3189
3190   /* update for random access flag */
3191   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3192
3193   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3194
3195   /* stop flushing now but for live sources, still block in the LIVE lock when
3196    * we are not yet PLAYING */
3197   gst_base_src_set_flushing (basesrc, FALSE, FALSE, NULL);
3198
3199   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3200
3201   GST_OBJECT_LOCK (basesrc->srcpad);
3202   mode = GST_PAD_MODE (basesrc->srcpad);
3203   GST_OBJECT_UNLOCK (basesrc->srcpad);
3204
3205   /* take the stream lock here, we only want to let the task run when we have
3206    * set the STARTED flag */
3207   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3208   if (mode == GST_PAD_MODE_PUSH) {
3209     /* do initial seek, which will start the task */
3210     GST_OBJECT_LOCK (basesrc);
3211     event = basesrc->pending_seek;
3212     basesrc->pending_seek = NULL;
3213     GST_OBJECT_UNLOCK (basesrc);
3214
3215     /* The perform seek code will start the task when finished. We don't have to
3216      * unlock the streaming thread because it is not running yet */
3217     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3218       goto seek_failed;
3219
3220     if (event)
3221       gst_event_unref (event);
3222   } else {
3223     /* if not random_access, we cannot operate in pull mode for now */
3224     if (G_UNLIKELY (!basesrc->random_access))
3225       goto no_get_range;
3226   }
3227
3228   GST_LIVE_LOCK (basesrc);
3229   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3230   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3231   basesrc->priv->start_result = ret;
3232   GST_LIVE_SIGNAL (basesrc);
3233   GST_LIVE_UNLOCK (basesrc);
3234
3235   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3236
3237   return;
3238
3239 seek_failed:
3240   {
3241     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3242     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3243     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3244     if (event)
3245       gst_event_unref (event);
3246     ret = GST_FLOW_ERROR;
3247     goto error;
3248   }
3249 no_get_range:
3250   {
3251     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3252     gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3253     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3254     ret = GST_FLOW_ERROR;
3255     goto error;
3256   }
3257 error:
3258   {
3259     GST_LIVE_LOCK (basesrc);
3260     basesrc->priv->start_result = ret;
3261     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3262     GST_LIVE_SIGNAL (basesrc);
3263     GST_LIVE_UNLOCK (basesrc);
3264     return;
3265   }
3266 }
3267
3268 /**
3269  * gst_base_src_start_wait:
3270  * @basesrc: base source instance
3271  *
3272  * Wait until the start operation completes.
3273  *
3274  * Returns: a #GstFlowReturn.
3275  */
3276 GstFlowReturn
3277 gst_base_src_start_wait (GstBaseSrc * basesrc)
3278 {
3279   GstFlowReturn result;
3280
3281   GST_LIVE_LOCK (basesrc);
3282   if (G_UNLIKELY (basesrc->priv->flushing))
3283     goto flushing;
3284
3285   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3286     GST_LIVE_WAIT (basesrc);
3287     if (G_UNLIKELY (basesrc->priv->flushing))
3288       goto flushing;
3289   }
3290   result = basesrc->priv->start_result;
3291   GST_LIVE_UNLOCK (basesrc);
3292
3293   return result;
3294
3295   /* ERRORS */
3296 flushing:
3297   {
3298     GST_DEBUG_OBJECT (basesrc, "we are flushing");
3299     GST_LIVE_UNLOCK (basesrc);
3300     return GST_FLOW_FLUSHING;
3301   }
3302 }
3303
3304 static gboolean
3305 gst_base_src_stop (GstBaseSrc * basesrc)
3306 {
3307   GstBaseSrcClass *bclass;
3308   gboolean result = TRUE;
3309
3310   GST_DEBUG_OBJECT (basesrc, "stopping source");
3311
3312   /* flush all */
3313   gst_base_src_set_flushing (basesrc, TRUE, FALSE, NULL);
3314   /* stop the task */
3315   gst_pad_stop_task (basesrc->srcpad);
3316
3317   GST_LIVE_LOCK (basesrc);
3318   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3319     goto was_stopped;
3320
3321   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3322   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3323   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3324   GST_LIVE_SIGNAL (basesrc);
3325   GST_LIVE_UNLOCK (basesrc);
3326
3327   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3328   if (bclass->stop)
3329     result = bclass->stop (basesrc);
3330
3331   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3332
3333   return result;
3334
3335 was_stopped:
3336   {
3337     GST_DEBUG_OBJECT (basesrc, "was started");
3338     GST_LIVE_UNLOCK (basesrc);
3339     return TRUE;
3340   }
3341 }
3342
3343 /* start or stop flushing dataprocessing
3344  */
3345 static gboolean
3346 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3347     gboolean flushing, gboolean live_play, gboolean * playing)
3348 {
3349   GstBaseSrcClass *bclass;
3350
3351   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3352
3353   GST_DEBUG_OBJECT (basesrc, "flushing %d, live_play %d", flushing, live_play);
3354
3355   if (flushing) {
3356     gst_base_src_activate_pool (basesrc, FALSE);
3357     /* unlock any subclasses, we need to do this before grabbing the
3358      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3359      * unlock to the params because of backwards compat (see seek handler)*/
3360     if (bclass->unlock)
3361       bclass->unlock (basesrc);
3362   }
3363
3364   /* the live lock is released when we are blocked, waiting for playing or
3365    * when we sync to the clock. */
3366   GST_LIVE_LOCK (basesrc);
3367   if (playing)
3368     *playing = basesrc->live_running;
3369   basesrc->priv->flushing = flushing;
3370   if (flushing) {
3371     /* if we are locked in the live lock, signal it to make it flush */
3372     basesrc->live_running = TRUE;
3373
3374     /* clear pending EOS if any */
3375     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3376
3377     /* step 1, now that we have the LIVE lock, clear our unlock request */
3378     if (bclass->unlock_stop)
3379       bclass->unlock_stop (basesrc);
3380
3381     /* step 2, unblock clock sync (if any) or any other blocking thing */
3382     if (basesrc->clock_id)
3383       gst_clock_id_unschedule (basesrc->clock_id);
3384   } else {
3385     /* signal the live source that it can start playing */
3386     basesrc->live_running = live_play;
3387
3388     gst_base_src_activate_pool (basesrc, TRUE);
3389
3390     /* Drop all delayed events */
3391     GST_OBJECT_LOCK (basesrc);
3392     if (basesrc->priv->pending_events) {
3393       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3394           NULL);
3395       g_list_free (basesrc->priv->pending_events);
3396       basesrc->priv->pending_events = NULL;
3397       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3398     }
3399     GST_OBJECT_UNLOCK (basesrc);
3400   }
3401   GST_LIVE_SIGNAL (basesrc);
3402   GST_LIVE_UNLOCK (basesrc);
3403
3404   return TRUE;
3405 }
3406
3407 /* the purpose of this function is to make sure that a live source blocks in the
3408  * LIVE lock or leaves the LIVE lock and continues playing. */
3409 static gboolean
3410 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3411 {
3412   GstBaseSrcClass *bclass;
3413
3414   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3415
3416   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3417   if (!live_play) {
3418     GST_DEBUG_OBJECT (basesrc, "unlock");
3419     if (bclass->unlock)
3420       bclass->unlock (basesrc);
3421   }
3422
3423   /* we are now able to grab the LIVE lock, when we get it, we can be
3424    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3425    * for the clock. */
3426   GST_LIVE_LOCK (basesrc);
3427   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3428
3429   /* unblock clock sync (if any) */
3430   if (basesrc->clock_id)
3431     gst_clock_id_unschedule (basesrc->clock_id);
3432
3433   /* configure what to do when we get to the LIVE lock. */
3434   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3435   basesrc->live_running = live_play;
3436
3437   if (live_play) {
3438     gboolean start;
3439
3440     /* clear our unlock request when going to PLAYING */
3441     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3442     if (bclass->unlock_stop)
3443       bclass->unlock_stop (basesrc);
3444
3445     /* for live sources we restart the timestamp correction */
3446     basesrc->priv->latency = -1;
3447     /* have to restart the task in case it stopped because of the unlock when
3448      * we went to PAUSED. Only do this if we operating in push mode. */
3449     GST_OBJECT_LOCK (basesrc->srcpad);
3450     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3451     GST_OBJECT_UNLOCK (basesrc->srcpad);
3452     if (start)
3453       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3454           basesrc->srcpad, NULL);
3455     GST_DEBUG_OBJECT (basesrc, "signal");
3456     GST_LIVE_SIGNAL (basesrc);
3457   }
3458   GST_LIVE_UNLOCK (basesrc);
3459
3460   return TRUE;
3461 }
3462
3463 static gboolean
3464 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3465 {
3466   GstBaseSrc *basesrc;
3467
3468   basesrc = GST_BASE_SRC (parent);
3469
3470   /* prepare subclass first */
3471   if (active) {
3472     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3473
3474     if (G_UNLIKELY (!basesrc->can_activate_push))
3475       goto no_push_activation;
3476
3477     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3478       goto error_start;
3479   } else {
3480     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3481     /* now we can stop the source */
3482     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3483       goto error_stop;
3484   }
3485   return TRUE;
3486
3487   /* ERRORS */
3488 no_push_activation:
3489   {
3490     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3491     return FALSE;
3492   }
3493 error_start:
3494   {
3495     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3496     return FALSE;
3497   }
3498 error_stop:
3499   {
3500     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3501     return FALSE;
3502   }
3503 }
3504
3505 static gboolean
3506 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3507 {
3508   GstBaseSrc *basesrc;
3509
3510   basesrc = GST_BASE_SRC (parent);
3511
3512   /* prepare subclass first */
3513   if (active) {
3514     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3515     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3516       goto error_start;
3517   } else {
3518     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3519     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3520       goto error_stop;
3521   }
3522   return TRUE;
3523
3524   /* ERRORS */
3525 error_start:
3526   {
3527     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3528     return FALSE;
3529   }
3530 error_stop:
3531   {
3532     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3533     return FALSE;
3534   }
3535 }
3536
3537 static gboolean
3538 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3539     GstPadMode mode, gboolean active)
3540 {
3541   gboolean res;
3542   GstBaseSrc *src = GST_BASE_SRC (parent);
3543
3544   src->priv->stream_start_pending = FALSE;
3545
3546   switch (mode) {
3547     case GST_PAD_MODE_PULL:
3548       res = gst_base_src_activate_pull (pad, parent, active);
3549       break;
3550     case GST_PAD_MODE_PUSH:
3551       src->priv->stream_start_pending = active;
3552       res = gst_base_src_activate_push (pad, parent, active);
3553       break;
3554     default:
3555       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3556       res = FALSE;
3557       break;
3558   }
3559   return res;
3560 }
3561
3562
3563 static GstStateChangeReturn
3564 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3565 {
3566   GstBaseSrc *basesrc;
3567   GstStateChangeReturn result;
3568   gboolean no_preroll = FALSE;
3569
3570   basesrc = GST_BASE_SRC (element);
3571
3572   switch (transition) {
3573     case GST_STATE_CHANGE_NULL_TO_READY:
3574       break;
3575     case GST_STATE_CHANGE_READY_TO_PAUSED:
3576       no_preroll = gst_base_src_is_live (basesrc);
3577       break;
3578     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3579       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3580       if (gst_base_src_is_live (basesrc)) {
3581         /* now we can start playback */
3582         gst_base_src_set_playing (basesrc, TRUE);
3583       }
3584       break;
3585     default:
3586       break;
3587   }
3588
3589   if ((result =
3590           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3591               transition)) == GST_STATE_CHANGE_FAILURE)
3592     goto failure;
3593
3594   switch (transition) {
3595     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3596       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3597       if (gst_base_src_is_live (basesrc)) {
3598         /* make sure we block in the live lock in PAUSED */
3599         gst_base_src_set_playing (basesrc, FALSE);
3600         no_preroll = TRUE;
3601       }
3602       break;
3603     case GST_STATE_CHANGE_PAUSED_TO_READY:
3604     {
3605       /* we don't need to unblock anything here, the pad deactivation code
3606        * already did this */
3607       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3608       gst_event_replace (&basesrc->pending_seek, NULL);
3609       break;
3610     }
3611     case GST_STATE_CHANGE_READY_TO_NULL:
3612       break;
3613     default:
3614       break;
3615   }
3616
3617   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3618     result = GST_STATE_CHANGE_NO_PREROLL;
3619
3620   return result;
3621
3622   /* ERRORS */
3623 failure:
3624   {
3625     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3626     return result;
3627   }
3628 }