basesrc: Make locking of the segment a bit more strict and update documentation
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrc:Class.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include "gstbasesrc.h"
169 #include "gsttypefindhelper.h"
170 #include <gst/gstmarshal.h>
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
183                                                                                 timeval)
184 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
185 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
186
187 /* BaseSrc signals and args */
188 enum
189 {
190   /* FILL ME */
191   LAST_SIGNAL
192 };
193
194 #define DEFAULT_BLOCKSIZE       4096
195 #define DEFAULT_NUM_BUFFERS     -1
196 #define DEFAULT_TYPEFIND        FALSE
197 #define DEFAULT_DO_TIMESTAMP    FALSE
198
199 enum
200 {
201   PROP_0,
202   PROP_BLOCKSIZE,
203   PROP_NUM_BUFFERS,
204   PROP_TYPEFIND,
205   PROP_DO_TIMESTAMP
206 };
207
208 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
209    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
210
211 struct _GstBaseSrcPrivate
212 {
213   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
214                                  * to avoid the sending of two EOS in some cases) */
215   gboolean discont;
216   gboolean flushing;
217
218   /* two segments to be sent in the streaming thread with STREAM_LOCK */
219   GstEvent *close_segment;
220   GstEvent *start_segment;
221
222   /* if EOS is pending (atomic) */
223   gint pending_eos;
224
225   /* startup latency is the time it takes between going to PLAYING and producing
226    * the first BUFFER with running_time 0. This value is included in the latency
227    * reporting. */
228   GstClockTime latency;
229   /* timestamp offset, this is the offset add to the values of gst_times for
230    * pseudo live sources */
231   GstClockTimeDiff ts_offset;
232
233   gboolean do_timestamp;
234
235   /* stream sequence number */
236   guint32 seqnum;
237
238   /* pending tags to be pushed in the data stream */
239   GList *pending_tags;
240 };
241
242 static GstElementClass *parent_class = NULL;
243
244 static void gst_base_src_base_init (gpointer g_class);
245 static void gst_base_src_class_init (GstBaseSrcClass * klass);
246 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
247 static void gst_base_src_finalize (GObject * object);
248
249
250 GType
251 gst_base_src_get_type (void)
252 {
253   static volatile gsize base_src_type = 0;
254
255   if (g_once_init_enter (&base_src_type)) {
256     GType _type;
257     static const GTypeInfo base_src_info = {
258       sizeof (GstBaseSrcClass),
259       (GBaseInitFunc) gst_base_src_base_init,
260       NULL,
261       (GClassInitFunc) gst_base_src_class_init,
262       NULL,
263       NULL,
264       sizeof (GstBaseSrc),
265       0,
266       (GInstanceInitFunc) gst_base_src_init,
267     };
268
269     _type = g_type_register_static (GST_TYPE_ELEMENT,
270         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
271     g_once_init_leave (&base_src_type, _type);
272   }
273   return base_src_type;
274 }
275
276 static GstCaps *gst_base_src_getcaps (GstPad * pad);
277 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
278 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
279
280 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
281 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
282 static void gst_base_src_set_property (GObject * object, guint prop_id,
283     const GValue * value, GParamSpec * pspec);
284 static void gst_base_src_get_property (GObject * object, guint prop_id,
285     GValue * value, GParamSpec * pspec);
286 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
287 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
288 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
289 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
290
291 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
292
293 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
294 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
295     GstSegment * segment);
296 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
297 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
298     GstEvent * event, GstSegment * segment);
299
300 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
301     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
302 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
303 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
304
305 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
306     GstStateChange transition);
307
308 static void gst_base_src_loop (GstPad * pad);
309 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
310 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
311 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
312     guint length, GstBuffer ** buf);
313 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
314     guint length, GstBuffer ** buf);
315 static gboolean gst_base_src_seekable (GstBaseSrc * src);
316
317 static void
318 gst_base_src_base_init (gpointer g_class)
319 {
320   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
321 }
322
323 static void
324 gst_base_src_class_init (GstBaseSrcClass * klass)
325 {
326   GObjectClass *gobject_class;
327   GstElementClass *gstelement_class;
328
329   gobject_class = G_OBJECT_CLASS (klass);
330   gstelement_class = GST_ELEMENT_CLASS (klass);
331
332   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
333
334   parent_class = g_type_class_peek_parent (klass);
335
336   gobject_class->finalize = gst_base_src_finalize;
337   gobject_class->set_property = gst_base_src_set_property;
338   gobject_class->get_property = gst_base_src_get_property;
339
340   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
341       g_param_spec_ulong ("blocksize", "Block size",
342           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
343           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
345       g_param_spec_int ("num-buffers", "num-buffers",
346           "Number of buffers to output before sending EOS (-1 = unlimited)",
347           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
350       g_param_spec_boolean ("typefind", "Typefind",
351           "Run typefind before negotiating", DEFAULT_TYPEFIND,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
354       g_param_spec_boolean ("do-timestamp", "Do timestamp",
355           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
356           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357
358   gstelement_class->change_state =
359       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
360   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
361   gstelement_class->get_query_types =
362       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
363
364   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
365   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
366   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
367   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
368   klass->check_get_range =
369       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
370   klass->prepare_seek_segment =
371       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
372
373   /* Registering debug symbols for function pointers */
374   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
375   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
376   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
377   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
378   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
379   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
380   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
381   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
383 }
384
385 static void
386 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
387 {
388   GstPad *pad;
389   GstPadTemplate *pad_template;
390
391   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
392
393   basesrc->is_live = FALSE;
394   basesrc->live_lock = g_mutex_new ();
395   basesrc->live_cond = g_cond_new ();
396   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
397   basesrc->num_buffers_left = -1;
398
399   basesrc->can_activate_push = TRUE;
400   basesrc->pad_mode = GST_ACTIVATE_NONE;
401
402   pad_template =
403       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
404   g_return_if_fail (pad_template != NULL);
405
406   GST_DEBUG_OBJECT (basesrc, "creating src pad");
407   pad = gst_pad_new_from_template (pad_template, "src");
408
409   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
410   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
411   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
412   gst_pad_set_event_function (pad, gst_base_src_event_handler);
413   gst_pad_set_query_function (pad, gst_base_src_query);
414   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
415   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
416   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
417   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
418   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
419
420   /* hold pointer to pad */
421   basesrc->srcpad = pad;
422   GST_DEBUG_OBJECT (basesrc, "adding src pad");
423   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
424
425   basesrc->blocksize = DEFAULT_BLOCKSIZE;
426   basesrc->clock_id = NULL;
427   /* we operate in BYTES by default */
428   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
429   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
430   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
431
432   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
433
434   GST_DEBUG_OBJECT (basesrc, "init done");
435 }
436
437 static void
438 gst_base_src_finalize (GObject * object)
439 {
440   GstBaseSrc *basesrc;
441   GstEvent **event_p;
442
443   basesrc = GST_BASE_SRC (object);
444
445   g_mutex_free (basesrc->live_lock);
446   g_cond_free (basesrc->live_cond);
447
448   event_p = &basesrc->data.ABI.pending_seek;
449   gst_event_replace (event_p, NULL);
450
451   if (basesrc->priv->pending_tags) {
452     g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
453     g_list_free (basesrc->priv->pending_tags);
454   }
455
456   G_OBJECT_CLASS (parent_class)->finalize (object);
457 }
458
459 /**
460  * gst_base_src_wait_playing:
461  * @src: the src
462  *
463  * If the #GstBaseSrcClass.create() method performs its own synchronisation
464  * against the clock it must unblock when going from PLAYING to the PAUSED state
465  * and call this method before continuing to produce the remaining data.
466  *
467  * This function will block until a state change to PLAYING happens (in which
468  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
469  * to a state change to READY or a FLUSH event (in which case this function
470  * returns #GST_FLOW_WRONG_STATE).
471  *
472  * Since: 0.10.12
473  *
474  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
475  * continue. Any other return value should be returned from the create vmethod.
476  */
477 GstFlowReturn
478 gst_base_src_wait_playing (GstBaseSrc * src)
479 {
480   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
481
482   /* block until the state changes, or we get a flush, or something */
483   GST_DEBUG_OBJECT (src, "live source waiting for running state");
484   GST_LIVE_WAIT (src);
485   if (src->priv->flushing)
486     goto flushing;
487   GST_DEBUG_OBJECT (src, "live source unlocked");
488
489   return GST_FLOW_OK;
490
491   /* ERRORS */
492 flushing:
493   {
494     GST_DEBUG_OBJECT (src, "we are flushing");
495     return GST_FLOW_WRONG_STATE;
496   }
497 }
498
499 /**
500  * gst_base_src_set_live:
501  * @src: base source instance
502  * @live: new live-mode
503  *
504  * If the element listens to a live source, @live should
505  * be set to %TRUE.
506  *
507  * A live source will not produce data in the PAUSED state and
508  * will therefore not be able to participate in the PREROLL phase
509  * of a pipeline. To signal this fact to the application and the
510  * pipeline, the state change return value of the live source will
511  * be GST_STATE_CHANGE_NO_PREROLL.
512  */
513 void
514 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
515 {
516   g_return_if_fail (GST_IS_BASE_SRC (src));
517
518   GST_OBJECT_LOCK (src);
519   src->is_live = live;
520   GST_OBJECT_UNLOCK (src);
521 }
522
523 /**
524  * gst_base_src_is_live:
525  * @src: base source instance
526  *
527  * Check if an element is in live mode.
528  *
529  * Returns: %TRUE if element is in live mode.
530  */
531 gboolean
532 gst_base_src_is_live (GstBaseSrc * src)
533 {
534   gboolean result;
535
536   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
537
538   GST_OBJECT_LOCK (src);
539   result = src->is_live;
540   GST_OBJECT_UNLOCK (src);
541
542   return result;
543 }
544
545 /**
546  * gst_base_src_set_format:
547  * @src: base source instance
548  * @format: the format to use
549  *
550  * Sets the default format of the source. This will be the format used
551  * for sending NEW_SEGMENT events and for performing seeks.
552  *
553  * If a format of GST_FORMAT_BYTES is set, the element will be able to
554  * operate in pull mode if the #GstBaseSrc.is_seekable() returns TRUE.
555  *
556  * This function must only be called in states < %GST_STATE_PAUSED.
557  *
558  * Since: 0.10.1
559  */
560 void
561 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
562 {
563   g_return_if_fail (GST_IS_BASE_SRC (src));
564   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
565
566   GST_OBJECT_LOCK (src);
567   gst_segment_init (&src->segment, format);
568   GST_OBJECT_UNLOCK (src);
569 }
570
571 /**
572  * gst_base_src_query_latency:
573  * @src: the source
574  * @live: if the source is live
575  * @min_latency: the min latency of the source
576  * @max_latency: the max latency of the source
577  *
578  * Query the source for the latency parameters. @live will be TRUE when @src is
579  * configured as a live source. @min_latency will be set to the difference
580  * between the running time and the timestamp of the first buffer.
581  * @max_latency is always the undefined value of -1.
582  *
583  * This function is mostly used by subclasses.
584  *
585  * Returns: TRUE if the query succeeded.
586  *
587  * Since: 0.10.13
588  */
589 gboolean
590 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
591     GstClockTime * min_latency, GstClockTime * max_latency)
592 {
593   GstClockTime min;
594
595   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
596
597   GST_OBJECT_LOCK (src);
598   if (live)
599     *live = src->is_live;
600
601   /* if we have a startup latency, report this one, else report 0. Subclasses
602    * are supposed to override the query function if they want something
603    * else. */
604   if (src->priv->latency != -1)
605     min = src->priv->latency;
606   else
607     min = 0;
608
609   if (min_latency)
610     *min_latency = min;
611   if (max_latency)
612     *max_latency = -1;
613
614   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
615       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
616       GST_TIME_ARGS (-1));
617   GST_OBJECT_UNLOCK (src);
618
619   return TRUE;
620 }
621
622 /**
623  * gst_base_src_set_blocksize:
624  * @src: the source
625  * @blocksize: the new blocksize in bytes
626  *
627  * Set the number of bytes that @src will push out with each buffer. When
628  * @blocksize is set to -1, a default length will be used.
629  *
630  * Since: 0.10.22
631  */
632 void
633 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
634 {
635   g_return_if_fail (GST_IS_BASE_SRC (src));
636
637   GST_OBJECT_LOCK (src);
638   src->blocksize = blocksize;
639   GST_OBJECT_UNLOCK (src);
640 }
641
642 /**
643  * gst_base_src_get_blocksize:
644  * @src: the source
645  *
646  * Get the number of bytes that @src will push out with each buffer.
647  *
648  * Returns: the number of bytes pushed with each buffer.
649  *
650  * Since: 0.10.22
651  */
652 gulong
653 gst_base_src_get_blocksize (GstBaseSrc * src)
654 {
655   gulong res;
656
657   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
658
659   GST_OBJECT_LOCK (src);
660   res = src->blocksize;
661   GST_OBJECT_UNLOCK (src);
662
663   return res;
664 }
665
666
667 /**
668  * gst_base_src_set_do_timestamp:
669  * @src: the source
670  * @timestamp: enable or disable timestamping
671  *
672  * Configure @src to automatically timestamp outgoing buffers based on the
673  * current running_time of the pipeline. This property is mostly useful for live
674  * sources.
675  *
676  * Since: 0.10.15
677  */
678 void
679 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
680 {
681   g_return_if_fail (GST_IS_BASE_SRC (src));
682
683   GST_OBJECT_LOCK (src);
684   src->priv->do_timestamp = timestamp;
685   GST_OBJECT_UNLOCK (src);
686 }
687
688 /**
689  * gst_base_src_get_do_timestamp:
690  * @src: the source
691  *
692  * Query if @src timestamps outgoing buffers based on the current running_time.
693  *
694  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
695  *
696  * Since: 0.10.15
697  */
698 gboolean
699 gst_base_src_get_do_timestamp (GstBaseSrc * src)
700 {
701   gboolean res;
702
703   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
704
705   GST_OBJECT_LOCK (src);
706   res = src->priv->do_timestamp;
707   GST_OBJECT_UNLOCK (src);
708
709   return res;
710 }
711
712 /**
713  * gst_base_src_new_seamless_segment:
714  * @src: The source
715  * @start: The new start value for the segment
716  * @stop: Stop value for the new segment
717  * @position: The position value for the new segent
718  *
719  * Prepare a new seamless segment for emission downstream. This function must
720  * only be called by derived sub-classes, and only from the create() function,
721  * as the stream-lock needs to be held.
722  *
723  * The format for the new segment will be the current format of the source, as
724  * configured with gst_base_src_set_format()
725  *
726  * Returns: %TRUE if preparation of the seamless segment succeeded.
727  *
728  * Since: 0.10.26
729  */
730 gboolean
731 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
732     gint64 position)
733 {
734   gboolean res = TRUE;
735
736   GST_DEBUG_OBJECT (src,
737       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
738       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
739       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
740
741   GST_OBJECT_LOCK (src);
742   if (src->data.ABI.running) {
743     if (src->priv->close_segment)
744       gst_event_unref (src->priv->close_segment);
745     src->priv->close_segment =
746         gst_event_new_new_segment_full (TRUE,
747         src->segment.rate, src->segment.applied_rate, src->segment.format,
748         src->segment.start, src->segment.last_stop, src->segment.time);
749   }
750
751   gst_segment_set_newsegment_full (&src->segment, FALSE, src->segment.rate,
752       src->segment.applied_rate, src->segment.format, start, stop, position);
753
754   if (src->priv->start_segment)
755     gst_event_unref (src->priv->start_segment);
756   if (src->segment.rate >= 0.0) {
757     /* forward, we send data from last_stop to stop */
758     src->priv->start_segment =
759         gst_event_new_new_segment_full (FALSE,
760         src->segment.rate, src->segment.applied_rate, src->segment.format,
761         src->segment.last_stop, stop, src->segment.time);
762   } else {
763     /* reverse, we send data from last_stop to start */
764     src->priv->start_segment =
765         gst_event_new_new_segment_full (FALSE,
766         src->segment.rate, src->segment.applied_rate, src->segment.format,
767         src->segment.start, src->segment.last_stop, src->segment.time);
768   }
769   GST_OBJECT_UNLOCK (src);
770
771   src->priv->discont = TRUE;
772   src->data.ABI.running = TRUE;
773
774   return res;
775 }
776
777 static gboolean
778 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
779 {
780   GstBaseSrcClass *bclass;
781   GstBaseSrc *bsrc;
782   gboolean res = TRUE;
783
784   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
785   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
786
787   if (bclass->set_caps)
788     res = bclass->set_caps (bsrc, caps);
789
790   return res;
791 }
792
793 static GstCaps *
794 gst_base_src_getcaps (GstPad * pad)
795 {
796   GstBaseSrcClass *bclass;
797   GstBaseSrc *bsrc;
798   GstCaps *caps = NULL;
799
800   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
801   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
802   if (bclass->get_caps)
803     caps = bclass->get_caps (bsrc);
804
805   if (caps == NULL) {
806     GstPadTemplate *pad_template;
807
808     pad_template =
809         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
810     if (pad_template != NULL) {
811       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
812     }
813   }
814   return caps;
815 }
816
817 static void
818 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
819 {
820   GstBaseSrcClass *bclass;
821   GstBaseSrc *bsrc;
822
823   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
824   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
825
826   if (bclass->fixate)
827     bclass->fixate (bsrc, caps);
828
829   gst_object_unref (bsrc);
830 }
831
832 static gboolean
833 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
834 {
835   gboolean res;
836
837   switch (GST_QUERY_TYPE (query)) {
838     case GST_QUERY_POSITION:
839     {
840       GstFormat format;
841
842       gst_query_parse_position (query, &format, NULL);
843       switch (format) {
844         case GST_FORMAT_PERCENT:
845         {
846           gint64 percent;
847           gint64 position;
848           gint64 duration;
849
850           GST_OBJECT_LOCK (src);
851           position = src->segment.last_stop;
852           duration = src->segment.duration;
853           GST_OBJECT_UNLOCK (src);
854
855           if (position != -1 && duration != -1) {
856             if (position < duration)
857               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
858                   duration);
859             else
860               percent = GST_FORMAT_PERCENT_MAX;
861           } else
862             percent = -1;
863
864           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
865           res = TRUE;
866           break;
867         }
868         default:
869         {
870           gint64 position;
871           GstFormat seg_format;
872
873           GST_OBJECT_LOCK (src);
874           position = src->segment.last_stop;
875           seg_format = src->segment.format;
876           GST_OBJECT_UNLOCK (src);
877
878           if (position != -1) {
879             /* convert to requested format */
880             res =
881                 gst_pad_query_convert (src->srcpad, seg_format,
882                 position, &format, &position);
883           } else
884             res = TRUE;
885
886           gst_query_set_position (query, format, position);
887           break;
888         }
889       }
890       break;
891     }
892     case GST_QUERY_DURATION:
893     {
894       GstFormat format;
895
896       gst_query_parse_duration (query, &format, NULL);
897
898       GST_DEBUG_OBJECT (src, "duration query in format %s",
899           gst_format_get_name (format));
900
901       switch (format) {
902         case GST_FORMAT_PERCENT:
903           gst_query_set_duration (query, GST_FORMAT_PERCENT,
904               GST_FORMAT_PERCENT_MAX);
905           res = TRUE;
906           break;
907         default:
908         {
909           gint64 duration;
910           GstFormat seg_format;
911
912           GST_OBJECT_LOCK (src);
913           /* this is the duration as configured by the subclass. */
914           duration = src->segment.duration;
915           seg_format = src->segment.format;
916           GST_OBJECT_UNLOCK (src);
917
918           if (duration != -1) {
919             /* convert to requested format, if this fails, we have a duration
920              * but we cannot answer the query, we must return FALSE. */
921             res =
922                 gst_pad_query_convert (src->srcpad, seg_format,
923                 duration, &format, &duration);
924           } else {
925             /* The subclass did not configure a duration, we assume that the
926              * media has an unknown duration then and we return TRUE to report
927              * this. Note that this is not the same as returning FALSE, which
928              * means that we cannot report the duration at all. */
929             res = TRUE;
930           }
931           gst_query_set_duration (query, format, duration);
932           break;
933         }
934       }
935       break;
936     }
937
938     case GST_QUERY_SEEKING:
939     {
940       GstFormat format, seg_format;
941       gint64 duration;
942
943       GST_OBJECT_LOCK (src);
944       duration = src->segment.duration;
945       seg_format = src->segment.format;
946       GST_OBJECT_UNLOCK (src);
947
948       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
949       if (format == seg_format) {
950         gst_query_set_seeking (query, seg_format,
951             gst_base_src_seekable (src), 0, duration);
952         res = TRUE;
953       } else {
954         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
955         /* Don't reply to the query to make up for demuxers which don't
956          * handle the SEEKING query yet. Players like Totem will fall back
957          * to the duration when the SEEKING query isn't answered. */
958         res = FALSE;
959       }
960       break;
961     }
962     case GST_QUERY_SEGMENT:
963     {
964       gint64 start, stop;
965
966       GST_OBJECT_LOCK (src);
967       /* no end segment configured, current duration then */
968       if ((stop = src->segment.stop) == -1)
969         stop = src->segment.duration;
970       start = src->segment.start;
971
972       /* adjust to stream time */
973       if (src->segment.time != -1) {
974         start -= src->segment.time;
975         if (stop != -1)
976           stop -= src->segment.time;
977       }
978       gst_query_set_segment (query, src->segment.rate, src->segment.format,
979           start, stop);
980       GST_OBJECT_UNLOCK (src);
981       res = TRUE;
982       break;
983     }
984
985     case GST_QUERY_FORMATS:
986     {
987       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
988           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
989       res = TRUE;
990       break;
991     }
992     case GST_QUERY_CONVERT:
993     {
994       GstFormat src_fmt, dest_fmt;
995       gint64 src_val, dest_val;
996
997       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
998
999       /* we can only convert between equal formats... */
1000       if (src_fmt == dest_fmt) {
1001         dest_val = src_val;
1002         res = TRUE;
1003       } else
1004         res = FALSE;
1005
1006       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1007       break;
1008     }
1009     case GST_QUERY_LATENCY:
1010     {
1011       GstClockTime min, max;
1012       gboolean live;
1013
1014       /* Subclasses should override and implement something usefull */
1015       res = gst_base_src_query_latency (src, &live, &min, &max);
1016
1017       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1018           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1019           GST_TIME_ARGS (max));
1020
1021       gst_query_set_latency (query, live, min, max);
1022       break;
1023     }
1024     case GST_QUERY_JITTER:
1025     case GST_QUERY_RATE:
1026       res = FALSE;
1027       break;
1028     case GST_QUERY_BUFFERING:
1029     {
1030       GstFormat format, seg_format;
1031       gint64 start, stop, estimated;
1032
1033       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1034
1035       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1036           gst_format_get_name (format));
1037
1038       GST_OBJECT_LOCK (src);
1039       if (src->random_access) {
1040         estimated = 0;
1041         start = 0;
1042         if (format == GST_FORMAT_PERCENT)
1043           stop = GST_FORMAT_PERCENT_MAX;
1044         else
1045           stop = src->segment.duration;
1046       } else {
1047         estimated = -1;
1048         start = -1;
1049         stop = -1;
1050       }
1051       seg_format = src->segment.format;
1052       GST_OBJECT_UNLOCK (src);
1053
1054       /* convert to required format. When the conversion fails, we can't answer
1055        * the query. When the value is unknown, we can don't perform conversion
1056        * but report TRUE. */
1057       if (format != GST_FORMAT_PERCENT && stop != -1) {
1058         res = gst_pad_query_convert (src->srcpad, seg_format,
1059             stop, &format, &stop);
1060       } else {
1061         res = TRUE;
1062       }
1063       if (res && format != GST_FORMAT_PERCENT && start != -1)
1064         res = gst_pad_query_convert (src->srcpad, seg_format,
1065             start, &format, &start);
1066
1067       gst_query_set_buffering_range (query, format, start, stop, estimated);
1068       break;
1069     }
1070     default:
1071       res = FALSE;
1072       break;
1073   }
1074   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1075       res);
1076   return res;
1077 }
1078
1079 static gboolean
1080 gst_base_src_query (GstPad * pad, GstQuery * query)
1081 {
1082   GstBaseSrc *src;
1083   GstBaseSrcClass *bclass;
1084   gboolean result = FALSE;
1085
1086   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1087
1088   bclass = GST_BASE_SRC_GET_CLASS (src);
1089
1090   if (bclass->query)
1091     result = bclass->query (src, query);
1092   else
1093     result = gst_pad_query_default (pad, query);
1094
1095   gst_object_unref (src);
1096
1097   return result;
1098 }
1099
1100 static gboolean
1101 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1102 {
1103   gboolean res = TRUE;
1104
1105   /* update our offset if the start/stop position was updated */
1106   if (segment->format == GST_FORMAT_BYTES) {
1107     segment->time = segment->start;
1108   } else if (segment->start == 0) {
1109     /* seek to start, we can implement a default for this. */
1110     segment->time = 0;
1111   } else {
1112     res = FALSE;
1113     GST_INFO_OBJECT (src, "Can't do a default seek");
1114   }
1115
1116   return res;
1117 }
1118
1119 static gboolean
1120 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1121 {
1122   GstBaseSrcClass *bclass;
1123   gboolean result = FALSE;
1124
1125   bclass = GST_BASE_SRC_GET_CLASS (src);
1126
1127   if (bclass->do_seek)
1128     result = bclass->do_seek (src, segment);
1129
1130   return result;
1131 }
1132
1133 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1134
1135 static gboolean
1136 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1137     GstSegment * segment)
1138 {
1139   /* By default, we try one of 2 things:
1140    *   - For absolute seek positions, convert the requested position to our
1141    *     configured processing format and place it in the output segment \
1142    *   - For relative seek positions, convert our current (input) values to the
1143    *     seek format, adjust by the relative seek offset and then convert back to
1144    *     the processing format
1145    */
1146   GstSeekType cur_type, stop_type;
1147   gint64 cur, stop;
1148   GstSeekFlags flags;
1149   GstFormat seek_format, dest_format;
1150   gdouble rate;
1151   gboolean update;
1152   gboolean res = TRUE;
1153
1154   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1155       &cur_type, &cur, &stop_type, &stop);
1156   dest_format = segment->format;
1157
1158   if (seek_format == dest_format) {
1159     gst_segment_set_seek (segment, rate, seek_format, flags,
1160         cur_type, cur, stop_type, stop, &update);
1161     return TRUE;
1162   }
1163
1164   if (cur_type != GST_SEEK_TYPE_NONE) {
1165     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1166     res =
1167         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1168         &cur);
1169     cur_type = GST_SEEK_TYPE_SET;
1170   }
1171
1172   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1173     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1174     res =
1175         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1176         &stop);
1177     stop_type = GST_SEEK_TYPE_SET;
1178   }
1179
1180   /* And finally, configure our output segment in the desired format */
1181   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1182       stop_type, stop, &update);
1183
1184   if (!res)
1185     goto no_format;
1186
1187   return res;
1188
1189 no_format:
1190   {
1191     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1192     return FALSE;
1193   }
1194 }
1195
1196 static gboolean
1197 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1198     GstSegment * seeksegment)
1199 {
1200   GstBaseSrcClass *bclass;
1201   gboolean result = FALSE;
1202
1203   bclass = GST_BASE_SRC_GET_CLASS (src);
1204
1205   if (bclass->prepare_seek_segment)
1206     result = bclass->prepare_seek_segment (src, event, seeksegment);
1207
1208   return result;
1209 }
1210
1211 /* this code implements the seeking. It is a good example
1212  * handling all cases.
1213  *
1214  * A seek updates the currently configured segment.start
1215  * and segment.stop values based on the SEEK_TYPE. If the
1216  * segment.start value is updated, a seek to this new position
1217  * should be performed.
1218  *
1219  * The seek can only be executed when we are not currently
1220  * streaming any data, to make sure that this is the case, we
1221  * acquire the STREAM_LOCK which is taken when we are in the
1222  * _loop() function or when a getrange() is called. Normally
1223  * we will not receive a seek if we are operating in pull mode
1224  * though. When we operate as a live source we might block on the live
1225  * cond, which does not release the STREAM_LOCK. Therefore we will try
1226  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1227  * safe to perform the seek.
1228  *
1229  * When we are in the loop() function, we might be in the middle
1230  * of pushing a buffer, which might block in a sink. To make sure
1231  * that the push gets unblocked we push out a FLUSH_START event.
1232  * Our loop function will get a WRONG_STATE return value from
1233  * the push and will pause, effectively releasing the STREAM_LOCK.
1234  *
1235  * For a non-flushing seek, we pause the task, which might eventually
1236  * release the STREAM_LOCK. We say eventually because when the sink
1237  * blocks on the sample we might wait a very long time until the sink
1238  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1239  * can continue the seek. A non-flushing seek is normally done in a
1240  * running pipeline to perform seamless playback, this means that the sink is
1241  * PLAYING and will return from its chain function.
1242  * In the case of a non-flushing seek we need to make sure that the
1243  * data we output after the seek is continuous with the previous data,
1244  * this is because a non-flushing seek does not reset the running-time
1245  * to 0. We do this by closing the currently running segment, ie. sending
1246  * a new_segment event with the stop position set to the last processed
1247  * position.
1248  *
1249  * After updating the segment.start/stop values, we prepare for
1250  * streaming again. We push out a FLUSH_STOP to make the peer pad
1251  * accept data again and we start our task again.
1252  *
1253  * A segment seek posts a message on the bus saying that the playback
1254  * of the segment started. We store the segment flag internally because
1255  * when we reach the segment.stop we have to post a segment.done
1256  * instead of EOS when doing a segment seek.
1257  */
1258 /* FIXME (0.11), we have the unlock gboolean here because most current
1259  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1260  * the streaming thread isn't running, resulting in bogus unlocks later when it
1261  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1262  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1263  * until 0.11
1264  */
1265 static gboolean
1266 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1267 {
1268   gboolean res = TRUE, tres;
1269   gdouble rate;
1270   GstFormat seek_format, dest_format;
1271   GstSeekFlags flags;
1272   GstSeekType cur_type, stop_type;
1273   gint64 cur, stop;
1274   gboolean flush, playing;
1275   gboolean update;
1276   gboolean relative_seek = FALSE;
1277   gboolean seekseg_configured = FALSE;
1278   GstSegment seeksegment;
1279   guint32 seqnum;
1280   GstEvent *tevent;
1281
1282   GST_DEBUG_OBJECT (src, "doing seek");
1283
1284   GST_OBJECT_LOCK (src);
1285   dest_format = src->segment.format;
1286   GST_OBJECT_UNLOCK (src);
1287
1288   if (event) {
1289     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1290         &cur_type, &cur, &stop_type, &stop);
1291
1292     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1293         SEEK_TYPE_IS_RELATIVE (stop_type);
1294
1295     if (dest_format != seek_format && !relative_seek) {
1296       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1297        * here before taking the stream lock, otherwise we must convert it later,
1298        * once we have the stream lock and can read the last configures segment
1299        * start and stop positions */
1300       gst_segment_init (&seeksegment, dest_format);
1301
1302       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1303         goto prepare_failed;
1304
1305       seekseg_configured = TRUE;
1306     }
1307
1308     flush = flags & GST_SEEK_FLAG_FLUSH;
1309     seqnum = gst_event_get_seqnum (event);
1310   } else {
1311     flush = FALSE;
1312     /* get next seqnum */
1313     seqnum = gst_util_seqnum_next ();
1314   }
1315
1316   /* send flush start */
1317   if (flush) {
1318     tevent = gst_event_new_flush_start ();
1319     gst_event_set_seqnum (tevent, seqnum);
1320     gst_pad_push_event (src->srcpad, tevent);
1321   } else
1322     gst_pad_pause_task (src->srcpad);
1323
1324   /* unblock streaming thread. */
1325   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1326
1327   /* grab streaming lock, this should eventually be possible, either
1328    * because the task is paused, our streaming thread stopped
1329    * or because our peer is flushing. */
1330   GST_PAD_STREAM_LOCK (src->srcpad);
1331   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1332     /* we have seen this event before, issue a warning for now */
1333     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1334         seqnum);
1335   } else {
1336     src->priv->seqnum = seqnum;
1337     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1338   }
1339
1340   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1341
1342   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1343    * copy the current segment info into the temp segment that we can actually
1344    * attempt the seek with. We only update the real segment if the seek suceeds. */
1345   if (!seekseg_configured) {
1346     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1347
1348     /* now configure the final seek segment */
1349     if (event) {
1350       if (seeksegment.format != seek_format) {
1351         /* OK, here's where we give the subclass a chance to convert the relative
1352          * seek into an absolute one in the processing format. We set up any
1353          * absolute seek above, before taking the stream lock. */
1354         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1355           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1356               "Aborting seek");
1357           res = FALSE;
1358         }
1359       } else {
1360         /* The seek format matches our processing format, no need to ask the
1361          * the subclass to configure the segment. */
1362         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1363             cur_type, cur, stop_type, stop, &update);
1364       }
1365     }
1366     /* Else, no seek event passed, so we're just (re)starting the
1367        current segment. */
1368   }
1369
1370   if (res) {
1371     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1372         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1373         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1374
1375     /* do the seek, segment.last_stop contains the new position. */
1376     res = gst_base_src_do_seek (src, &seeksegment);
1377   }
1378
1379   /* and prepare to continue streaming */
1380   if (flush) {
1381     tevent = gst_event_new_flush_stop ();
1382     gst_event_set_seqnum (tevent, seqnum);
1383     /* send flush stop, peer will accept data and events again. We
1384      * are not yet providing data as we still have the STREAM_LOCK. */
1385     gst_pad_push_event (src->srcpad, tevent);
1386   } else if (res && src->data.ABI.running) {
1387     /* we are running the current segment and doing a non-flushing seek,
1388      * close the segment first based on the last_stop. */
1389     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1390         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1391
1392     /* queue the segment for sending in the stream thread */
1393     if (src->priv->close_segment)
1394       gst_event_unref (src->priv->close_segment);
1395     src->priv->close_segment =
1396         gst_event_new_new_segment_full (TRUE,
1397         src->segment.rate, src->segment.applied_rate, src->segment.format,
1398         src->segment.start, src->segment.last_stop, src->segment.time);
1399     gst_event_set_seqnum (src->priv->close_segment, seqnum);
1400   }
1401
1402   /* The subclass must have converted the segment to the processing format
1403    * by now */
1404   if (res && seeksegment.format != dest_format) {
1405     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1406         "in the correct format. Aborting seek.");
1407     res = FALSE;
1408   }
1409
1410   /* if the seek was successful, we update our real segment and push
1411    * out the new segment. */
1412   if (res) {
1413     GST_OBJECT_LOCK (src);
1414     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1415     GST_OBJECT_UNLOCK (src);
1416
1417     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1418       GstMessage *message;
1419
1420       message = gst_message_new_segment_start (GST_OBJECT (src),
1421           seeksegment.format, seeksegment.last_stop);
1422       gst_message_set_seqnum (message, seqnum);
1423
1424       gst_element_post_message (GST_ELEMENT (src), message);
1425     }
1426
1427     /* for deriving a stop position for the playback segment from the seek
1428      * segment, we must take the duration when the stop is not set */
1429     if ((stop = seeksegment.stop) == -1)
1430       stop = seeksegment.duration;
1431
1432     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1433         " to %" G_GINT64_FORMAT, seeksegment.start, stop);
1434
1435     /* now replace the old segment so that we send it in the stream thread the
1436      * next time it is scheduled. */
1437     if (src->priv->start_segment)
1438       gst_event_unref (src->priv->start_segment);
1439     if (seeksegment.rate >= 0.0) {
1440       /* forward, we send data from last_stop to stop */
1441       src->priv->start_segment =
1442           gst_event_new_new_segment_full (FALSE,
1443           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1444           seeksegment.last_stop, stop, seeksegment.time);
1445     } else {
1446       /* reverse, we send data from last_stop to start */
1447       src->priv->start_segment =
1448           gst_event_new_new_segment_full (FALSE,
1449           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1450           seeksegment.start, seeksegment.last_stop, seeksegment.time);
1451     }
1452     gst_event_set_seqnum (src->priv->start_segment, seqnum);
1453   }
1454
1455   src->priv->discont = TRUE;
1456   src->data.ABI.running = TRUE;
1457   /* and restart the task in case it got paused explicitly or by
1458    * the FLUSH_START event we pushed out. */
1459   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1460       src->srcpad);
1461   if (res && !tres)
1462     res = FALSE;
1463
1464   /* and release the lock again so we can continue streaming */
1465   GST_PAD_STREAM_UNLOCK (src->srcpad);
1466
1467   return res;
1468
1469   /* ERROR */
1470 prepare_failed:
1471   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1472       "Aborting seek");
1473   return FALSE;
1474 }
1475
1476 static const GstQueryType *
1477 gst_base_src_get_query_types (GstElement * element)
1478 {
1479   static const GstQueryType query_types[] = {
1480     GST_QUERY_DURATION,
1481     GST_QUERY_POSITION,
1482     GST_QUERY_SEEKING,
1483     GST_QUERY_SEGMENT,
1484     GST_QUERY_FORMATS,
1485     GST_QUERY_LATENCY,
1486     GST_QUERY_JITTER,
1487     GST_QUERY_RATE,
1488     GST_QUERY_CONVERT,
1489     0
1490   };
1491
1492   return query_types;
1493 }
1494
1495 /* all events send to this element directly. This is mainly done from the
1496  * application.
1497  */
1498 static gboolean
1499 gst_base_src_send_event (GstElement * element, GstEvent * event)
1500 {
1501   GstBaseSrc *src;
1502   gboolean result = FALSE;
1503
1504   src = GST_BASE_SRC (element);
1505
1506   GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
1507
1508   switch (GST_EVENT_TYPE (event)) {
1509       /* bidirectional events */
1510     case GST_EVENT_FLUSH_START:
1511     case GST_EVENT_FLUSH_STOP:
1512       /* sending random flushes downstream can break stuff,
1513        * especially sync since all segment info will get flushed */
1514       break;
1515
1516       /* downstream serialized events */
1517     case GST_EVENT_EOS:
1518     {
1519       GstBaseSrcClass *bclass;
1520
1521       bclass = GST_BASE_SRC_GET_CLASS (src);
1522
1523       /* queue EOS and make sure the task or pull function performs the EOS
1524        * actions.
1525        *
1526        * We have two possibilities:
1527        *
1528        *  - Before we are to enter the _create function, we check the pending_eos
1529        *    first and do EOS instead of entering it.
1530        *  - If we are in the _create function or we did not manage to set the
1531        *    flag fast enough and we are about to enter the _create function,
1532        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1533        *    check the EOS flag and do the EOS logic.
1534        */
1535       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1536       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1537
1538       /* unlock the _create function so that we can check the pending_eos flag
1539        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1540        * that we can grab it and stop the unlock again. We don't take the stream
1541        * lock so that this operation is guaranteed to never block. */
1542       if (bclass->unlock)
1543         bclass->unlock (src);
1544
1545       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1546
1547       GST_LIVE_LOCK (src);
1548       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1549       /* now stop the unlock of the streaming thread again. Grabbing the live
1550        * lock is enough because that protects the create function. */
1551       if (bclass->unlock_stop)
1552         bclass->unlock_stop (src);
1553       GST_LIVE_UNLOCK (src);
1554
1555       result = TRUE;
1556       break;
1557     }
1558     case GST_EVENT_NEWSEGMENT:
1559       /* sending random NEWSEGMENT downstream can break sync. */
1560       break;
1561     case GST_EVENT_TAG:
1562       /* Insert tag in the dataflow */
1563       GST_OBJECT_LOCK (src);
1564       src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
1565       GST_OBJECT_UNLOCK (src);
1566       event = NULL;
1567       result = TRUE;
1568       break;
1569     case GST_EVENT_BUFFERSIZE:
1570       /* does not seem to make much sense currently */
1571       break;
1572
1573       /* upstream events */
1574     case GST_EVENT_QOS:
1575       /* elements should override send_event and do something */
1576       break;
1577     case GST_EVENT_SEEK:
1578     {
1579       gboolean started;
1580
1581       GST_OBJECT_LOCK (src->srcpad);
1582       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1583         goto wrong_mode;
1584       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1585       GST_OBJECT_UNLOCK (src->srcpad);
1586
1587       if (started) {
1588         GST_DEBUG_OBJECT (src, "performing seek");
1589         /* when we are running in push mode, we can execute the
1590          * seek right now, we need to unlock. */
1591         result = gst_base_src_perform_seek (src, event, TRUE);
1592       } else {
1593         GstEvent **event_p;
1594
1595         /* else we store the event and execute the seek when we
1596          * get activated */
1597         GST_OBJECT_LOCK (src);
1598         GST_DEBUG_OBJECT (src, "queueing seek");
1599         event_p = &src->data.ABI.pending_seek;
1600         gst_event_replace ((GstEvent **) event_p, event);
1601         GST_OBJECT_UNLOCK (src);
1602         /* assume the seek will work */
1603         result = TRUE;
1604       }
1605       break;
1606     }
1607     case GST_EVENT_NAVIGATION:
1608       /* could make sense for elements that do something with navigation events
1609        * but then they would need to override the send_event function */
1610       break;
1611     case GST_EVENT_LATENCY:
1612       /* does not seem to make sense currently */
1613       break;
1614
1615       /* custom events */
1616     case GST_EVENT_CUSTOM_UPSTREAM:
1617       /* override send_event if you want this */
1618       break;
1619     case GST_EVENT_CUSTOM_DOWNSTREAM:
1620     case GST_EVENT_CUSTOM_BOTH:
1621       /* FIXME, insert event in the dataflow */
1622       break;
1623     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1624     case GST_EVENT_CUSTOM_BOTH_OOB:
1625       /* insert a random custom event into the pipeline */
1626       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1627       result = gst_pad_push_event (src->srcpad, event);
1628       /* we gave away the ref to the event in the push */
1629       event = NULL;
1630       break;
1631     default:
1632       break;
1633   }
1634 done:
1635   /* if we still have a ref to the event, unref it now */
1636   if (event)
1637     gst_event_unref (event);
1638
1639   return result;
1640
1641   /* ERRORS */
1642 wrong_mode:
1643   {
1644     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1645     GST_OBJECT_UNLOCK (src->srcpad);
1646     result = FALSE;
1647     goto done;
1648   }
1649 }
1650
1651 static gboolean
1652 gst_base_src_seekable (GstBaseSrc * src)
1653 {
1654   GstBaseSrcClass *bclass;
1655   bclass = GST_BASE_SRC_GET_CLASS (src);
1656   if (bclass->is_seekable)
1657     return bclass->is_seekable (src);
1658   else
1659     return FALSE;
1660 }
1661
1662 static gboolean
1663 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1664 {
1665   gboolean result;
1666
1667   switch (GST_EVENT_TYPE (event)) {
1668     case GST_EVENT_SEEK:
1669       /* is normally called when in push mode */
1670       if (!gst_base_src_seekable (src))
1671         goto not_seekable;
1672
1673       result = gst_base_src_perform_seek (src, event, TRUE);
1674       break;
1675     case GST_EVENT_FLUSH_START:
1676       /* cancel any blocking getrange, is normally called
1677        * when in pull mode. */
1678       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1679       break;
1680     case GST_EVENT_FLUSH_STOP:
1681       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1682       break;
1683     default:
1684       result = TRUE;
1685       break;
1686   }
1687   return result;
1688
1689   /* ERRORS */
1690 not_seekable:
1691   {
1692     GST_DEBUG_OBJECT (src, "is not seekable");
1693     return FALSE;
1694   }
1695 }
1696
1697 static gboolean
1698 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1699 {
1700   GstBaseSrc *src;
1701   GstBaseSrcClass *bclass;
1702   gboolean result = FALSE;
1703
1704   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1705   bclass = GST_BASE_SRC_GET_CLASS (src);
1706
1707   if (bclass->event) {
1708     if (!(result = bclass->event (src, event)))
1709       goto subclass_failed;
1710   }
1711
1712 done:
1713   gst_event_unref (event);
1714   gst_object_unref (src);
1715
1716   return result;
1717
1718   /* ERRORS */
1719 subclass_failed:
1720   {
1721     GST_DEBUG_OBJECT (src, "subclass refused event");
1722     goto done;
1723   }
1724 }
1725
1726 static void
1727 gst_base_src_set_property (GObject * object, guint prop_id,
1728     const GValue * value, GParamSpec * pspec)
1729 {
1730   GstBaseSrc *src;
1731
1732   src = GST_BASE_SRC (object);
1733
1734   switch (prop_id) {
1735     case PROP_BLOCKSIZE:
1736       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1737       break;
1738     case PROP_NUM_BUFFERS:
1739       src->num_buffers = g_value_get_int (value);
1740       break;
1741     case PROP_TYPEFIND:
1742       src->data.ABI.typefind = g_value_get_boolean (value);
1743       break;
1744     case PROP_DO_TIMESTAMP:
1745       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1746       break;
1747     default:
1748       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1749       break;
1750   }
1751 }
1752
1753 static void
1754 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1755     GParamSpec * pspec)
1756 {
1757   GstBaseSrc *src;
1758
1759   src = GST_BASE_SRC (object);
1760
1761   switch (prop_id) {
1762     case PROP_BLOCKSIZE:
1763       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1764       break;
1765     case PROP_NUM_BUFFERS:
1766       g_value_set_int (value, src->num_buffers);
1767       break;
1768     case PROP_TYPEFIND:
1769       g_value_set_boolean (value, src->data.ABI.typefind);
1770       break;
1771     case PROP_DO_TIMESTAMP:
1772       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1773       break;
1774     default:
1775       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1776       break;
1777   }
1778 }
1779
1780 /* with STREAM_LOCK and LOCK */
1781 static GstClockReturn
1782 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1783 {
1784   GstClockReturn ret;
1785   GstClockID id;
1786
1787   id = gst_clock_new_single_shot_id (clock, time);
1788
1789   basesrc->clock_id = id;
1790   /* release the live lock while waiting */
1791   GST_LIVE_UNLOCK (basesrc);
1792
1793   ret = gst_clock_id_wait (id, NULL);
1794
1795   GST_LIVE_LOCK (basesrc);
1796   gst_clock_id_unref (id);
1797   basesrc->clock_id = NULL;
1798
1799   return ret;
1800 }
1801
1802 /* perform synchronisation on a buffer.
1803  * with STREAM_LOCK.
1804  */
1805 static GstClockReturn
1806 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1807 {
1808   GstClockReturn result;
1809   GstClockTime start, end;
1810   GstBaseSrcClass *bclass;
1811   GstClockTime base_time;
1812   GstClock *clock;
1813   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1814   gboolean do_timestamp, first, pseudo_live;
1815
1816   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1817
1818   start = end = -1;
1819   if (bclass->get_times)
1820     bclass->get_times (basesrc, buffer, &start, &end);
1821
1822   /* get buffer timestamp */
1823   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1824
1825   /* grab the lock to prepare for clocking and calculate the startup
1826    * latency. */
1827   GST_OBJECT_LOCK (basesrc);
1828
1829   /* if we are asked to sync against the clock we are a pseudo live element */
1830   pseudo_live = (start != -1 && basesrc->is_live);
1831   /* check for the first buffer */
1832   first = (basesrc->priv->latency == -1);
1833
1834   if (timestamp != -1 && pseudo_live) {
1835     GstClockTime latency;
1836
1837     /* we have a timestamp and a sync time, latency is the diff */
1838     if (timestamp <= start)
1839       latency = start - timestamp;
1840     else
1841       latency = 0;
1842
1843     if (first) {
1844       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1845           GST_TIME_ARGS (latency));
1846       /* first time we calculate latency, just configure */
1847       basesrc->priv->latency = latency;
1848     } else {
1849       if (basesrc->priv->latency != latency) {
1850         /* we have a new latency, FIXME post latency message */
1851         basesrc->priv->latency = latency;
1852         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1853             GST_TIME_ARGS (latency));
1854       }
1855     }
1856   } else if (first) {
1857     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1858         basesrc->is_live, start != -1);
1859     basesrc->priv->latency = 0;
1860   }
1861
1862   /* get clock, if no clock, we can't sync or do timestamps */
1863   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1864     goto no_clock;
1865
1866   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1867
1868   do_timestamp = basesrc->priv->do_timestamp;
1869
1870   /* first buffer, calculate the timestamp offset */
1871   if (first) {
1872     GstClockTime running_time;
1873
1874     now = gst_clock_get_time (clock);
1875     running_time = now - base_time;
1876
1877     GST_LOG_OBJECT (basesrc,
1878         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1879         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1880         GST_TIME_ARGS (running_time));
1881
1882     if (pseudo_live && timestamp != -1) {
1883       /* live source and we need to sync, add startup latency to all timestamps
1884        * to get the real running_time. Live sources should always timestamp
1885        * according to the current running time. */
1886       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1887
1888       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1889           GST_TIME_ARGS (basesrc->priv->ts_offset));
1890     } else {
1891       basesrc->priv->ts_offset = 0;
1892       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1893     }
1894
1895     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1896       if (do_timestamp)
1897         timestamp = running_time;
1898       else
1899         timestamp = 0;
1900
1901       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1902
1903       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1904           GST_TIME_ARGS (timestamp));
1905     }
1906
1907     /* add the timestamp offset we need for sync */
1908     timestamp += basesrc->priv->ts_offset;
1909   } else {
1910     /* not the first buffer, the timestamp is the diff between the clock and
1911      * base_time */
1912     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1913       now = gst_clock_get_time (clock);
1914
1915       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1916
1917       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1918           GST_TIME_ARGS (now - base_time));
1919     }
1920   }
1921
1922   /* if we don't have a buffer timestamp, we don't sync */
1923   if (!GST_CLOCK_TIME_IS_VALID (start))
1924     goto no_sync;
1925
1926   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1927     /* for pseudo live sources, add our ts_offset to the timestamp */
1928     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1929     start += basesrc->priv->ts_offset;
1930   }
1931
1932   GST_LOG_OBJECT (basesrc,
1933       "waiting for clock, base time %" GST_TIME_FORMAT
1934       ", stream_start %" GST_TIME_FORMAT,
1935       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1936   GST_OBJECT_UNLOCK (basesrc);
1937
1938   result = gst_base_src_wait (basesrc, clock, start + base_time);
1939
1940   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1941
1942   return result;
1943
1944   /* special cases */
1945 no_clock:
1946   {
1947     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1948     GST_OBJECT_UNLOCK (basesrc);
1949     return GST_CLOCK_OK;
1950   }
1951 no_sync:
1952   {
1953     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1954     GST_OBJECT_UNLOCK (basesrc);
1955     return GST_CLOCK_OK;
1956   }
1957 }
1958
1959 /* Called with STREAM_LOCK and LIVE_LOCK */
1960 static gboolean
1961 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1962 {
1963   guint64 size, maxsize;
1964   GstBaseSrcClass *bclass;
1965   GstFormat format;
1966   gint64 stop;
1967
1968   bclass = GST_BASE_SRC_GET_CLASS (src);
1969
1970   format = src->segment.format;
1971   stop = src->segment.stop;
1972   /* get total file size */
1973   size = (guint64) src->segment.duration;
1974
1975   /* only operate if we are working with bytes */
1976   if (format != GST_FORMAT_BYTES)
1977     return TRUE;
1978
1979   /* the max amount of bytes to read is the total size or
1980    * up to the segment.stop if present. */
1981   if (stop != -1)
1982     maxsize = MIN (size, stop);
1983   else
1984     maxsize = size;
1985
1986   GST_DEBUG_OBJECT (src,
1987       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1988       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1989       *length, size, stop, maxsize);
1990
1991   /* check size if we have one */
1992   if (maxsize != -1) {
1993     /* if we run past the end, check if the file became bigger and
1994      * retry. */
1995     if (G_UNLIKELY (offset + *length >= maxsize)) {
1996       /* see if length of the file changed */
1997       if (bclass->get_size)
1998         if (!bclass->get_size (src, &size))
1999           size = -1;
2000
2001       /* make sure we don't exceed the configured segment stop
2002        * if it was set */
2003       if (stop != -1)
2004         maxsize = MIN (size, stop);
2005       else
2006         maxsize = size;
2007
2008       /* if we are at or past the end, EOS */
2009       if (G_UNLIKELY (offset >= maxsize))
2010         goto unexpected_length;
2011
2012       /* else we can clip to the end */
2013       if (G_UNLIKELY (offset + *length >= maxsize))
2014         *length = maxsize - offset;
2015
2016     }
2017   }
2018
2019   /* keep track of current position and update duration.
2020    * segment is in bytes, we checked that above. */
2021   GST_OBJECT_LOCK (src);
2022   gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
2023   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
2024   GST_OBJECT_UNLOCK (src);
2025
2026   return TRUE;
2027
2028   /* ERRORS */
2029 unexpected_length:
2030   {
2031     return FALSE;
2032   }
2033 }
2034
2035 /* must be called with LIVE_LOCK */
2036 static GstFlowReturn
2037 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2038     GstBuffer ** buf)
2039 {
2040   GstFlowReturn ret;
2041   GstBaseSrcClass *bclass;
2042   GstClockReturn status;
2043
2044   bclass = GST_BASE_SRC_GET_CLASS (src);
2045
2046 again:
2047   if (src->is_live) {
2048     while (G_UNLIKELY (!src->live_running)) {
2049       ret = gst_base_src_wait_playing (src);
2050       if (ret != GST_FLOW_OK)
2051         goto stopped;
2052     }
2053   }
2054
2055   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2056     goto not_started;
2057
2058   if (G_UNLIKELY (!bclass->create))
2059     goto no_function;
2060
2061   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2062     goto unexpected_length;
2063
2064   /* normally we don't count buffers */
2065   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2066     if (src->num_buffers_left == 0)
2067       goto reached_num_buffers;
2068     else
2069       src->num_buffers_left--;
2070   }
2071
2072   /* don't enter the create function if a pending EOS event was set. For the
2073    * logic of the pending_eos, check the event function of this class. */
2074   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2075     goto eos;
2076
2077   GST_DEBUG_OBJECT (src,
2078       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2079       G_GINT64_FORMAT, offset, length, src->segment.time);
2080
2081   ret = bclass->create (src, offset, length, buf);
2082
2083   /* The create function could be unlocked because we have a pending EOS. It's
2084    * possible that we have a valid buffer from create that we need to
2085    * discard when the create function returned _OK. */
2086   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2087     if (ret == GST_FLOW_OK) {
2088       gst_buffer_unref (*buf);
2089       *buf = NULL;
2090     }
2091     goto eos;
2092   }
2093
2094   if (G_UNLIKELY (ret != GST_FLOW_OK))
2095     goto not_ok;
2096
2097   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2098   if (offset == 0 && src->segment.time == 0
2099       && GST_BUFFER_TIMESTAMP (*buf) == -1)
2100     GST_BUFFER_TIMESTAMP (*buf) = 0;
2101
2102   /* set pad caps on the buffer if the buffer had no caps */
2103   if (GST_BUFFER_CAPS (*buf) == NULL)
2104     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
2105
2106   /* now sync before pushing the buffer */
2107   status = gst_base_src_do_sync (src, *buf);
2108
2109   /* waiting for the clock could have made us flushing */
2110   if (G_UNLIKELY (src->priv->flushing))
2111     goto flushing;
2112
2113   switch (status) {
2114     case GST_CLOCK_EARLY:
2115       /* the buffer is too late. We currently don't drop the buffer. */
2116       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2117       break;
2118     case GST_CLOCK_OK:
2119       /* buffer synchronised properly */
2120       GST_DEBUG_OBJECT (src, "buffer ok");
2121       break;
2122     case GST_CLOCK_UNSCHEDULED:
2123       /* this case is triggered when we were waiting for the clock and
2124        * it got unlocked because we did a state change. In any case, get rid of
2125        * the buffer. */
2126       gst_buffer_unref (*buf);
2127       *buf = NULL;
2128       if (!src->live_running) {
2129         /* We return WRONG_STATE when we are not running to stop the dataflow also
2130          * get rid of the produced buffer. */
2131         GST_DEBUG_OBJECT (src,
2132             "clock was unscheduled (%d), returning WRONG_STATE", status);
2133         ret = GST_FLOW_WRONG_STATE;
2134       } else {
2135         /* If we are running when this happens, we quickly switched between
2136          * pause and playing. We try to produce a new buffer */
2137         GST_DEBUG_OBJECT (src,
2138             "clock was unscheduled (%d), but we are running", status);
2139         goto again;
2140       }
2141       break;
2142     default:
2143       /* all other result values are unexpected and errors */
2144       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2145           (_("Internal clock error.")),
2146           ("clock returned unexpected return value %d", status));
2147       gst_buffer_unref (*buf);
2148       *buf = NULL;
2149       ret = GST_FLOW_ERROR;
2150       break;
2151   }
2152   return ret;
2153
2154   /* ERROR */
2155 stopped:
2156   {
2157     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2158         gst_flow_get_name (ret));
2159     return ret;
2160   }
2161 not_ok:
2162   {
2163     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2164         gst_flow_get_name (ret));
2165     return ret;
2166   }
2167 not_started:
2168   {
2169     GST_DEBUG_OBJECT (src, "getrange but not started");
2170     return GST_FLOW_WRONG_STATE;
2171   }
2172 no_function:
2173   {
2174     GST_DEBUG_OBJECT (src, "no create function");
2175     return GST_FLOW_ERROR;
2176   }
2177 unexpected_length:
2178   {
2179     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2180         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2181     return GST_FLOW_UNEXPECTED;
2182   }
2183 reached_num_buffers:
2184   {
2185     GST_DEBUG_OBJECT (src, "sent all buffers");
2186     return GST_FLOW_UNEXPECTED;
2187   }
2188 flushing:
2189   {
2190     GST_DEBUG_OBJECT (src, "we are flushing");
2191     gst_buffer_unref (*buf);
2192     *buf = NULL;
2193     return GST_FLOW_WRONG_STATE;
2194   }
2195 eos:
2196   {
2197     GST_DEBUG_OBJECT (src, "we are EOS");
2198     return GST_FLOW_UNEXPECTED;
2199   }
2200 }
2201
2202 static GstFlowReturn
2203 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2204     GstBuffer ** buf)
2205 {
2206   GstBaseSrc *src;
2207   GstFlowReturn res;
2208
2209   src = GST_BASE_SRC (gst_pad_get_parent (pad));
2210
2211   GST_LIVE_LOCK (src);
2212   if (G_UNLIKELY (src->priv->flushing))
2213     goto flushing;
2214
2215   res = gst_base_src_get_range (src, offset, length, buf);
2216
2217 done:
2218   GST_LIVE_UNLOCK (src);
2219
2220   gst_object_unref (src);
2221
2222   return res;
2223
2224   /* ERRORS */
2225 flushing:
2226   {
2227     GST_DEBUG_OBJECT (src, "we are flushing");
2228     res = GST_FLOW_WRONG_STATE;
2229     goto done;
2230   }
2231 }
2232
2233 static gboolean
2234 gst_base_src_default_check_get_range (GstBaseSrc * src)
2235 {
2236   gboolean res;
2237
2238   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2239     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2240     if (G_LIKELY (gst_base_src_start (src)))
2241       gst_base_src_stop (src);
2242   }
2243
2244   /* we can operate in getrange mode if the native format is bytes
2245    * and we are seekable, this condition is set in the random_access
2246    * flag and is set in the _start() method. */
2247   res = src->random_access;
2248
2249   return res;
2250 }
2251
2252 static gboolean
2253 gst_base_src_check_get_range (GstBaseSrc * src)
2254 {
2255   GstBaseSrcClass *bclass;
2256   gboolean res;
2257
2258   bclass = GST_BASE_SRC_GET_CLASS (src);
2259
2260   if (bclass->check_get_range == NULL)
2261     goto no_function;
2262
2263   res = bclass->check_get_range (src);
2264   GST_LOG_OBJECT (src, "%s() returned %d",
2265       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2266
2267   return res;
2268
2269   /* ERRORS */
2270 no_function:
2271   {
2272     GST_WARNING_OBJECT (src, "no check_get_range function set");
2273     return FALSE;
2274   }
2275 }
2276
2277 static gboolean
2278 gst_base_src_pad_check_get_range (GstPad * pad)
2279 {
2280   GstBaseSrc *src;
2281   gboolean res;
2282
2283   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2284
2285   res = gst_base_src_check_get_range (src);
2286
2287   return res;
2288 }
2289
2290 static void
2291 gst_base_src_loop (GstPad * pad)
2292 {
2293   GstBaseSrc *src;
2294   GstBuffer *buf = NULL;
2295   GstFlowReturn ret;
2296   gint64 position;
2297   gboolean eos;
2298   gulong blocksize;
2299   GList *tags, *tmp;
2300
2301   eos = FALSE;
2302
2303   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2304
2305   GST_LIVE_LOCK (src);
2306
2307   if (G_UNLIKELY (src->priv->flushing))
2308     goto flushing;
2309
2310   src->priv->last_sent_eos = FALSE;
2311
2312   blocksize = src->blocksize;
2313
2314   /* if we operate in bytes, we can calculate an offset */
2315   if (src->segment.format == GST_FORMAT_BYTES) {
2316     position = src->segment.last_stop;
2317     /* for negative rates, start with subtracting the blocksize */
2318     if (src->segment.rate < 0.0) {
2319       /* we cannot go below segment.start */
2320       if (position > src->segment.start + blocksize)
2321         position -= blocksize;
2322       else {
2323         /* last block, remainder up to segment.start */
2324         blocksize = position - src->segment.start;
2325         position = src->segment.start;
2326       }
2327     }
2328   } else
2329     position = -1;
2330
2331   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2332       GST_TIME_ARGS (position), blocksize);
2333
2334   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2335   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2336     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2337         gst_flow_get_name (ret));
2338     GST_LIVE_UNLOCK (src);
2339     goto pause;
2340   }
2341   /* this should not happen */
2342   if (G_UNLIKELY (buf == NULL))
2343     goto null_buffer;
2344
2345   /* push events to close/start our segment before we push the buffer. */
2346   if (G_UNLIKELY (src->priv->close_segment)) {
2347     gst_pad_push_event (pad, src->priv->close_segment);
2348     src->priv->close_segment = NULL;
2349   }
2350   if (G_UNLIKELY (src->priv->start_segment)) {
2351     gst_pad_push_event (pad, src->priv->start_segment);
2352     src->priv->start_segment = NULL;
2353   }
2354
2355   GST_OBJECT_LOCK (src);
2356   /* take the tags */
2357   tags = src->priv->pending_tags;
2358   src->priv->pending_tags = NULL;
2359   GST_OBJECT_UNLOCK (src);
2360
2361   /* Push out pending tags if any */
2362   if (G_UNLIKELY (tags != NULL)) {
2363     for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
2364       GstEvent *ev = (GstEvent *) tmp->data;
2365       gst_pad_push_event (pad, ev);
2366     }
2367     g_list_free (tags);
2368   }
2369
2370   /* figure out the new position */
2371   switch (src->segment.format) {
2372     case GST_FORMAT_BYTES:
2373     {
2374       guint bufsize = GST_BUFFER_SIZE (buf);
2375
2376       /* we subtracted above for negative rates */
2377       if (src->segment.rate >= 0.0)
2378         position += bufsize;
2379       break;
2380     }
2381     case GST_FORMAT_TIME:
2382     {
2383       GstClockTime start, duration;
2384
2385       start = GST_BUFFER_TIMESTAMP (buf);
2386       duration = GST_BUFFER_DURATION (buf);
2387
2388       if (GST_CLOCK_TIME_IS_VALID (start))
2389         position = start;
2390       else
2391         position = src->segment.last_stop;
2392
2393       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2394         if (src->segment.rate >= 0.0)
2395           position += duration;
2396         else if (position > duration)
2397           position -= duration;
2398         else
2399           position = 0;
2400       }
2401       break;
2402     }
2403     case GST_FORMAT_DEFAULT:
2404       if (src->segment.rate >= 0.0)
2405         position = GST_BUFFER_OFFSET_END (buf);
2406       else
2407         position = GST_BUFFER_OFFSET (buf);
2408       break;
2409     default:
2410       position = -1;
2411       break;
2412   }
2413   if (position != -1) {
2414     if (src->segment.rate >= 0.0) {
2415       /* positive rate, check if we reached the stop */
2416       if (src->segment.stop != -1) {
2417         if (position >= src->segment.stop) {
2418           eos = TRUE;
2419           position = src->segment.stop;
2420         }
2421       }
2422     } else {
2423       /* negative rate, check if we reached the start. start is always set to
2424        * something different from -1 */
2425       if (position <= src->segment.start) {
2426         eos = TRUE;
2427         position = src->segment.start;
2428       }
2429       /* when going reverse, all buffers are DISCONT */
2430       src->priv->discont = TRUE;
2431     }
2432     GST_OBJECT_LOCK (src);
2433     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2434     GST_OBJECT_UNLOCK (src);
2435   }
2436
2437   if (G_UNLIKELY (src->priv->discont)) {
2438     buf = gst_buffer_make_metadata_writable (buf);
2439     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2440     src->priv->discont = FALSE;
2441   }
2442   GST_LIVE_UNLOCK (src);
2443
2444   ret = gst_pad_push (pad, buf);
2445   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2446     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2447         gst_flow_get_name (ret));
2448     goto pause;
2449   }
2450
2451   if (G_UNLIKELY (eos)) {
2452     GST_INFO_OBJECT (src, "pausing after end of segment");
2453     ret = GST_FLOW_UNEXPECTED;
2454     goto pause;
2455   }
2456
2457 done:
2458   return;
2459
2460   /* special cases */
2461 flushing:
2462   {
2463     GST_DEBUG_OBJECT (src, "we are flushing");
2464     GST_LIVE_UNLOCK (src);
2465     ret = GST_FLOW_WRONG_STATE;
2466     goto pause;
2467   }
2468 pause:
2469   {
2470     const gchar *reason = gst_flow_get_name (ret);
2471     GstEvent *event;
2472
2473     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2474     src->data.ABI.running = FALSE;
2475     gst_pad_pause_task (pad);
2476     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2477       if (ret == GST_FLOW_UNEXPECTED) {
2478         gboolean flag_segment;
2479         GstFormat format;
2480         gint64 last_stop;
2481
2482         /* perform EOS logic */
2483         flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2484         format = src->segment.format;
2485         last_stop = src->segment.last_stop;
2486
2487         if (flag_segment) {
2488           GstMessage *message;
2489
2490           message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2491               format, last_stop);
2492           gst_message_set_seqnum (message, src->priv->seqnum);
2493           gst_element_post_message (GST_ELEMENT_CAST (src), message);
2494         } else {
2495           event = gst_event_new_eos ();
2496           gst_event_set_seqnum (event, src->priv->seqnum);
2497           gst_pad_push_event (pad, event);
2498           src->priv->last_sent_eos = TRUE;
2499         }
2500       } else {
2501         event = gst_event_new_eos ();
2502         gst_event_set_seqnum (event, src->priv->seqnum);
2503         /* for fatal errors we post an error message, post the error
2504          * first so the app knows about the error first. */
2505         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2506             (_("Internal data flow error.")),
2507             ("streaming task paused, reason %s (%d)", reason, ret));
2508         gst_pad_push_event (pad, event);
2509         src->priv->last_sent_eos = TRUE;
2510       }
2511     }
2512     goto done;
2513   }
2514 null_buffer:
2515   {
2516     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2517         (_("Internal data flow error.")), ("element returned NULL buffer"));
2518     GST_LIVE_UNLOCK (src);
2519     /* we finished the segment on error */
2520     ret = GST_FLOW_ERROR;
2521     goto done;
2522   }
2523 }
2524
2525 /* default negotiation code.
2526  *
2527  * Take intersection between src and sink pads, take first
2528  * caps and fixate.
2529  */
2530 static gboolean
2531 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2532 {
2533   GstCaps *thiscaps;
2534   GstCaps *caps = NULL;
2535   GstCaps *peercaps = NULL;
2536   gboolean result = FALSE;
2537
2538   /* first see what is possible on our source pad */
2539   thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2540   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2541   /* nothing or anything is allowed, we're done */
2542   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2543     goto no_nego_needed;
2544
2545   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2546     goto no_caps;
2547
2548   /* get the peer caps */
2549   peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2550   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2551   if (peercaps) {
2552     GstCaps *icaps;
2553
2554     /* get intersection */
2555     icaps = gst_caps_intersect (thiscaps, peercaps);
2556     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2557     gst_caps_unref (thiscaps);
2558     gst_caps_unref (peercaps);
2559     if (icaps) {
2560       /* take first (and best, since they are sorted) possibility */
2561       caps = gst_caps_copy_nth (icaps, 0);
2562       gst_caps_unref (icaps);
2563     }
2564   } else {
2565     /* no peer, work with our own caps then */
2566     caps = thiscaps;
2567   }
2568   if (caps) {
2569     caps = gst_caps_make_writable (caps);
2570     gst_caps_truncate (caps);
2571
2572     /* now fixate */
2573     if (!gst_caps_is_empty (caps)) {
2574       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2575       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2576
2577       if (gst_caps_is_any (caps)) {
2578         /* hmm, still anything, so element can do anything and
2579          * nego is not needed */
2580         result = TRUE;
2581       } else if (gst_caps_is_fixed (caps)) {
2582         /* yay, fixed caps, use those then, it's possible that the subclass does
2583          * not accept this caps after all and we have to fail. */
2584         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2585       }
2586     }
2587     gst_caps_unref (caps);
2588   } else {
2589     GST_DEBUG_OBJECT (basesrc, "no common caps");
2590   }
2591   return result;
2592
2593 no_nego_needed:
2594   {
2595     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2596     if (thiscaps)
2597       gst_caps_unref (thiscaps);
2598     return TRUE;
2599   }
2600 no_caps:
2601   {
2602     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2603         ("No supported formats found"),
2604         ("This element did not produce valid caps"));
2605     if (thiscaps)
2606       gst_caps_unref (thiscaps);
2607     return TRUE;
2608   }
2609 }
2610
2611 static gboolean
2612 gst_base_src_negotiate (GstBaseSrc * basesrc)
2613 {
2614   GstBaseSrcClass *bclass;
2615   gboolean result = TRUE;
2616
2617   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2618
2619   if (bclass->negotiate)
2620     result = bclass->negotiate (basesrc);
2621
2622   return result;
2623 }
2624
2625 static gboolean
2626 gst_base_src_start (GstBaseSrc * basesrc)
2627 {
2628   GstBaseSrcClass *bclass;
2629   gboolean result;
2630   guint64 size;
2631   gboolean seekable;
2632   GstFormat format;
2633
2634   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2635     return TRUE;
2636
2637   GST_DEBUG_OBJECT (basesrc, "starting source");
2638
2639   basesrc->num_buffers_left = basesrc->num_buffers;
2640
2641   GST_OBJECT_LOCK (basesrc);
2642   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2643   GST_OBJECT_UNLOCK (basesrc);
2644
2645   basesrc->data.ABI.running = FALSE;
2646
2647   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2648   if (bclass->start)
2649     result = bclass->start (basesrc);
2650   else
2651     result = TRUE;
2652
2653   if (!result)
2654     goto could_not_start;
2655
2656   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2657
2658   format = basesrc->segment.format;
2659
2660   /* figure out the size */
2661   if (format == GST_FORMAT_BYTES) {
2662     if (bclass->get_size) {
2663       if (!(result = bclass->get_size (basesrc, &size)))
2664         size = -1;
2665     } else {
2666       result = FALSE;
2667       size = -1;
2668     }
2669     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2670     /* only update the size when operating in bytes, subclass is supposed
2671      * to set duration in the start method for other formats */
2672     GST_OBJECT_LOCK (basesrc);
2673     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2674     GST_OBJECT_UNLOCK (basesrc);
2675   } else {
2676     size = -1;
2677   }
2678
2679   GST_DEBUG_OBJECT (basesrc,
2680       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2681       G_GINT64_FORMAT, format, result, size, basesrc->segment.duration);
2682
2683   seekable = gst_base_src_seekable (basesrc);
2684   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2685
2686   /* update for random access flag */
2687   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2688
2689   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2690
2691   /* run typefind if we are random_access and the typefinding is enabled. */
2692   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2693     GstCaps *caps;
2694
2695     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2696       goto typefind_failed;
2697
2698     result = gst_pad_set_caps (basesrc->srcpad, caps);
2699     gst_caps_unref (caps);
2700   } else {
2701     /* use class or default negotiate function */
2702     if (!(result = gst_base_src_negotiate (basesrc)))
2703       goto could_not_negotiate;
2704   }
2705
2706   return result;
2707
2708   /* ERROR */
2709 could_not_start:
2710   {
2711     GST_DEBUG_OBJECT (basesrc, "could not start");
2712     /* subclass is supposed to post a message. We don't have to call _stop. */
2713     return FALSE;
2714   }
2715 could_not_negotiate:
2716   {
2717     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2718     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2719         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2720     /* we must call stop */
2721     gst_base_src_stop (basesrc);
2722     return FALSE;
2723   }
2724 typefind_failed:
2725   {
2726     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2727     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2728     /* we must call stop */
2729     gst_base_src_stop (basesrc);
2730     return FALSE;
2731   }
2732 }
2733
2734 static gboolean
2735 gst_base_src_stop (GstBaseSrc * basesrc)
2736 {
2737   GstBaseSrcClass *bclass;
2738   gboolean result = TRUE;
2739
2740   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2741     return TRUE;
2742
2743   GST_DEBUG_OBJECT (basesrc, "stopping source");
2744
2745   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2746   if (bclass->stop)
2747     result = bclass->stop (basesrc);
2748
2749   if (result)
2750     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2751
2752   return result;
2753 }
2754
2755 /* start or stop flushing dataprocessing
2756  */
2757 static gboolean
2758 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2759     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2760 {
2761   GstBaseSrcClass *bclass;
2762
2763   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2764
2765   if (flushing && unlock) {
2766     /* unlock any subclasses, we need to do this before grabbing the
2767      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2768      * unlock to the params because of backwards compat (see seek handler)*/
2769     if (bclass->unlock)
2770       bclass->unlock (basesrc);
2771   }
2772
2773   /* the live lock is released when we are blocked, waiting for playing or
2774    * when we sync to the clock. */
2775   GST_LIVE_LOCK (basesrc);
2776   if (playing)
2777     *playing = basesrc->live_running;
2778   basesrc->priv->flushing = flushing;
2779   if (flushing) {
2780     /* if we are locked in the live lock, signal it to make it flush */
2781     basesrc->live_running = TRUE;
2782
2783     /* clear pending EOS if any */
2784     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2785
2786     /* step 1, now that we have the LIVE lock, clear our unlock request */
2787     if (bclass->unlock_stop)
2788       bclass->unlock_stop (basesrc);
2789
2790     /* step 2, unblock clock sync (if any) or any other blocking thing */
2791     if (basesrc->clock_id)
2792       gst_clock_id_unschedule (basesrc->clock_id);
2793   } else {
2794     /* signal the live source that it can start playing */
2795     basesrc->live_running = live_play;
2796   }
2797   GST_LIVE_SIGNAL (basesrc);
2798   GST_LIVE_UNLOCK (basesrc);
2799
2800   return TRUE;
2801 }
2802
2803 /* the purpose of this function is to make sure that a live source blocks in the
2804  * LIVE lock or leaves the LIVE lock and continues playing. */
2805 static gboolean
2806 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2807 {
2808   GstBaseSrcClass *bclass;
2809
2810   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2811
2812   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2813   if (!live_play) {
2814     GST_DEBUG_OBJECT (basesrc, "unlock");
2815     if (bclass->unlock)
2816       bclass->unlock (basesrc);
2817   }
2818
2819   /* we are now able to grab the LIVE lock, when we get it, we can be
2820    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2821    * for the clock. */
2822   GST_LIVE_LOCK (basesrc);
2823   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2824
2825   /* unblock clock sync (if any) */
2826   if (basesrc->clock_id)
2827     gst_clock_id_unschedule (basesrc->clock_id);
2828
2829   /* configure what to do when we get to the LIVE lock. */
2830   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2831   basesrc->live_running = live_play;
2832
2833   if (live_play) {
2834     gboolean start;
2835
2836     /* clear our unlock request when going to PLAYING */
2837     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2838     if (bclass->unlock_stop)
2839       bclass->unlock_stop (basesrc);
2840
2841     /* for live sources we restart the timestamp correction */
2842     basesrc->priv->latency = -1;
2843     /* have to restart the task in case it stopped because of the unlock when
2844      * we went to PAUSED. Only do this if we operating in push mode. */
2845     GST_OBJECT_LOCK (basesrc->srcpad);
2846     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2847     GST_OBJECT_UNLOCK (basesrc->srcpad);
2848     if (start)
2849       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2850           basesrc->srcpad);
2851     GST_DEBUG_OBJECT (basesrc, "signal");
2852     GST_LIVE_SIGNAL (basesrc);
2853   }
2854   GST_LIVE_UNLOCK (basesrc);
2855
2856   return TRUE;
2857 }
2858
2859 static gboolean
2860 gst_base_src_activate_push (GstPad * pad, gboolean active)
2861 {
2862   GstBaseSrc *basesrc;
2863   GstEvent *event;
2864
2865   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2866
2867   /* prepare subclass first */
2868   if (active) {
2869     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2870
2871     if (G_UNLIKELY (!basesrc->can_activate_push))
2872       goto no_push_activation;
2873
2874     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2875       goto error_start;
2876
2877     basesrc->priv->last_sent_eos = FALSE;
2878     basesrc->priv->discont = TRUE;
2879     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2880
2881     /* do initial seek, which will start the task */
2882     GST_OBJECT_LOCK (basesrc);
2883     event = basesrc->data.ABI.pending_seek;
2884     basesrc->data.ABI.pending_seek = NULL;
2885     GST_OBJECT_UNLOCK (basesrc);
2886
2887     /* no need to unlock anything, the task is certainly
2888      * not running here. The perform seek code will start the task when
2889      * finished. */
2890     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2891       goto seek_failed;
2892
2893     if (event)
2894       gst_event_unref (event);
2895   } else {
2896     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2897     /* flush all */
2898     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2899     /* stop the task */
2900     gst_pad_stop_task (pad);
2901     /* now we can stop the source */
2902     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2903       goto error_stop;
2904   }
2905   return TRUE;
2906
2907   /* ERRORS */
2908 no_push_activation:
2909   {
2910     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2911     return FALSE;
2912   }
2913 error_start:
2914   {
2915     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2916     return FALSE;
2917   }
2918 seek_failed:
2919   {
2920     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2921     /* flush all */
2922     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2923     /* stop the task */
2924     gst_pad_stop_task (pad);
2925     /* Stop the basesrc */
2926     gst_base_src_stop (basesrc);
2927     if (event)
2928       gst_event_unref (event);
2929     return FALSE;
2930   }
2931 error_stop:
2932   {
2933     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2934     return FALSE;
2935   }
2936 }
2937
2938 static gboolean
2939 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2940 {
2941   GstBaseSrc *basesrc;
2942
2943   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2944
2945   /* prepare subclass first */
2946   if (active) {
2947     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2948     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2949       goto error_start;
2950
2951     /* if not random_access, we cannot operate in pull mode for now */
2952     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2953       goto no_get_range;
2954
2955     /* stop flushing now but for live sources, still block in the LIVE lock when
2956      * we are not yet PLAYING */
2957     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2958   } else {
2959     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2960     /* flush all, there is no task to stop */
2961     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2962
2963     /* don't send EOS when going from PAUSED => READY when in pull mode */
2964     basesrc->priv->last_sent_eos = TRUE;
2965
2966     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2967       goto error_stop;
2968   }
2969   return TRUE;
2970
2971   /* ERRORS */
2972 error_start:
2973   {
2974     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2975     return FALSE;
2976   }
2977 no_get_range:
2978   {
2979     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2980     gst_base_src_stop (basesrc);
2981     return FALSE;
2982   }
2983 error_stop:
2984   {
2985     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2986     return FALSE;
2987   }
2988 }
2989
2990 static GstStateChangeReturn
2991 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2992 {
2993   GstBaseSrc *basesrc;
2994   GstStateChangeReturn result;
2995   gboolean no_preroll = FALSE;
2996
2997   basesrc = GST_BASE_SRC (element);
2998
2999   switch (transition) {
3000     case GST_STATE_CHANGE_NULL_TO_READY:
3001       break;
3002     case GST_STATE_CHANGE_READY_TO_PAUSED:
3003       no_preroll = gst_base_src_is_live (basesrc);
3004       break;
3005     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3006       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3007       if (gst_base_src_is_live (basesrc)) {
3008         /* now we can start playback */
3009         gst_base_src_set_playing (basesrc, TRUE);
3010       }
3011       break;
3012     default:
3013       break;
3014   }
3015
3016   if ((result =
3017           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3018               transition)) == GST_STATE_CHANGE_FAILURE)
3019     goto failure;
3020
3021   switch (transition) {
3022     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3023       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3024       if (gst_base_src_is_live (basesrc)) {
3025         /* make sure we block in the live lock in PAUSED */
3026         gst_base_src_set_playing (basesrc, FALSE);
3027         no_preroll = TRUE;
3028       }
3029       break;
3030     case GST_STATE_CHANGE_PAUSED_TO_READY:
3031     {
3032       GstEvent **event_p, *event;
3033
3034       /* we don't need to unblock anything here, the pad deactivation code
3035        * already did this */
3036
3037       /* FIXME, deprecate this behaviour, it is very dangerous.
3038        * the prefered way of sending EOS downstream is by sending
3039        * the EOS event to the element */
3040       if (!basesrc->priv->last_sent_eos) {
3041         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
3042         event = gst_event_new_eos ();
3043         gst_event_set_seqnum (event, basesrc->priv->seqnum);
3044         gst_pad_push_event (basesrc->srcpad, event);
3045         basesrc->priv->last_sent_eos = TRUE;
3046       }
3047       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3048       event_p = &basesrc->data.ABI.pending_seek;
3049       gst_event_replace (event_p, NULL);
3050       event_p = &basesrc->priv->close_segment;
3051       gst_event_replace (event_p, NULL);
3052       event_p = &basesrc->priv->start_segment;
3053       gst_event_replace (event_p, NULL);
3054       break;
3055     }
3056     case GST_STATE_CHANGE_READY_TO_NULL:
3057       break;
3058     default:
3059       break;
3060   }
3061
3062   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3063     result = GST_STATE_CHANGE_NO_PREROLL;
3064
3065   return result;
3066
3067   /* ERRORS */
3068 failure:
3069   {
3070     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3071     return result;
3072   }
3073 }