buffer: add memset function
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * Since GStreamer 0.10.16 an application may send an EOS event to a source
142  * element to make it perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163
164 #include "gstbasesrc.h"
165 #include "gsttypefindhelper.h"
166 #include <gst/gstmarshal.h>
167 #include <gst/gst-i18n-lib.h>
168
169 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
170 #define GST_CAT_DEFAULT gst_base_src_debug
171
172 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
173 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
174 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
177 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
178 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
179                                                                                 timeval)
180 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
181 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
182
183 /* BaseSrc signals and args */
184 enum
185 {
186   /* FILL ME */
187   LAST_SIGNAL
188 };
189
190 #define DEFAULT_BLOCKSIZE       4096
191 #define DEFAULT_NUM_BUFFERS     -1
192 #define DEFAULT_TYPEFIND        FALSE
193 #define DEFAULT_DO_TIMESTAMP    FALSE
194
195 enum
196 {
197   PROP_0,
198   PROP_BLOCKSIZE,
199   PROP_NUM_BUFFERS,
200   PROP_TYPEFIND,
201   PROP_DO_TIMESTAMP
202 };
203
204 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
205    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
206
207 struct _GstBaseSrcPrivate
208 {
209   gboolean discont;
210   gboolean flushing;
211
212   /* if segment should be sent */
213   gboolean segment_pending;
214
215   /* if EOS is pending (atomic) */
216   gint pending_eos;
217
218   /* startup latency is the time it takes between going to PLAYING and producing
219    * the first BUFFER with running_time 0. This value is included in the latency
220    * reporting. */
221   GstClockTime latency;
222   /* timestamp offset, this is the offset add to the values of gst_times for
223    * pseudo live sources */
224   GstClockTimeDiff ts_offset;
225
226   gboolean do_timestamp;
227   volatile gint dynamic_size;
228
229   /* stream sequence number */
230   guint32 seqnum;
231
232   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
233    * pushed in the data stream */
234   GList *pending_events;
235   volatile gint have_events;
236
237   /* QoS *//* with LOCK */
238   gboolean qos_enabled;
239   gdouble proportion;
240   GstClockTime earliest_time;
241
242   GstBufferPool *pool;
243   const GstAllocator *allocator;
244   guint prefix;
245   guint alignment;
246 };
247
248 static GstElementClass *parent_class = NULL;
249
250 static void gst_base_src_class_init (GstBaseSrcClass * klass);
251 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
252 static void gst_base_src_finalize (GObject * object);
253
254
255 GType
256 gst_base_src_get_type (void)
257 {
258   static volatile gsize base_src_type = 0;
259
260   if (g_once_init_enter (&base_src_type)) {
261     GType _type;
262     static const GTypeInfo base_src_info = {
263       sizeof (GstBaseSrcClass),
264       NULL,
265       NULL,
266       (GClassInitFunc) gst_base_src_class_init,
267       NULL,
268       NULL,
269       sizeof (GstBaseSrc),
270       0,
271       (GInstanceInitFunc) gst_base_src_init,
272     };
273
274     _type = g_type_register_static (GST_TYPE_ELEMENT,
275         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
276     g_once_init_leave (&base_src_type, _type);
277   }
278   return base_src_type;
279 }
280
281 static GstCaps *gst_base_src_getcaps (GstPad * pad, GstCaps * filter);
282 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
283
284 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
285 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
286 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
287 static void gst_base_src_set_property (GObject * object, guint prop_id,
288     const GValue * value, GParamSpec * pspec);
289 static void gst_base_src_get_property (GObject * object, guint prop_id,
290     GValue * value, GParamSpec * pspec);
291 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
292 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
293 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
294 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
295
296 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
297
298 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
299     gboolean active);
300 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
301 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
302     GstSegment * segment);
303 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
304 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
305     GstEvent * event, GstSegment * segment);
306 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
307     guint64 offset, guint size, GstBuffer ** buf);
308
309 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
310     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
311 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
312 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
313
314 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
315     GstStateChange transition);
316
317 static void gst_base_src_loop (GstPad * pad);
318 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
319     guint length, GstBuffer ** buf);
320 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
321     guint length, GstBuffer ** buf);
322 static gboolean gst_base_src_seekable (GstBaseSrc * src);
323 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
324 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
325     guint * length);
326
327 static void
328 gst_base_src_class_init (GstBaseSrcClass * klass)
329 {
330   GObjectClass *gobject_class;
331   GstElementClass *gstelement_class;
332
333   gobject_class = G_OBJECT_CLASS (klass);
334   gstelement_class = GST_ELEMENT_CLASS (klass);
335
336   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
337
338   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
339
340   parent_class = g_type_class_peek_parent (klass);
341
342   gobject_class->finalize = gst_base_src_finalize;
343   gobject_class->set_property = gst_base_src_set_property;
344   gobject_class->get_property = gst_base_src_get_property;
345
346   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
347       g_param_spec_uint ("blocksize", "Block size",
348           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
349           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
351       g_param_spec_int ("num-buffers", "num-buffers",
352           "Number of buffers to output before sending EOS (-1 = unlimited)",
353           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
354           G_PARAM_STATIC_STRINGS));
355   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
356       g_param_spec_boolean ("typefind", "Typefind",
357           "Run typefind before negotiating", DEFAULT_TYPEFIND,
358           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
360       g_param_spec_boolean ("do-timestamp", "Do timestamp",
361           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363
364   gstelement_class->change_state =
365       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
366   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
367   gstelement_class->get_query_types =
368       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
369
370   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
371   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
372   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
373   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
374   klass->prepare_seek_segment =
375       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
376   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
377
378   /* Registering debug symbols for function pointers */
379   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
380   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
381   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
383   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
384   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
385   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
386 }
387
388 static void
389 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
390 {
391   GstPad *pad;
392   GstPadTemplate *pad_template;
393
394   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
395
396   basesrc->is_live = FALSE;
397   basesrc->live_lock = g_mutex_new ();
398   basesrc->live_cond = g_cond_new ();
399   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
400   basesrc->num_buffers_left = -1;
401
402   basesrc->can_activate_push = TRUE;
403
404   pad_template =
405       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
406   g_return_if_fail (pad_template != NULL);
407
408   GST_DEBUG_OBJECT (basesrc, "creating src pad");
409   pad = gst_pad_new_from_template (pad_template, "src");
410
411   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
412   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
413   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
414   gst_pad_set_event_function (pad, gst_base_src_event_handler);
415   gst_pad_set_query_function (pad, gst_base_src_query);
416   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
417   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
418   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
419
420   /* hold pointer to pad */
421   basesrc->srcpad = pad;
422   GST_DEBUG_OBJECT (basesrc, "adding src pad");
423   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
424
425   basesrc->blocksize = DEFAULT_BLOCKSIZE;
426   basesrc->clock_id = NULL;
427   /* we operate in BYTES by default */
428   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
429   basesrc->typefind = DEFAULT_TYPEFIND;
430   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
431   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
432
433   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
434   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE);
435
436   GST_DEBUG_OBJECT (basesrc, "init done");
437 }
438
439 static void
440 gst_base_src_finalize (GObject * object)
441 {
442   GstBaseSrc *basesrc;
443   GstEvent **event_p;
444
445   basesrc = GST_BASE_SRC (object);
446
447   g_mutex_free (basesrc->live_lock);
448   g_cond_free (basesrc->live_cond);
449
450   event_p = &basesrc->pending_seek;
451   gst_event_replace (event_p, NULL);
452
453   if (basesrc->priv->pending_events) {
454     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
455         NULL);
456     g_list_free (basesrc->priv->pending_events);
457   }
458
459   G_OBJECT_CLASS (parent_class)->finalize (object);
460 }
461
462 /**
463  * gst_base_src_wait_playing:
464  * @src: the src
465  *
466  * If the #GstBaseSrcClass.create() method performs its own synchronisation
467  * against the clock it must unblock when going from PLAYING to the PAUSED state
468  * and call this method before continuing to produce the remaining data.
469  *
470  * This function will block until a state change to PLAYING happens (in which
471  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
472  * to a state change to READY or a FLUSH event (in which case this function
473  * returns #GST_FLOW_WRONG_STATE).
474  *
475  * Since: 0.10.12
476  *
477  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
478  * continue. Any other return value should be returned from the create vmethod.
479  */
480 GstFlowReturn
481 gst_base_src_wait_playing (GstBaseSrc * src)
482 {
483   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
484
485   do {
486     /* block until the state changes, or we get a flush, or something */
487     GST_DEBUG_OBJECT (src, "live source waiting for running state");
488     GST_LIVE_WAIT (src);
489     GST_DEBUG_OBJECT (src, "live source unlocked");
490     if (src->priv->flushing)
491       goto flushing;
492   } while (G_UNLIKELY (!src->live_running));
493
494   return GST_FLOW_OK;
495
496   /* ERRORS */
497 flushing:
498   {
499     GST_DEBUG_OBJECT (src, "we are flushing");
500     return GST_FLOW_WRONG_STATE;
501   }
502 }
503
504 /**
505  * gst_base_src_set_live:
506  * @src: base source instance
507  * @live: new live-mode
508  *
509  * If the element listens to a live source, @live should
510  * be set to %TRUE.
511  *
512  * A live source will not produce data in the PAUSED state and
513  * will therefore not be able to participate in the PREROLL phase
514  * of a pipeline. To signal this fact to the application and the
515  * pipeline, the state change return value of the live source will
516  * be GST_STATE_CHANGE_NO_PREROLL.
517  */
518 void
519 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
520 {
521   g_return_if_fail (GST_IS_BASE_SRC (src));
522
523   GST_OBJECT_LOCK (src);
524   src->is_live = live;
525   GST_OBJECT_UNLOCK (src);
526 }
527
528 /**
529  * gst_base_src_is_live:
530  * @src: base source instance
531  *
532  * Check if an element is in live mode.
533  *
534  * Returns: %TRUE if element is in live mode.
535  */
536 gboolean
537 gst_base_src_is_live (GstBaseSrc * src)
538 {
539   gboolean result;
540
541   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
542
543   GST_OBJECT_LOCK (src);
544   result = src->is_live;
545   GST_OBJECT_UNLOCK (src);
546
547   return result;
548 }
549
550 /**
551  * gst_base_src_set_format:
552  * @src: base source instance
553  * @format: the format to use
554  *
555  * Sets the default format of the source. This will be the format used
556  * for sending NEW_SEGMENT events and for performing seeks.
557  *
558  * If a format of GST_FORMAT_BYTES is set, the element will be able to
559  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
560  *
561  * This function must only be called in states < %GST_STATE_PAUSED.
562  *
563  * Since: 0.10.1
564  */
565 void
566 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
567 {
568   g_return_if_fail (GST_IS_BASE_SRC (src));
569   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
570
571   GST_OBJECT_LOCK (src);
572   gst_segment_init (&src->segment, format);
573   GST_OBJECT_UNLOCK (src);
574 }
575
576 /**
577  * gst_base_src_set_dynamic_size:
578  * @src: base source instance
579  * @dynamic: new dynamic size mode
580  *
581  * If not @dynamic, size is only updated when needed, such as when trying to
582  * read past current tracked size.  Otherwise, size is checked for upon each
583  * read.
584  *
585  * Since: 0.10.35
586  */
587 void
588 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
589 {
590   g_return_if_fail (GST_IS_BASE_SRC (src));
591
592   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
593 }
594
595 /**
596  * gst_base_src_query_latency:
597  * @src: the source
598  * @live: (out) (allow-none): if the source is live
599  * @min_latency: (out) (allow-none): the min latency of the source
600  * @max_latency: (out) (allow-none): the max latency of the source
601  *
602  * Query the source for the latency parameters. @live will be TRUE when @src is
603  * configured as a live source. @min_latency will be set to the difference
604  * between the running time and the timestamp of the first buffer.
605  * @max_latency is always the undefined value of -1.
606  *
607  * This function is mostly used by subclasses.
608  *
609  * Returns: TRUE if the query succeeded.
610  *
611  * Since: 0.10.13
612  */
613 gboolean
614 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
615     GstClockTime * min_latency, GstClockTime * max_latency)
616 {
617   GstClockTime min;
618
619   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
620
621   GST_OBJECT_LOCK (src);
622   if (live)
623     *live = src->is_live;
624
625   /* if we have a startup latency, report this one, else report 0. Subclasses
626    * are supposed to override the query function if they want something
627    * else. */
628   if (src->priv->latency != -1)
629     min = src->priv->latency;
630   else
631     min = 0;
632
633   if (min_latency)
634     *min_latency = min;
635   if (max_latency)
636     *max_latency = -1;
637
638   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
639       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
640       GST_TIME_ARGS (-1));
641   GST_OBJECT_UNLOCK (src);
642
643   return TRUE;
644 }
645
646 /**
647  * gst_base_src_set_blocksize:
648  * @src: the source
649  * @blocksize: the new blocksize in bytes
650  *
651  * Set the number of bytes that @src will push out with each buffer. When
652  * @blocksize is set to -1, a default length will be used.
653  *
654  * Since: 0.10.22
655  */
656 void
657 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
658 {
659   g_return_if_fail (GST_IS_BASE_SRC (src));
660
661   GST_OBJECT_LOCK (src);
662   src->blocksize = blocksize;
663   GST_OBJECT_UNLOCK (src);
664 }
665
666 /**
667  * gst_base_src_get_blocksize:
668  * @src: the source
669  *
670  * Get the number of bytes that @src will push out with each buffer.
671  *
672  * Returns: the number of bytes pushed with each buffer.
673  *
674  * Since: 0.10.22
675  */
676 guint
677 gst_base_src_get_blocksize (GstBaseSrc * src)
678 {
679   gint res;
680
681   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
682
683   GST_OBJECT_LOCK (src);
684   res = src->blocksize;
685   GST_OBJECT_UNLOCK (src);
686
687   return res;
688 }
689
690
691 /**
692  * gst_base_src_set_do_timestamp:
693  * @src: the source
694  * @timestamp: enable or disable timestamping
695  *
696  * Configure @src to automatically timestamp outgoing buffers based on the
697  * current running_time of the pipeline. This property is mostly useful for live
698  * sources.
699  *
700  * Since: 0.10.15
701  */
702 void
703 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
704 {
705   g_return_if_fail (GST_IS_BASE_SRC (src));
706
707   GST_OBJECT_LOCK (src);
708   src->priv->do_timestamp = timestamp;
709   GST_OBJECT_UNLOCK (src);
710 }
711
712 /**
713  * gst_base_src_get_do_timestamp:
714  * @src: the source
715  *
716  * Query if @src timestamps outgoing buffers based on the current running_time.
717  *
718  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
719  *
720  * Since: 0.10.15
721  */
722 gboolean
723 gst_base_src_get_do_timestamp (GstBaseSrc * src)
724 {
725   gboolean res;
726
727   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
728
729   GST_OBJECT_LOCK (src);
730   res = src->priv->do_timestamp;
731   GST_OBJECT_UNLOCK (src);
732
733   return res;
734 }
735
736 /**
737  * gst_base_src_new_seamless_segment:
738  * @src: The source
739  * @start: The new start value for the segment
740  * @stop: Stop value for the new segment
741  * @position: The position value for the new segent
742  *
743  * Prepare a new seamless segment for emission downstream. This function must
744  * only be called by derived sub-classes, and only from the create() function,
745  * as the stream-lock needs to be held.
746  *
747  * The format for the new segment will be the current format of the source, as
748  * configured with gst_base_src_set_format()
749  *
750  * Returns: %TRUE if preparation of the seamless segment succeeded.
751  *
752  * Since: 0.10.26
753  */
754 gboolean
755 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
756     gint64 position)
757 {
758   gboolean res = TRUE;
759
760   GST_DEBUG_OBJECT (src,
761       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
762       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
763       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
764
765   GST_OBJECT_LOCK (src);
766
767   src->segment.base = gst_segment_to_running_time (&src->segment,
768       src->segment.format, src->segment.position);
769   src->segment.start = start;
770   src->segment.stop = stop;
771   src->segment.position = position;
772
773   /* forward, we send data from position to stop */
774   src->priv->segment_pending = TRUE;
775   GST_OBJECT_UNLOCK (src);
776
777   src->priv->discont = TRUE;
778   src->running = TRUE;
779
780   return res;
781 }
782
783 static gboolean
784 gst_base_src_setcaps (GstBaseSrc * bsrc, GstCaps * caps)
785 {
786   GstBaseSrcClass *bclass;
787   gboolean res = TRUE;
788
789   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
790
791   gst_pad_push_event (bsrc->srcpad, gst_event_new_caps (caps));
792
793   if (bclass->set_caps)
794     res = bclass->set_caps (bsrc, caps);
795
796   return res;
797 }
798
799 static GstCaps *
800 gst_base_src_getcaps (GstPad * pad, GstCaps * filter)
801 {
802   GstBaseSrcClass *bclass;
803   GstBaseSrc *bsrc;
804   GstCaps *caps = NULL;
805
806   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
807   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
808   if (bclass->get_caps)
809     caps = bclass->get_caps (bsrc, filter);
810
811   if (caps == NULL) {
812     GstPadTemplate *pad_template;
813
814     pad_template =
815         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
816     if (pad_template != NULL) {
817       caps = gst_pad_template_get_caps (pad_template);
818
819       if (filter) {
820         GstCaps *intersection;
821
822         intersection =
823             gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
824         gst_caps_unref (caps);
825         caps = intersection;
826       }
827     }
828   }
829   return caps;
830 }
831
832 static void
833 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
834 {
835   GstBaseSrcClass *bclass;
836   GstBaseSrc *bsrc;
837
838   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
839   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
840
841   if (bclass->fixate)
842     bclass->fixate (bsrc, caps);
843
844   gst_object_unref (bsrc);
845 }
846
847 static gboolean
848 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
849 {
850   gboolean res;
851
852   switch (GST_QUERY_TYPE (query)) {
853     case GST_QUERY_POSITION:
854     {
855       GstFormat format;
856
857       gst_query_parse_position (query, &format, NULL);
858
859       GST_DEBUG_OBJECT (src, "position query in format %s",
860           gst_format_get_name (format));
861
862       switch (format) {
863         case GST_FORMAT_PERCENT:
864         {
865           gint64 percent;
866           gint64 position;
867           gint64 duration;
868
869           GST_OBJECT_LOCK (src);
870           position = src->segment.position;
871           duration = src->segment.duration;
872           GST_OBJECT_UNLOCK (src);
873
874           if (position != -1 && duration != -1) {
875             if (position < duration)
876               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
877                   duration);
878             else
879               percent = GST_FORMAT_PERCENT_MAX;
880           } else
881             percent = -1;
882
883           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
884           res = TRUE;
885           break;
886         }
887         default:
888         {
889           gint64 position;
890           GstFormat seg_format;
891
892           GST_OBJECT_LOCK (src);
893           position =
894               gst_segment_to_stream_time (&src->segment, src->segment.format,
895               src->segment.position);
896           seg_format = src->segment.format;
897           GST_OBJECT_UNLOCK (src);
898
899           if (position != -1) {
900             /* convert to requested format */
901             res =
902                 gst_pad_query_convert (src->srcpad, seg_format,
903                 position, &format, &position);
904           } else
905             res = TRUE;
906
907           gst_query_set_position (query, format, position);
908           break;
909         }
910       }
911       break;
912     }
913     case GST_QUERY_DURATION:
914     {
915       GstFormat format;
916
917       gst_query_parse_duration (query, &format, NULL);
918
919       GST_DEBUG_OBJECT (src, "duration query in format %s",
920           gst_format_get_name (format));
921
922       switch (format) {
923         case GST_FORMAT_PERCENT:
924           gst_query_set_duration (query, GST_FORMAT_PERCENT,
925               GST_FORMAT_PERCENT_MAX);
926           res = TRUE;
927           break;
928         default:
929         {
930           gint64 duration;
931           GstFormat seg_format;
932           guint length = 0;
933
934           /* may have to refresh duration */
935           if (g_atomic_int_get (&src->priv->dynamic_size))
936             gst_base_src_update_length (src, 0, &length);
937
938           /* this is the duration as configured by the subclass. */
939           GST_OBJECT_LOCK (src);
940           duration = src->segment.duration;
941           seg_format = src->segment.format;
942           GST_OBJECT_UNLOCK (src);
943
944           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
945               duration, gst_format_get_name (seg_format));
946
947           if (duration != -1) {
948             /* convert to requested format, if this fails, we have a duration
949              * but we cannot answer the query, we must return FALSE. */
950             res =
951                 gst_pad_query_convert (src->srcpad, seg_format,
952                 duration, &format, &duration);
953           } else {
954             /* The subclass did not configure a duration, we assume that the
955              * media has an unknown duration then and we return TRUE to report
956              * this. Note that this is not the same as returning FALSE, which
957              * means that we cannot report the duration at all. */
958             res = TRUE;
959           }
960           gst_query_set_duration (query, format, duration);
961           break;
962         }
963       }
964       break;
965     }
966
967     case GST_QUERY_SEEKING:
968     {
969       GstFormat format, seg_format;
970       gint64 duration;
971
972       GST_OBJECT_LOCK (src);
973       duration = src->segment.duration;
974       seg_format = src->segment.format;
975       GST_OBJECT_UNLOCK (src);
976
977       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
978       if (format == seg_format) {
979         gst_query_set_seeking (query, seg_format,
980             gst_base_src_seekable (src), 0, duration);
981         res = TRUE;
982       } else {
983         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
984         /* Don't reply to the query to make up for demuxers which don't
985          * handle the SEEKING query yet. Players like Totem will fall back
986          * to the duration when the SEEKING query isn't answered. */
987         res = FALSE;
988       }
989       break;
990     }
991     case GST_QUERY_SEGMENT:
992     {
993       gint64 start, stop;
994
995       GST_OBJECT_LOCK (src);
996       /* no end segment configured, current duration then */
997       if ((stop = src->segment.stop) == -1)
998         stop = src->segment.duration;
999       start = src->segment.start;
1000
1001       /* adjust to stream time */
1002       if (src->segment.time != -1) {
1003         start -= src->segment.time;
1004         if (stop != -1)
1005           stop -= src->segment.time;
1006       }
1007
1008       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1009           start, stop);
1010       GST_OBJECT_UNLOCK (src);
1011       res = TRUE;
1012       break;
1013     }
1014
1015     case GST_QUERY_FORMATS:
1016     {
1017       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1018           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1019       res = TRUE;
1020       break;
1021     }
1022     case GST_QUERY_CONVERT:
1023     {
1024       GstFormat src_fmt, dest_fmt;
1025       gint64 src_val, dest_val;
1026
1027       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1028
1029       /* we can only convert between equal formats... */
1030       if (src_fmt == dest_fmt) {
1031         dest_val = src_val;
1032         res = TRUE;
1033       } else
1034         res = FALSE;
1035
1036       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1037       break;
1038     }
1039     case GST_QUERY_LATENCY:
1040     {
1041       GstClockTime min, max;
1042       gboolean live;
1043
1044       /* Subclasses should override and implement something usefull */
1045       res = gst_base_src_query_latency (src, &live, &min, &max);
1046
1047       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1048           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1049           GST_TIME_ARGS (max));
1050
1051       gst_query_set_latency (query, live, min, max);
1052       break;
1053     }
1054     case GST_QUERY_JITTER:
1055     case GST_QUERY_RATE:
1056       res = FALSE;
1057       break;
1058     case GST_QUERY_BUFFERING:
1059     {
1060       GstFormat format, seg_format;
1061       gint64 start, stop, estimated;
1062
1063       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1064
1065       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1066           gst_format_get_name (format));
1067
1068       GST_OBJECT_LOCK (src);
1069       if (src->random_access) {
1070         estimated = 0;
1071         start = 0;
1072         if (format == GST_FORMAT_PERCENT)
1073           stop = GST_FORMAT_PERCENT_MAX;
1074         else
1075           stop = src->segment.duration;
1076       } else {
1077         estimated = -1;
1078         start = -1;
1079         stop = -1;
1080       }
1081       seg_format = src->segment.format;
1082       GST_OBJECT_UNLOCK (src);
1083
1084       /* convert to required format. When the conversion fails, we can't answer
1085        * the query. When the value is unknown, we can don't perform conversion
1086        * but report TRUE. */
1087       if (format != GST_FORMAT_PERCENT && stop != -1) {
1088         res = gst_pad_query_convert (src->srcpad, seg_format,
1089             stop, &format, &stop);
1090       } else {
1091         res = TRUE;
1092       }
1093       if (res && format != GST_FORMAT_PERCENT && start != -1)
1094         res = gst_pad_query_convert (src->srcpad, seg_format,
1095             start, &format, &start);
1096
1097       gst_query_set_buffering_range (query, format, start, stop, estimated);
1098       break;
1099     }
1100     case GST_QUERY_SCHEDULING:
1101     {
1102       gboolean random_access;
1103
1104       random_access = gst_base_src_is_random_access (src);
1105
1106       /* we can operate in getrange mode if the native format is bytes
1107        * and we are seekable, this condition is set in the random_access
1108        * flag and is set in the _start() method. */
1109       gst_query_set_scheduling (query, random_access, TRUE, FALSE, 1, -1, 1);
1110
1111       res = TRUE;
1112       break;
1113     }
1114     default:
1115       res = FALSE;
1116       break;
1117   }
1118   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1119       res);
1120   return res;
1121 }
1122
1123 static gboolean
1124 gst_base_src_query (GstPad * pad, GstQuery * query)
1125 {
1126   GstBaseSrc *src;
1127   GstBaseSrcClass *bclass;
1128   gboolean result = FALSE;
1129
1130   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1131   if (G_UNLIKELY (src == NULL))
1132     return FALSE;
1133
1134   bclass = GST_BASE_SRC_GET_CLASS (src);
1135
1136   if (bclass->query)
1137     result = bclass->query (src, query);
1138   else
1139     result = gst_pad_query_default (pad, query);
1140
1141   gst_object_unref (src);
1142
1143   return result;
1144 }
1145
1146 static gboolean
1147 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1148 {
1149   gboolean res = TRUE;
1150
1151   /* update our offset if the start/stop position was updated */
1152   if (segment->format == GST_FORMAT_BYTES) {
1153     segment->time = segment->start;
1154   } else if (segment->start == 0) {
1155     /* seek to start, we can implement a default for this. */
1156     segment->time = 0;
1157   } else {
1158     res = FALSE;
1159     GST_INFO_OBJECT (src, "Can't do a default seek");
1160   }
1161
1162   return res;
1163 }
1164
1165 static gboolean
1166 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1167 {
1168   GstBaseSrcClass *bclass;
1169   gboolean result = FALSE;
1170
1171   bclass = GST_BASE_SRC_GET_CLASS (src);
1172
1173   if (bclass->do_seek)
1174     result = bclass->do_seek (src, segment);
1175
1176   return result;
1177 }
1178
1179 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1180
1181 static gboolean
1182 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1183     GstSegment * segment)
1184 {
1185   /* By default, we try one of 2 things:
1186    *   - For absolute seek positions, convert the requested position to our
1187    *     configured processing format and place it in the output segment \
1188    *   - For relative seek positions, convert our current (input) values to the
1189    *     seek format, adjust by the relative seek offset and then convert back to
1190    *     the processing format
1191    */
1192   GstSeekType cur_type, stop_type;
1193   gint64 cur, stop;
1194   GstSeekFlags flags;
1195   GstFormat seek_format, dest_format;
1196   gdouble rate;
1197   gboolean update;
1198   gboolean res = TRUE;
1199
1200   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1201       &cur_type, &cur, &stop_type, &stop);
1202   dest_format = segment->format;
1203
1204   if (seek_format == dest_format) {
1205     gst_segment_do_seek (segment, rate, seek_format, flags,
1206         cur_type, cur, stop_type, stop, &update);
1207     return TRUE;
1208   }
1209
1210   if (cur_type != GST_SEEK_TYPE_NONE) {
1211     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1212     res =
1213         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1214         &cur);
1215     cur_type = GST_SEEK_TYPE_SET;
1216   }
1217
1218   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1219     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1220     res =
1221         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1222         &stop);
1223     stop_type = GST_SEEK_TYPE_SET;
1224   }
1225
1226   /* And finally, configure our output segment in the desired format */
1227   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1228       stop_type, stop, &update);
1229
1230   if (!res)
1231     goto no_format;
1232
1233   return res;
1234
1235 no_format:
1236   {
1237     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1238     return FALSE;
1239   }
1240 }
1241
1242 static gboolean
1243 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1244     GstSegment * seeksegment)
1245 {
1246   GstBaseSrcClass *bclass;
1247   gboolean result = FALSE;
1248
1249   bclass = GST_BASE_SRC_GET_CLASS (src);
1250
1251   if (bclass->prepare_seek_segment)
1252     result = bclass->prepare_seek_segment (src, event, seeksegment);
1253
1254   return result;
1255 }
1256
1257 static GstFlowReturn
1258 gst_base_src_alloc_buffer (GstBaseSrc * src, guint64 offset,
1259     guint size, GstBuffer ** buffer)
1260 {
1261   GstFlowReturn ret;
1262   GstBaseSrcPrivate *priv = src->priv;
1263
1264   if (priv->pool) {
1265     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1266   } else {
1267     *buffer = gst_buffer_new_allocate (priv->allocator, size, priv->alignment);
1268     if (G_UNLIKELY (*buffer == NULL))
1269       goto alloc_failed;
1270
1271     ret = GST_FLOW_OK;
1272   }
1273   return ret;
1274
1275   /* ERRORS */
1276 alloc_failed:
1277   {
1278     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1279     return GST_FLOW_ERROR;
1280   }
1281 }
1282
1283 static GstFlowReturn
1284 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1285     guint size, GstBuffer ** buffer)
1286 {
1287   GstBaseSrcClass *bclass;
1288   GstFlowReturn ret;
1289
1290   bclass = GST_BASE_SRC_GET_CLASS (src);
1291
1292   if (G_UNLIKELY (!bclass->fill))
1293     goto no_function;
1294
1295   ret = gst_base_src_alloc_buffer (src, offset, size, buffer);
1296   if (G_UNLIKELY (ret != GST_FLOW_OK))
1297     goto alloc_failed;
1298
1299   if (G_LIKELY (size > 0)) {
1300     /* only call fill when there is a size */
1301     ret = bclass->fill (src, offset, size, *buffer);
1302     if (G_UNLIKELY (ret != GST_FLOW_OK))
1303       goto not_ok;
1304   }
1305
1306   return GST_FLOW_OK;
1307
1308   /* ERRORS */
1309 no_function:
1310   {
1311     GST_DEBUG_OBJECT (src, "no fill function");
1312     return GST_FLOW_NOT_SUPPORTED;
1313   }
1314 alloc_failed:
1315   {
1316     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1317     return ret;
1318   }
1319 not_ok:
1320   {
1321     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1322         gst_flow_get_name (ret));
1323     gst_buffer_unref (*buffer);
1324     return ret;
1325   }
1326 }
1327
1328 /* this code implements the seeking. It is a good example
1329  * handling all cases.
1330  *
1331  * A seek updates the currently configured segment.start
1332  * and segment.stop values based on the SEEK_TYPE. If the
1333  * segment.start value is updated, a seek to this new position
1334  * should be performed.
1335  *
1336  * The seek can only be executed when we are not currently
1337  * streaming any data, to make sure that this is the case, we
1338  * acquire the STREAM_LOCK which is taken when we are in the
1339  * _loop() function or when a getrange() is called. Normally
1340  * we will not receive a seek if we are operating in pull mode
1341  * though. When we operate as a live source we might block on the live
1342  * cond, which does not release the STREAM_LOCK. Therefore we will try
1343  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1344  * safe to perform the seek.
1345  *
1346  * When we are in the loop() function, we might be in the middle
1347  * of pushing a buffer, which might block in a sink. To make sure
1348  * that the push gets unblocked we push out a FLUSH_START event.
1349  * Our loop function will get a WRONG_STATE return value from
1350  * the push and will pause, effectively releasing the STREAM_LOCK.
1351  *
1352  * For a non-flushing seek, we pause the task, which might eventually
1353  * release the STREAM_LOCK. We say eventually because when the sink
1354  * blocks on the sample we might wait a very long time until the sink
1355  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1356  * can continue the seek. A non-flushing seek is normally done in a
1357  * running pipeline to perform seamless playback, this means that the sink is
1358  * PLAYING and will return from its chain function.
1359  * In the case of a non-flushing seek we need to make sure that the
1360  * data we output after the seek is continuous with the previous data,
1361  * this is because a non-flushing seek does not reset the running-time
1362  * to 0. We do this by closing the currently running segment, ie. sending
1363  * a new_segment event with the stop position set to the last processed
1364  * position.
1365  *
1366  * After updating the segment.start/stop values, we prepare for
1367  * streaming again. We push out a FLUSH_STOP to make the peer pad
1368  * accept data again and we start our task again.
1369  *
1370  * A segment seek posts a message on the bus saying that the playback
1371  * of the segment started. We store the segment flag internally because
1372  * when we reach the segment.stop we have to post a segment.done
1373  * instead of EOS when doing a segment seek.
1374  */
1375 /* FIXME (0.11), we have the unlock gboolean here because most current
1376  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1377  * the streaming thread isn't running, resulting in bogus unlocks later when it
1378  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1379  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1380  * until 0.11
1381  */
1382 static gboolean
1383 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1384 {
1385   gboolean res = TRUE, tres;
1386   gdouble rate;
1387   GstFormat seek_format, dest_format;
1388   GstSeekFlags flags;
1389   GstSeekType cur_type, stop_type;
1390   gint64 cur, stop;
1391   gboolean flush, playing;
1392   gboolean update;
1393   gboolean relative_seek = FALSE;
1394   gboolean seekseg_configured = FALSE;
1395   GstSegment seeksegment;
1396   guint32 seqnum;
1397   GstEvent *tevent;
1398
1399   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1400
1401   GST_OBJECT_LOCK (src);
1402   dest_format = src->segment.format;
1403   GST_OBJECT_UNLOCK (src);
1404
1405   if (event) {
1406     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1407         &cur_type, &cur, &stop_type, &stop);
1408
1409     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1410         SEEK_TYPE_IS_RELATIVE (stop_type);
1411
1412     if (dest_format != seek_format && !relative_seek) {
1413       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1414        * here before taking the stream lock, otherwise we must convert it later,
1415        * once we have the stream lock and can read the last configures segment
1416        * start and stop positions */
1417       gst_segment_init (&seeksegment, dest_format);
1418
1419       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1420         goto prepare_failed;
1421
1422       seekseg_configured = TRUE;
1423     }
1424
1425     flush = flags & GST_SEEK_FLAG_FLUSH;
1426     seqnum = gst_event_get_seqnum (event);
1427   } else {
1428     flush = FALSE;
1429     /* get next seqnum */
1430     seqnum = gst_util_seqnum_next ();
1431   }
1432
1433   /* send flush start */
1434   if (flush) {
1435     tevent = gst_event_new_flush_start ();
1436     gst_event_set_seqnum (tevent, seqnum);
1437     gst_pad_push_event (src->srcpad, tevent);
1438   } else
1439     gst_pad_pause_task (src->srcpad);
1440
1441   /* unblock streaming thread. */
1442   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1443
1444   /* grab streaming lock, this should eventually be possible, either
1445    * because the task is paused, our streaming thread stopped
1446    * or because our peer is flushing. */
1447   GST_PAD_STREAM_LOCK (src->srcpad);
1448   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1449     /* we have seen this event before, issue a warning for now */
1450     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1451         seqnum);
1452   } else {
1453     src->priv->seqnum = seqnum;
1454     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1455   }
1456
1457   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1458
1459   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1460    * copy the current segment info into the temp segment that we can actually
1461    * attempt the seek with. We only update the real segment if the seek suceeds. */
1462   if (!seekseg_configured) {
1463     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1464
1465     /* now configure the final seek segment */
1466     if (event) {
1467       if (seeksegment.format != seek_format) {
1468         /* OK, here's where we give the subclass a chance to convert the relative
1469          * seek into an absolute one in the processing format. We set up any
1470          * absolute seek above, before taking the stream lock. */
1471         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1472           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1473               "Aborting seek");
1474           res = FALSE;
1475         }
1476       } else {
1477         /* The seek format matches our processing format, no need to ask the
1478          * the subclass to configure the segment. */
1479         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1480             cur_type, cur, stop_type, stop, &update);
1481       }
1482     }
1483     /* Else, no seek event passed, so we're just (re)starting the
1484        current segment. */
1485   }
1486
1487   if (res) {
1488     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1489         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1490         seeksegment.start, seeksegment.stop, seeksegment.position);
1491
1492     /* do the seek, segment.position contains the new position. */
1493     res = gst_base_src_do_seek (src, &seeksegment);
1494   }
1495
1496   /* and prepare to continue streaming */
1497   if (flush) {
1498     tevent = gst_event_new_flush_stop (TRUE);
1499     gst_event_set_seqnum (tevent, seqnum);
1500     /* send flush stop, peer will accept data and events again. We
1501      * are not yet providing data as we still have the STREAM_LOCK. */
1502     gst_pad_push_event (src->srcpad, tevent);
1503   }
1504
1505   /* The subclass must have converted the segment to the processing format
1506    * by now */
1507   if (res && seeksegment.format != dest_format) {
1508     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1509         "in the correct format. Aborting seek.");
1510     res = FALSE;
1511   }
1512
1513   /* if the seek was successful, we update our real segment and push
1514    * out the new segment. */
1515   if (res) {
1516     GST_OBJECT_LOCK (src);
1517     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1518     GST_OBJECT_UNLOCK (src);
1519
1520     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1521       GstMessage *message;
1522
1523       message = gst_message_new_segment_start (GST_OBJECT (src),
1524           seeksegment.format, seeksegment.position);
1525       gst_message_set_seqnum (message, seqnum);
1526
1527       gst_element_post_message (GST_ELEMENT (src), message);
1528     }
1529
1530     /* for deriving a stop position for the playback segment from the seek
1531      * segment, we must take the duration when the stop is not set */
1532     if ((stop = seeksegment.stop) == -1)
1533       stop = seeksegment.duration;
1534
1535     src->priv->segment_pending = TRUE;
1536   }
1537
1538   src->priv->discont = TRUE;
1539   src->running = TRUE;
1540   /* and restart the task in case it got paused explicitly or by
1541    * the FLUSH_START event we pushed out. */
1542   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1543       src->srcpad);
1544   if (res && !tres)
1545     res = FALSE;
1546
1547   /* and release the lock again so we can continue streaming */
1548   GST_PAD_STREAM_UNLOCK (src->srcpad);
1549
1550   return res;
1551
1552   /* ERROR */
1553 prepare_failed:
1554   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1555       "Aborting seek");
1556   return FALSE;
1557 }
1558
1559 static const GstQueryType *
1560 gst_base_src_get_query_types (GstElement * element)
1561 {
1562   static const GstQueryType query_types[] = {
1563     GST_QUERY_DURATION,
1564     GST_QUERY_POSITION,
1565     GST_QUERY_SEEKING,
1566     GST_QUERY_SEGMENT,
1567     GST_QUERY_FORMATS,
1568     GST_QUERY_LATENCY,
1569     GST_QUERY_JITTER,
1570     GST_QUERY_RATE,
1571     GST_QUERY_CONVERT,
1572     0
1573   };
1574
1575   return query_types;
1576 }
1577
1578 /* all events send to this element directly. This is mainly done from the
1579  * application.
1580  */
1581 static gboolean
1582 gst_base_src_send_event (GstElement * element, GstEvent * event)
1583 {
1584   GstBaseSrc *src;
1585   gboolean result = FALSE;
1586
1587   src = GST_BASE_SRC (element);
1588
1589   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1590
1591   switch (GST_EVENT_TYPE (event)) {
1592       /* bidirectional events */
1593     case GST_EVENT_FLUSH_START:
1594     case GST_EVENT_FLUSH_STOP:
1595       /* sending random flushes downstream can break stuff,
1596        * especially sync since all segment info will get flushed */
1597       break;
1598
1599       /* downstream serialized events */
1600     case GST_EVENT_EOS:
1601     {
1602       GstBaseSrcClass *bclass;
1603
1604       bclass = GST_BASE_SRC_GET_CLASS (src);
1605
1606       /* queue EOS and make sure the task or pull function performs the EOS
1607        * actions.
1608        *
1609        * We have two possibilities:
1610        *
1611        *  - Before we are to enter the _create function, we check the pending_eos
1612        *    first and do EOS instead of entering it.
1613        *  - If we are in the _create function or we did not manage to set the
1614        *    flag fast enough and we are about to enter the _create function,
1615        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1616        *    check the EOS flag and do the EOS logic.
1617        */
1618       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1619       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1620
1621
1622       /* unlock the _create function so that we can check the pending_eos flag
1623        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1624        * that we can grab it and stop the unlock again. We don't take the stream
1625        * lock so that this operation is guaranteed to never block. */
1626       gst_base_src_activate_pool (src, FALSE);
1627       if (bclass->unlock)
1628         bclass->unlock (src);
1629
1630       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1631
1632       GST_LIVE_LOCK (src);
1633       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1634       /* now stop the unlock of the streaming thread again. Grabbing the live
1635        * lock is enough because that protects the create function. */
1636       if (bclass->unlock_stop)
1637         bclass->unlock_stop (src);
1638       gst_base_src_activate_pool (src, TRUE);
1639       GST_LIVE_UNLOCK (src);
1640
1641       result = TRUE;
1642       break;
1643     }
1644     case GST_EVENT_SEGMENT:
1645       /* sending random SEGMENT downstream can break sync. */
1646       break;
1647     case GST_EVENT_TAG:
1648     case GST_EVENT_CUSTOM_DOWNSTREAM:
1649     case GST_EVENT_CUSTOM_BOTH:
1650       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1651       GST_OBJECT_LOCK (src);
1652       src->priv->pending_events =
1653           g_list_append (src->priv->pending_events, event);
1654       g_atomic_int_set (&src->priv->have_events, TRUE);
1655       GST_OBJECT_UNLOCK (src);
1656       event = NULL;
1657       result = TRUE;
1658       break;
1659     case GST_EVENT_BUFFERSIZE:
1660       /* does not seem to make much sense currently */
1661       break;
1662
1663       /* upstream events */
1664     case GST_EVENT_QOS:
1665       /* elements should override send_event and do something */
1666       break;
1667     case GST_EVENT_SEEK:
1668     {
1669       gboolean started;
1670
1671       GST_OBJECT_LOCK (src->srcpad);
1672       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1673         goto wrong_mode;
1674       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1675       GST_OBJECT_UNLOCK (src->srcpad);
1676
1677       if (started) {
1678         GST_DEBUG_OBJECT (src, "performing seek");
1679         /* when we are running in push mode, we can execute the
1680          * seek right now, we need to unlock. */
1681         result = gst_base_src_perform_seek (src, event, TRUE);
1682       } else {
1683         GstEvent **event_p;
1684
1685         /* else we store the event and execute the seek when we
1686          * get activated */
1687         GST_OBJECT_LOCK (src);
1688         GST_DEBUG_OBJECT (src, "queueing seek");
1689         event_p = &src->pending_seek;
1690         gst_event_replace ((GstEvent **) event_p, event);
1691         GST_OBJECT_UNLOCK (src);
1692         /* assume the seek will work */
1693         result = TRUE;
1694       }
1695       break;
1696     }
1697     case GST_EVENT_NAVIGATION:
1698       /* could make sense for elements that do something with navigation events
1699        * but then they would need to override the send_event function */
1700       break;
1701     case GST_EVENT_LATENCY:
1702       /* does not seem to make sense currently */
1703       break;
1704
1705       /* custom events */
1706     case GST_EVENT_CUSTOM_UPSTREAM:
1707       /* override send_event if you want this */
1708       break;
1709     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1710     case GST_EVENT_CUSTOM_BOTH_OOB:
1711       /* insert a random custom event into the pipeline */
1712       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1713       result = gst_pad_push_event (src->srcpad, event);
1714       /* we gave away the ref to the event in the push */
1715       event = NULL;
1716       break;
1717     default:
1718       break;
1719   }
1720 done:
1721   /* if we still have a ref to the event, unref it now */
1722   if (event)
1723     gst_event_unref (event);
1724
1725   return result;
1726
1727   /* ERRORS */
1728 wrong_mode:
1729   {
1730     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1731     GST_OBJECT_UNLOCK (src->srcpad);
1732     result = FALSE;
1733     goto done;
1734   }
1735 }
1736
1737 static gboolean
1738 gst_base_src_seekable (GstBaseSrc * src)
1739 {
1740   GstBaseSrcClass *bclass;
1741   bclass = GST_BASE_SRC_GET_CLASS (src);
1742   if (bclass->is_seekable)
1743     return bclass->is_seekable (src);
1744   else
1745     return FALSE;
1746 }
1747
1748 static void
1749 gst_base_src_update_qos (GstBaseSrc * src,
1750     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1751 {
1752   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1753       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1754       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1755
1756   GST_OBJECT_LOCK (src);
1757   src->priv->proportion = proportion;
1758   src->priv->earliest_time = timestamp + diff;
1759   GST_OBJECT_UNLOCK (src);
1760 }
1761
1762
1763 static gboolean
1764 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1765 {
1766   gboolean result;
1767
1768   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1769
1770   switch (GST_EVENT_TYPE (event)) {
1771     case GST_EVENT_SEEK:
1772       /* is normally called when in push mode */
1773       if (!gst_base_src_seekable (src))
1774         goto not_seekable;
1775
1776       result = gst_base_src_perform_seek (src, event, TRUE);
1777       break;
1778     case GST_EVENT_FLUSH_START:
1779       /* cancel any blocking getrange, is normally called
1780        * when in pull mode. */
1781       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1782       break;
1783     case GST_EVENT_FLUSH_STOP:
1784       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1785       break;
1786     case GST_EVENT_QOS:
1787     {
1788       gdouble proportion;
1789       GstClockTimeDiff diff;
1790       GstClockTime timestamp;
1791
1792       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1793       gst_base_src_update_qos (src, proportion, diff, timestamp);
1794       result = TRUE;
1795       break;
1796     }
1797     default:
1798       result = FALSE;
1799       break;
1800   }
1801   return result;
1802
1803   /* ERRORS */
1804 not_seekable:
1805   {
1806     GST_DEBUG_OBJECT (src, "is not seekable");
1807     return FALSE;
1808   }
1809 }
1810
1811 static gboolean
1812 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1813 {
1814   GstBaseSrc *src;
1815   GstBaseSrcClass *bclass;
1816   gboolean result = FALSE;
1817
1818   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1819   if (G_UNLIKELY (src == NULL)) {
1820     gst_event_unref (event);
1821     return FALSE;
1822   }
1823
1824   bclass = GST_BASE_SRC_GET_CLASS (src);
1825
1826   if (bclass->event) {
1827     if (!(result = bclass->event (src, event)))
1828       goto subclass_failed;
1829   }
1830
1831 done:
1832   gst_event_unref (event);
1833   gst_object_unref (src);
1834
1835   return result;
1836
1837   /* ERRORS */
1838 subclass_failed:
1839   {
1840     GST_DEBUG_OBJECT (src, "subclass refused event");
1841     goto done;
1842   }
1843 }
1844
1845 static void
1846 gst_base_src_set_property (GObject * object, guint prop_id,
1847     const GValue * value, GParamSpec * pspec)
1848 {
1849   GstBaseSrc *src;
1850
1851   src = GST_BASE_SRC (object);
1852
1853   switch (prop_id) {
1854     case PROP_BLOCKSIZE:
1855       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1856       break;
1857     case PROP_NUM_BUFFERS:
1858       src->num_buffers = g_value_get_int (value);
1859       break;
1860     case PROP_TYPEFIND:
1861       src->typefind = g_value_get_boolean (value);
1862       break;
1863     case PROP_DO_TIMESTAMP:
1864       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1865       break;
1866     default:
1867       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1868       break;
1869   }
1870 }
1871
1872 static void
1873 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1874     GParamSpec * pspec)
1875 {
1876   GstBaseSrc *src;
1877
1878   src = GST_BASE_SRC (object);
1879
1880   switch (prop_id) {
1881     case PROP_BLOCKSIZE:
1882       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1883       break;
1884     case PROP_NUM_BUFFERS:
1885       g_value_set_int (value, src->num_buffers);
1886       break;
1887     case PROP_TYPEFIND:
1888       g_value_set_boolean (value, src->typefind);
1889       break;
1890     case PROP_DO_TIMESTAMP:
1891       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1892       break;
1893     default:
1894       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1895       break;
1896   }
1897 }
1898
1899 /* with STREAM_LOCK and LOCK */
1900 static GstClockReturn
1901 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1902 {
1903   GstClockReturn ret;
1904   GstClockID id;
1905
1906   id = gst_clock_new_single_shot_id (clock, time);
1907
1908   basesrc->clock_id = id;
1909   /* release the live lock while waiting */
1910   GST_LIVE_UNLOCK (basesrc);
1911
1912   ret = gst_clock_id_wait (id, NULL);
1913
1914   GST_LIVE_LOCK (basesrc);
1915   gst_clock_id_unref (id);
1916   basesrc->clock_id = NULL;
1917
1918   return ret;
1919 }
1920
1921 /* perform synchronisation on a buffer.
1922  * with STREAM_LOCK.
1923  */
1924 static GstClockReturn
1925 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1926 {
1927   GstClockReturn result;
1928   GstClockTime start, end;
1929   GstBaseSrcClass *bclass;
1930   GstClockTime base_time;
1931   GstClock *clock;
1932   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1933   gboolean do_timestamp, first, pseudo_live;
1934
1935   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1936
1937   start = end = -1;
1938   if (bclass->get_times)
1939     bclass->get_times (basesrc, buffer, &start, &end);
1940
1941   /* get buffer timestamp */
1942   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1943
1944   /* grab the lock to prepare for clocking and calculate the startup
1945    * latency. */
1946   GST_OBJECT_LOCK (basesrc);
1947
1948   /* if we are asked to sync against the clock we are a pseudo live element */
1949   pseudo_live = (start != -1 && basesrc->is_live);
1950   /* check for the first buffer */
1951   first = (basesrc->priv->latency == -1);
1952
1953   if (timestamp != -1 && pseudo_live) {
1954     GstClockTime latency;
1955
1956     /* we have a timestamp and a sync time, latency is the diff */
1957     if (timestamp <= start)
1958       latency = start - timestamp;
1959     else
1960       latency = 0;
1961
1962     if (first) {
1963       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1964           GST_TIME_ARGS (latency));
1965       /* first time we calculate latency, just configure */
1966       basesrc->priv->latency = latency;
1967     } else {
1968       if (basesrc->priv->latency != latency) {
1969         /* we have a new latency, FIXME post latency message */
1970         basesrc->priv->latency = latency;
1971         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1972             GST_TIME_ARGS (latency));
1973       }
1974     }
1975   } else if (first) {
1976     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1977         basesrc->is_live, start != -1);
1978     basesrc->priv->latency = 0;
1979   }
1980
1981   /* get clock, if no clock, we can't sync or do timestamps */
1982   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1983     goto no_clock;
1984
1985   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1986
1987   do_timestamp = basesrc->priv->do_timestamp;
1988
1989   /* first buffer, calculate the timestamp offset */
1990   if (first) {
1991     GstClockTime running_time;
1992
1993     now = gst_clock_get_time (clock);
1994     running_time = now - base_time;
1995
1996     GST_LOG_OBJECT (basesrc,
1997         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1998         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1999         GST_TIME_ARGS (running_time));
2000
2001     if (pseudo_live && timestamp != -1) {
2002       /* live source and we need to sync, add startup latency to all timestamps
2003        * to get the real running_time. Live sources should always timestamp
2004        * according to the current running time. */
2005       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2006
2007       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2008           GST_TIME_ARGS (basesrc->priv->ts_offset));
2009     } else {
2010       basesrc->priv->ts_offset = 0;
2011       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2012     }
2013
2014     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
2015       if (do_timestamp)
2016         timestamp = running_time;
2017       else
2018         timestamp = 0;
2019
2020       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
2021
2022       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2023           GST_TIME_ARGS (timestamp));
2024     }
2025
2026     /* add the timestamp offset we need for sync */
2027     timestamp += basesrc->priv->ts_offset;
2028   } else {
2029     /* not the first buffer, the timestamp is the diff between the clock and
2030      * base_time */
2031     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
2032       now = gst_clock_get_time (clock);
2033
2034       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
2035
2036       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2037           GST_TIME_ARGS (now - base_time));
2038     }
2039   }
2040
2041   /* if we don't have a buffer timestamp, we don't sync */
2042   if (!GST_CLOCK_TIME_IS_VALID (start))
2043     goto no_sync;
2044
2045   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
2046     /* for pseudo live sources, add our ts_offset to the timestamp */
2047     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
2048     start += basesrc->priv->ts_offset;
2049   }
2050
2051   GST_LOG_OBJECT (basesrc,
2052       "waiting for clock, base time %" GST_TIME_FORMAT
2053       ", stream_start %" GST_TIME_FORMAT,
2054       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2055   GST_OBJECT_UNLOCK (basesrc);
2056
2057   result = gst_base_src_wait (basesrc, clock, start + base_time);
2058
2059   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2060
2061   return result;
2062
2063   /* special cases */
2064 no_clock:
2065   {
2066     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2067     GST_OBJECT_UNLOCK (basesrc);
2068     return GST_CLOCK_OK;
2069   }
2070 no_sync:
2071   {
2072     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2073     GST_OBJECT_UNLOCK (basesrc);
2074     return GST_CLOCK_OK;
2075   }
2076 }
2077
2078 /* Called with STREAM_LOCK and LIVE_LOCK */
2079 static gboolean
2080 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2081 {
2082   guint64 size, maxsize;
2083   GstBaseSrcClass *bclass;
2084   GstFormat format;
2085   gint64 stop;
2086   gboolean dynamic;
2087
2088   bclass = GST_BASE_SRC_GET_CLASS (src);
2089
2090   format = src->segment.format;
2091   stop = src->segment.stop;
2092   /* get total file size */
2093   size = src->segment.duration;
2094
2095   /* only operate if we are working with bytes */
2096   if (format != GST_FORMAT_BYTES)
2097     return TRUE;
2098
2099   /* the max amount of bytes to read is the total size or
2100    * up to the segment.stop if present. */
2101   if (stop != -1)
2102     maxsize = MIN (size, stop);
2103   else
2104     maxsize = size;
2105
2106   GST_DEBUG_OBJECT (src,
2107       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2108       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2109       *length, size, stop, maxsize);
2110
2111   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2112   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2113
2114   /* check size if we have one */
2115   if (maxsize != -1) {
2116     /* if we run past the end, check if the file became bigger and
2117      * retry. */
2118     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2119       /* see if length of the file changed */
2120       if (bclass->get_size)
2121         if (!bclass->get_size (src, &size))
2122           size = -1;
2123
2124       /* make sure we don't exceed the configured segment stop
2125        * if it was set */
2126       if (stop != -1)
2127         maxsize = MIN (size, stop);
2128       else
2129         maxsize = size;
2130
2131       /* if we are at or past the end, EOS */
2132       if (G_UNLIKELY (offset >= maxsize))
2133         goto unexpected_length;
2134
2135       /* else we can clip to the end */
2136       if (G_UNLIKELY (offset + *length >= maxsize))
2137         *length = maxsize - offset;
2138
2139     }
2140   }
2141
2142   /* keep track of current position and update duration.
2143    * segment is in bytes, we checked that above. */
2144   GST_OBJECT_LOCK (src);
2145   src->segment.duration = size;
2146   src->segment.position = offset;
2147   GST_OBJECT_UNLOCK (src);
2148
2149   return TRUE;
2150
2151   /* ERRORS */
2152 unexpected_length:
2153   {
2154     return FALSE;
2155   }
2156 }
2157
2158 /* must be called with LIVE_LOCK */
2159 static GstFlowReturn
2160 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2161     GstBuffer ** buf)
2162 {
2163   GstFlowReturn ret;
2164   GstBaseSrcClass *bclass;
2165   GstClockReturn status;
2166
2167   bclass = GST_BASE_SRC_GET_CLASS (src);
2168
2169 again:
2170   if (src->is_live) {
2171     if (G_UNLIKELY (!src->live_running)) {
2172       ret = gst_base_src_wait_playing (src);
2173       if (ret != GST_FLOW_OK)
2174         goto stopped;
2175     }
2176   }
2177
2178   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2179     goto not_started;
2180
2181   if (G_UNLIKELY (!bclass->create))
2182     goto no_function;
2183
2184   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2185     goto unexpected_length;
2186
2187   /* normally we don't count buffers */
2188   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2189     if (src->num_buffers_left == 0)
2190       goto reached_num_buffers;
2191     else
2192       src->num_buffers_left--;
2193   }
2194
2195   /* don't enter the create function if a pending EOS event was set. For the
2196    * logic of the pending_eos, check the event function of this class. */
2197   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2198     goto eos;
2199
2200   GST_DEBUG_OBJECT (src,
2201       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2202       G_GINT64_FORMAT, offset, length, src->segment.time);
2203
2204   ret = bclass->create (src, offset, length, buf);
2205
2206   /* The create function could be unlocked because we have a pending EOS. It's
2207    * possible that we have a valid buffer from create that we need to
2208    * discard when the create function returned _OK. */
2209   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2210     if (ret == GST_FLOW_OK) {
2211       gst_buffer_unref (*buf);
2212       *buf = NULL;
2213     }
2214     goto eos;
2215   }
2216
2217   if (G_UNLIKELY (ret != GST_FLOW_OK))
2218     goto not_ok;
2219
2220   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2221   if (offset == 0 && src->segment.time == 0
2222       && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) {
2223     *buf = gst_buffer_make_writable (*buf);
2224     GST_BUFFER_TIMESTAMP (*buf) = 0;
2225   }
2226
2227   /* now sync before pushing the buffer */
2228   status = gst_base_src_do_sync (src, *buf);
2229
2230   /* waiting for the clock could have made us flushing */
2231   if (G_UNLIKELY (src->priv->flushing))
2232     goto flushing;
2233
2234   switch (status) {
2235     case GST_CLOCK_EARLY:
2236       /* the buffer is too late. We currently don't drop the buffer. */
2237       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2238       break;
2239     case GST_CLOCK_OK:
2240       /* buffer synchronised properly */
2241       GST_DEBUG_OBJECT (src, "buffer ok");
2242       break;
2243     case GST_CLOCK_UNSCHEDULED:
2244       /* this case is triggered when we were waiting for the clock and
2245        * it got unlocked because we did a state change. In any case, get rid of
2246        * the buffer. */
2247       gst_buffer_unref (*buf);
2248       *buf = NULL;
2249       if (!src->live_running) {
2250         /* We return WRONG_STATE when we are not running to stop the dataflow also
2251          * get rid of the produced buffer. */
2252         GST_DEBUG_OBJECT (src,
2253             "clock was unscheduled (%d), returning WRONG_STATE", status);
2254         ret = GST_FLOW_WRONG_STATE;
2255       } else {
2256         /* If we are running when this happens, we quickly switched between
2257          * pause and playing. We try to produce a new buffer */
2258         GST_DEBUG_OBJECT (src,
2259             "clock was unscheduled (%d), but we are running", status);
2260         goto again;
2261       }
2262       break;
2263     default:
2264       /* all other result values are unexpected and errors */
2265       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2266           (_("Internal clock error.")),
2267           ("clock returned unexpected return value %d", status));
2268       gst_buffer_unref (*buf);
2269       *buf = NULL;
2270       ret = GST_FLOW_ERROR;
2271       break;
2272   }
2273   return ret;
2274
2275   /* ERROR */
2276 stopped:
2277   {
2278     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2279         gst_flow_get_name (ret));
2280     return ret;
2281   }
2282 not_ok:
2283   {
2284     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2285         gst_flow_get_name (ret));
2286     return ret;
2287   }
2288 not_started:
2289   {
2290     GST_DEBUG_OBJECT (src, "getrange but not started");
2291     return GST_FLOW_WRONG_STATE;
2292   }
2293 no_function:
2294   {
2295     GST_DEBUG_OBJECT (src, "no create function");
2296     return GST_FLOW_NOT_SUPPORTED;
2297   }
2298 unexpected_length:
2299   {
2300     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2301         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2302     return GST_FLOW_UNEXPECTED;
2303   }
2304 reached_num_buffers:
2305   {
2306     GST_DEBUG_OBJECT (src, "sent all buffers");
2307     return GST_FLOW_UNEXPECTED;
2308   }
2309 flushing:
2310   {
2311     GST_DEBUG_OBJECT (src, "we are flushing");
2312     gst_buffer_unref (*buf);
2313     *buf = NULL;
2314     return GST_FLOW_WRONG_STATE;
2315   }
2316 eos:
2317   {
2318     GST_DEBUG_OBJECT (src, "we are EOS");
2319     return GST_FLOW_UNEXPECTED;
2320   }
2321 }
2322
2323 static GstFlowReturn
2324 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2325     GstBuffer ** buf)
2326 {
2327   GstBaseSrc *src;
2328   GstFlowReturn res;
2329
2330   src = GST_BASE_SRC_CAST (gst_object_ref (GST_OBJECT_PARENT (pad)));
2331
2332   GST_LIVE_LOCK (src);
2333   if (G_UNLIKELY (src->priv->flushing))
2334     goto flushing;
2335
2336   res = gst_base_src_get_range (src, offset, length, buf);
2337
2338 done:
2339   GST_LIVE_UNLOCK (src);
2340
2341   gst_object_unref (src);
2342
2343   return res;
2344
2345   /* ERRORS */
2346 flushing:
2347   {
2348     GST_DEBUG_OBJECT (src, "we are flushing");
2349     res = GST_FLOW_WRONG_STATE;
2350     goto done;
2351   }
2352 }
2353
2354 static gboolean
2355 gst_base_src_is_random_access (GstBaseSrc * src)
2356 {
2357   /* we need to start the basesrc to check random access */
2358   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2359     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2360     if (G_LIKELY (gst_base_src_start (src)))
2361       gst_base_src_stop (src);
2362   }
2363
2364   return src->random_access;
2365 }
2366
2367 static void
2368 gst_base_src_loop (GstPad * pad)
2369 {
2370   GstBaseSrc *src;
2371   GstBuffer *buf = NULL;
2372   GstFlowReturn ret;
2373   gint64 position;
2374   gboolean eos;
2375   guint blocksize;
2376   GList *pending_events = NULL, *tmp;
2377
2378   eos = FALSE;
2379
2380   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2381
2382   /* check if we need to renegotiate */
2383   if (gst_pad_check_reconfigure (pad)) {
2384     if (!gst_base_src_negotiate (src))
2385       GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2386   }
2387
2388   GST_LIVE_LOCK (src);
2389
2390   if (G_UNLIKELY (src->priv->flushing))
2391     goto flushing;
2392
2393   blocksize = src->blocksize;
2394
2395   /* if we operate in bytes, we can calculate an offset */
2396   if (src->segment.format == GST_FORMAT_BYTES) {
2397     position = src->segment.position;
2398     /* for negative rates, start with subtracting the blocksize */
2399     if (src->segment.rate < 0.0) {
2400       /* we cannot go below segment.start */
2401       if (position > src->segment.start + blocksize)
2402         position -= blocksize;
2403       else {
2404         /* last block, remainder up to segment.start */
2405         blocksize = position - src->segment.start;
2406         position = src->segment.start;
2407       }
2408     }
2409   } else
2410     position = -1;
2411
2412   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2413       GST_TIME_ARGS (position), blocksize);
2414
2415   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2416   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2417     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2418         gst_flow_get_name (ret));
2419     GST_LIVE_UNLOCK (src);
2420     goto pause;
2421   }
2422   /* this should not happen */
2423   if (G_UNLIKELY (buf == NULL))
2424     goto null_buffer;
2425
2426   /* push events to close/start our segment before we push the buffer. */
2427   if (G_UNLIKELY (src->priv->segment_pending)) {
2428     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2429     src->priv->segment_pending = FALSE;
2430   }
2431
2432   if (g_atomic_int_get (&src->priv->have_events)) {
2433     GST_OBJECT_LOCK (src);
2434     /* take the events */
2435     pending_events = src->priv->pending_events;
2436     src->priv->pending_events = NULL;
2437     g_atomic_int_set (&src->priv->have_events, FALSE);
2438     GST_OBJECT_UNLOCK (src);
2439   }
2440
2441   /* Push out pending events if any */
2442   if (G_UNLIKELY (pending_events != NULL)) {
2443     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2444       GstEvent *ev = (GstEvent *) tmp->data;
2445       gst_pad_push_event (pad, ev);
2446     }
2447     g_list_free (pending_events);
2448   }
2449
2450   /* figure out the new position */
2451   switch (src->segment.format) {
2452     case GST_FORMAT_BYTES:
2453     {
2454       guint bufsize = gst_buffer_get_size (buf);
2455
2456       /* we subtracted above for negative rates */
2457       if (src->segment.rate >= 0.0)
2458         position += bufsize;
2459       break;
2460     }
2461     case GST_FORMAT_TIME:
2462     {
2463       GstClockTime start, duration;
2464
2465       start = GST_BUFFER_TIMESTAMP (buf);
2466       duration = GST_BUFFER_DURATION (buf);
2467
2468       if (GST_CLOCK_TIME_IS_VALID (start))
2469         position = start;
2470       else
2471         position = src->segment.position;
2472
2473       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2474         if (src->segment.rate >= 0.0)
2475           position += duration;
2476         else if (position > duration)
2477           position -= duration;
2478         else
2479           position = 0;
2480       }
2481       break;
2482     }
2483     case GST_FORMAT_DEFAULT:
2484       if (src->segment.rate >= 0.0)
2485         position = GST_BUFFER_OFFSET_END (buf);
2486       else
2487         position = GST_BUFFER_OFFSET (buf);
2488       break;
2489     default:
2490       position = -1;
2491       break;
2492   }
2493   if (position != -1) {
2494     if (src->segment.rate >= 0.0) {
2495       /* positive rate, check if we reached the stop */
2496       if (src->segment.stop != -1) {
2497         if (position >= src->segment.stop) {
2498           eos = TRUE;
2499           position = src->segment.stop;
2500         }
2501       }
2502     } else {
2503       /* negative rate, check if we reached the start. start is always set to
2504        * something different from -1 */
2505       if (position <= src->segment.start) {
2506         eos = TRUE;
2507         position = src->segment.start;
2508       }
2509       /* when going reverse, all buffers are DISCONT */
2510       src->priv->discont = TRUE;
2511     }
2512     GST_OBJECT_LOCK (src);
2513     src->segment.position = position;
2514     GST_OBJECT_UNLOCK (src);
2515   }
2516
2517   if (G_UNLIKELY (src->priv->discont)) {
2518     buf = gst_buffer_make_writable (buf);
2519     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2520     src->priv->discont = FALSE;
2521   }
2522   GST_LIVE_UNLOCK (src);
2523
2524   ret = gst_pad_push (pad, buf);
2525   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2526     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2527         gst_flow_get_name (ret));
2528     goto pause;
2529   }
2530
2531   if (G_UNLIKELY (eos)) {
2532     GST_INFO_OBJECT (src, "pausing after end of segment");
2533     ret = GST_FLOW_UNEXPECTED;
2534     goto pause;
2535   }
2536
2537 done:
2538   return;
2539
2540   /* special cases */
2541 flushing:
2542   {
2543     GST_DEBUG_OBJECT (src, "we are flushing");
2544     GST_LIVE_UNLOCK (src);
2545     ret = GST_FLOW_WRONG_STATE;
2546     goto pause;
2547   }
2548 pause:
2549   {
2550     const gchar *reason = gst_flow_get_name (ret);
2551     GstEvent *event;
2552
2553     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2554     src->running = FALSE;
2555     gst_pad_pause_task (pad);
2556     if (ret == GST_FLOW_UNEXPECTED) {
2557       gboolean flag_segment;
2558       GstFormat format;
2559       gint64 position;
2560
2561       /* perform EOS logic */
2562       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2563       format = src->segment.format;
2564       position = src->segment.position;
2565
2566       if (flag_segment) {
2567         GstMessage *message;
2568
2569         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2570             format, position);
2571         gst_message_set_seqnum (message, src->priv->seqnum);
2572         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2573       } else {
2574         event = gst_event_new_eos ();
2575         gst_event_set_seqnum (event, src->priv->seqnum);
2576         gst_pad_push_event (pad, event);
2577       }
2578     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_UNEXPECTED) {
2579       event = gst_event_new_eos ();
2580       gst_event_set_seqnum (event, src->priv->seqnum);
2581       /* for fatal errors we post an error message, post the error
2582        * first so the app knows about the error first.
2583        * Also don't do this for WRONG_STATE because it happens
2584        * due to flushing and posting an error message because of
2585        * that is the wrong thing to do, e.g. when we're doing
2586        * a flushing seek. */
2587       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2588           (_("Internal data flow error.")),
2589           ("streaming task paused, reason %s (%d)", reason, ret));
2590       gst_pad_push_event (pad, event);
2591     }
2592     goto done;
2593   }
2594 null_buffer:
2595   {
2596     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2597         (_("Internal data flow error.")), ("element returned NULL buffer"));
2598     GST_LIVE_UNLOCK (src);
2599     goto done;
2600   }
2601 }
2602
2603 static gboolean
2604 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2605     const GstAllocator * allocator, guint prefix, guint alignment)
2606 {
2607   GstBufferPool *oldpool;
2608   GstBaseSrcPrivate *priv = basesrc->priv;
2609
2610   if (pool) {
2611     if (!gst_buffer_pool_set_active (pool, TRUE))
2612       goto activate_failed;
2613   }
2614
2615   GST_OBJECT_LOCK (basesrc);
2616   oldpool = priv->pool;
2617   priv->pool = pool;
2618
2619   priv->allocator = allocator;
2620
2621   priv->prefix = prefix;
2622   priv->alignment = alignment;
2623   GST_OBJECT_UNLOCK (basesrc);
2624
2625   if (oldpool) {
2626     gst_buffer_pool_set_active (oldpool, FALSE);
2627     gst_object_unref (oldpool);
2628   }
2629   return TRUE;
2630
2631   /* ERRORS */
2632 activate_failed:
2633   {
2634     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2635     return FALSE;
2636   }
2637 }
2638
2639 static gboolean
2640 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2641 {
2642   GstBaseSrcPrivate *priv = basesrc->priv;
2643   GstBufferPool *pool;
2644   gboolean res = TRUE;
2645
2646   GST_OBJECT_LOCK (basesrc);
2647   if ((pool = priv->pool))
2648     pool = gst_object_ref (pool);
2649   GST_OBJECT_UNLOCK (basesrc);
2650
2651   if (pool) {
2652     res = gst_buffer_pool_set_active (pool, active);
2653     gst_object_unref (pool);
2654   }
2655   return res;
2656 }
2657
2658 static gboolean
2659 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2660 {
2661   GstBaseSrcClass *bclass;
2662   gboolean result = TRUE;
2663   GstQuery *query;
2664   GstBufferPool *pool = NULL;
2665   const GstAllocator *allocator = NULL;
2666   guint size, min, max, prefix, alignment;
2667
2668   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2669
2670   /* make query and let peer pad answer, we don't really care if it worked or
2671    * not, if it failed, the allocation query would contain defaults and the
2672    * subclass would then set better values if needed */
2673   query = gst_query_new_allocation (caps, TRUE);
2674   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2675     /* not a problem, just debug a little */
2676     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2677   }
2678
2679   if (G_LIKELY (bclass->setup_allocation))
2680     result = bclass->setup_allocation (basesrc, query);
2681
2682   GST_DEBUG_OBJECT (basesrc, "ALLOCATION params: %" GST_PTR_FORMAT, query);
2683   gst_query_parse_allocation_params (query, &size, &min, &max, &prefix,
2684       &alignment, &pool);
2685
2686   if (size == 0) {
2687     const gchar *mem = NULL;
2688
2689     /* no size, we have variable size buffers */
2690     if (gst_query_get_n_allocation_memories (query) > 0) {
2691       mem = gst_query_parse_nth_allocation_memory (query, 0);
2692     }
2693     allocator = gst_allocator_find (mem);
2694   } else if (pool == NULL) {
2695     /* fixed size, we can use a bufferpool */
2696     GstStructure *config;
2697
2698     /* we did not get a pool, make one ourselves then */
2699     pool = gst_buffer_pool_new ();
2700
2701     config = gst_buffer_pool_get_config (pool);
2702     gst_buffer_pool_config_set (config, caps, size, min, max, prefix,
2703         alignment);
2704     gst_buffer_pool_set_config (pool, config);
2705   }
2706
2707   gst_query_unref (query);
2708
2709   result =
2710       gst_base_src_set_allocation (basesrc, pool, allocator, prefix, alignment);
2711
2712   return result;
2713
2714 }
2715
2716 /* default negotiation code.
2717  *
2718  * Take intersection between src and sink pads, take first
2719  * caps and fixate.
2720  */
2721 static gboolean
2722 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2723 {
2724   GstCaps *thiscaps;
2725   GstCaps *caps = NULL;
2726   GstCaps *peercaps = NULL;
2727   gboolean result = FALSE;
2728
2729   /* first see what is possible on our source pad */
2730   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2731   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2732   /* nothing or anything is allowed, we're done */
2733   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2734     goto no_nego_needed;
2735
2736   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2737     goto no_caps;
2738
2739   /* get the peer caps */
2740   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2741   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2742   if (peercaps) {
2743     /* The result is already a subset of our caps */
2744     caps = peercaps;
2745     gst_caps_unref (thiscaps);
2746   } else {
2747     /* no peer, work with our own caps then */
2748     caps = thiscaps;
2749   }
2750   if (caps && !gst_caps_is_empty (caps)) {
2751     caps = gst_caps_make_writable (caps);
2752
2753     /* take first (and best, since they are sorted) possibility */
2754     gst_caps_truncate (caps);
2755
2756     /* now fixate */
2757     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2758     if (gst_caps_is_any (caps)) {
2759       /* hmm, still anything, so element can do anything and
2760        * nego is not needed */
2761       result = TRUE;
2762     } else {
2763       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2764       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2765       if (gst_caps_is_fixed (caps)) {
2766         /* yay, fixed caps, use those then, it's possible that the subclass does
2767          * not accept this caps after all and we have to fail. */
2768         result = gst_base_src_setcaps (basesrc, caps);
2769       }
2770     }
2771     gst_caps_unref (caps);
2772   } else {
2773     GST_DEBUG_OBJECT (basesrc, "no common caps");
2774   }
2775   return result;
2776
2777 no_nego_needed:
2778   {
2779     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2780     if (thiscaps)
2781       gst_caps_unref (thiscaps);
2782     return TRUE;
2783   }
2784 no_caps:
2785   {
2786     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2787         ("No supported formats found"),
2788         ("This element did not produce valid caps"));
2789     if (thiscaps)
2790       gst_caps_unref (thiscaps);
2791     return TRUE;
2792   }
2793 }
2794
2795 static gboolean
2796 gst_base_src_negotiate (GstBaseSrc * basesrc)
2797 {
2798   GstBaseSrcClass *bclass;
2799   gboolean result = TRUE;
2800
2801   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2802
2803   if (G_LIKELY (bclass->negotiate))
2804     result = bclass->negotiate (basesrc);
2805
2806   if (G_LIKELY (result)) {
2807     GstCaps *caps;
2808
2809     caps = gst_pad_get_current_caps (basesrc->srcpad);
2810
2811     result = gst_base_src_prepare_allocation (basesrc, caps);
2812   }
2813   return result;
2814 }
2815
2816 static gboolean
2817 gst_base_src_start (GstBaseSrc * basesrc)
2818 {
2819   GstBaseSrcClass *bclass;
2820   gboolean result;
2821   guint64 size;
2822   gboolean seekable;
2823   GstFormat format;
2824
2825   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2826     return TRUE;
2827
2828   GST_DEBUG_OBJECT (basesrc, "starting source");
2829
2830   basesrc->num_buffers_left = basesrc->num_buffers;
2831
2832   GST_OBJECT_LOCK (basesrc);
2833   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2834   GST_OBJECT_UNLOCK (basesrc);
2835
2836   basesrc->running = FALSE;
2837   basesrc->priv->segment_pending = FALSE;
2838
2839   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2840   if (bclass->start)
2841     result = bclass->start (basesrc);
2842   else
2843     result = TRUE;
2844
2845   if (!result)
2846     goto could_not_start;
2847
2848   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2849
2850   format = basesrc->segment.format;
2851
2852   /* figure out the size */
2853   if (format == GST_FORMAT_BYTES) {
2854     if (bclass->get_size) {
2855       if (!(result = bclass->get_size (basesrc, &size)))
2856         size = -1;
2857     } else {
2858       result = FALSE;
2859       size = -1;
2860     }
2861     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2862     /* only update the size when operating in bytes, subclass is supposed
2863      * to set duration in the start method for other formats */
2864     GST_OBJECT_LOCK (basesrc);
2865     basesrc->segment.duration = size;
2866     GST_OBJECT_UNLOCK (basesrc);
2867   } else {
2868     size = -1;
2869   }
2870
2871   GST_DEBUG_OBJECT (basesrc,
2872       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2873       G_GINT64_FORMAT, gst_format_get_name (format), result, size,
2874       basesrc->segment.duration);
2875
2876   seekable = gst_base_src_seekable (basesrc);
2877   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2878
2879   /* update for random access flag */
2880   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2881
2882   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2883
2884   /* run typefind if we are random_access and the typefinding is enabled. */
2885   if (basesrc->random_access && basesrc->typefind && size != -1) {
2886     GstCaps *caps;
2887
2888     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2889       goto typefind_failed;
2890
2891     result = gst_base_src_setcaps (basesrc, caps);
2892     gst_caps_unref (caps);
2893   } else {
2894     /* use class or default negotiate function */
2895     if (!(result = gst_base_src_negotiate (basesrc)))
2896       goto could_not_negotiate;
2897   }
2898
2899   return result;
2900
2901   /* ERROR */
2902 could_not_start:
2903   {
2904     GST_DEBUG_OBJECT (basesrc, "could not start");
2905     /* subclass is supposed to post a message. We don't have to call _stop. */
2906     return FALSE;
2907   }
2908 could_not_negotiate:
2909   {
2910     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2911     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2912         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2913     /* we must call stop */
2914     gst_base_src_stop (basesrc);
2915     return FALSE;
2916   }
2917 typefind_failed:
2918   {
2919     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2920     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2921     /* we must call stop */
2922     gst_base_src_stop (basesrc);
2923     return FALSE;
2924   }
2925 }
2926
2927 static gboolean
2928 gst_base_src_stop (GstBaseSrc * basesrc)
2929 {
2930   GstBaseSrcClass *bclass;
2931   gboolean result = TRUE;
2932
2933   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2934     return TRUE;
2935
2936   GST_DEBUG_OBJECT (basesrc, "stopping source");
2937
2938   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2939   if (bclass->stop)
2940     result = bclass->stop (basesrc);
2941
2942   gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0);
2943
2944   if (result)
2945     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2946
2947   return result;
2948 }
2949
2950 /* start or stop flushing dataprocessing
2951  */
2952 static gboolean
2953 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2954     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2955 {
2956   GstBaseSrcClass *bclass;
2957
2958   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2959
2960   if (flushing && unlock) {
2961     gst_base_src_activate_pool (basesrc, FALSE);
2962     /* unlock any subclasses, we need to do this before grabbing the
2963      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2964      * unlock to the params because of backwards compat (see seek handler)*/
2965     if (bclass->unlock)
2966       bclass->unlock (basesrc);
2967   }
2968
2969   /* the live lock is released when we are blocked, waiting for playing or
2970    * when we sync to the clock. */
2971   GST_LIVE_LOCK (basesrc);
2972   if (playing)
2973     *playing = basesrc->live_running;
2974   basesrc->priv->flushing = flushing;
2975   if (flushing) {
2976     /* if we are locked in the live lock, signal it to make it flush */
2977     basesrc->live_running = TRUE;
2978
2979     /* clear pending EOS if any */
2980     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2981
2982     /* step 1, now that we have the LIVE lock, clear our unlock request */
2983     if (bclass->unlock_stop)
2984       bclass->unlock_stop (basesrc);
2985
2986     /* step 2, unblock clock sync (if any) or any other blocking thing */
2987     if (basesrc->clock_id)
2988       gst_clock_id_unschedule (basesrc->clock_id);
2989   } else {
2990     /* signal the live source that it can start playing */
2991     basesrc->live_running = live_play;
2992
2993     gst_base_src_activate_pool (basesrc, TRUE);
2994
2995     /* When unlocking drop all delayed events */
2996     if (unlock) {
2997       GST_OBJECT_LOCK (basesrc);
2998       if (basesrc->priv->pending_events) {
2999         g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3000             NULL);
3001         g_list_free (basesrc->priv->pending_events);
3002         basesrc->priv->pending_events = NULL;
3003         g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3004       }
3005       GST_OBJECT_UNLOCK (basesrc);
3006     }
3007   }
3008   GST_LIVE_SIGNAL (basesrc);
3009   GST_LIVE_UNLOCK (basesrc);
3010
3011   return TRUE;
3012 }
3013
3014 /* the purpose of this function is to make sure that a live source blocks in the
3015  * LIVE lock or leaves the LIVE lock and continues playing. */
3016 static gboolean
3017 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3018 {
3019   GstBaseSrcClass *bclass;
3020
3021   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3022
3023   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3024   if (!live_play) {
3025     GST_DEBUG_OBJECT (basesrc, "unlock");
3026     gst_base_src_activate_pool (basesrc, FALSE);
3027     if (bclass->unlock)
3028       bclass->unlock (basesrc);
3029   }
3030
3031   /* we are now able to grab the LIVE lock, when we get it, we can be
3032    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3033    * for the clock. */
3034   GST_LIVE_LOCK (basesrc);
3035   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3036
3037   /* unblock clock sync (if any) */
3038   if (basesrc->clock_id)
3039     gst_clock_id_unschedule (basesrc->clock_id);
3040
3041   /* configure what to do when we get to the LIVE lock. */
3042   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3043   basesrc->live_running = live_play;
3044
3045   if (live_play) {
3046     gboolean start;
3047
3048     /* clear our unlock request when going to PLAYING */
3049     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3050     gst_base_src_activate_pool (basesrc, TRUE);
3051     if (bclass->unlock_stop)
3052       bclass->unlock_stop (basesrc);
3053
3054     /* for live sources we restart the timestamp correction */
3055     basesrc->priv->latency = -1;
3056     /* have to restart the task in case it stopped because of the unlock when
3057      * we went to PAUSED. Only do this if we operating in push mode. */
3058     GST_OBJECT_LOCK (basesrc->srcpad);
3059     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
3060     GST_OBJECT_UNLOCK (basesrc->srcpad);
3061     if (start)
3062       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3063           basesrc->srcpad);
3064     GST_DEBUG_OBJECT (basesrc, "signal");
3065     GST_LIVE_SIGNAL (basesrc);
3066   }
3067   GST_LIVE_UNLOCK (basesrc);
3068
3069   return TRUE;
3070 }
3071
3072 static gboolean
3073 gst_base_src_activate_push (GstPad * pad, gboolean active)
3074 {
3075   GstBaseSrc *basesrc;
3076   GstEvent *event;
3077
3078   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
3079
3080   /* prepare subclass first */
3081   if (active) {
3082     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3083
3084     if (G_UNLIKELY (!basesrc->can_activate_push))
3085       goto no_push_activation;
3086
3087     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3088       goto error_start;
3089
3090     basesrc->priv->discont = TRUE;
3091     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
3092
3093     /* do initial seek, which will start the task */
3094     GST_OBJECT_LOCK (basesrc);
3095     event = basesrc->pending_seek;
3096     basesrc->pending_seek = NULL;
3097     GST_OBJECT_UNLOCK (basesrc);
3098
3099     /* no need to unlock anything, the task is certainly
3100      * not running here. The perform seek code will start the task when
3101      * finished. */
3102     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3103       goto seek_failed;
3104
3105     if (event)
3106       gst_event_unref (event);
3107   } else {
3108     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3109     /* flush all */
3110     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3111     /* stop the task */
3112     gst_pad_stop_task (pad);
3113     /* now we can stop the source */
3114     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3115       goto error_stop;
3116   }
3117   return TRUE;
3118
3119   /* ERRORS */
3120 no_push_activation:
3121   {
3122     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3123     return FALSE;
3124   }
3125 error_start:
3126   {
3127     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3128     return FALSE;
3129   }
3130 seek_failed:
3131   {
3132     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3133     /* flush all */
3134     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3135     /* stop the task */
3136     gst_pad_stop_task (pad);
3137     /* Stop the basesrc */
3138     gst_base_src_stop (basesrc);
3139     if (event)
3140       gst_event_unref (event);
3141     return FALSE;
3142   }
3143 error_stop:
3144   {
3145     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3146     return FALSE;
3147   }
3148 }
3149
3150 static gboolean
3151 gst_base_src_activate_pull (GstPad * pad, gboolean active)
3152 {
3153   GstBaseSrc *basesrc;
3154
3155   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
3156
3157   /* prepare subclass first */
3158   if (active) {
3159     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3160     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3161       goto error_start;
3162
3163     /* if not random_access, we cannot operate in pull mode for now */
3164     if (G_UNLIKELY (!gst_base_src_is_random_access (basesrc)))
3165       goto no_get_range;
3166
3167     /* stop flushing now but for live sources, still block in the LIVE lock when
3168      * we are not yet PLAYING */
3169     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
3170   } else {
3171     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3172     /* flush all, there is no task to stop */
3173     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3174
3175     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3176       goto error_stop;
3177   }
3178   return TRUE;
3179
3180   /* ERRORS */
3181 error_start:
3182   {
3183     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3184     return FALSE;
3185   }
3186 no_get_range:
3187   {
3188     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3189     gst_base_src_stop (basesrc);
3190     return FALSE;
3191   }
3192 error_stop:
3193   {
3194     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3195     return FALSE;
3196   }
3197 }
3198
3199 static GstStateChangeReturn
3200 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3201 {
3202   GstBaseSrc *basesrc;
3203   GstStateChangeReturn result;
3204   gboolean no_preroll = FALSE;
3205
3206   basesrc = GST_BASE_SRC (element);
3207
3208   switch (transition) {
3209     case GST_STATE_CHANGE_NULL_TO_READY:
3210       break;
3211     case GST_STATE_CHANGE_READY_TO_PAUSED:
3212       no_preroll = gst_base_src_is_live (basesrc);
3213       break;
3214     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3215       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3216       if (gst_base_src_is_live (basesrc)) {
3217         /* now we can start playback */
3218         gst_base_src_set_playing (basesrc, TRUE);
3219       }
3220       break;
3221     default:
3222       break;
3223   }
3224
3225   if ((result =
3226           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3227               transition)) == GST_STATE_CHANGE_FAILURE)
3228     goto failure;
3229
3230   switch (transition) {
3231     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3232       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3233       if (gst_base_src_is_live (basesrc)) {
3234         /* make sure we block in the live lock in PAUSED */
3235         gst_base_src_set_playing (basesrc, FALSE);
3236         no_preroll = TRUE;
3237       }
3238       break;
3239     case GST_STATE_CHANGE_PAUSED_TO_READY:
3240     {
3241       GstEvent **event_p;
3242
3243       /* we don't need to unblock anything here, the pad deactivation code
3244        * already did this */
3245       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3246       event_p = &basesrc->pending_seek;
3247       gst_event_replace (event_p, NULL);
3248       break;
3249     }
3250     case GST_STATE_CHANGE_READY_TO_NULL:
3251       break;
3252     default:
3253       break;
3254   }
3255
3256   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3257     result = GST_STATE_CHANGE_NO_PREROLL;
3258
3259   return result;
3260
3261   /* ERRORS */
3262 failure:
3263   {
3264     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3265     return result;
3266   }
3267 }