Update for g_type_class_add_private() deprecation in recent GLib
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @title: GstBaseSrc
26  * @short_description: Base class for getrange based source elements
27  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
28  *
29  * This is a generic base class for source elements. The following
30  * types of sources are supported:
31  *
32  *   * random access sources like files
33  *   * seekable sources
34  *   * live sources
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any %GST_EVENT_SEGMENT
39  * events. The default format for #GstBaseSrc is %GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  *
44  *   * The format is set to %GST_FORMAT_BYTES (default).
45  *   * #GstBaseSrcClass.is_seekable() returns %TRUE.
46  *
47  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
48  * automatically seekable in push mode as well. The following conditions must
49  * be met to make the element seekable in push mode when the format is not
50  * %GST_FORMAT_BYTES:
51  *
52  * * #GstBaseSrcClass.is_seekable() returns %TRUE.
53  * * #GstBaseSrcClass.query() can convert all supported seek formats to the
54  *   internal format as set with gst_base_src_set_format().
55  * * #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
56  *    %TRUE.
57  *
58  * When the element does not meet the requirements to operate in pull mode, the
59  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
60  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
61  * element can operate in pull mode but only with specific offsets and
62  * lengths, it is allowed to generate an error when the wrong values are passed
63  * to the #GstBaseSrcClass.create() function.
64  *
65  * #GstBaseSrc has support for live sources. Live sources are sources that when
66  * paused discard data, such as audio or video capture devices. A typical live
67  * source also produces data at a fixed rate and thus provides a clock to publish
68  * this rate.
69  * Use gst_base_src_set_live() to activate the live source mode.
70  *
71  * A live source does not produce data in the PAUSED state. This means that the
72  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
73  * PLAYING. To signal the pipeline that the element will not produce data, the
74  * return value from the READY to PAUSED state will be
75  * %GST_STATE_CHANGE_NO_PREROLL.
76  *
77  * A typical live source will timestamp the buffers it creates with the
78  * current running time of the pipeline. This is one reason why a live source
79  * can only produce data in the PLAYING state, when the clock is actually
80  * distributed and running.
81  *
82  * Live sources that synchronize and block on the clock (an audio source, for
83  * example) can use gst_base_src_wait_playing() when the
84  * #GstBaseSrcClass.create() function was interrupted by a state change to
85  * PAUSED.
86  *
87  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
88  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
89  * function if the source is a live source. The #GstBaseSrcClass.get_times()
90  * function should return timestamps starting from 0, as if it were a non-live
91  * source. The base class will make sure that the timestamps are transformed
92  * into the current running_time. The base source will then wait for the
93  * calculated running_time before pushing out the buffer.
94  *
95  * For live sources, the base class will by default report a latency of 0.
96  * For pseudo live sources, the base class will by default measure the difference
97  * between the first buffer timestamp and the start time of get_times and will
98  * report this value as the latency.
99  * Subclasses should override the query function when this behaviour is not
100  * acceptable.
101  *
102  * There is only support in #GstBaseSrc for exactly one source pad, which
103  * should be named "src". A source implementation (subclass of #GstBaseSrc)
104  * should install a pad template in its class_init function, like so:
105  * |[<!-- language="C" -->
106  * static void
107  * my_element_class_init (GstMyElementClass *klass)
108  * {
109  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
110  *   // srctemplate should be a #GstStaticPadTemplate with direction
111  *   // %GST_PAD_SRC and name "src"
112  *   gst_element_class_add_static_pad_template (gstelement_class, &amp;srctemplate);
113  *
114  *   gst_element_class_set_static_metadata (gstelement_class,
115  *      "Source name",
116  *      "Source",
117  *      "My Source element",
118  *      "The author <my.sink@my.email>");
119  * }
120  * ]|
121  *
122  * ## Controlled shutdown of live sources in applications
123  *
124  * Applications that record from a live source may want to stop recording
125  * in a controlled way, so that the recording is stopped, but the data
126  * already in the pipeline is processed to the end (remember that many live
127  * sources would go on recording forever otherwise). For that to happen the
128  * application needs to make the source stop recording and send an EOS
129  * event down the pipeline. The application would then wait for an
130  * EOS message posted on the pipeline's bus to know when all data has
131  * been processed and the pipeline can safely be stopped.
132  *
133  * An application may send an EOS event to a source element to make it
134  * perform the EOS logic (send EOS event downstream or post a
135  * %GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
136  * with the gst_element_send_event() function on the element or its parent bin.
137  *
138  * After the EOS has been sent to the element, the application should wait for
139  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
140  * received, it may safely shut down the entire pipeline.
141  *
142  */
143
144 #ifdef HAVE_CONFIG_H
145 #  include "config.h"
146 #endif
147
148 #include <stdlib.h>
149 #include <string.h>
150
151 #include <gst/gst_private.h>
152 #include <gst/glib-compat-private.h>
153
154 #include "gstbasesrc.h"
155 #include <gst/gst-i18n-lib.h>
156
157 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
158 #define GST_CAT_DEFAULT gst_base_src_debug
159
160 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
161 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
162 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
163 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
164 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
165 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
166 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
167 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
168 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
169
170
171 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
172 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
173 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
174
175 #define CLEAR_PENDING_EOS(bsrc) \
176   G_STMT_START { \
177     g_atomic_int_set (&bsrc->priv->has_pending_eos, FALSE); \
178     gst_event_replace (&bsrc->priv->pending_eos, NULL); \
179   } G_STMT_END
180
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_DO_TIMESTAMP    FALSE
192
193 enum
194 {
195   PROP_0,
196   PROP_BLOCKSIZE,
197   PROP_NUM_BUFFERS,
198 #ifndef GST_REMOVE_DEPRECATED
199   PROP_TYPEFIND,
200 #endif
201   PROP_DO_TIMESTAMP
202 };
203
204 /* The basesrc implementation need to respect the following locking order:
205  *   1. STREAM_LOCK
206  *   2. LIVE_LOCK
207  *   3. OBJECT_LOCK
208  */
209 struct _GstBaseSrcPrivate
210 {
211   gboolean discont;             /* STREAM_LOCK */
212   gboolean flushing;            /* LIVE_LOCK */
213
214   GstFlowReturn start_result;   /* OBJECT_LOCK */
215   gboolean async;               /* OBJECT_LOCK */
216
217   /* if a stream-start event should be sent */
218   gboolean stream_start_pending;        /* STREAM_LOCK */
219
220   /* if segment should be sent and a
221    * seqnum if it was originated by a seek */
222   gboolean segment_pending;     /* OBJECT_LOCK */
223   guint32 segment_seqnum;       /* OBJECT_LOCK */
224
225   /* if EOS is pending (atomic) */
226   GstEvent *pending_eos;        /* OBJECT_LOCK */
227   gint has_pending_eos;         /* atomic */
228
229   /* if the eos was caused by a forced eos from the application */
230   gboolean forced_eos;          /* LIVE_LOCK */
231
232   /* startup latency is the time it takes between going to PLAYING and producing
233    * the first BUFFER with running_time 0. This value is included in the latency
234    * reporting. */
235   GstClockTime latency;         /* OBJECT_LOCK */
236   /* timestamp offset, this is the offset add to the values of gst_times for
237    * pseudo live sources */
238   GstClockTimeDiff ts_offset;   /* OBJECT_LOCK */
239
240   gboolean do_timestamp;        /* OBJECT_LOCK */
241   volatile gint dynamic_size;   /* atomic */
242   volatile gint automatic_eos;  /* atomic */
243
244   /* stream sequence number */
245   guint32 seqnum;               /* STREAM_LOCK */
246
247   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
248    * pushed in the data stream */
249   GList *pending_events;        /* OBJECT_LOCK */
250   volatile gint have_events;    /* OBJECT_LOCK */
251
252   /* QoS *//* with LOCK */
253   gdouble proportion;           /* OBJECT_LOCK */
254   GstClockTime earliest_time;   /* OBJECT_LOCK */
255
256   GstBufferPool *pool;          /* OBJECT_LOCK */
257   GstAllocator *allocator;      /* OBJECT_LOCK */
258   GstAllocationParams params;   /* OBJECT_LOCK */
259
260   GCond async_cond;             /* OBJECT_LOCK */
261
262   /* for _submit_buffer_list() */
263   GstBufferList *pending_bufferlist;
264 };
265
266 #define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
267     ((src)->priv->pending_bufferlist != NULL)
268
269 static GstElementClass *parent_class = NULL;
270 static gint private_offset = 0;
271
272 static void gst_base_src_class_init (GstBaseSrcClass * klass);
273 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
274 static void gst_base_src_finalize (GObject * object);
275
276
277 GType
278 gst_base_src_get_type (void)
279 {
280   static volatile gsize base_src_type = 0;
281
282   if (g_once_init_enter (&base_src_type)) {
283     GType _type;
284     static const GTypeInfo base_src_info = {
285       sizeof (GstBaseSrcClass),
286       NULL,
287       NULL,
288       (GClassInitFunc) gst_base_src_class_init,
289       NULL,
290       NULL,
291       sizeof (GstBaseSrc),
292       0,
293       (GInstanceInitFunc) gst_base_src_init,
294     };
295
296     _type = g_type_register_static (GST_TYPE_ELEMENT,
297         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
298
299     private_offset =
300         g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate));
301
302     g_once_init_leave (&base_src_type, _type);
303   }
304   return base_src_type;
305 }
306
307 static inline GstBaseSrcPrivate *
308 gst_base_src_get_instance_private (GstBaseSrc * self)
309 {
310   return (G_STRUCT_MEMBER_P (self, private_offset));
311 }
312
313 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
314     GstCaps * filter);
315 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
316 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
317
318 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
319 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
320     GstPadMode mode, gboolean active);
321 static void gst_base_src_set_property (GObject * object, guint prop_id,
322     const GValue * value, GParamSpec * pspec);
323 static void gst_base_src_get_property (GObject * object, guint prop_id,
324     GValue * value, GParamSpec * pspec);
325 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
326     GstEvent * event);
327 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
328 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
329
330 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
331     GstQuery * query);
332
333 static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc,
334     gboolean flushing);
335 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
337     GstSegment * segment);
338 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
339 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
340     GstEvent * event, GstSegment * segment);
341 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
342     guint64 offset, guint size, GstBuffer ** buf);
343 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
344     guint64 offset, guint size, GstBuffer ** buf);
345 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
346     GstQuery * query);
347
348 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
349     gboolean flushing);
350
351 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
352 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
353
354 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
355     GstStateChange transition);
356
357 static void gst_base_src_loop (GstPad * pad);
358 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
359     guint64 offset, guint length, GstBuffer ** buf);
360 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
361     guint length, GstBuffer ** buf);
362 static gboolean gst_base_src_seekable (GstBaseSrc * src);
363 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
364 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
365     guint * length, gboolean force);
366
367 static void
368 gst_base_src_class_init (GstBaseSrcClass * klass)
369 {
370   GObjectClass *gobject_class;
371   GstElementClass *gstelement_class;
372
373   gobject_class = G_OBJECT_CLASS (klass);
374   gstelement_class = GST_ELEMENT_CLASS (klass);
375
376   if (private_offset != 0)
377     g_type_class_adjust_private_offset (klass, &private_offset);
378
379   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
380
381   parent_class = g_type_class_peek_parent (klass);
382
383   gobject_class->finalize = gst_base_src_finalize;
384   gobject_class->set_property = gst_base_src_set_property;
385   gobject_class->get_property = gst_base_src_get_property;
386
387   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
388       g_param_spec_uint ("blocksize", "Block size",
389           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
390           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
391   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
392       g_param_spec_int ("num-buffers", "num-buffers",
393           "Number of buffers to output before sending EOS (-1 = unlimited)",
394           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
395           G_PARAM_STATIC_STRINGS));
396 #ifndef GST_REMOVE_DEPRECATED
397   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
398       g_param_spec_boolean ("typefind", "Typefind",
399           "Run typefind before negotiating (deprecated, non-functional)", FALSE,
400           G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS));
401 #endif
402   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
403       g_param_spec_boolean ("do-timestamp", "Do timestamp",
404           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
405           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
406
407   gstelement_class->change_state =
408       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
409   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
410
411   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
412   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
413   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
414   klass->prepare_seek_segment =
415       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
416   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
417   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
418   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
419   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
420   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
421   klass->decide_allocation =
422       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
423
424   /* Registering debug symbols for function pointers */
425   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
426   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
427   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
428   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
429   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
430 }
431
432 static void
433 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
434 {
435   GstPad *pad;
436   GstPadTemplate *pad_template;
437
438   basesrc->priv = gst_base_src_get_instance_private (basesrc);
439
440   basesrc->is_live = FALSE;
441   g_mutex_init (&basesrc->live_lock);
442   g_cond_init (&basesrc->live_cond);
443   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
444   basesrc->num_buffers_left = -1;
445   basesrc->priv->automatic_eos = TRUE;
446
447   basesrc->can_activate_push = TRUE;
448
449   pad_template =
450       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
451   g_return_if_fail (pad_template != NULL);
452
453   GST_DEBUG_OBJECT (basesrc, "creating src pad");
454   pad = gst_pad_new_from_template (pad_template, "src");
455
456   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
457   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
458   gst_pad_set_event_function (pad, gst_base_src_event);
459   gst_pad_set_query_function (pad, gst_base_src_query);
460   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
461
462   /* hold pointer to pad */
463   basesrc->srcpad = pad;
464   GST_DEBUG_OBJECT (basesrc, "adding src pad");
465   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
466
467   basesrc->blocksize = DEFAULT_BLOCKSIZE;
468   basesrc->clock_id = NULL;
469   /* we operate in BYTES by default */
470   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
471   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
472   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
473
474   g_cond_init (&basesrc->priv->async_cond);
475   basesrc->priv->start_result = GST_FLOW_FLUSHING;
476   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
477   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
478   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
479
480   GST_DEBUG_OBJECT (basesrc, "init done");
481 }
482
483 static void
484 gst_base_src_finalize (GObject * object)
485 {
486   GstBaseSrc *basesrc;
487   GstEvent **event_p;
488
489   basesrc = GST_BASE_SRC (object);
490
491   g_mutex_clear (&basesrc->live_lock);
492   g_cond_clear (&basesrc->live_cond);
493   g_cond_clear (&basesrc->priv->async_cond);
494
495   event_p = &basesrc->pending_seek;
496   gst_event_replace (event_p, NULL);
497
498   if (basesrc->priv->pending_events) {
499     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
500         NULL);
501     g_list_free (basesrc->priv->pending_events);
502   }
503
504   G_OBJECT_CLASS (parent_class)->finalize (object);
505 }
506
507 /* Call with LIVE_LOCK held */
508 static GstFlowReturn
509 gst_base_src_wait_playing_unlocked (GstBaseSrc * src)
510 {
511   while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) {
512     /* block until the state changes, or we get a flush, or something */
513     GST_DEBUG_OBJECT (src, "live source waiting for running state");
514     GST_LIVE_WAIT (src);
515     GST_DEBUG_OBJECT (src, "live source unlocked");
516   }
517
518   if (src->priv->flushing)
519     goto flushing;
520
521   return GST_FLOW_OK;
522
523   /* ERRORS */
524 flushing:
525   {
526     GST_DEBUG_OBJECT (src, "we are flushing");
527     return GST_FLOW_FLUSHING;
528   }
529 }
530
531
532 /**
533  * gst_base_src_wait_playing:
534  * @src: the src
535  *
536  * If the #GstBaseSrcClass.create() method performs its own synchronisation
537  * against the clock it must unblock when going from PLAYING to the PAUSED state
538  * and call this method before continuing to produce the remaining data.
539  *
540  * This function will block until a state change to PLAYING happens (in which
541  * case this function returns %GST_FLOW_OK) or the processing must be stopped due
542  * to a state change to READY or a FLUSH event (in which case this function
543  * returns %GST_FLOW_FLUSHING).
544  *
545  * Returns: %GST_FLOW_OK if @src is PLAYING and processing can
546  * continue. Any other return value should be returned from the create vmethod.
547  */
548 GstFlowReturn
549 gst_base_src_wait_playing (GstBaseSrc * src)
550 {
551   GstFlowReturn ret;
552
553   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
554
555   GST_LIVE_LOCK (src);
556   ret = gst_base_src_wait_playing_unlocked (src);
557   GST_LIVE_UNLOCK (src);
558
559   return ret;
560 }
561
562 /**
563  * gst_base_src_set_live:
564  * @src: base source instance
565  * @live: new live-mode
566  *
567  * If the element listens to a live source, @live should
568  * be set to %TRUE.
569  *
570  * A live source will not produce data in the PAUSED state and
571  * will therefore not be able to participate in the PREROLL phase
572  * of a pipeline. To signal this fact to the application and the
573  * pipeline, the state change return value of the live source will
574  * be GST_STATE_CHANGE_NO_PREROLL.
575  */
576 void
577 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
578 {
579   g_return_if_fail (GST_IS_BASE_SRC (src));
580
581   GST_OBJECT_LOCK (src);
582   src->is_live = live;
583   GST_OBJECT_UNLOCK (src);
584 }
585
586 /**
587  * gst_base_src_is_live:
588  * @src: base source instance
589  *
590  * Check if an element is in live mode.
591  *
592  * Returns: %TRUE if element is in live mode.
593  */
594 gboolean
595 gst_base_src_is_live (GstBaseSrc * src)
596 {
597   gboolean result;
598
599   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
600
601   GST_OBJECT_LOCK (src);
602   result = src->is_live;
603   GST_OBJECT_UNLOCK (src);
604
605   return result;
606 }
607
608 /**
609  * gst_base_src_set_format:
610  * @src: base source instance
611  * @format: the format to use
612  *
613  * Sets the default format of the source. This will be the format used
614  * for sending SEGMENT events and for performing seeks.
615  *
616  * If a format of GST_FORMAT_BYTES is set, the element will be able to
617  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns %TRUE.
618  *
619  * This function must only be called in states < %GST_STATE_PAUSED.
620  */
621 void
622 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
623 {
624   g_return_if_fail (GST_IS_BASE_SRC (src));
625   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
626
627   GST_OBJECT_LOCK (src);
628   gst_segment_init (&src->segment, format);
629   GST_OBJECT_UNLOCK (src);
630 }
631
632 /**
633  * gst_base_src_set_dynamic_size:
634  * @src: base source instance
635  * @dynamic: new dynamic size mode
636  *
637  * If not @dynamic, size is only updated when needed, such as when trying to
638  * read past current tracked size.  Otherwise, size is checked for upon each
639  * read.
640  */
641 void
642 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
643 {
644   g_return_if_fail (GST_IS_BASE_SRC (src));
645
646   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
647 }
648
649 /**
650  * gst_base_src_set_automatic_eos:
651  * @src: base source instance
652  * @automatic_eos: automatic eos
653  *
654  * If @automatic_eos is %TRUE, @src will automatically go EOS if a buffer
655  * after the total size is returned. By default this is %TRUE but sources
656  * that can't return an authoritative size and only know that they're EOS
657  * when trying to read more should set this to %FALSE.
658  *
659  * Since: 1.4
660  */
661 void
662 gst_base_src_set_automatic_eos (GstBaseSrc * src, gboolean automatic_eos)
663 {
664   g_return_if_fail (GST_IS_BASE_SRC (src));
665
666   g_atomic_int_set (&src->priv->automatic_eos, automatic_eos);
667 }
668
669 /**
670  * gst_base_src_set_async:
671  * @src: base source instance
672  * @async: new async mode
673  *
674  * Configure async behaviour in @src, no state change will block. The open,
675  * close, start, stop, play and pause virtual methods will be executed in a
676  * different thread and are thus allowed to perform blocking operations. Any
677  * blocking operation should be unblocked with the unlock vmethod.
678  */
679 void
680 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
681 {
682   g_return_if_fail (GST_IS_BASE_SRC (src));
683
684   GST_OBJECT_LOCK (src);
685   src->priv->async = async;
686   GST_OBJECT_UNLOCK (src);
687 }
688
689 /**
690  * gst_base_src_is_async:
691  * @src: base source instance
692  *
693  * Get the current async behaviour of @src. See also gst_base_src_set_async().
694  *
695  * Returns: %TRUE if @src is operating in async mode.
696  */
697 gboolean
698 gst_base_src_is_async (GstBaseSrc * src)
699 {
700   gboolean res;
701
702   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
703
704   GST_OBJECT_LOCK (src);
705   res = src->priv->async;
706   GST_OBJECT_UNLOCK (src);
707
708   return res;
709 }
710
711
712 /**
713  * gst_base_src_query_latency:
714  * @src: the source
715  * @live: (out) (allow-none): if the source is live
716  * @min_latency: (out) (allow-none): the min latency of the source
717  * @max_latency: (out) (allow-none): the max latency of the source
718  *
719  * Query the source for the latency parameters. @live will be %TRUE when @src is
720  * configured as a live source. @min_latency and @max_latency will be set
721  * to the difference between the running time and the timestamp of the first
722  * buffer.
723  *
724  * This function is mostly used by subclasses.
725  *
726  * Returns: %TRUE if the query succeeded.
727  */
728 gboolean
729 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
730     GstClockTime * min_latency, GstClockTime * max_latency)
731 {
732   GstClockTime min;
733
734   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
735
736   GST_OBJECT_LOCK (src);
737   if (live)
738     *live = src->is_live;
739
740   /* if we have a startup latency, report this one, else report 0. Subclasses
741    * are supposed to override the query function if they want something
742    * else. */
743   if (src->priv->latency != -1)
744     min = src->priv->latency;
745   else
746     min = 0;
747
748   if (min_latency)
749     *min_latency = min;
750   if (max_latency)
751     *max_latency = min;
752
753   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
754       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
755       GST_TIME_ARGS (min));
756   GST_OBJECT_UNLOCK (src);
757
758   return TRUE;
759 }
760
761 /**
762  * gst_base_src_set_blocksize:
763  * @src: the source
764  * @blocksize: the new blocksize in bytes
765  *
766  * Set the number of bytes that @src will push out with each buffer. When
767  * @blocksize is set to -1, a default length will be used.
768  */
769 void
770 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
771 {
772   g_return_if_fail (GST_IS_BASE_SRC (src));
773
774   GST_OBJECT_LOCK (src);
775   src->blocksize = blocksize;
776   GST_OBJECT_UNLOCK (src);
777 }
778
779 /**
780  * gst_base_src_get_blocksize:
781  * @src: the source
782  *
783  * Get the number of bytes that @src will push out with each buffer.
784  *
785  * Returns: the number of bytes pushed with each buffer.
786  */
787 guint
788 gst_base_src_get_blocksize (GstBaseSrc * src)
789 {
790   gint res;
791
792   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
793
794   GST_OBJECT_LOCK (src);
795   res = src->blocksize;
796   GST_OBJECT_UNLOCK (src);
797
798   return res;
799 }
800
801
802 /**
803  * gst_base_src_set_do_timestamp:
804  * @src: the source
805  * @timestamp: enable or disable timestamping
806  *
807  * Configure @src to automatically timestamp outgoing buffers based on the
808  * current running_time of the pipeline. This property is mostly useful for live
809  * sources.
810  */
811 void
812 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
813 {
814   g_return_if_fail (GST_IS_BASE_SRC (src));
815
816   GST_OBJECT_LOCK (src);
817   src->priv->do_timestamp = timestamp;
818   if (timestamp && src->segment.format != GST_FORMAT_TIME)
819     gst_segment_init (&src->segment, GST_FORMAT_TIME);
820   GST_OBJECT_UNLOCK (src);
821 }
822
823 /**
824  * gst_base_src_get_do_timestamp:
825  * @src: the source
826  *
827  * Query if @src timestamps outgoing buffers based on the current running_time.
828  *
829  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
830  */
831 gboolean
832 gst_base_src_get_do_timestamp (GstBaseSrc * src)
833 {
834   gboolean res;
835
836   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
837
838   GST_OBJECT_LOCK (src);
839   res = src->priv->do_timestamp;
840   GST_OBJECT_UNLOCK (src);
841
842   return res;
843 }
844
845 /**
846  * gst_base_src_new_seamless_segment:
847  * @src: The source
848  * @start: The new start value for the segment
849  * @stop: Stop value for the new segment
850  * @time: The new time value for the start of the new segment
851  *
852  * Prepare a new seamless segment for emission downstream. This function must
853  * only be called by derived sub-classes, and only from the create() function,
854  * as the stream-lock needs to be held.
855  *
856  * The format for the new segment will be the current format of the source, as
857  * configured with gst_base_src_set_format()
858  *
859  * Returns: %TRUE if preparation of the seamless segment succeeded.
860  */
861 gboolean
862 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
863     gint64 time)
864 {
865   gboolean res = TRUE;
866
867   GST_OBJECT_LOCK (src);
868
869   src->segment.base = gst_segment_to_running_time (&src->segment,
870       src->segment.format, src->segment.position);
871   src->segment.position = src->segment.start = start;
872   src->segment.stop = stop;
873   src->segment.time = time;
874
875   /* Mark pending segment. Will be sent before next data */
876   src->priv->segment_pending = TRUE;
877   src->priv->segment_seqnum = gst_util_seqnum_next ();
878
879   GST_DEBUG_OBJECT (src,
880       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
881       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
882       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
883       GST_TIME_ARGS (src->segment.base));
884
885   GST_OBJECT_UNLOCK (src);
886
887   src->priv->discont = TRUE;
888   src->running = TRUE;
889
890   return res;
891 }
892
893 /* called with STREAM_LOCK */
894 static gboolean
895 gst_base_src_send_stream_start (GstBaseSrc * src)
896 {
897   gboolean ret = TRUE;
898
899   if (src->priv->stream_start_pending) {
900     gchar *stream_id;
901     GstEvent *event;
902
903     stream_id =
904         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
905
906     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
907     event = gst_event_new_stream_start (stream_id);
908     gst_event_set_group_id (event, gst_util_group_id_next ());
909
910     ret = gst_pad_push_event (src->srcpad, event);
911     src->priv->stream_start_pending = FALSE;
912     g_free (stream_id);
913   }
914
915   return ret;
916 }
917
918 /**
919  * gst_base_src_set_caps:
920  * @src: a #GstBaseSrc
921  * @caps: (transfer none): a #GstCaps
922  *
923  * Set new caps on the basesrc source pad.
924  *
925  * Returns: %TRUE if the caps could be set
926  */
927 gboolean
928 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
929 {
930   GstBaseSrcClass *bclass;
931   gboolean res = TRUE;
932   GstCaps *current_caps;
933
934   bclass = GST_BASE_SRC_GET_CLASS (src);
935
936   gst_base_src_send_stream_start (src);
937
938   current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (src));
939   if (current_caps && gst_caps_is_equal (current_caps, caps)) {
940     GST_DEBUG_OBJECT (src, "New caps equal to old ones: %" GST_PTR_FORMAT,
941         caps);
942     res = TRUE;
943   } else {
944     if (bclass->set_caps)
945       res = bclass->set_caps (src, caps);
946
947     if (res)
948       res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
949   }
950
951   if (current_caps)
952     gst_caps_unref (current_caps);
953
954   return res;
955 }
956
957 static GstCaps *
958 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
959 {
960   GstCaps *caps = NULL;
961   GstPadTemplate *pad_template;
962   GstBaseSrcClass *bclass;
963
964   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
965
966   pad_template =
967       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
968
969   if (pad_template != NULL) {
970     caps = gst_pad_template_get_caps (pad_template);
971
972     if (filter) {
973       GstCaps *intersection;
974
975       intersection =
976           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
977       gst_caps_unref (caps);
978       caps = intersection;
979     }
980   }
981   return caps;
982 }
983
984 static GstCaps *
985 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
986 {
987   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
988   return gst_caps_fixate (caps);
989 }
990
991 static GstCaps *
992 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
993 {
994   GstBaseSrcClass *bclass;
995
996   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
997
998   if (bclass->fixate)
999     caps = bclass->fixate (bsrc, caps);
1000
1001   return caps;
1002 }
1003
1004 static gboolean
1005 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
1006 {
1007   gboolean res;
1008
1009   switch (GST_QUERY_TYPE (query)) {
1010     case GST_QUERY_POSITION:
1011     {
1012       GstFormat format;
1013
1014       gst_query_parse_position (query, &format, NULL);
1015
1016       GST_DEBUG_OBJECT (src, "position query in format %s",
1017           gst_format_get_name (format));
1018
1019       switch (format) {
1020         case GST_FORMAT_PERCENT:
1021         {
1022           gint64 percent;
1023           gint64 position;
1024           gint64 duration;
1025
1026           GST_OBJECT_LOCK (src);
1027           position = src->segment.position;
1028           duration = src->segment.duration;
1029           GST_OBJECT_UNLOCK (src);
1030
1031           if (position != -1 && duration != -1) {
1032             if (position < duration)
1033               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
1034                   duration);
1035             else
1036               percent = GST_FORMAT_PERCENT_MAX;
1037           } else
1038             percent = -1;
1039
1040           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
1041           res = TRUE;
1042           break;
1043         }
1044         default:
1045         {
1046           gint64 position;
1047           GstFormat seg_format;
1048
1049           GST_OBJECT_LOCK (src);
1050           position =
1051               gst_segment_to_stream_time (&src->segment, src->segment.format,
1052               src->segment.position);
1053           seg_format = src->segment.format;
1054           GST_OBJECT_UNLOCK (src);
1055
1056           if (position != -1) {
1057             /* convert to requested format */
1058             res =
1059                 gst_pad_query_convert (src->srcpad, seg_format,
1060                 position, format, &position);
1061           } else
1062             res = TRUE;
1063
1064           if (res)
1065             gst_query_set_position (query, format, position);
1066
1067           break;
1068         }
1069       }
1070       break;
1071     }
1072     case GST_QUERY_DURATION:
1073     {
1074       GstFormat format;
1075
1076       gst_query_parse_duration (query, &format, NULL);
1077
1078       GST_DEBUG_OBJECT (src, "duration query in format %s",
1079           gst_format_get_name (format));
1080
1081       switch (format) {
1082         case GST_FORMAT_PERCENT:
1083           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1084               GST_FORMAT_PERCENT_MAX);
1085           res = TRUE;
1086           break;
1087         default:
1088         {
1089           gint64 duration;
1090           GstFormat seg_format;
1091           guint length = 0;
1092
1093           /* may have to refresh duration */
1094           gst_base_src_update_length (src, 0, &length,
1095               g_atomic_int_get (&src->priv->dynamic_size));
1096
1097           /* this is the duration as configured by the subclass. */
1098           GST_OBJECT_LOCK (src);
1099           duration = src->segment.duration;
1100           seg_format = src->segment.format;
1101           GST_OBJECT_UNLOCK (src);
1102
1103           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1104               duration, gst_format_get_name (seg_format));
1105
1106           if (duration != -1) {
1107             /* convert to requested format, if this fails, we have a duration
1108              * but we cannot answer the query, we must return FALSE. */
1109             res =
1110                 gst_pad_query_convert (src->srcpad, seg_format,
1111                 duration, format, &duration);
1112           } else {
1113             /* The subclass did not configure a duration, we assume that the
1114              * media has an unknown duration then and we return TRUE to report
1115              * this. Note that this is not the same as returning FALSE, which
1116              * means that we cannot report the duration at all. */
1117             res = TRUE;
1118           }
1119
1120           if (res)
1121             gst_query_set_duration (query, format, duration);
1122
1123           break;
1124         }
1125       }
1126       break;
1127     }
1128
1129     case GST_QUERY_SEEKING:
1130     {
1131       GstFormat format, seg_format;
1132       gint64 duration;
1133
1134       GST_OBJECT_LOCK (src);
1135       duration = src->segment.duration;
1136       seg_format = src->segment.format;
1137       GST_OBJECT_UNLOCK (src);
1138
1139       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1140       if (format == seg_format) {
1141         gst_query_set_seeking (query, seg_format,
1142             gst_base_src_seekable (src), 0, duration);
1143         res = TRUE;
1144       } else {
1145         /* FIXME 2.0: return TRUE + seekable=FALSE for SEEKING query here */
1146         /* Don't reply to the query to make up for demuxers which don't
1147          * handle the SEEKING query yet. Players like Totem will fall back
1148          * to the duration when the SEEKING query isn't answered. */
1149         res = FALSE;
1150       }
1151       break;
1152     }
1153     case GST_QUERY_SEGMENT:
1154     {
1155       GstFormat format;
1156       gint64 start, stop;
1157
1158       GST_OBJECT_LOCK (src);
1159
1160       format = src->segment.format;
1161
1162       start =
1163           gst_segment_to_stream_time (&src->segment, format,
1164           src->segment.start);
1165       if ((stop = src->segment.stop) == -1)
1166         stop = src->segment.duration;
1167       else
1168         stop = gst_segment_to_stream_time (&src->segment, format, stop);
1169
1170       gst_query_set_segment (query, src->segment.rate, format, start, stop);
1171
1172       GST_OBJECT_UNLOCK (src);
1173       res = TRUE;
1174       break;
1175     }
1176
1177     case GST_QUERY_FORMATS:
1178     {
1179       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1180           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1181       res = TRUE;
1182       break;
1183     }
1184     case GST_QUERY_CONVERT:
1185     {
1186       GstFormat src_fmt, dest_fmt;
1187       gint64 src_val, dest_val;
1188
1189       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1190
1191       /* we can only convert between equal formats... */
1192       if (src_fmt == dest_fmt) {
1193         dest_val = src_val;
1194         res = TRUE;
1195       } else
1196         res = FALSE;
1197
1198       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1199       break;
1200     }
1201     case GST_QUERY_LATENCY:
1202     {
1203       GstClockTime min, max;
1204       gboolean live;
1205
1206       /* Subclasses should override and implement something useful */
1207       res = gst_base_src_query_latency (src, &live, &min, &max);
1208
1209       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1210           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1211           GST_TIME_ARGS (max));
1212
1213       gst_query_set_latency (query, live, min, max);
1214       break;
1215     }
1216     case GST_QUERY_JITTER:
1217     case GST_QUERY_RATE:
1218       res = FALSE;
1219       break;
1220     case GST_QUERY_BUFFERING:
1221     {
1222       GstFormat format, seg_format;
1223       gint64 start, stop, estimated;
1224
1225       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1226
1227       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1228           gst_format_get_name (format));
1229
1230       GST_OBJECT_LOCK (src);
1231       if (src->random_access) {
1232         estimated = 0;
1233         start = 0;
1234         if (format == GST_FORMAT_PERCENT)
1235           stop = GST_FORMAT_PERCENT_MAX;
1236         else
1237           stop = src->segment.duration;
1238       } else {
1239         estimated = -1;
1240         start = -1;
1241         stop = -1;
1242       }
1243       seg_format = src->segment.format;
1244       GST_OBJECT_UNLOCK (src);
1245
1246       /* convert to required format. When the conversion fails, we can't answer
1247        * the query. When the value is unknown, we can don't perform conversion
1248        * but report TRUE. */
1249       if (format != GST_FORMAT_PERCENT && stop != -1) {
1250         res = gst_pad_query_convert (src->srcpad, seg_format,
1251             stop, format, &stop);
1252       } else {
1253         res = TRUE;
1254       }
1255       if (res && format != GST_FORMAT_PERCENT && start != -1)
1256         res = gst_pad_query_convert (src->srcpad, seg_format,
1257             start, format, &start);
1258
1259       gst_query_set_buffering_range (query, format, start, stop, estimated);
1260       break;
1261     }
1262     case GST_QUERY_SCHEDULING:
1263     {
1264       gboolean random_access;
1265
1266       random_access = gst_base_src_is_random_access (src);
1267
1268       /* we can operate in getrange mode if the native format is bytes
1269        * and we are seekable, this condition is set in the random_access
1270        * flag and is set in the _start() method. */
1271       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1272       if (random_access)
1273         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1274       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1275
1276       res = TRUE;
1277       break;
1278     }
1279     case GST_QUERY_CAPS:
1280     {
1281       GstBaseSrcClass *bclass;
1282       GstCaps *caps, *filter;
1283
1284       bclass = GST_BASE_SRC_GET_CLASS (src);
1285       if (bclass->get_caps) {
1286         gst_query_parse_caps (query, &filter);
1287         if ((caps = bclass->get_caps (src, filter))) {
1288           gst_query_set_caps_result (query, caps);
1289           gst_caps_unref (caps);
1290           res = TRUE;
1291         } else {
1292           res = FALSE;
1293         }
1294       } else
1295         res = FALSE;
1296       break;
1297     }
1298     case GST_QUERY_URI:{
1299       if (GST_IS_URI_HANDLER (src)) {
1300         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1301
1302         if (uri != NULL) {
1303           gst_query_set_uri (query, uri);
1304           g_free (uri);
1305           res = TRUE;
1306         } else {
1307           res = FALSE;
1308         }
1309       } else {
1310         res = FALSE;
1311       }
1312       break;
1313     }
1314     default:
1315       res = FALSE;
1316       break;
1317   }
1318   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1319       res);
1320
1321   return res;
1322 }
1323
1324 static gboolean
1325 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1326 {
1327   GstBaseSrc *src;
1328   GstBaseSrcClass *bclass;
1329   gboolean result = FALSE;
1330
1331   src = GST_BASE_SRC (parent);
1332   bclass = GST_BASE_SRC_GET_CLASS (src);
1333
1334   if (bclass->query)
1335     result = bclass->query (src, query);
1336
1337   return result;
1338 }
1339
1340 static gboolean
1341 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1342 {
1343   gboolean res = TRUE;
1344
1345   /* update our offset if the start/stop position was updated */
1346   if (segment->format == GST_FORMAT_BYTES) {
1347     segment->time = segment->start;
1348   } else if (segment->start == 0) {
1349     /* seek to start, we can implement a default for this. */
1350     segment->time = 0;
1351   } else {
1352     res = FALSE;
1353     GST_INFO_OBJECT (src, "Can't do a default seek");
1354   }
1355
1356   return res;
1357 }
1358
1359 static gboolean
1360 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1361 {
1362   GstBaseSrcClass *bclass;
1363   gboolean result = FALSE;
1364
1365   bclass = GST_BASE_SRC_GET_CLASS (src);
1366
1367   GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
1368
1369   if (bclass->do_seek)
1370     result = bclass->do_seek (src, segment);
1371
1372   return result;
1373 }
1374
1375 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1376
1377 static gboolean
1378 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1379     GstSegment * segment)
1380 {
1381   /* By default, we try one of 2 things:
1382    *   - For absolute seek positions, convert the requested position to our
1383    *     configured processing format and place it in the output segment \
1384    *   - For relative seek positions, convert our current (input) values to the
1385    *     seek format, adjust by the relative seek offset and then convert back to
1386    *     the processing format
1387    */
1388   GstSeekType start_type, stop_type;
1389   gint64 start, stop;
1390   GstSeekFlags flags;
1391   GstFormat seek_format, dest_format;
1392   gdouble rate;
1393   gboolean update;
1394   gboolean res = TRUE;
1395
1396   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1397       &start_type, &start, &stop_type, &stop);
1398   dest_format = segment->format;
1399
1400   if (seek_format == dest_format) {
1401     gst_segment_do_seek (segment, rate, seek_format, flags,
1402         start_type, start, stop_type, stop, &update);
1403     return TRUE;
1404   }
1405
1406   if (start_type != GST_SEEK_TYPE_NONE) {
1407     /* FIXME: Handle seek_end by converting the input segment vals */
1408     res =
1409         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1410         &start);
1411     start_type = GST_SEEK_TYPE_SET;
1412   }
1413
1414   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1415     /* FIXME: Handle seek_end by converting the input segment vals */
1416     res =
1417         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1418         &stop);
1419     stop_type = GST_SEEK_TYPE_SET;
1420   }
1421
1422   /* And finally, configure our output segment in the desired format */
1423   gst_segment_do_seek (segment, rate, dest_format, flags, start_type, start,
1424       stop_type, stop, &update);
1425
1426   if (!res)
1427     goto no_format;
1428
1429   return res;
1430
1431 no_format:
1432   {
1433     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1434     return FALSE;
1435   }
1436 }
1437
1438 static gboolean
1439 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1440     GstSegment * seeksegment)
1441 {
1442   GstBaseSrcClass *bclass;
1443   gboolean result = FALSE;
1444
1445   bclass = GST_BASE_SRC_GET_CLASS (src);
1446
1447   if (bclass->prepare_seek_segment)
1448     result = bclass->prepare_seek_segment (src, event, seeksegment);
1449
1450   return result;
1451 }
1452
1453 static GstFlowReturn
1454 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1455     guint size, GstBuffer ** buffer)
1456 {
1457   GstFlowReturn ret;
1458   GstBaseSrcPrivate *priv = src->priv;
1459   GstBufferPool *pool = NULL;
1460   GstAllocator *allocator = NULL;
1461   GstAllocationParams params;
1462
1463   GST_OBJECT_LOCK (src);
1464   if (priv->pool) {
1465     pool = gst_object_ref (priv->pool);
1466   } else if (priv->allocator) {
1467     allocator = gst_object_ref (priv->allocator);
1468   }
1469   params = priv->params;
1470   GST_OBJECT_UNLOCK (src);
1471
1472   if (pool) {
1473     ret = gst_buffer_pool_acquire_buffer (pool, buffer, NULL);
1474   } else if (size != -1) {
1475     *buffer = gst_buffer_new_allocate (allocator, size, &params);
1476     if (G_UNLIKELY (*buffer == NULL))
1477       goto alloc_failed;
1478
1479     ret = GST_FLOW_OK;
1480   } else {
1481     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1482         size);
1483     goto alloc_failed;
1484   }
1485
1486 done:
1487   if (pool)
1488     gst_object_unref (pool);
1489   if (allocator)
1490     gst_object_unref (allocator);
1491
1492   return ret;
1493
1494   /* ERRORS */
1495 alloc_failed:
1496   {
1497     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1498     ret = GST_FLOW_ERROR;
1499     goto done;
1500   }
1501 }
1502
1503 static GstFlowReturn
1504 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1505     guint size, GstBuffer ** buffer)
1506 {
1507   GstBaseSrcClass *bclass;
1508   GstFlowReturn ret;
1509   GstBuffer *res_buf;
1510
1511   bclass = GST_BASE_SRC_GET_CLASS (src);
1512
1513   if (G_UNLIKELY (!bclass->alloc))
1514     goto no_function;
1515   if (G_UNLIKELY (!bclass->fill))
1516     goto no_function;
1517
1518   if (*buffer == NULL) {
1519     /* downstream did not provide us with a buffer to fill, allocate one
1520      * ourselves */
1521     ret = bclass->alloc (src, offset, size, &res_buf);
1522     if (G_UNLIKELY (ret != GST_FLOW_OK))
1523       goto alloc_failed;
1524   } else {
1525     res_buf = *buffer;
1526   }
1527
1528   if (G_LIKELY (size > 0)) {
1529     /* only call fill when there is a size */
1530     ret = bclass->fill (src, offset, size, res_buf);
1531     if (G_UNLIKELY (ret != GST_FLOW_OK))
1532       goto not_ok;
1533   }
1534
1535   *buffer = res_buf;
1536
1537   return GST_FLOW_OK;
1538
1539   /* ERRORS */
1540 no_function:
1541   {
1542     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1543     return GST_FLOW_NOT_SUPPORTED;
1544   }
1545 alloc_failed:
1546   {
1547     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1548     return ret;
1549   }
1550 not_ok:
1551   {
1552     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1553         gst_flow_get_name (ret));
1554     if (*buffer == NULL)
1555       gst_buffer_unref (res_buf);
1556     return ret;
1557   }
1558 }
1559
1560 /* this code implements the seeking. It is a good example
1561  * handling all cases.
1562  *
1563  * A seek updates the currently configured segment.start
1564  * and segment.stop values based on the SEEK_TYPE. If the
1565  * segment.start value is updated, a seek to this new position
1566  * should be performed.
1567  *
1568  * The seek can only be executed when we are not currently
1569  * streaming any data, to make sure that this is the case, we
1570  * acquire the STREAM_LOCK which is taken when we are in the
1571  * _loop() function or when a getrange() is called. Normally
1572  * we will not receive a seek if we are operating in pull mode
1573  * though. When we operate as a live source we might block on the live
1574  * cond, which does not release the STREAM_LOCK. Therefore we will try
1575  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1576  * safe to perform the seek.
1577  *
1578  * When we are in the loop() function, we might be in the middle
1579  * of pushing a buffer, which might block in a sink. To make sure
1580  * that the push gets unblocked we push out a FLUSH_START event.
1581  * Our loop function will get a FLUSHING return value from
1582  * the push and will pause, effectively releasing the STREAM_LOCK.
1583  *
1584  * For a non-flushing seek, we pause the task, which might eventually
1585  * release the STREAM_LOCK. We say eventually because when the sink
1586  * blocks on the sample we might wait a very long time until the sink
1587  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1588  * can continue the seek. A non-flushing seek is normally done in a
1589  * running pipeline to perform seamless playback, this means that the sink is
1590  * PLAYING and will return from its chain function.
1591  * In the case of a non-flushing seek we need to make sure that the
1592  * data we output after the seek is continuous with the previous data,
1593  * this is because a non-flushing seek does not reset the running-time
1594  * to 0. We do this by closing the currently running segment, ie. sending
1595  * a new_segment event with the stop position set to the last processed
1596  * position.
1597  *
1598  * After updating the segment.start/stop values, we prepare for
1599  * streaming again. We push out a FLUSH_STOP to make the peer pad
1600  * accept data again and we start our task again.
1601  *
1602  * A segment seek posts a message on the bus saying that the playback
1603  * of the segment started. We store the segment flag internally because
1604  * when we reach the segment.stop we have to post a segment.done
1605  * instead of EOS when doing a segment seek.
1606  */
1607 static gboolean
1608 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1609 {
1610   gboolean res = TRUE, tres;
1611   gdouble rate;
1612   GstFormat seek_format, dest_format;
1613   GstSeekFlags flags;
1614   GstSeekType start_type, stop_type;
1615   gint64 start, stop;
1616   gboolean flush;
1617   gboolean update;
1618   gboolean relative_seek = FALSE;
1619   gboolean seekseg_configured = FALSE;
1620   GstSegment seeksegment;
1621   guint32 seqnum;
1622   GstEvent *tevent;
1623
1624   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1625
1626   GST_OBJECT_LOCK (src);
1627   dest_format = src->segment.format;
1628   GST_OBJECT_UNLOCK (src);
1629
1630   if (event) {
1631     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1632         &start_type, &start, &stop_type, &stop);
1633
1634     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1635         SEEK_TYPE_IS_RELATIVE (stop_type);
1636
1637     if (dest_format != seek_format && !relative_seek) {
1638       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1639        * here before taking the stream lock, otherwise we must convert it later,
1640        * once we have the stream lock and can read the last configures segment
1641        * start and stop positions */
1642       gst_segment_init (&seeksegment, dest_format);
1643
1644       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1645         goto prepare_failed;
1646
1647       seekseg_configured = TRUE;
1648     }
1649
1650     flush = flags & GST_SEEK_FLAG_FLUSH;
1651     seqnum = gst_event_get_seqnum (event);
1652   } else {
1653     flush = FALSE;
1654     /* get next seqnum */
1655     seqnum = gst_util_seqnum_next ();
1656   }
1657
1658   /* send flush start */
1659   if (flush) {
1660     tevent = gst_event_new_flush_start ();
1661     gst_event_set_seqnum (tevent, seqnum);
1662     gst_pad_push_event (src->srcpad, tevent);
1663   } else
1664     gst_pad_pause_task (src->srcpad);
1665
1666   /* unblock streaming thread. */
1667   if (unlock)
1668     gst_base_src_set_flushing (src, TRUE);
1669
1670   /* grab streaming lock, this should eventually be possible, either
1671    * because the task is paused, our streaming thread stopped
1672    * or because our peer is flushing. */
1673   GST_PAD_STREAM_LOCK (src->srcpad);
1674   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1675     /* we have seen this event before, issue a warning for now */
1676     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1677         seqnum);
1678   } else {
1679     src->priv->seqnum = seqnum;
1680     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1681   }
1682
1683   if (unlock)
1684     gst_base_src_set_flushing (src, FALSE);
1685
1686   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1687    * copy the current segment info into the temp segment that we can actually
1688    * attempt the seek with. We only update the real segment if the seek succeeds. */
1689   if (!seekseg_configured) {
1690     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1691
1692     /* now configure the final seek segment */
1693     if (event) {
1694       if (seeksegment.format != seek_format) {
1695         /* OK, here's where we give the subclass a chance to convert the relative
1696          * seek into an absolute one in the processing format. We set up any
1697          * absolute seek above, before taking the stream lock. */
1698         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1699           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1700               "Aborting seek");
1701           res = FALSE;
1702         }
1703       } else {
1704         /* The seek format matches our processing format, no need to ask the
1705          * the subclass to configure the segment. */
1706         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1707             start_type, start, stop_type, stop, &update);
1708       }
1709     }
1710     /* Else, no seek event passed, so we're just (re)starting the
1711        current segment. */
1712   }
1713
1714   if (res) {
1715     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1716         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1717         seeksegment.start, seeksegment.stop, seeksegment.position);
1718
1719     /* do the seek, segment.position contains the new position. */
1720     res = gst_base_src_do_seek (src, &seeksegment);
1721   }
1722
1723   /* and prepare to continue streaming */
1724   if (flush) {
1725     tevent = gst_event_new_flush_stop (TRUE);
1726     gst_event_set_seqnum (tevent, seqnum);
1727     /* send flush stop, peer will accept data and events again. We
1728      * are not yet providing data as we still have the STREAM_LOCK. */
1729     gst_pad_push_event (src->srcpad, tevent);
1730   }
1731
1732   /* The subclass must have converted the segment to the processing format
1733    * by now */
1734   if (res && seeksegment.format != dest_format) {
1735     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1736         "in the correct format. Aborting seek.");
1737     res = FALSE;
1738   }
1739
1740   /* if the seek was successful, we update our real segment and push
1741    * out the new segment. */
1742   if (res) {
1743     GST_OBJECT_LOCK (src);
1744     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1745     GST_OBJECT_UNLOCK (src);
1746
1747     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1748       GstMessage *message;
1749
1750       message = gst_message_new_segment_start (GST_OBJECT (src),
1751           seeksegment.format, seeksegment.position);
1752       gst_message_set_seqnum (message, seqnum);
1753
1754       gst_element_post_message (GST_ELEMENT (src), message);
1755     }
1756
1757     src->priv->segment_pending = TRUE;
1758     src->priv->segment_seqnum = seqnum;
1759   }
1760
1761   src->priv->discont = TRUE;
1762   src->running = TRUE;
1763   /* and restart the task in case it got paused explicitly or by
1764    * the FLUSH_START event we pushed out. */
1765   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1766       src->srcpad, NULL);
1767   if (res && !tres)
1768     res = FALSE;
1769
1770   /* and release the lock again so we can continue streaming */
1771   GST_PAD_STREAM_UNLOCK (src->srcpad);
1772
1773   return res;
1774
1775   /* ERROR */
1776 prepare_failed:
1777   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1778       "Aborting seek");
1779   return FALSE;
1780 }
1781
1782 /* all events send to this element directly. This is mainly done from the
1783  * application.
1784  */
1785 static gboolean
1786 gst_base_src_send_event (GstElement * element, GstEvent * event)
1787 {
1788   GstBaseSrc *src;
1789   gboolean result = FALSE;
1790   GstBaseSrcClass *bclass;
1791
1792   src = GST_BASE_SRC (element);
1793   bclass = GST_BASE_SRC_GET_CLASS (src);
1794
1795   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1796
1797   switch (GST_EVENT_TYPE (event)) {
1798       /* bidirectional events */
1799     case GST_EVENT_FLUSH_START:
1800       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1801
1802       result = gst_pad_push_event (src->srcpad, event);
1803       gst_base_src_set_flushing (src, TRUE);
1804       event = NULL;
1805       break;
1806     case GST_EVENT_FLUSH_STOP:
1807     {
1808       gboolean start;
1809
1810       GST_PAD_STREAM_LOCK (src->srcpad);
1811       gst_base_src_set_flushing (src, FALSE);
1812
1813       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1814       result = gst_pad_push_event (src->srcpad, event);
1815
1816       /* For external flush, restart the task .. */
1817       GST_LIVE_LOCK (src);
1818       src->priv->segment_pending = TRUE;
1819
1820       GST_OBJECT_LOCK (src->srcpad);
1821       start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
1822       GST_OBJECT_UNLOCK (src->srcpad);
1823
1824       /* ... and for live sources, only if in playing state */
1825       if (src->is_live) {
1826         if (!src->live_running)
1827           start = FALSE;
1828       }
1829
1830       if (start)
1831         gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1832             src->srcpad, NULL);
1833
1834       GST_LIVE_UNLOCK (src);
1835       GST_PAD_STREAM_UNLOCK (src->srcpad);
1836
1837       event = NULL;
1838       break;
1839     }
1840
1841       /* downstream serialized events */
1842     case GST_EVENT_EOS:
1843     {
1844       gboolean push_mode;
1845
1846       /* queue EOS and make sure the task or pull function performs the EOS
1847        * actions.
1848        *
1849        * For push mode, This will be done in 3 steps. It is required to not
1850        * block here as gst_element_send_event() will hold the STATE_LOCK, hence
1851        * blocking would prevent asynchronous state change to complete.
1852        *
1853        * 1. We stop the streaming thread
1854        * 2. We set the pending eos
1855        * 3. We start the streaming thread again, so it is performed
1856        *    asynchronously.
1857        *
1858        * For pull mode, we simply mark the pending EOS without flushing.
1859        */
1860
1861       GST_OBJECT_LOCK (src->srcpad);
1862       push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1863       GST_OBJECT_UNLOCK (src->srcpad);
1864
1865       if (push_mode) {
1866         gst_base_src_set_flushing (src, TRUE);
1867
1868         GST_PAD_STREAM_LOCK (src->srcpad);
1869         gst_base_src_set_flushing (src, FALSE);
1870
1871         GST_OBJECT_LOCK (src);
1872         g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
1873         if (src->priv->pending_eos)
1874           gst_event_unref (src->priv->pending_eos);
1875         src->priv->pending_eos = event;
1876         GST_OBJECT_UNLOCK (src);
1877
1878         GST_DEBUG_OBJECT (src,
1879             "EOS marked, start task for asynchronous handling");
1880         gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1881             src->srcpad, NULL);
1882
1883         GST_PAD_STREAM_UNLOCK (src->srcpad);
1884       } else {
1885         /* In pull mode, we need not to return flushing to downstream, though
1886          * the stream lock is not kept after getrange was unblocked */
1887         GST_OBJECT_LOCK (src);
1888         g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
1889         if (src->priv->pending_eos)
1890           gst_event_unref (src->priv->pending_eos);
1891         src->priv->pending_eos = event;
1892         GST_OBJECT_UNLOCK (src);
1893
1894         gst_base_src_set_pool_flushing (src, TRUE);
1895         if (bclass->unlock)
1896           bclass->unlock (src);
1897
1898         GST_PAD_STREAM_LOCK (src->srcpad);
1899         if (bclass->unlock_stop)
1900           bclass->unlock_stop (src);
1901         gst_base_src_set_pool_flushing (src, TRUE);
1902         GST_PAD_STREAM_UNLOCK (src->srcpad);
1903       }
1904
1905
1906       event = NULL;
1907       result = TRUE;
1908       break;
1909     }
1910     case GST_EVENT_SEGMENT:
1911       /* sending random SEGMENT downstream can break sync. */
1912       break;
1913     case GST_EVENT_TAG:
1914     case GST_EVENT_SINK_MESSAGE:
1915     case GST_EVENT_CUSTOM_DOWNSTREAM:
1916     case GST_EVENT_CUSTOM_BOTH:
1917     case GST_EVENT_PROTECTION:
1918       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH, PROTECTION in the dataflow */
1919       GST_OBJECT_LOCK (src);
1920       src->priv->pending_events =
1921           g_list_append (src->priv->pending_events, event);
1922       g_atomic_int_set (&src->priv->have_events, TRUE);
1923       GST_OBJECT_UNLOCK (src);
1924       event = NULL;
1925       result = TRUE;
1926       break;
1927     case GST_EVENT_BUFFERSIZE:
1928       /* does not seem to make much sense currently */
1929       break;
1930
1931       /* upstream events */
1932     case GST_EVENT_QOS:
1933       /* elements should override send_event and do something */
1934       break;
1935     case GST_EVENT_SEEK:
1936     {
1937       gboolean started;
1938
1939       GST_OBJECT_LOCK (src->srcpad);
1940       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1941         goto wrong_mode;
1942       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1943       GST_OBJECT_UNLOCK (src->srcpad);
1944
1945       if (started) {
1946         GST_DEBUG_OBJECT (src, "performing seek");
1947         /* when we are running in push mode, we can execute the
1948          * seek right now. */
1949         result = gst_base_src_perform_seek (src, event, TRUE);
1950       } else {
1951         GstEvent **event_p;
1952
1953         /* else we store the event and execute the seek when we
1954          * get activated */
1955         GST_OBJECT_LOCK (src);
1956         GST_DEBUG_OBJECT (src, "queueing seek");
1957         event_p = &src->pending_seek;
1958         gst_event_replace ((GstEvent **) event_p, event);
1959         GST_OBJECT_UNLOCK (src);
1960         /* assume the seek will work */
1961         result = TRUE;
1962       }
1963       break;
1964     }
1965     case GST_EVENT_NAVIGATION:
1966       /* could make sense for elements that do something with navigation events
1967        * but then they would need to override the send_event function */
1968       break;
1969     case GST_EVENT_LATENCY:
1970       /* does not seem to make sense currently */
1971       break;
1972
1973       /* custom events */
1974     case GST_EVENT_CUSTOM_UPSTREAM:
1975       /* override send_event if you want this */
1976       break;
1977     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1978     case GST_EVENT_CUSTOM_BOTH_OOB:
1979       /* insert a random custom event into the pipeline */
1980       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1981       result = gst_pad_push_event (src->srcpad, event);
1982       /* we gave away the ref to the event in the push */
1983       event = NULL;
1984       break;
1985     default:
1986       break;
1987   }
1988 done:
1989   /* if we still have a ref to the event, unref it now */
1990   if (event)
1991     gst_event_unref (event);
1992
1993   return result;
1994
1995   /* ERRORS */
1996 wrong_mode:
1997   {
1998     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1999     GST_OBJECT_UNLOCK (src->srcpad);
2000     result = FALSE;
2001     goto done;
2002   }
2003 }
2004
2005 static gboolean
2006 gst_base_src_seekable (GstBaseSrc * src)
2007 {
2008   GstBaseSrcClass *bclass;
2009   bclass = GST_BASE_SRC_GET_CLASS (src);
2010   if (bclass->is_seekable)
2011     return bclass->is_seekable (src);
2012   else
2013     return FALSE;
2014 }
2015
2016 static void
2017 gst_base_src_update_qos (GstBaseSrc * src,
2018     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
2019 {
2020   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
2021       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2022       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
2023
2024   GST_OBJECT_LOCK (src);
2025   src->priv->proportion = proportion;
2026   src->priv->earliest_time = timestamp + diff;
2027   GST_OBJECT_UNLOCK (src);
2028 }
2029
2030
2031 static gboolean
2032 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
2033 {
2034   gboolean result;
2035
2036   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
2037
2038   switch (GST_EVENT_TYPE (event)) {
2039     case GST_EVENT_SEEK:
2040       /* is normally called when in push mode */
2041       if (!gst_base_src_seekable (src))
2042         goto not_seekable;
2043
2044       result = gst_base_src_perform_seek (src, event, TRUE);
2045       break;
2046     case GST_EVENT_FLUSH_START:
2047       /* cancel any blocking getrange, is normally called
2048        * when in pull mode. */
2049       result = gst_base_src_set_flushing (src, TRUE);
2050       break;
2051     case GST_EVENT_FLUSH_STOP:
2052       result = gst_base_src_set_flushing (src, FALSE);
2053       break;
2054     case GST_EVENT_QOS:
2055     {
2056       gdouble proportion;
2057       GstClockTimeDiff diff;
2058       GstClockTime timestamp;
2059
2060       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
2061       gst_base_src_update_qos (src, proportion, diff, timestamp);
2062       result = TRUE;
2063       break;
2064     }
2065     case GST_EVENT_RECONFIGURE:
2066       result = TRUE;
2067       break;
2068     case GST_EVENT_LATENCY:
2069       result = TRUE;
2070       break;
2071     default:
2072       result = FALSE;
2073       break;
2074   }
2075   return result;
2076
2077   /* ERRORS */
2078 not_seekable:
2079   {
2080     GST_DEBUG_OBJECT (src, "is not seekable");
2081     return FALSE;
2082   }
2083 }
2084
2085 static gboolean
2086 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2087 {
2088   GstBaseSrc *src;
2089   GstBaseSrcClass *bclass;
2090   gboolean result = FALSE;
2091
2092   src = GST_BASE_SRC (parent);
2093   bclass = GST_BASE_SRC_GET_CLASS (src);
2094
2095   if (bclass->event) {
2096     if (!(result = bclass->event (src, event)))
2097       goto subclass_failed;
2098   }
2099
2100 done:
2101   gst_event_unref (event);
2102
2103   return result;
2104
2105   /* ERRORS */
2106 subclass_failed:
2107   {
2108     GST_DEBUG_OBJECT (src, "subclass refused event");
2109     goto done;
2110   }
2111 }
2112
2113 static void
2114 gst_base_src_set_property (GObject * object, guint prop_id,
2115     const GValue * value, GParamSpec * pspec)
2116 {
2117   GstBaseSrc *src;
2118
2119   src = GST_BASE_SRC (object);
2120
2121   switch (prop_id) {
2122     case PROP_BLOCKSIZE:
2123       gst_base_src_set_blocksize (src, g_value_get_uint (value));
2124       break;
2125     case PROP_NUM_BUFFERS:
2126       src->num_buffers = g_value_get_int (value);
2127       break;
2128 #ifndef GST_REMOVE_DEPRECATED
2129     case PROP_TYPEFIND:
2130       src->typefind = g_value_get_boolean (value);
2131       break;
2132 #endif
2133     case PROP_DO_TIMESTAMP:
2134       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
2135       break;
2136     default:
2137       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2138       break;
2139   }
2140 }
2141
2142 static void
2143 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
2144     GParamSpec * pspec)
2145 {
2146   GstBaseSrc *src;
2147
2148   src = GST_BASE_SRC (object);
2149
2150   switch (prop_id) {
2151     case PROP_BLOCKSIZE:
2152       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2153       break;
2154     case PROP_NUM_BUFFERS:
2155       g_value_set_int (value, src->num_buffers);
2156       break;
2157 #ifndef GST_REMOVE_DEPRECATED
2158     case PROP_TYPEFIND:
2159       g_value_set_boolean (value, src->typefind);
2160       break;
2161 #endif
2162     case PROP_DO_TIMESTAMP:
2163       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2164       break;
2165     default:
2166       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2167       break;
2168   }
2169 }
2170
2171 /* with STREAM_LOCK and LOCK */
2172 static GstClockReturn
2173 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2174 {
2175   GstClockReturn ret;
2176   GstClockID id;
2177
2178   id = gst_clock_new_single_shot_id (clock, time);
2179
2180   basesrc->clock_id = id;
2181   /* release the live lock while waiting */
2182   GST_LIVE_UNLOCK (basesrc);
2183
2184   ret = gst_clock_id_wait (id, NULL);
2185
2186   GST_LIVE_LOCK (basesrc);
2187   gst_clock_id_unref (id);
2188   basesrc->clock_id = NULL;
2189
2190   return ret;
2191 }
2192
2193 /* perform synchronisation on a buffer.
2194  * with STREAM_LOCK.
2195  */
2196 static GstClockReturn
2197 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2198 {
2199   GstClockReturn result;
2200   GstClockTime start, end;
2201   GstBaseSrcClass *bclass;
2202   GstClockTime base_time;
2203   GstClock *clock;
2204   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2205   gboolean do_timestamp, first, pseudo_live, is_live;
2206
2207   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2208
2209   start = end = -1;
2210   if (bclass->get_times)
2211     bclass->get_times (basesrc, buffer, &start, &end);
2212
2213   /* get buffer timestamp */
2214   dts = GST_BUFFER_DTS (buffer);
2215   pts = GST_BUFFER_PTS (buffer);
2216
2217   if (GST_CLOCK_TIME_IS_VALID (dts))
2218     timestamp = dts;
2219   else
2220     timestamp = pts;
2221
2222   /* grab the lock to prepare for clocking and calculate the startup
2223    * latency. */
2224   GST_OBJECT_LOCK (basesrc);
2225
2226   is_live = basesrc->is_live;
2227   /* if we are asked to sync against the clock we are a pseudo live element */
2228   pseudo_live = (start != -1 && is_live);
2229   /* check for the first buffer */
2230   first = (basesrc->priv->latency == -1);
2231
2232   if (timestamp != -1 && pseudo_live) {
2233     GstClockTime latency;
2234
2235     /* we have a timestamp and a sync time, latency is the diff */
2236     if (timestamp <= start)
2237       latency = start - timestamp;
2238     else
2239       latency = 0;
2240
2241     if (first) {
2242       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2243           GST_TIME_ARGS (latency));
2244       /* first time we calculate latency, just configure */
2245       basesrc->priv->latency = latency;
2246     } else {
2247       if (basesrc->priv->latency != latency) {
2248         /* we have a new latency, FIXME post latency message */
2249         basesrc->priv->latency = latency;
2250         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2251             GST_TIME_ARGS (latency));
2252       }
2253     }
2254   } else if (first) {
2255     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2256         is_live, start != -1);
2257     basesrc->priv->latency = 0;
2258   }
2259
2260   /* get clock, if no clock, we can't sync or do timestamps */
2261   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2262     goto no_clock;
2263   else
2264     gst_object_ref (clock);
2265
2266   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2267
2268   do_timestamp = basesrc->priv->do_timestamp;
2269   GST_OBJECT_UNLOCK (basesrc);
2270
2271   /* first buffer, calculate the timestamp offset */
2272   if (first) {
2273     GstClockTime running_time;
2274
2275     now = gst_clock_get_time (clock);
2276     running_time = now - base_time;
2277
2278     GST_LOG_OBJECT (basesrc,
2279         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2280         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2281         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2282
2283     if (pseudo_live && timestamp != -1) {
2284       /* live source and we need to sync, add startup latency to all timestamps
2285        * to get the real running_time. Live sources should always timestamp
2286        * according to the current running time. */
2287       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2288
2289       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2290           GST_TIME_ARGS (basesrc->priv->ts_offset));
2291     } else {
2292       basesrc->priv->ts_offset = 0;
2293       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2294     }
2295
2296     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2297       if (do_timestamp) {
2298         dts = running_time;
2299       } else if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2300         if (GST_CLOCK_TIME_IS_VALID (basesrc->segment.start)) {
2301           dts = basesrc->segment.start;
2302         } else {
2303           dts = 0;
2304         }
2305       }
2306       GST_BUFFER_DTS (buffer) = dts;
2307
2308       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2309           GST_TIME_ARGS (dts));
2310     }
2311   } else {
2312     /* not the first buffer, the timestamp is the diff between the clock and
2313      * base_time */
2314     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2315       now = gst_clock_get_time (clock);
2316
2317       dts = now - base_time;
2318       GST_BUFFER_DTS (buffer) = dts;
2319
2320       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2321           GST_TIME_ARGS (dts));
2322     }
2323   }
2324   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2325     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2326       pts = dts;
2327
2328     GST_BUFFER_PTS (buffer) = dts;
2329
2330     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2331         GST_TIME_ARGS (pts));
2332   }
2333
2334   /* if we don't have a buffer timestamp, we don't sync */
2335   if (!GST_CLOCK_TIME_IS_VALID (start))
2336     goto no_sync;
2337
2338   if (is_live) {
2339     /* for pseudo live sources, add our ts_offset to the timestamp */
2340     if (GST_CLOCK_TIME_IS_VALID (pts))
2341       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2342     if (GST_CLOCK_TIME_IS_VALID (dts))
2343       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2344     start += basesrc->priv->ts_offset;
2345   }
2346
2347   GST_LOG_OBJECT (basesrc,
2348       "waiting for clock, base time %" GST_TIME_FORMAT
2349       ", stream_start %" GST_TIME_FORMAT,
2350       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2351
2352   result = gst_base_src_wait (basesrc, clock, start + base_time);
2353
2354   gst_object_unref (clock);
2355
2356   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2357
2358   return result;
2359
2360   /* special cases */
2361 no_clock:
2362   {
2363     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2364     GST_OBJECT_UNLOCK (basesrc);
2365     return GST_CLOCK_OK;
2366   }
2367 no_sync:
2368   {
2369     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2370     gst_object_unref (clock);
2371     return GST_CLOCK_OK;
2372   }
2373 }
2374
2375 /* Called with STREAM_LOCK and LIVE_LOCK */
2376 static gboolean
2377 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
2378     gboolean force)
2379 {
2380   guint64 size, maxsize;
2381   GstBaseSrcClass *bclass;
2382   gint64 stop;
2383
2384   /* only operate if we are working with bytes */
2385   if (src->segment.format != GST_FORMAT_BYTES)
2386     return TRUE;
2387
2388   bclass = GST_BASE_SRC_GET_CLASS (src);
2389
2390   stop = src->segment.stop;
2391   /* get total file size */
2392   size = src->segment.duration;
2393
2394   /* when not doing automatic EOS, just use the stop position. We don't use
2395    * the size to check for EOS */
2396   if (!g_atomic_int_get (&src->priv->automatic_eos))
2397     maxsize = stop;
2398   /* Otherwise, the max amount of bytes to read is the total
2399    * size or up to the segment.stop if present. */
2400   else if (stop != -1)
2401     maxsize = size != -1 ? MIN (size, stop) : stop;
2402   else
2403     maxsize = size;
2404
2405   GST_DEBUG_OBJECT (src,
2406       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2407       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2408       *length, size, stop, maxsize);
2409
2410   /* check size if we have one */
2411   if (maxsize != -1) {
2412     /* if we run past the end, check if the file became bigger and
2413      * retry.  Mind wrap when checking. */
2414     if (G_UNLIKELY (offset >= maxsize || offset + *length >= maxsize || force)) {
2415       /* see if length of the file changed */
2416       if (bclass->get_size)
2417         if (!bclass->get_size (src, &size))
2418           size = -1;
2419
2420       /* when not doing automatic EOS, just use the stop position. We don't use
2421        * the size to check for EOS */
2422       if (!g_atomic_int_get (&src->priv->automatic_eos))
2423         maxsize = stop;
2424       /* Otherwise, the max amount of bytes to read is the total
2425        * size or up to the segment.stop if present. */
2426       else if (stop != -1)
2427         maxsize = size != -1 ? MIN (size, stop) : stop;
2428       else
2429         maxsize = size;
2430
2431       if (maxsize != -1) {
2432         /* if we are at or past the end, EOS */
2433         if (G_UNLIKELY (offset >= maxsize))
2434           goto unexpected_length;
2435
2436         /* else we can clip to the end */
2437         if (G_UNLIKELY (offset + *length >= maxsize))
2438           *length = maxsize - offset;
2439       }
2440     }
2441   }
2442
2443   /* keep track of current duration. segment is in bytes, we checked
2444    * that above. */
2445   GST_OBJECT_LOCK (src);
2446   src->segment.duration = size;
2447   GST_OBJECT_UNLOCK (src);
2448
2449   return TRUE;
2450
2451   /* ERRORS */
2452 unexpected_length:
2453   {
2454     GST_WARNING_OBJECT (src, "processing at or past EOS");
2455     return FALSE;
2456   }
2457 }
2458
2459 /* must be called with LIVE_LOCK */
2460 static GstFlowReturn
2461 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2462     GstBuffer ** buf)
2463 {
2464   GstFlowReturn ret;
2465   GstBaseSrcClass *bclass;
2466   GstClockReturn status;
2467   GstBuffer *res_buf;
2468   GstBuffer *in_buf;
2469   gboolean own_res_buf;
2470
2471   bclass = GST_BASE_SRC_GET_CLASS (src);
2472
2473 again:
2474   if (src->is_live) {
2475     if (G_UNLIKELY (!src->live_running)) {
2476       ret = gst_base_src_wait_playing_unlocked (src);
2477       if (ret != GST_FLOW_OK)
2478         goto stopped;
2479     }
2480   }
2481
2482   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2483           && !GST_BASE_SRC_IS_STARTING (src)))
2484     goto not_started;
2485
2486   if (G_UNLIKELY (!bclass->create))
2487     goto no_function;
2488
2489   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE)))
2490     goto unexpected_length;
2491
2492   /* track position */
2493   GST_OBJECT_LOCK (src);
2494   if (src->segment.format == GST_FORMAT_BYTES)
2495     src->segment.position = offset;
2496   GST_OBJECT_UNLOCK (src);
2497
2498   /* normally we don't count buffers */
2499   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2500     if (src->num_buffers_left == 0)
2501       goto reached_num_buffers;
2502     else
2503       src->num_buffers_left--;
2504   }
2505
2506   /* don't enter the create function if a pending EOS event was set. For the
2507    * logic of the has_pending_eos, check the event function of this class. */
2508   if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
2509     src->priv->forced_eos = TRUE;
2510     goto eos;
2511   }
2512
2513   GST_DEBUG_OBJECT (src,
2514       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2515       G_GINT64_FORMAT, offset, length, src->segment.time);
2516
2517   res_buf = in_buf = *buf;
2518   own_res_buf = (*buf == NULL);
2519
2520   GST_LIVE_UNLOCK (src);
2521   ret = bclass->create (src, offset, length, &res_buf);
2522   GST_LIVE_LOCK (src);
2523
2524   /* As we released the LIVE_LOCK, the state may have changed */
2525   if (src->is_live) {
2526     if (G_UNLIKELY (!src->live_running)) {
2527       GstFlowReturn wait_ret;
2528       wait_ret = gst_base_src_wait_playing_unlocked (src);
2529       if (wait_ret != GST_FLOW_OK) {
2530         if (ret == GST_FLOW_OK && own_res_buf)
2531           gst_buffer_unref (res_buf);
2532         ret = wait_ret;
2533         goto stopped;
2534       }
2535     }
2536   }
2537
2538   /* The create function could be unlocked because we have a pending EOS. It's
2539    * possible that we have a valid buffer from create that we need to
2540    * discard when the create function returned _OK. */
2541   if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
2542     if (ret == GST_FLOW_OK) {
2543       if (own_res_buf)
2544         gst_buffer_unref (res_buf);
2545     }
2546     src->priv->forced_eos = TRUE;
2547     goto eos;
2548   }
2549
2550   if (G_UNLIKELY (ret != GST_FLOW_OK))
2551     goto not_ok;
2552
2553   /* fallback in case the create function didn't fill a provided buffer */
2554   if (in_buf != NULL && res_buf != in_buf) {
2555     GstMapInfo info;
2556     gsize copied_size;
2557
2558     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2559         "fill the provided buffer, copying");
2560
2561     if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
2562       goto map_failed;
2563
2564     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2565     gst_buffer_unmap (in_buf, &info);
2566     gst_buffer_set_size (in_buf, copied_size);
2567
2568     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2569
2570     gst_buffer_unref (res_buf);
2571     res_buf = in_buf;
2572   }
2573
2574   if (res_buf == NULL) {
2575     GstBufferList *pending_list = src->priv->pending_bufferlist;
2576
2577     if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
2578       goto null_buffer;
2579
2580     res_buf = gst_buffer_list_get_writable (pending_list, 0);
2581     own_res_buf = FALSE;
2582   }
2583
2584   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2585   if (offset == 0 && src->segment.time == 0
2586       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2587     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2588     res_buf = gst_buffer_make_writable (res_buf);
2589     GST_BUFFER_DTS (res_buf) = 0;
2590   }
2591
2592   /* now sync before pushing the buffer */
2593   status = gst_base_src_do_sync (src, res_buf);
2594
2595   /* waiting for the clock could have made us flushing */
2596   if (G_UNLIKELY (src->priv->flushing))
2597     goto flushing;
2598
2599   switch (status) {
2600     case GST_CLOCK_EARLY:
2601       /* the buffer is too late. We currently don't drop the buffer. */
2602       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2603       break;
2604     case GST_CLOCK_OK:
2605       /* buffer synchronised properly */
2606       GST_DEBUG_OBJECT (src, "buffer ok");
2607       break;
2608     case GST_CLOCK_UNSCHEDULED:
2609       /* this case is triggered when we were waiting for the clock and
2610        * it got unlocked because we did a state change. In any case, get rid of
2611        * the buffer. */
2612       if (own_res_buf)
2613         gst_buffer_unref (res_buf);
2614
2615       if (!src->live_running) {
2616         /* We return FLUSHING when we are not running to stop the dataflow also
2617          * get rid of the produced buffer. */
2618         GST_DEBUG_OBJECT (src,
2619             "clock was unscheduled (%d), returning FLUSHING", status);
2620         ret = GST_FLOW_FLUSHING;
2621       } else {
2622         /* If we are running when this happens, we quickly switched between
2623          * pause and playing. We try to produce a new buffer */
2624         GST_DEBUG_OBJECT (src,
2625             "clock was unscheduled (%d), but we are running", status);
2626         goto again;
2627       }
2628       break;
2629     default:
2630       /* all other result values are unexpected and errors */
2631       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2632           (_("Internal clock error.")),
2633           ("clock returned unexpected return value %d", status));
2634       if (own_res_buf)
2635         gst_buffer_unref (res_buf);
2636       ret = GST_FLOW_ERROR;
2637       break;
2638   }
2639   if (G_LIKELY (ret == GST_FLOW_OK))
2640     *buf = res_buf;
2641
2642   return ret;
2643
2644   /* ERROR */
2645 stopped:
2646   {
2647     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2648         gst_flow_get_name (ret));
2649     return ret;
2650   }
2651 not_ok:
2652   {
2653     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2654         gst_flow_get_name (ret));
2655     return ret;
2656   }
2657 map_failed:
2658   {
2659     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
2660         (_("Failed to map buffer.")),
2661         ("failed to map result buffer in WRITE mode"));
2662     if (own_res_buf)
2663       gst_buffer_unref (res_buf);
2664     return GST_FLOW_ERROR;
2665   }
2666 not_started:
2667   {
2668     GST_DEBUG_OBJECT (src, "getrange but not started");
2669     return GST_FLOW_FLUSHING;
2670   }
2671 no_function:
2672   {
2673     GST_DEBUG_OBJECT (src, "no create function");
2674     return GST_FLOW_NOT_SUPPORTED;
2675   }
2676 unexpected_length:
2677   {
2678     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2679         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2680     return GST_FLOW_EOS;
2681   }
2682 reached_num_buffers:
2683   {
2684     GST_DEBUG_OBJECT (src, "sent all buffers");
2685     return GST_FLOW_EOS;
2686   }
2687 flushing:
2688   {
2689     GST_DEBUG_OBJECT (src, "we are flushing");
2690     if (own_res_buf)
2691       gst_buffer_unref (res_buf);
2692     return GST_FLOW_FLUSHING;
2693   }
2694 eos:
2695   {
2696     GST_DEBUG_OBJECT (src, "we are EOS");
2697     return GST_FLOW_EOS;
2698   }
2699 null_buffer:
2700   {
2701     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2702         (_("Internal data flow error.")),
2703         ("Subclass %s neither returned a buffer nor submitted a buffer list "
2704             "from its create function", G_OBJECT_TYPE_NAME (src)));
2705     return GST_FLOW_ERROR;
2706   }
2707 }
2708
2709 static GstFlowReturn
2710 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2711     guint length, GstBuffer ** buf)
2712 {
2713   GstBaseSrc *src;
2714   GstFlowReturn res;
2715
2716   src = GST_BASE_SRC_CAST (parent);
2717
2718   GST_LIVE_LOCK (src);
2719   if (G_UNLIKELY (src->priv->flushing))
2720     goto flushing;
2721
2722   res = gst_base_src_get_range (src, offset, length, buf);
2723
2724 done:
2725   GST_LIVE_UNLOCK (src);
2726
2727   return res;
2728
2729   /* ERRORS */
2730 flushing:
2731   {
2732     GST_DEBUG_OBJECT (src, "we are flushing");
2733     res = GST_FLOW_FLUSHING;
2734     goto done;
2735   }
2736 }
2737
2738 static gboolean
2739 gst_base_src_is_random_access (GstBaseSrc * src)
2740 {
2741   /* we need to start the basesrc to check random access */
2742   if (!GST_BASE_SRC_IS_STARTED (src)) {
2743     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2744     if (G_LIKELY (gst_base_src_start (src))) {
2745       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2746         goto start_failed;
2747       gst_base_src_stop (src);
2748     }
2749   }
2750
2751   return src->random_access;
2752
2753   /* ERRORS */
2754 start_failed:
2755   {
2756     GST_DEBUG_OBJECT (src, "failed to start");
2757     return FALSE;
2758   }
2759 }
2760
2761 /* Called with STREAM_LOCK */
2762 static void
2763 gst_base_src_loop (GstPad * pad)
2764 {
2765   GstBaseSrc *src;
2766   GstBuffer *buf = NULL;
2767   GstFlowReturn ret;
2768   gint64 position;
2769   gboolean eos;
2770   guint blocksize;
2771   GList *pending_events = NULL, *tmp;
2772
2773   eos = FALSE;
2774
2775   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2776
2777   /* Just leave immediately if we're flushing */
2778   GST_LIVE_LOCK (src);
2779   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2780     goto flushing;
2781   GST_LIVE_UNLOCK (src);
2782
2783   /* Just return if EOS is pushed again, as the app might be unaware that an
2784    * EOS have been sent already */
2785   if (GST_PAD_IS_EOS (pad)) {
2786     GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task");
2787     gst_pad_pause_task (pad);
2788     goto done;
2789   }
2790
2791   gst_base_src_send_stream_start (src);
2792
2793   /* The stream-start event could've caused something to flush us */
2794   GST_LIVE_LOCK (src);
2795   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2796     goto flushing;
2797   GST_LIVE_UNLOCK (src);
2798
2799   /* check if we need to renegotiate */
2800   if (gst_pad_check_reconfigure (pad)) {
2801     if (!gst_base_src_negotiate (src)) {
2802       gst_pad_mark_reconfigure (pad);
2803       if (GST_PAD_IS_FLUSHING (pad)) {
2804         GST_LIVE_LOCK (src);
2805         goto flushing;
2806       } else {
2807         goto negotiate_failed;
2808       }
2809     }
2810   }
2811
2812   GST_LIVE_LOCK (src);
2813
2814   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2815     goto flushing;
2816
2817   blocksize = src->blocksize;
2818
2819   /* if we operate in bytes, we can calculate an offset */
2820   if (src->segment.format == GST_FORMAT_BYTES) {
2821     position = src->segment.position;
2822     /* for negative rates, start with subtracting the blocksize */
2823     if (src->segment.rate < 0.0) {
2824       /* we cannot go below segment.start */
2825       if (position > src->segment.start + blocksize)
2826         position -= blocksize;
2827       else {
2828         /* last block, remainder up to segment.start */
2829         blocksize = position - src->segment.start;
2830         position = src->segment.start;
2831       }
2832     }
2833   } else
2834     position = -1;
2835
2836   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2837       GST_TIME_ARGS (position), blocksize);
2838
2839   /* clean up just in case we got interrupted or so last time round */
2840   if (src->priv->pending_bufferlist != NULL) {
2841     gst_buffer_list_unref (src->priv->pending_bufferlist);
2842     src->priv->pending_bufferlist = NULL;
2843   }
2844
2845   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2846   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2847     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2848         gst_flow_get_name (ret));
2849     GST_LIVE_UNLOCK (src);
2850     goto pause;
2851   }
2852
2853   /* Note: at this point buf might be a single buf returned which we own or
2854    * the first buf of a pending buffer list submitted via submit_buffer_list(),
2855    * in which case the buffer is owned by the pending buffer list and not us. */
2856   g_assert (buf != NULL);
2857
2858   /* push events to close/start our segment before we push the buffer. */
2859   if (G_UNLIKELY (src->priv->segment_pending)) {
2860     GstEvent *seg_event = gst_event_new_segment (&src->segment);
2861
2862     gst_event_set_seqnum (seg_event, src->priv->segment_seqnum);
2863     src->priv->segment_seqnum = gst_util_seqnum_next ();
2864     gst_pad_push_event (pad, seg_event);
2865     src->priv->segment_pending = FALSE;
2866   }
2867
2868   if (g_atomic_int_get (&src->priv->have_events)) {
2869     GST_OBJECT_LOCK (src);
2870     /* take the events */
2871     pending_events = src->priv->pending_events;
2872     src->priv->pending_events = NULL;
2873     g_atomic_int_set (&src->priv->have_events, FALSE);
2874     GST_OBJECT_UNLOCK (src);
2875   }
2876
2877   /* Push out pending events if any */
2878   if (G_UNLIKELY (pending_events != NULL)) {
2879     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2880       GstEvent *ev = (GstEvent *) tmp->data;
2881       gst_pad_push_event (pad, ev);
2882     }
2883     g_list_free (pending_events);
2884   }
2885
2886   /* figure out the new position */
2887   switch (src->segment.format) {
2888     case GST_FORMAT_BYTES:
2889     {
2890       guint bufsize = gst_buffer_get_size (buf);
2891
2892       /* we subtracted above for negative rates */
2893       if (src->segment.rate >= 0.0)
2894         position += bufsize;
2895       break;
2896     }
2897     case GST_FORMAT_TIME:
2898     {
2899       GstClockTime start, duration;
2900
2901       start = GST_BUFFER_TIMESTAMP (buf);
2902       duration = GST_BUFFER_DURATION (buf);
2903
2904       if (GST_CLOCK_TIME_IS_VALID (start))
2905         position = start;
2906       else
2907         position = src->segment.position;
2908
2909       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2910         if (src->segment.rate >= 0.0)
2911           position += duration;
2912         else if (position > duration)
2913           position -= duration;
2914         else
2915           position = 0;
2916       }
2917       break;
2918     }
2919     case GST_FORMAT_DEFAULT:
2920       if (src->segment.rate >= 0.0)
2921         position = GST_BUFFER_OFFSET_END (buf);
2922       else
2923         position = GST_BUFFER_OFFSET (buf);
2924       break;
2925     default:
2926       position = -1;
2927       break;
2928   }
2929   if (position != -1) {
2930     if (src->segment.rate >= 0.0) {
2931       /* positive rate, check if we reached the stop */
2932       if (src->segment.stop != -1) {
2933         if (position >= src->segment.stop) {
2934           eos = TRUE;
2935           position = src->segment.stop;
2936         }
2937       }
2938     } else {
2939       /* negative rate, check if we reached the start. start is always set to
2940        * something different from -1 */
2941       if (position <= src->segment.start) {
2942         eos = TRUE;
2943         position = src->segment.start;
2944       }
2945       /* when going reverse, all buffers are DISCONT */
2946       src->priv->discont = TRUE;
2947     }
2948     GST_OBJECT_LOCK (src);
2949     src->segment.position = position;
2950     GST_OBJECT_UNLOCK (src);
2951   }
2952
2953   if (G_UNLIKELY (src->priv->discont)) {
2954     GST_INFO_OBJECT (src, "marking pending DISCONT");
2955     buf = gst_buffer_make_writable (buf);
2956     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2957     src->priv->discont = FALSE;
2958   }
2959   GST_LIVE_UNLOCK (src);
2960
2961   /* push buffer or buffer list */
2962   if (src->priv->pending_bufferlist != NULL) {
2963     ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
2964     src->priv->pending_bufferlist = NULL;
2965   } else {
2966     ret = gst_pad_push (pad, buf);
2967   }
2968
2969   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2970     if (ret == GST_FLOW_NOT_NEGOTIATED) {
2971       goto not_negotiated;
2972     }
2973     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2974         gst_flow_get_name (ret));
2975     goto pause;
2976   }
2977
2978   /* Segment pending means that a new segment was configured
2979    * during this loop run */
2980   if (G_UNLIKELY (eos && !src->priv->segment_pending)) {
2981     GST_INFO_OBJECT (src, "pausing after end of segment");
2982     ret = GST_FLOW_EOS;
2983     goto pause;
2984   }
2985
2986 done:
2987   return;
2988
2989   /* special cases */
2990 not_negotiated:
2991   {
2992     if (gst_pad_needs_reconfigure (pad)) {
2993       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
2994       return;
2995     }
2996     /* fallthrough when push returns NOT_NEGOTIATED and we don't have
2997      * a pending negotiation request on our srcpad */
2998   }
2999 negotiate_failed:
3000   {
3001     GST_DEBUG_OBJECT (src, "Not negotiated");
3002     ret = GST_FLOW_NOT_NEGOTIATED;
3003     goto pause;
3004   }
3005 flushing:
3006   {
3007     GST_DEBUG_OBJECT (src, "we are flushing");
3008     GST_LIVE_UNLOCK (src);
3009     ret = GST_FLOW_FLUSHING;
3010     goto pause;
3011   }
3012 pause:
3013   {
3014     const gchar *reason = gst_flow_get_name (ret);
3015     GstEvent *event;
3016
3017     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
3018     src->running = FALSE;
3019     gst_pad_pause_task (pad);
3020     if (ret == GST_FLOW_EOS) {
3021       gboolean flag_segment;
3022       GstFormat format;
3023       gint64 position;
3024
3025       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
3026       format = src->segment.format;
3027       position = src->segment.position;
3028
3029       /* perform EOS logic */
3030       if (src->priv->forced_eos) {
3031         g_assert (g_atomic_int_get (&src->priv->has_pending_eos));
3032         GST_OBJECT_LOCK (src);
3033         event = src->priv->pending_eos;
3034         src->priv->pending_eos = NULL;
3035         GST_OBJECT_UNLOCK (src);
3036
3037       } else if (flag_segment) {
3038         GstMessage *message;
3039
3040         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
3041             format, position);
3042         gst_message_set_seqnum (message, src->priv->seqnum);
3043         gst_element_post_message (GST_ELEMENT_CAST (src), message);
3044         event = gst_event_new_segment_done (format, position);
3045         gst_event_set_seqnum (event, src->priv->seqnum);
3046
3047       } else {
3048         event = gst_event_new_eos ();
3049         gst_event_set_seqnum (event, src->priv->seqnum);
3050       }
3051
3052       gst_pad_push_event (pad, event);
3053       src->priv->forced_eos = FALSE;
3054
3055     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3056       event = gst_event_new_eos ();
3057       gst_event_set_seqnum (event, src->priv->seqnum);
3058       /* for fatal errors we post an error message, post the error
3059        * first so the app knows about the error first.
3060        * Also don't do this for FLUSHING because it happens
3061        * due to flushing and posting an error message because of
3062        * that is the wrong thing to do, e.g. when we're doing
3063        * a flushing seek. */
3064       GST_ELEMENT_FLOW_ERROR (src, ret);
3065       gst_pad_push_event (pad, event);
3066     }
3067     goto done;
3068   }
3069 }
3070
3071 static gboolean
3072 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
3073     GstAllocator * allocator, GstAllocationParams * params)
3074 {
3075   GstAllocator *oldalloc;
3076   GstBufferPool *oldpool;
3077   GstBaseSrcPrivate *priv = basesrc->priv;
3078
3079   if (pool) {
3080     GST_DEBUG_OBJECT (basesrc, "activate pool");
3081     if (!gst_buffer_pool_set_active (pool, TRUE))
3082       goto activate_failed;
3083   }
3084
3085   GST_OBJECT_LOCK (basesrc);
3086   oldpool = priv->pool;
3087   priv->pool = pool;
3088
3089   oldalloc = priv->allocator;
3090   priv->allocator = allocator;
3091
3092   if (priv->pool)
3093     gst_object_ref (priv->pool);
3094   if (priv->allocator)
3095     gst_object_ref (priv->allocator);
3096
3097   if (params)
3098     priv->params = *params;
3099   else
3100     gst_allocation_params_init (&priv->params);
3101   GST_OBJECT_UNLOCK (basesrc);
3102
3103   if (oldpool) {
3104     /* only deactivate if the pool is not the one we're using */
3105     if (oldpool != pool) {
3106       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
3107       gst_buffer_pool_set_active (oldpool, FALSE);
3108     }
3109     gst_object_unref (oldpool);
3110   }
3111   if (oldalloc) {
3112     gst_object_unref (oldalloc);
3113   }
3114   return TRUE;
3115
3116   /* ERRORS */
3117 activate_failed:
3118   {
3119     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
3120     return FALSE;
3121   }
3122 }
3123
3124 static void
3125 gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing)
3126 {
3127   GstBaseSrcPrivate *priv = basesrc->priv;
3128   GstBufferPool *pool;
3129
3130   GST_OBJECT_LOCK (basesrc);
3131   if ((pool = priv->pool))
3132     pool = gst_object_ref (pool);
3133   GST_OBJECT_UNLOCK (basesrc);
3134
3135   if (pool) {
3136     gst_buffer_pool_set_flushing (pool, flushing);
3137     gst_object_unref (pool);
3138   }
3139 }
3140
3141
3142 static gboolean
3143 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
3144 {
3145   GstCaps *outcaps;
3146   GstBufferPool *pool;
3147   guint size, min, max;
3148   GstAllocator *allocator;
3149   GstAllocationParams params;
3150   GstStructure *config;
3151   gboolean update_allocator;
3152
3153   gst_query_parse_allocation (query, &outcaps, NULL);
3154
3155   /* we got configuration from our peer or the decide_allocation method,
3156    * parse them */
3157   if (gst_query_get_n_allocation_params (query) > 0) {
3158     /* try the allocator */
3159     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
3160     update_allocator = TRUE;
3161   } else {
3162     allocator = NULL;
3163     gst_allocation_params_init (&params);
3164     update_allocator = FALSE;
3165   }
3166
3167   if (gst_query_get_n_allocation_pools (query) > 0) {
3168     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
3169
3170     if (pool == NULL) {
3171       /* no pool, we can make our own */
3172       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
3173       pool = gst_buffer_pool_new ();
3174     }
3175   } else {
3176     pool = NULL;
3177     size = min = max = 0;
3178   }
3179
3180   /* now configure */
3181   if (pool) {
3182     config = gst_buffer_pool_get_config (pool);
3183     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
3184     gst_buffer_pool_config_set_allocator (config, allocator, &params);
3185
3186     /* buffer pool may have to do some changes */
3187     if (!gst_buffer_pool_set_config (pool, config)) {
3188       config = gst_buffer_pool_get_config (pool);
3189
3190       /* If change are not acceptable, fallback to generic pool */
3191       if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min,
3192               max)) {
3193         GST_DEBUG_OBJECT (basesrc, "unsupported pool, making new pool");
3194
3195         gst_object_unref (pool);
3196         pool = gst_buffer_pool_new ();
3197         gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
3198         gst_buffer_pool_config_set_allocator (config, allocator, &params);
3199       }
3200
3201       if (!gst_buffer_pool_set_config (pool, config))
3202         goto config_failed;
3203     }
3204   }
3205
3206   if (update_allocator)
3207     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
3208   else
3209     gst_query_add_allocation_param (query, allocator, &params);
3210   if (allocator)
3211     gst_object_unref (allocator);
3212
3213   if (pool) {
3214     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
3215     gst_object_unref (pool);
3216   }
3217
3218   return TRUE;
3219
3220 config_failed:
3221   GST_ELEMENT_ERROR (basesrc, RESOURCE, SETTINGS,
3222       ("Failed to configure the buffer pool"),
3223       ("Configuration is most likely invalid, please report this issue."));
3224   gst_object_unref (pool);
3225   return FALSE;
3226 }
3227
3228 static gboolean
3229 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
3230 {
3231   GstBaseSrcClass *bclass;
3232   gboolean result = TRUE;
3233   GstQuery *query;
3234   GstBufferPool *pool = NULL;
3235   GstAllocator *allocator = NULL;
3236   GstAllocationParams params;
3237
3238   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3239
3240   /* make query and let peer pad answer, we don't really care if it worked or
3241    * not, if it failed, the allocation query would contain defaults and the
3242    * subclass would then set better values if needed */
3243   query = gst_query_new_allocation (caps, TRUE);
3244   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
3245     /* not a problem, just debug a little */
3246     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
3247   }
3248
3249   g_assert (bclass->decide_allocation != NULL);
3250   result = bclass->decide_allocation (basesrc, query);
3251
3252   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
3253       query);
3254
3255   if (!result)
3256     goto no_decide_allocation;
3257
3258   /* we got configuration from our peer or the decide_allocation method,
3259    * parse them */
3260   if (gst_query_get_n_allocation_params (query) > 0) {
3261     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
3262   } else {
3263     allocator = NULL;
3264     gst_allocation_params_init (&params);
3265   }
3266
3267   if (gst_query_get_n_allocation_pools (query) > 0)
3268     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
3269
3270   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
3271
3272   if (allocator)
3273     gst_object_unref (allocator);
3274   if (pool)
3275     gst_object_unref (pool);
3276
3277   gst_query_unref (query);
3278
3279   return result;
3280
3281   /* Errors */
3282 no_decide_allocation:
3283   {
3284     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
3285     gst_query_unref (query);
3286
3287     return result;
3288   }
3289 }
3290
3291 /* default negotiation code.
3292  *
3293  * Take intersection between src and sink pads, take first
3294  * caps and fixate.
3295  */
3296 static gboolean
3297 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
3298 {
3299   GstCaps *thiscaps;
3300   GstCaps *caps = NULL;
3301   GstCaps *peercaps = NULL;
3302   gboolean result = FALSE;
3303
3304   /* first see what is possible on our source pad */
3305   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3306   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3307   /* nothing or anything is allowed, we're done */
3308   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3309     goto no_nego_needed;
3310
3311   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3312     goto no_caps;
3313
3314   /* get the peer caps */
3315   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3316   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3317   if (peercaps) {
3318     /* The result is already a subset of our caps */
3319     caps = peercaps;
3320     gst_caps_unref (thiscaps);
3321   } else {
3322     /* no peer, work with our own caps then */
3323     caps = thiscaps;
3324   }
3325   if (caps && !gst_caps_is_empty (caps)) {
3326     /* now fixate */
3327     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3328     if (gst_caps_is_any (caps)) {
3329       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3330       /* hmm, still anything, so element can do anything and
3331        * nego is not needed */
3332       result = TRUE;
3333     } else {
3334       caps = gst_base_src_fixate (basesrc, caps);
3335       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3336       if (gst_caps_is_fixed (caps)) {
3337         /* yay, fixed caps, use those then, it's possible that the subclass does
3338          * not accept this caps after all and we have to fail. */
3339         result = gst_base_src_set_caps (basesrc, caps);
3340       }
3341     }
3342     gst_caps_unref (caps);
3343   } else {
3344     if (caps)
3345       gst_caps_unref (caps);
3346     GST_DEBUG_OBJECT (basesrc, "no common caps");
3347   }
3348   return result;
3349
3350 no_nego_needed:
3351   {
3352     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3353     if (thiscaps)
3354       gst_caps_unref (thiscaps);
3355     return TRUE;
3356   }
3357 no_caps:
3358   {
3359     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3360         ("No supported formats found"),
3361         ("This element did not produce valid caps"));
3362     if (thiscaps)
3363       gst_caps_unref (thiscaps);
3364     return TRUE;
3365   }
3366 }
3367
3368 static gboolean
3369 gst_base_src_negotiate (GstBaseSrc * basesrc)
3370 {
3371   GstBaseSrcClass *bclass;
3372   gboolean result;
3373
3374   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3375
3376   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3377
3378   if (G_LIKELY (bclass->negotiate))
3379     result = bclass->negotiate (basesrc);
3380   else
3381     result = TRUE;
3382
3383   if (G_LIKELY (result)) {
3384     GstCaps *caps;
3385
3386     caps = gst_pad_get_current_caps (basesrc->srcpad);
3387
3388     result = gst_base_src_prepare_allocation (basesrc, caps);
3389
3390     if (caps)
3391       gst_caps_unref (caps);
3392   }
3393   return result;
3394 }
3395
3396 static gboolean
3397 gst_base_src_start (GstBaseSrc * basesrc)
3398 {
3399   GstBaseSrcClass *bclass;
3400   gboolean result;
3401
3402   GST_LIVE_LOCK (basesrc);
3403
3404   GST_OBJECT_LOCK (basesrc);
3405   if (GST_BASE_SRC_IS_STARTING (basesrc))
3406     goto was_starting;
3407   if (GST_BASE_SRC_IS_STARTED (basesrc))
3408     goto was_started;
3409
3410   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3411   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3412   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3413   GST_OBJECT_UNLOCK (basesrc);
3414
3415   basesrc->num_buffers_left = basesrc->num_buffers;
3416   basesrc->running = FALSE;
3417   basesrc->priv->segment_pending = FALSE;
3418   basesrc->priv->segment_seqnum = gst_util_seqnum_next ();
3419   basesrc->priv->forced_eos = FALSE;
3420   GST_LIVE_UNLOCK (basesrc);
3421
3422   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3423   if (bclass->start)
3424     result = bclass->start (basesrc);
3425   else
3426     result = TRUE;
3427
3428   if (!result)
3429     goto could_not_start;
3430
3431   if (!gst_base_src_is_async (basesrc)) {
3432     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3433     /* not really waiting here, we call this to get the result
3434      * from the start_complete call */
3435     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3436   }
3437
3438   return result;
3439
3440   /* ERROR */
3441 was_starting:
3442   {
3443     GST_DEBUG_OBJECT (basesrc, "was starting");
3444     GST_OBJECT_UNLOCK (basesrc);
3445     GST_LIVE_UNLOCK (basesrc);
3446     return TRUE;
3447   }
3448 was_started:
3449   {
3450     GST_DEBUG_OBJECT (basesrc, "was started");
3451     GST_OBJECT_UNLOCK (basesrc);
3452     GST_LIVE_UNLOCK (basesrc);
3453     return TRUE;
3454   }
3455 could_not_start:
3456   {
3457     GST_DEBUG_OBJECT (basesrc, "could not start");
3458     /* subclass is supposed to post a message but we post one as a fallback
3459      * just in case. We don't have to call _stop. */
3460     GST_ELEMENT_ERROR (basesrc, CORE, STATE_CHANGE, (NULL),
3461         ("Failed to start"));
3462     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3463     return FALSE;
3464   }
3465 }
3466
3467 /**
3468  * gst_base_src_start_complete:
3469  * @basesrc: base source instance
3470  * @ret: a #GstFlowReturn
3471  *
3472  * Complete an asynchronous start operation. When the subclass overrides the
3473  * start method, it should call gst_base_src_start_complete() when the start
3474  * operation completes either from the same thread or from an asynchronous
3475  * helper thread.
3476  */
3477 void
3478 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3479 {
3480   gboolean have_size;
3481   guint64 size;
3482   gboolean seekable;
3483   GstFormat format;
3484   GstPadMode mode;
3485   GstEvent *event;
3486
3487   if (ret != GST_FLOW_OK)
3488     goto error;
3489
3490   GST_DEBUG_OBJECT (basesrc, "starting source");
3491   format = basesrc->segment.format;
3492
3493   /* figure out the size */
3494   have_size = FALSE;
3495   size = -1;
3496   if (format == GST_FORMAT_BYTES) {
3497     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3498
3499     if (bclass->get_size) {
3500       if (!(have_size = bclass->get_size (basesrc, &size)))
3501         size = -1;
3502     }
3503     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3504     /* only update the size when operating in bytes, subclass is supposed
3505      * to set duration in the start method for other formats */
3506     GST_OBJECT_LOCK (basesrc);
3507     basesrc->segment.duration = size;
3508     GST_OBJECT_UNLOCK (basesrc);
3509   }
3510
3511   GST_DEBUG_OBJECT (basesrc,
3512       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3513       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3514       basesrc->segment.duration);
3515
3516   seekable = gst_base_src_seekable (basesrc);
3517   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3518
3519   /* update for random access flag */
3520   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3521
3522   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3523
3524   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3525
3526   GST_OBJECT_LOCK (basesrc->srcpad);
3527   mode = GST_PAD_MODE (basesrc->srcpad);
3528   GST_OBJECT_UNLOCK (basesrc->srcpad);
3529
3530   /* take the stream lock here, we only want to let the task run when we have
3531    * set the STARTED flag */
3532   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3533   switch (mode) {
3534     case GST_PAD_MODE_PUSH:
3535       /* do initial seek, which will start the task */
3536       GST_OBJECT_LOCK (basesrc);
3537       event = basesrc->pending_seek;
3538       basesrc->pending_seek = NULL;
3539       GST_OBJECT_UNLOCK (basesrc);
3540
3541       /* The perform seek code will start the task when finished. We don't have to
3542        * unlock the streaming thread because it is not running yet */
3543       if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3544         goto seek_failed;
3545
3546       if (event)
3547         gst_event_unref (event);
3548       break;
3549     case GST_PAD_MODE_PULL:
3550       /* if not random_access, we cannot operate in pull mode for now */
3551       if (G_UNLIKELY (!basesrc->random_access))
3552         goto no_get_range;
3553       break;
3554     default:
3555       goto not_activated_yet;
3556       break;
3557   }
3558
3559   GST_OBJECT_LOCK (basesrc);
3560   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3561   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3562   basesrc->priv->start_result = ret;
3563   GST_ASYNC_SIGNAL (basesrc);
3564   GST_OBJECT_UNLOCK (basesrc);
3565
3566   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3567
3568   return;
3569
3570 seek_failed:
3571   {
3572     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3573     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3574     gst_base_src_stop (basesrc);
3575     if (event)
3576       gst_event_unref (event);
3577     ret = GST_FLOW_ERROR;
3578     goto error;
3579   }
3580 no_get_range:
3581   {
3582     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3583     gst_base_src_stop (basesrc);
3584     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3585     ret = GST_FLOW_ERROR;
3586     goto error;
3587   }
3588 not_activated_yet:
3589   {
3590     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3591     gst_base_src_stop (basesrc);
3592     GST_WARNING_OBJECT (basesrc, "pad not activated yet");
3593     ret = GST_FLOW_ERROR;
3594     goto error;
3595   }
3596 error:
3597   {
3598     GST_OBJECT_LOCK (basesrc);
3599     basesrc->priv->start_result = ret;
3600     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3601     GST_ASYNC_SIGNAL (basesrc);
3602     GST_OBJECT_UNLOCK (basesrc);
3603     return;
3604   }
3605 }
3606
3607 /**
3608  * gst_base_src_start_wait:
3609  * @basesrc: base source instance
3610  *
3611  * Wait until the start operation completes.
3612  *
3613  * Returns: a #GstFlowReturn.
3614  */
3615 GstFlowReturn
3616 gst_base_src_start_wait (GstBaseSrc * basesrc)
3617 {
3618   GstFlowReturn result;
3619
3620   GST_OBJECT_LOCK (basesrc);
3621   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3622     GST_ASYNC_WAIT (basesrc);
3623   }
3624   result = basesrc->priv->start_result;
3625   GST_OBJECT_UNLOCK (basesrc);
3626
3627   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3628
3629   return result;
3630 }
3631
3632 static gboolean
3633 gst_base_src_stop (GstBaseSrc * basesrc)
3634 {
3635   GstBaseSrcClass *bclass;
3636   gboolean result = TRUE;
3637
3638   GST_DEBUG_OBJECT (basesrc, "stopping source");
3639
3640   /* flush all */
3641   gst_base_src_set_flushing (basesrc, TRUE);
3642
3643   /* stop the task */
3644   gst_pad_stop_task (basesrc->srcpad);
3645   /* stop flushing, this will balance unlock/unlock_stop calls */
3646   gst_base_src_set_flushing (basesrc, FALSE);
3647
3648   GST_OBJECT_LOCK (basesrc);
3649   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3650     goto was_stopped;
3651
3652   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3653   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3654   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3655   GST_ASYNC_SIGNAL (basesrc);
3656   GST_OBJECT_UNLOCK (basesrc);
3657
3658   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3659   if (bclass->stop)
3660     result = bclass->stop (basesrc);
3661
3662   if (basesrc->priv->pending_bufferlist != NULL) {
3663     gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
3664     basesrc->priv->pending_bufferlist = NULL;
3665   }
3666
3667   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3668
3669   return result;
3670
3671 was_stopped:
3672   {
3673     GST_DEBUG_OBJECT (basesrc, "was stopped");
3674     GST_OBJECT_UNLOCK (basesrc);
3675     return TRUE;
3676   }
3677 }
3678
3679 /* start or stop flushing dataprocessing
3680  */
3681 static gboolean
3682 gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
3683 {
3684   GstBaseSrcClass *bclass;
3685
3686   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3687
3688   GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
3689
3690   if (flushing) {
3691     gst_base_src_set_pool_flushing (basesrc, TRUE);
3692     /* unlock any subclasses to allow turning off the streaming thread */
3693     if (bclass->unlock)
3694       bclass->unlock (basesrc);
3695   }
3696
3697   /* the live lock is released when we are blocked, waiting for playing,
3698    * when we sync to the clock or creating a buffer */
3699   GST_LIVE_LOCK (basesrc);
3700   basesrc->priv->flushing = flushing;
3701   if (flushing) {
3702     /* clear pending EOS if any */
3703     if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
3704       GST_OBJECT_LOCK (basesrc);
3705       CLEAR_PENDING_EOS (basesrc);
3706       basesrc->priv->forced_eos = FALSE;
3707       GST_OBJECT_UNLOCK (basesrc);
3708     }
3709
3710     /* unblock clock sync (if any) or any other blocking thing */
3711     if (basesrc->clock_id)
3712       gst_clock_id_unschedule (basesrc->clock_id);
3713   } else {
3714     gst_base_src_set_pool_flushing (basesrc, FALSE);
3715
3716     /* Drop all delayed events */
3717     GST_OBJECT_LOCK (basesrc);
3718     if (basesrc->priv->pending_events) {
3719       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3720           NULL);
3721       g_list_free (basesrc->priv->pending_events);
3722       basesrc->priv->pending_events = NULL;
3723       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3724     }
3725     GST_OBJECT_UNLOCK (basesrc);
3726   }
3727
3728   GST_LIVE_SIGNAL (basesrc);
3729   GST_LIVE_UNLOCK (basesrc);
3730
3731   if (!flushing) {
3732     /* Now wait for the stream lock to be released and clear our unlock request */
3733     GST_PAD_STREAM_LOCK (basesrc->srcpad);
3734     if (bclass->unlock_stop)
3735       bclass->unlock_stop (basesrc);
3736     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3737   }
3738
3739   return TRUE;
3740 }
3741
3742 /* the purpose of this function is to make sure that a live source blocks in the
3743  * LIVE lock or leaves the LIVE lock and continues playing. */
3744 static gboolean
3745 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3746 {
3747   /* we are now able to grab the LIVE lock, when we get it, we can be
3748    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3749    * for the clock. */
3750   GST_LIVE_LOCK (basesrc);
3751   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3752
3753   /* unblock clock sync (if any) */
3754   if (basesrc->clock_id)
3755     gst_clock_id_unschedule (basesrc->clock_id);
3756
3757   /* configure what to do when we get to the LIVE lock. */
3758   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3759   basesrc->live_running = live_play;
3760
3761   if (live_play) {
3762     gboolean start;
3763
3764     /* for live sources we restart the timestamp correction */
3765     GST_OBJECT_LOCK (basesrc);
3766     basesrc->priv->latency = -1;
3767     GST_OBJECT_UNLOCK (basesrc);
3768     /* have to restart the task in case it stopped because of the unlock when
3769      * we went to PAUSED. Only do this if we operating in push mode. */
3770     GST_OBJECT_LOCK (basesrc->srcpad);
3771     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3772     GST_OBJECT_UNLOCK (basesrc->srcpad);
3773     if (start)
3774       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3775           basesrc->srcpad, NULL);
3776     GST_DEBUG_OBJECT (basesrc, "signal");
3777     GST_LIVE_SIGNAL (basesrc);
3778   }
3779   GST_LIVE_UNLOCK (basesrc);
3780
3781   return TRUE;
3782 }
3783
3784 static gboolean
3785 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3786 {
3787   GstBaseSrc *basesrc;
3788
3789   basesrc = GST_BASE_SRC (parent);
3790
3791   /* prepare subclass first */
3792   if (active) {
3793     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3794
3795     if (G_UNLIKELY (!basesrc->can_activate_push))
3796       goto no_push_activation;
3797
3798     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3799       goto error_start;
3800   } else {
3801     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3802     /* now we can stop the source */
3803     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3804       goto error_stop;
3805   }
3806   return TRUE;
3807
3808   /* ERRORS */
3809 no_push_activation:
3810   {
3811     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3812     return FALSE;
3813   }
3814 error_start:
3815   {
3816     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3817     return FALSE;
3818   }
3819 error_stop:
3820   {
3821     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3822     return FALSE;
3823   }
3824 }
3825
3826 static gboolean
3827 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3828 {
3829   GstBaseSrc *basesrc;
3830
3831   basesrc = GST_BASE_SRC (parent);
3832
3833   /* prepare subclass first */
3834   if (active) {
3835     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3836     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3837       goto error_start;
3838   } else {
3839     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3840     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3841       goto error_stop;
3842   }
3843   return TRUE;
3844
3845   /* ERRORS */
3846 error_start:
3847   {
3848     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3849     return FALSE;
3850   }
3851 error_stop:
3852   {
3853     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3854     return FALSE;
3855   }
3856 }
3857
3858 static gboolean
3859 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3860     GstPadMode mode, gboolean active)
3861 {
3862   gboolean res;
3863   GstBaseSrc *src = GST_BASE_SRC (parent);
3864
3865   src->priv->stream_start_pending = FALSE;
3866
3867   GST_DEBUG_OBJECT (pad, "activating in mode %d", mode);
3868
3869   switch (mode) {
3870     case GST_PAD_MODE_PULL:
3871       res = gst_base_src_activate_pull (pad, parent, active);
3872       break;
3873     case GST_PAD_MODE_PUSH:
3874       src->priv->stream_start_pending = active;
3875       res = gst_base_src_activate_push (pad, parent, active);
3876       break;
3877     default:
3878       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3879       res = FALSE;
3880       break;
3881   }
3882   return res;
3883 }
3884
3885
3886 static GstStateChangeReturn
3887 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3888 {
3889   GstBaseSrc *basesrc;
3890   GstStateChangeReturn result;
3891   gboolean no_preroll = FALSE;
3892
3893   basesrc = GST_BASE_SRC (element);
3894
3895   switch (transition) {
3896     case GST_STATE_CHANGE_NULL_TO_READY:
3897       break;
3898     case GST_STATE_CHANGE_READY_TO_PAUSED:
3899       no_preroll = gst_base_src_is_live (basesrc);
3900       break;
3901     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3902       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3903       if (gst_base_src_is_live (basesrc)) {
3904         /* now we can start playback */
3905         gst_base_src_set_playing (basesrc, TRUE);
3906       }
3907       break;
3908     default:
3909       break;
3910   }
3911
3912   if ((result =
3913           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3914               transition)) == GST_STATE_CHANGE_FAILURE)
3915     goto failure;
3916
3917   switch (transition) {
3918     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3919       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3920       if (gst_base_src_is_live (basesrc)) {
3921         /* make sure we block in the live cond in PAUSED */
3922         gst_base_src_set_playing (basesrc, FALSE);
3923         no_preroll = TRUE;
3924       }
3925       break;
3926     case GST_STATE_CHANGE_PAUSED_TO_READY:
3927     {
3928       /* we don't need to unblock anything here, the pad deactivation code
3929        * already did this */
3930       if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
3931         GST_OBJECT_LOCK (basesrc);
3932         CLEAR_PENDING_EOS (basesrc);
3933         GST_OBJECT_UNLOCK (basesrc);
3934       }
3935       gst_event_replace (&basesrc->pending_seek, NULL);
3936       break;
3937     }
3938     case GST_STATE_CHANGE_READY_TO_NULL:
3939       break;
3940     default:
3941       break;
3942   }
3943
3944   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3945     result = GST_STATE_CHANGE_NO_PREROLL;
3946
3947   return result;
3948
3949   /* ERRORS */
3950 failure:
3951   {
3952     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3953     return result;
3954   }
3955 }
3956
3957 /**
3958  * gst_base_src_get_buffer_pool:
3959  * @src: a #GstBaseSrc
3960  *
3961  * Returns: (transfer full): the instance of the #GstBufferPool used
3962  * by the src; unref it after usage.
3963  */
3964 GstBufferPool *
3965 gst_base_src_get_buffer_pool (GstBaseSrc * src)
3966 {
3967   GstBufferPool *ret = NULL;
3968
3969   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
3970
3971   GST_OBJECT_LOCK (src);
3972   if (src->priv->pool)
3973     ret = gst_object_ref (src->priv->pool);
3974   GST_OBJECT_UNLOCK (src);
3975
3976   return ret;
3977 }
3978
3979 /**
3980  * gst_base_src_get_allocator:
3981  * @src: a #GstBaseSrc
3982  * @allocator: (out) (allow-none) (transfer full): the #GstAllocator
3983  * used
3984  * @params: (out) (allow-none) (transfer full): the
3985  * #GstAllocationParams of @allocator
3986  *
3987  * Lets #GstBaseSrc sub-classes to know the memory @allocator
3988  * used by the base class and its @params.
3989  *
3990  * Unref the @allocator after usage.
3991  */
3992 void
3993 gst_base_src_get_allocator (GstBaseSrc * src,
3994     GstAllocator ** allocator, GstAllocationParams * params)
3995 {
3996   g_return_if_fail (GST_IS_BASE_SRC (src));
3997
3998   GST_OBJECT_LOCK (src);
3999   if (allocator)
4000     *allocator = src->priv->allocator ?
4001         gst_object_ref (src->priv->allocator) : NULL;
4002
4003   if (params)
4004     *params = src->priv->params;
4005   GST_OBJECT_UNLOCK (src);
4006 }
4007
4008 /**
4009  * gst_base_src_submit_buffer_list:
4010  * @src: a #GstBaseSrc
4011  * @buffer_list: (transfer full): a #GstBufferList
4012  *
4013  * Subclasses can call this from their create virtual method implementation
4014  * to submit a buffer list to be pushed out later. This is useful in
4015  * cases where the create function wants to produce multiple buffers to be
4016  * pushed out in one go in form of a #GstBufferList, which can reduce overhead
4017  * drastically, especially for packetised inputs (for data streams where
4018  * the packetisation/chunking is not important it is usually more efficient
4019  * to return larger buffers instead).
4020  *
4021  * Subclasses that use this function from their create function must return
4022  * %GST_FLOW_OK and no buffer from their create virtual method implementation.
4023  * If a buffer is returned after a buffer list has also been submitted via this
4024  * function the behaviour is undefined.
4025  *
4026  * Subclasses must only call this function once per create function call and
4027  * subclasses must only call this function when the source operates in push
4028  * mode.
4029  *
4030  * Since: 1.14
4031  */
4032 void
4033 gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
4034 {
4035   g_return_if_fail (GST_IS_BASE_SRC (src));
4036   g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
4037   g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
4038
4039   src->priv->pending_bufferlist = buffer_list;
4040
4041   GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
4042       gst_buffer_list_length (buffer_list));
4043 }