basesrc: do not set first buffer timestamp to 0 for live sources
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include <gst/gst_private.h>
169
170 #include "gstbasesrc.h"
171 #include "gsttypefindhelper.h"
172 #include <gst/gstmarshal.h>
173 #include <gst/gst-i18n-lib.h>
174
175 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
176 #define GST_CAT_DEFAULT gst_base_src_debug
177
178 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
179 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
181 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
182 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
183 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
184 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
185                                                                                 timeval)
186 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
187 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
188
189 /* BaseSrc signals and args */
190 enum
191 {
192   /* FILL ME */
193   LAST_SIGNAL
194 };
195
196 #define DEFAULT_BLOCKSIZE       4096
197 #define DEFAULT_NUM_BUFFERS     -1
198 #define DEFAULT_TYPEFIND        FALSE
199 #define DEFAULT_DO_TIMESTAMP    FALSE
200
201 enum
202 {
203   PROP_0,
204   PROP_BLOCKSIZE,
205   PROP_NUM_BUFFERS,
206   PROP_TYPEFIND,
207   PROP_DO_TIMESTAMP
208 };
209
210 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
211    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
212
213 struct _GstBaseSrcPrivate
214 {
215   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
216                                  * to avoid the sending of two EOS in some cases) */
217   gboolean discont;
218   gboolean flushing;
219
220   /* two segments to be sent in the streaming thread with STREAM_LOCK */
221   GstEvent *close_segment;
222   GstEvent *start_segment;
223   gboolean newsegment_pending;
224
225   /* if EOS is pending (atomic) */
226   gint pending_eos;
227
228   /* startup latency is the time it takes between going to PLAYING and producing
229    * the first BUFFER with running_time 0. This value is included in the latency
230    * reporting. */
231   GstClockTime latency;
232   /* timestamp offset, this is the offset add to the values of gst_times for
233    * pseudo live sources */
234   GstClockTimeDiff ts_offset;
235
236   gboolean do_timestamp;
237
238   /* stream sequence number */
239   guint32 seqnum;
240
241   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
242    * pushed in the data stream */
243   GList *pending_events;
244   volatile gint have_events;
245
246   /* QoS *//* with LOCK */
247   gboolean qos_enabled;
248   gdouble proportion;
249   GstClockTime earliest_time;
250 };
251
252 static GstElementClass *parent_class = NULL;
253
254 static void gst_base_src_base_init (gpointer g_class);
255 static void gst_base_src_class_init (GstBaseSrcClass * klass);
256 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
257 static void gst_base_src_finalize (GObject * object);
258
259
260 GType
261 gst_base_src_get_type (void)
262 {
263   static volatile gsize base_src_type = 0;
264
265   if (g_once_init_enter (&base_src_type)) {
266     GType _type;
267     static const GTypeInfo base_src_info = {
268       sizeof (GstBaseSrcClass),
269       (GBaseInitFunc) gst_base_src_base_init,
270       NULL,
271       (GClassInitFunc) gst_base_src_class_init,
272       NULL,
273       NULL,
274       sizeof (GstBaseSrc),
275       0,
276       (GInstanceInitFunc) gst_base_src_init,
277     };
278
279     _type = g_type_register_static (GST_TYPE_ELEMENT,
280         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
281     g_once_init_leave (&base_src_type, _type);
282   }
283   return base_src_type;
284 }
285
286 static GstCaps *gst_base_src_getcaps (GstPad * pad);
287 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
288 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
289
290 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
291 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
292 static void gst_base_src_set_property (GObject * object, guint prop_id,
293     const GValue * value, GParamSpec * pspec);
294 static void gst_base_src_get_property (GObject * object, guint prop_id,
295     GValue * value, GParamSpec * pspec);
296 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
297 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
298 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
299 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
300
301 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
302
303 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
304 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
305     GstSegment * segment);
306 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
307 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
308     GstEvent * event, GstSegment * segment);
309
310 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
311     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
312 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
313 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
314
315 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
316     GstStateChange transition);
317
318 static void gst_base_src_loop (GstPad * pad);
319 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
320 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
321 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
322     guint length, GstBuffer ** buf);
323 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
324     guint length, GstBuffer ** buf);
325 static gboolean gst_base_src_seekable (GstBaseSrc * src);
326
327 static void
328 gst_base_src_base_init (gpointer g_class)
329 {
330   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
331 }
332
333 static void
334 gst_base_src_class_init (GstBaseSrcClass * klass)
335 {
336   GObjectClass *gobject_class;
337   GstElementClass *gstelement_class;
338
339   gobject_class = G_OBJECT_CLASS (klass);
340   gstelement_class = GST_ELEMENT_CLASS (klass);
341
342   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
343
344   parent_class = g_type_class_peek_parent (klass);
345
346   gobject_class->finalize = gst_base_src_finalize;
347   gobject_class->set_property = gst_base_src_set_property;
348   gobject_class->get_property = gst_base_src_get_property;
349
350 /* FIXME 0.11: blocksize property should be int, not ulong (min is >max here) */
351   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
352       g_param_spec_ulong ("blocksize", "Block size",
353           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
354           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
355   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
356       g_param_spec_int ("num-buffers", "num-buffers",
357           "Number of buffers to output before sending EOS (-1 = unlimited)",
358           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
359           G_PARAM_STATIC_STRINGS));
360   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
361       g_param_spec_boolean ("typefind", "Typefind",
362           "Run typefind before negotiating", DEFAULT_TYPEFIND,
363           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
364   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
365       g_param_spec_boolean ("do-timestamp", "Do timestamp",
366           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
367           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
368
369   gstelement_class->change_state =
370       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
371   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
372   gstelement_class->get_query_types =
373       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
374
375   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
376   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
377   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
378   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
379   klass->check_get_range =
380       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
381   klass->prepare_seek_segment =
382       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
383
384   /* Registering debug symbols for function pointers */
385   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
386   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
387   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
388   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
389   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
390   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
391   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
392   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
393   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
394 }
395
396 static void
397 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
398 {
399   GstPad *pad;
400   GstPadTemplate *pad_template;
401
402   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
403
404   basesrc->is_live = FALSE;
405   basesrc->live_lock = g_mutex_new ();
406   basesrc->live_cond = g_cond_new ();
407   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
408   basesrc->num_buffers_left = -1;
409
410   basesrc->can_activate_push = TRUE;
411   basesrc->pad_mode = GST_ACTIVATE_NONE;
412
413   pad_template =
414       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
415   g_return_if_fail (pad_template != NULL);
416
417   GST_DEBUG_OBJECT (basesrc, "creating src pad");
418   pad = gst_pad_new_from_template (pad_template, "src");
419
420   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
421   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
422   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
423   gst_pad_set_event_function (pad, gst_base_src_event_handler);
424   gst_pad_set_query_function (pad, gst_base_src_query);
425   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
426   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
427   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
428   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
429   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
430
431   /* hold pointer to pad */
432   basesrc->srcpad = pad;
433   GST_DEBUG_OBJECT (basesrc, "adding src pad");
434   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
435
436   basesrc->blocksize = DEFAULT_BLOCKSIZE;
437   basesrc->clock_id = NULL;
438   /* we operate in BYTES by default */
439   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
440   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
441   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
442   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
443
444   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
445   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE);
446
447   GST_DEBUG_OBJECT (basesrc, "init done");
448 }
449
450 static void
451 gst_base_src_finalize (GObject * object)
452 {
453   GstBaseSrc *basesrc;
454   GstEvent **event_p;
455
456   basesrc = GST_BASE_SRC (object);
457
458   g_mutex_free (basesrc->live_lock);
459   g_cond_free (basesrc->live_cond);
460
461   event_p = &basesrc->data.ABI.pending_seek;
462   gst_event_replace (event_p, NULL);
463
464   if (basesrc->priv->pending_events) {
465     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
466         NULL);
467     g_list_free (basesrc->priv->pending_events);
468   }
469
470   G_OBJECT_CLASS (parent_class)->finalize (object);
471 }
472
473 /**
474  * gst_base_src_wait_playing:
475  * @src: the src
476  *
477  * If the #GstBaseSrcClass.create() method performs its own synchronisation
478  * against the clock it must unblock when going from PLAYING to the PAUSED state
479  * and call this method before continuing to produce the remaining data.
480  *
481  * This function will block until a state change to PLAYING happens (in which
482  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
483  * to a state change to READY or a FLUSH event (in which case this function
484  * returns #GST_FLOW_WRONG_STATE).
485  *
486  * Since: 0.10.12
487  *
488  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
489  * continue. Any other return value should be returned from the create vmethod.
490  */
491 GstFlowReturn
492 gst_base_src_wait_playing (GstBaseSrc * src)
493 {
494   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
495
496   do {
497     /* block until the state changes, or we get a flush, or something */
498     GST_DEBUG_OBJECT (src, "live source waiting for running state");
499     GST_LIVE_WAIT (src);
500     GST_DEBUG_OBJECT (src, "live source unlocked");
501     if (src->priv->flushing)
502       goto flushing;
503   } while (G_UNLIKELY (!src->live_running));
504
505   return GST_FLOW_OK;
506
507   /* ERRORS */
508 flushing:
509   {
510     GST_DEBUG_OBJECT (src, "we are flushing");
511     return GST_FLOW_WRONG_STATE;
512   }
513 }
514
515 /**
516  * gst_base_src_set_live:
517  * @src: base source instance
518  * @live: new live-mode
519  *
520  * If the element listens to a live source, @live should
521  * be set to %TRUE.
522  *
523  * A live source will not produce data in the PAUSED state and
524  * will therefore not be able to participate in the PREROLL phase
525  * of a pipeline. To signal this fact to the application and the
526  * pipeline, the state change return value of the live source will
527  * be GST_STATE_CHANGE_NO_PREROLL.
528  */
529 void
530 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
531 {
532   g_return_if_fail (GST_IS_BASE_SRC (src));
533
534   GST_OBJECT_LOCK (src);
535   src->is_live = live;
536   GST_OBJECT_UNLOCK (src);
537 }
538
539 /**
540  * gst_base_src_is_live:
541  * @src: base source instance
542  *
543  * Check if an element is in live mode.
544  *
545  * Returns: %TRUE if element is in live mode.
546  */
547 gboolean
548 gst_base_src_is_live (GstBaseSrc * src)
549 {
550   gboolean result;
551
552   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
553
554   GST_OBJECT_LOCK (src);
555   result = src->is_live;
556   GST_OBJECT_UNLOCK (src);
557
558   return result;
559 }
560
561 /**
562  * gst_base_src_set_format:
563  * @src: base source instance
564  * @format: the format to use
565  *
566  * Sets the default format of the source. This will be the format used
567  * for sending NEW_SEGMENT events and for performing seeks.
568  *
569  * If a format of GST_FORMAT_BYTES is set, the element will be able to
570  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
571  *
572  * This function must only be called in states < %GST_STATE_PAUSED.
573  *
574  * Since: 0.10.1
575  */
576 void
577 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
578 {
579   g_return_if_fail (GST_IS_BASE_SRC (src));
580   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
581
582   GST_OBJECT_LOCK (src);
583   gst_segment_init (&src->segment, format);
584   GST_OBJECT_UNLOCK (src);
585 }
586
587 /**
588  * gst_base_src_query_latency:
589  * @src: the source
590  * @live: (out) (allow-none): if the source is live
591  * @min_latency: (out) (allow-none): the min latency of the source
592  * @max_latency: (out) (allow-none): the max latency of the source
593  *
594  * Query the source for the latency parameters. @live will be TRUE when @src is
595  * configured as a live source. @min_latency will be set to the difference
596  * between the running time and the timestamp of the first buffer.
597  * @max_latency is always the undefined value of -1.
598  *
599  * This function is mostly used by subclasses.
600  *
601  * Returns: TRUE if the query succeeded.
602  *
603  * Since: 0.10.13
604  */
605 gboolean
606 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
607     GstClockTime * min_latency, GstClockTime * max_latency)
608 {
609   GstClockTime min;
610
611   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
612
613   GST_OBJECT_LOCK (src);
614   if (live)
615     *live = src->is_live;
616
617   /* if we have a startup latency, report this one, else report 0. Subclasses
618    * are supposed to override the query function if they want something
619    * else. */
620   if (src->priv->latency != -1)
621     min = src->priv->latency;
622   else
623     min = 0;
624
625   if (min_latency)
626     *min_latency = min;
627   if (max_latency)
628     *max_latency = -1;
629
630   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
631       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
632       GST_TIME_ARGS (-1));
633   GST_OBJECT_UNLOCK (src);
634
635   return TRUE;
636 }
637
638 /**
639  * gst_base_src_set_blocksize:
640  * @src: the source
641  * @blocksize: the new blocksize in bytes
642  *
643  * Set the number of bytes that @src will push out with each buffer. When
644  * @blocksize is set to -1, a default length will be used.
645  *
646  * Since: 0.10.22
647  */
648 /* FIXME 0.11: blocksize property should be int, not ulong */
649 void
650 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
651 {
652   g_return_if_fail (GST_IS_BASE_SRC (src));
653
654   GST_OBJECT_LOCK (src);
655   src->blocksize = blocksize;
656   GST_OBJECT_UNLOCK (src);
657 }
658
659 /**
660  * gst_base_src_get_blocksize:
661  * @src: the source
662  *
663  * Get the number of bytes that @src will push out with each buffer.
664  *
665  * Returns: the number of bytes pushed with each buffer.
666  *
667  * Since: 0.10.22
668  */
669 /* FIXME 0.11: blocksize property should be int, not ulong */
670 gulong
671 gst_base_src_get_blocksize (GstBaseSrc * src)
672 {
673   gulong res;
674
675   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
676
677   GST_OBJECT_LOCK (src);
678   res = src->blocksize;
679   GST_OBJECT_UNLOCK (src);
680
681   return res;
682 }
683
684
685 /**
686  * gst_base_src_set_do_timestamp:
687  * @src: the source
688  * @timestamp: enable or disable timestamping
689  *
690  * Configure @src to automatically timestamp outgoing buffers based on the
691  * current running_time of the pipeline. This property is mostly useful for live
692  * sources.
693  *
694  * Since: 0.10.15
695  */
696 void
697 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
698 {
699   g_return_if_fail (GST_IS_BASE_SRC (src));
700
701   GST_OBJECT_LOCK (src);
702   src->priv->do_timestamp = timestamp;
703   GST_OBJECT_UNLOCK (src);
704 }
705
706 /**
707  * gst_base_src_get_do_timestamp:
708  * @src: the source
709  *
710  * Query if @src timestamps outgoing buffers based on the current running_time.
711  *
712  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
713  *
714  * Since: 0.10.15
715  */
716 gboolean
717 gst_base_src_get_do_timestamp (GstBaseSrc * src)
718 {
719   gboolean res;
720
721   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
722
723   GST_OBJECT_LOCK (src);
724   res = src->priv->do_timestamp;
725   GST_OBJECT_UNLOCK (src);
726
727   return res;
728 }
729
730 /**
731  * gst_base_src_new_seamless_segment:
732  * @src: The source
733  * @start: The new start value for the segment
734  * @stop: Stop value for the new segment
735  * @position: The position value for the new segent
736  *
737  * Prepare a new seamless segment for emission downstream. This function must
738  * only be called by derived sub-classes, and only from the create() function,
739  * as the stream-lock needs to be held.
740  *
741  * The format for the new segment will be the current format of the source, as
742  * configured with gst_base_src_set_format()
743  *
744  * Returns: %TRUE if preparation of the seamless segment succeeded.
745  *
746  * Since: 0.10.26
747  */
748 gboolean
749 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
750     gint64 position)
751 {
752   gboolean res = TRUE;
753
754   GST_DEBUG_OBJECT (src,
755       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
756       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
757       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
758
759   GST_OBJECT_LOCK (src);
760   if (src->data.ABI.running && !src->priv->newsegment_pending) {
761     if (src->priv->close_segment)
762       gst_event_unref (src->priv->close_segment);
763     src->priv->close_segment =
764         gst_event_new_new_segment_full (TRUE,
765         src->segment.rate, src->segment.applied_rate, src->segment.format,
766         src->segment.start, src->segment.last_stop, src->segment.time);
767   }
768
769   gst_segment_set_newsegment_full (&src->segment, FALSE, src->segment.rate,
770       src->segment.applied_rate, src->segment.format, start, stop, position);
771
772   if (src->priv->start_segment)
773     gst_event_unref (src->priv->start_segment);
774   if (src->segment.rate >= 0.0) {
775     /* forward, we send data from last_stop to stop */
776     src->priv->start_segment =
777         gst_event_new_new_segment_full (FALSE,
778         src->segment.rate, src->segment.applied_rate, src->segment.format,
779         src->segment.last_stop, stop, src->segment.time);
780   } else {
781     /* reverse, we send data from last_stop to start */
782     src->priv->start_segment =
783         gst_event_new_new_segment_full (FALSE,
784         src->segment.rate, src->segment.applied_rate, src->segment.format,
785         src->segment.start, src->segment.last_stop, src->segment.time);
786   }
787   GST_OBJECT_UNLOCK (src);
788
789   src->priv->discont = TRUE;
790   src->data.ABI.running = TRUE;
791
792   return res;
793 }
794
795 static gboolean
796 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
797 {
798   GstBaseSrcClass *bclass;
799   GstBaseSrc *bsrc;
800   gboolean res = TRUE;
801
802   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
803   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
804
805   if (bclass->set_caps)
806     res = bclass->set_caps (bsrc, caps);
807
808   return res;
809 }
810
811 static GstCaps *
812 gst_base_src_getcaps (GstPad * pad)
813 {
814   GstBaseSrcClass *bclass;
815   GstBaseSrc *bsrc;
816   GstCaps *caps = NULL;
817
818   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
819   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
820   if (bclass->get_caps)
821     caps = bclass->get_caps (bsrc);
822
823   if (caps == NULL) {
824     GstPadTemplate *pad_template;
825
826     pad_template =
827         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
828     if (pad_template != NULL) {
829       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
830     }
831   }
832   return caps;
833 }
834
835 static void
836 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
837 {
838   GstBaseSrcClass *bclass;
839   GstBaseSrc *bsrc;
840
841   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
842   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
843
844   if (bclass->fixate)
845     bclass->fixate (bsrc, caps);
846
847   gst_object_unref (bsrc);
848 }
849
850 static gboolean
851 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
852 {
853   gboolean res;
854
855   switch (GST_QUERY_TYPE (query)) {
856     case GST_QUERY_POSITION:
857     {
858       GstFormat format;
859
860       gst_query_parse_position (query, &format, NULL);
861
862       GST_DEBUG_OBJECT (src, "position query in format %s",
863           gst_format_get_name (format));
864
865       switch (format) {
866         case GST_FORMAT_PERCENT:
867         {
868           gint64 percent;
869           gint64 position;
870           gint64 duration;
871
872           GST_OBJECT_LOCK (src);
873           position = src->segment.last_stop;
874           duration = src->segment.duration;
875           GST_OBJECT_UNLOCK (src);
876
877           if (position != -1 && duration != -1) {
878             if (position < duration)
879               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
880                   duration);
881             else
882               percent = GST_FORMAT_PERCENT_MAX;
883           } else
884             percent = -1;
885
886           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
887           res = TRUE;
888           break;
889         }
890         default:
891         {
892           gint64 position;
893           GstFormat seg_format;
894
895           GST_OBJECT_LOCK (src);
896           position =
897               gst_segment_to_stream_time (&src->segment, src->segment.format,
898               src->segment.last_stop);
899           seg_format = src->segment.format;
900           GST_OBJECT_UNLOCK (src);
901
902           if (position != -1) {
903             /* convert to requested format */
904             res =
905                 gst_pad_query_convert (src->srcpad, seg_format,
906                 position, &format, &position);
907           } else
908             res = TRUE;
909
910           gst_query_set_position (query, format, position);
911           break;
912         }
913       }
914       break;
915     }
916     case GST_QUERY_DURATION:
917     {
918       GstFormat format;
919
920       gst_query_parse_duration (query, &format, NULL);
921
922       GST_DEBUG_OBJECT (src, "duration query in format %s",
923           gst_format_get_name (format));
924
925       switch (format) {
926         case GST_FORMAT_PERCENT:
927           gst_query_set_duration (query, GST_FORMAT_PERCENT,
928               GST_FORMAT_PERCENT_MAX);
929           res = TRUE;
930           break;
931         default:
932         {
933           gint64 duration;
934           GstFormat seg_format;
935
936           GST_OBJECT_LOCK (src);
937           /* this is the duration as configured by the subclass. */
938           duration = src->segment.duration;
939           seg_format = src->segment.format;
940           GST_OBJECT_UNLOCK (src);
941
942           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
943               duration, gst_format_get_name (seg_format));
944
945           if (duration != -1) {
946             /* convert to requested format, if this fails, we have a duration
947              * but we cannot answer the query, we must return FALSE. */
948             res =
949                 gst_pad_query_convert (src->srcpad, seg_format,
950                 duration, &format, &duration);
951           } else {
952             /* The subclass did not configure a duration, we assume that the
953              * media has an unknown duration then and we return TRUE to report
954              * this. Note that this is not the same as returning FALSE, which
955              * means that we cannot report the duration at all. */
956             res = TRUE;
957           }
958           gst_query_set_duration (query, format, duration);
959           break;
960         }
961       }
962       break;
963     }
964
965     case GST_QUERY_SEEKING:
966     {
967       GstFormat format, seg_format;
968       gint64 duration;
969
970       GST_OBJECT_LOCK (src);
971       duration = src->segment.duration;
972       seg_format = src->segment.format;
973       GST_OBJECT_UNLOCK (src);
974
975       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
976       if (format == seg_format) {
977         gst_query_set_seeking (query, seg_format,
978             gst_base_src_seekable (src), 0, duration);
979         res = TRUE;
980       } else {
981         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
982         /* Don't reply to the query to make up for demuxers which don't
983          * handle the SEEKING query yet. Players like Totem will fall back
984          * to the duration when the SEEKING query isn't answered. */
985         res = FALSE;
986       }
987       break;
988     }
989     case GST_QUERY_SEGMENT:
990     {
991       gint64 start, stop;
992
993       GST_OBJECT_LOCK (src);
994       /* no end segment configured, current duration then */
995       if ((stop = src->segment.stop) == -1)
996         stop = src->segment.duration;
997       start = src->segment.start;
998
999       /* adjust to stream time */
1000       if (src->segment.time != -1) {
1001         start -= src->segment.time;
1002         if (stop != -1)
1003           stop -= src->segment.time;
1004       }
1005
1006       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1007           start, stop);
1008       GST_OBJECT_UNLOCK (src);
1009       res = TRUE;
1010       break;
1011     }
1012
1013     case GST_QUERY_FORMATS:
1014     {
1015       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1016           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1017       res = TRUE;
1018       break;
1019     }
1020     case GST_QUERY_CONVERT:
1021     {
1022       GstFormat src_fmt, dest_fmt;
1023       gint64 src_val, dest_val;
1024
1025       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1026
1027       /* we can only convert between equal formats... */
1028       if (src_fmt == dest_fmt) {
1029         dest_val = src_val;
1030         res = TRUE;
1031       } else
1032         res = FALSE;
1033
1034       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1035       break;
1036     }
1037     case GST_QUERY_LATENCY:
1038     {
1039       GstClockTime min, max;
1040       gboolean live;
1041
1042       /* Subclasses should override and implement something usefull */
1043       res = gst_base_src_query_latency (src, &live, &min, &max);
1044
1045       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1046           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1047           GST_TIME_ARGS (max));
1048
1049       gst_query_set_latency (query, live, min, max);
1050       break;
1051     }
1052     case GST_QUERY_JITTER:
1053     case GST_QUERY_RATE:
1054       res = FALSE;
1055       break;
1056     case GST_QUERY_BUFFERING:
1057     {
1058       GstFormat format, seg_format;
1059       gint64 start, stop, estimated;
1060
1061       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1062
1063       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1064           gst_format_get_name (format));
1065
1066       GST_OBJECT_LOCK (src);
1067       if (src->random_access) {
1068         estimated = 0;
1069         start = 0;
1070         if (format == GST_FORMAT_PERCENT)
1071           stop = GST_FORMAT_PERCENT_MAX;
1072         else
1073           stop = src->segment.duration;
1074       } else {
1075         estimated = -1;
1076         start = -1;
1077         stop = -1;
1078       }
1079       seg_format = src->segment.format;
1080       GST_OBJECT_UNLOCK (src);
1081
1082       /* convert to required format. When the conversion fails, we can't answer
1083        * the query. When the value is unknown, we can don't perform conversion
1084        * but report TRUE. */
1085       if (format != GST_FORMAT_PERCENT && stop != -1) {
1086         res = gst_pad_query_convert (src->srcpad, seg_format,
1087             stop, &format, &stop);
1088       } else {
1089         res = TRUE;
1090       }
1091       if (res && format != GST_FORMAT_PERCENT && start != -1)
1092         res = gst_pad_query_convert (src->srcpad, seg_format,
1093             start, &format, &start);
1094
1095       gst_query_set_buffering_range (query, format, start, stop, estimated);
1096       break;
1097     }
1098     default:
1099       res = FALSE;
1100       break;
1101   }
1102   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1103       res);
1104   return res;
1105 }
1106
1107 static gboolean
1108 gst_base_src_query (GstPad * pad, GstQuery * query)
1109 {
1110   GstBaseSrc *src;
1111   GstBaseSrcClass *bclass;
1112   gboolean result = FALSE;
1113
1114   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1115   if (G_UNLIKELY (src == NULL))
1116     return FALSE;
1117
1118   bclass = GST_BASE_SRC_GET_CLASS (src);
1119
1120   if (bclass->query)
1121     result = bclass->query (src, query);
1122   else
1123     result = gst_pad_query_default (pad, query);
1124
1125   gst_object_unref (src);
1126
1127   return result;
1128 }
1129
1130 static gboolean
1131 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1132 {
1133   gboolean res = TRUE;
1134
1135   /* update our offset if the start/stop position was updated */
1136   if (segment->format == GST_FORMAT_BYTES) {
1137     segment->time = segment->start;
1138   } else if (segment->start == 0) {
1139     /* seek to start, we can implement a default for this. */
1140     segment->time = 0;
1141   } else {
1142     res = FALSE;
1143     GST_INFO_OBJECT (src, "Can't do a default seek");
1144   }
1145
1146   return res;
1147 }
1148
1149 static gboolean
1150 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1151 {
1152   GstBaseSrcClass *bclass;
1153   gboolean result = FALSE;
1154
1155   bclass = GST_BASE_SRC_GET_CLASS (src);
1156
1157   if (bclass->do_seek)
1158     result = bclass->do_seek (src, segment);
1159
1160   return result;
1161 }
1162
1163 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1164
1165 static gboolean
1166 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1167     GstSegment * segment)
1168 {
1169   /* By default, we try one of 2 things:
1170    *   - For absolute seek positions, convert the requested position to our
1171    *     configured processing format and place it in the output segment \
1172    *   - For relative seek positions, convert our current (input) values to the
1173    *     seek format, adjust by the relative seek offset and then convert back to
1174    *     the processing format
1175    */
1176   GstSeekType cur_type, stop_type;
1177   gint64 cur, stop;
1178   GstSeekFlags flags;
1179   GstFormat seek_format, dest_format;
1180   gdouble rate;
1181   gboolean update;
1182   gboolean res = TRUE;
1183
1184   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1185       &cur_type, &cur, &stop_type, &stop);
1186   dest_format = segment->format;
1187
1188   if (seek_format == dest_format) {
1189     gst_segment_set_seek (segment, rate, seek_format, flags,
1190         cur_type, cur, stop_type, stop, &update);
1191     return TRUE;
1192   }
1193
1194   if (cur_type != GST_SEEK_TYPE_NONE) {
1195     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1196     res =
1197         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1198         &cur);
1199     cur_type = GST_SEEK_TYPE_SET;
1200   }
1201
1202   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1203     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1204     res =
1205         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1206         &stop);
1207     stop_type = GST_SEEK_TYPE_SET;
1208   }
1209
1210   /* And finally, configure our output segment in the desired format */
1211   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1212       stop_type, stop, &update);
1213
1214   if (!res)
1215     goto no_format;
1216
1217   return res;
1218
1219 no_format:
1220   {
1221     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1222     return FALSE;
1223   }
1224 }
1225
1226 static gboolean
1227 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1228     GstSegment * seeksegment)
1229 {
1230   GstBaseSrcClass *bclass;
1231   gboolean result = FALSE;
1232
1233   bclass = GST_BASE_SRC_GET_CLASS (src);
1234
1235   if (bclass->prepare_seek_segment)
1236     result = bclass->prepare_seek_segment (src, event, seeksegment);
1237
1238   return result;
1239 }
1240
1241 /* this code implements the seeking. It is a good example
1242  * handling all cases.
1243  *
1244  * A seek updates the currently configured segment.start
1245  * and segment.stop values based on the SEEK_TYPE. If the
1246  * segment.start value is updated, a seek to this new position
1247  * should be performed.
1248  *
1249  * The seek can only be executed when we are not currently
1250  * streaming any data, to make sure that this is the case, we
1251  * acquire the STREAM_LOCK which is taken when we are in the
1252  * _loop() function or when a getrange() is called. Normally
1253  * we will not receive a seek if we are operating in pull mode
1254  * though. When we operate as a live source we might block on the live
1255  * cond, which does not release the STREAM_LOCK. Therefore we will try
1256  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1257  * safe to perform the seek.
1258  *
1259  * When we are in the loop() function, we might be in the middle
1260  * of pushing a buffer, which might block in a sink. To make sure
1261  * that the push gets unblocked we push out a FLUSH_START event.
1262  * Our loop function will get a WRONG_STATE return value from
1263  * the push and will pause, effectively releasing the STREAM_LOCK.
1264  *
1265  * For a non-flushing seek, we pause the task, which might eventually
1266  * release the STREAM_LOCK. We say eventually because when the sink
1267  * blocks on the sample we might wait a very long time until the sink
1268  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1269  * can continue the seek. A non-flushing seek is normally done in a
1270  * running pipeline to perform seamless playback, this means that the sink is
1271  * PLAYING and will return from its chain function.
1272  * In the case of a non-flushing seek we need to make sure that the
1273  * data we output after the seek is continuous with the previous data,
1274  * this is because a non-flushing seek does not reset the running-time
1275  * to 0. We do this by closing the currently running segment, ie. sending
1276  * a new_segment event with the stop position set to the last processed
1277  * position.
1278  *
1279  * After updating the segment.start/stop values, we prepare for
1280  * streaming again. We push out a FLUSH_STOP to make the peer pad
1281  * accept data again and we start our task again.
1282  *
1283  * A segment seek posts a message on the bus saying that the playback
1284  * of the segment started. We store the segment flag internally because
1285  * when we reach the segment.stop we have to post a segment.done
1286  * instead of EOS when doing a segment seek.
1287  */
1288 /* FIXME (0.11), we have the unlock gboolean here because most current
1289  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1290  * the streaming thread isn't running, resulting in bogus unlocks later when it
1291  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1292  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1293  * until 0.11
1294  */
1295 static gboolean
1296 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1297 {
1298   gboolean res = TRUE, tres;
1299   gdouble rate;
1300   GstFormat seek_format, dest_format;
1301   GstSeekFlags flags;
1302   GstSeekType cur_type, stop_type;
1303   gint64 cur, stop;
1304   gboolean flush, playing;
1305   gboolean update;
1306   gboolean relative_seek = FALSE;
1307   gboolean seekseg_configured = FALSE;
1308   GstSegment seeksegment;
1309   guint32 seqnum;
1310   GstEvent *tevent;
1311
1312   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1313
1314   GST_OBJECT_LOCK (src);
1315   dest_format = src->segment.format;
1316   GST_OBJECT_UNLOCK (src);
1317
1318   if (event) {
1319     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1320         &cur_type, &cur, &stop_type, &stop);
1321
1322     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1323         SEEK_TYPE_IS_RELATIVE (stop_type);
1324
1325     if (dest_format != seek_format && !relative_seek) {
1326       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1327        * here before taking the stream lock, otherwise we must convert it later,
1328        * once we have the stream lock and can read the last configures segment
1329        * start and stop positions */
1330       gst_segment_init (&seeksegment, dest_format);
1331
1332       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1333         goto prepare_failed;
1334
1335       seekseg_configured = TRUE;
1336     }
1337
1338     flush = flags & GST_SEEK_FLAG_FLUSH;
1339     seqnum = gst_event_get_seqnum (event);
1340   } else {
1341     flush = FALSE;
1342     /* get next seqnum */
1343     seqnum = gst_util_seqnum_next ();
1344   }
1345
1346   /* send flush start */
1347   if (flush) {
1348     tevent = gst_event_new_flush_start ();
1349     gst_event_set_seqnum (tevent, seqnum);
1350     gst_pad_push_event (src->srcpad, tevent);
1351   } else
1352     gst_pad_pause_task (src->srcpad);
1353
1354   /* unblock streaming thread. */
1355   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1356
1357   /* grab streaming lock, this should eventually be possible, either
1358    * because the task is paused, our streaming thread stopped
1359    * or because our peer is flushing. */
1360   GST_PAD_STREAM_LOCK (src->srcpad);
1361   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1362     /* we have seen this event before, issue a warning for now */
1363     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1364         seqnum);
1365   } else {
1366     src->priv->seqnum = seqnum;
1367     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1368   }
1369
1370   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1371
1372   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1373    * copy the current segment info into the temp segment that we can actually
1374    * attempt the seek with. We only update the real segment if the seek suceeds. */
1375   if (!seekseg_configured) {
1376     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1377
1378     /* now configure the final seek segment */
1379     if (event) {
1380       if (seeksegment.format != seek_format) {
1381         /* OK, here's where we give the subclass a chance to convert the relative
1382          * seek into an absolute one in the processing format. We set up any
1383          * absolute seek above, before taking the stream lock. */
1384         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1385           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1386               "Aborting seek");
1387           res = FALSE;
1388         }
1389       } else {
1390         /* The seek format matches our processing format, no need to ask the
1391          * the subclass to configure the segment. */
1392         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1393             cur_type, cur, stop_type, stop, &update);
1394       }
1395     }
1396     /* Else, no seek event passed, so we're just (re)starting the
1397        current segment. */
1398   }
1399
1400   if (res) {
1401     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1402         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1403         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1404
1405     /* do the seek, segment.last_stop contains the new position. */
1406     res = gst_base_src_do_seek (src, &seeksegment);
1407   }
1408
1409   /* and prepare to continue streaming */
1410   if (flush) {
1411     tevent = gst_event_new_flush_stop ();
1412     gst_event_set_seqnum (tevent, seqnum);
1413     /* send flush stop, peer will accept data and events again. We
1414      * are not yet providing data as we still have the STREAM_LOCK. */
1415     gst_pad_push_event (src->srcpad, tevent);
1416   } else if (res && src->data.ABI.running) {
1417     /* we are running the current segment and doing a non-flushing seek,
1418      * close the segment first based on the last_stop. */
1419     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1420         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1421
1422     /* queue the segment for sending in the stream thread */
1423     if (src->priv->close_segment)
1424       gst_event_unref (src->priv->close_segment);
1425     src->priv->close_segment =
1426         gst_event_new_new_segment_full (TRUE,
1427         src->segment.rate, src->segment.applied_rate, src->segment.format,
1428         src->segment.start, src->segment.last_stop, src->segment.time);
1429     gst_event_set_seqnum (src->priv->close_segment, seqnum);
1430   }
1431
1432   /* The subclass must have converted the segment to the processing format
1433    * by now */
1434   if (res && seeksegment.format != dest_format) {
1435     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1436         "in the correct format. Aborting seek.");
1437     res = FALSE;
1438   }
1439
1440   /* if the seek was successful, we update our real segment and push
1441    * out the new segment. */
1442   if (res) {
1443     GST_OBJECT_LOCK (src);
1444     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1445     GST_OBJECT_UNLOCK (src);
1446
1447     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1448       GstMessage *message;
1449
1450       message = gst_message_new_segment_start (GST_OBJECT (src),
1451           seeksegment.format, seeksegment.last_stop);
1452       gst_message_set_seqnum (message, seqnum);
1453
1454       gst_element_post_message (GST_ELEMENT (src), message);
1455     }
1456
1457     /* for deriving a stop position for the playback segment from the seek
1458      * segment, we must take the duration when the stop is not set */
1459     if ((stop = seeksegment.stop) == -1)
1460       stop = seeksegment.duration;
1461
1462     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1463         " to %" G_GINT64_FORMAT, seeksegment.start, stop);
1464
1465     /* now replace the old segment so that we send it in the stream thread the
1466      * next time it is scheduled. */
1467     if (src->priv->start_segment)
1468       gst_event_unref (src->priv->start_segment);
1469     if (seeksegment.rate >= 0.0) {
1470       /* forward, we send data from last_stop to stop */
1471       src->priv->start_segment =
1472           gst_event_new_new_segment_full (FALSE,
1473           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1474           seeksegment.last_stop, stop, seeksegment.time);
1475     } else {
1476       /* reverse, we send data from last_stop to start */
1477       src->priv->start_segment =
1478           gst_event_new_new_segment_full (FALSE,
1479           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1480           seeksegment.start, seeksegment.last_stop, seeksegment.time);
1481     }
1482     gst_event_set_seqnum (src->priv->start_segment, seqnum);
1483     src->priv->newsegment_pending = TRUE;
1484   }
1485
1486   src->priv->discont = TRUE;
1487   src->data.ABI.running = TRUE;
1488   /* and restart the task in case it got paused explicitly or by
1489    * the FLUSH_START event we pushed out. */
1490   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1491       src->srcpad);
1492   if (res && !tres)
1493     res = FALSE;
1494
1495   /* and release the lock again so we can continue streaming */
1496   GST_PAD_STREAM_UNLOCK (src->srcpad);
1497
1498   return res;
1499
1500   /* ERROR */
1501 prepare_failed:
1502   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1503       "Aborting seek");
1504   return FALSE;
1505 }
1506
1507 static const GstQueryType *
1508 gst_base_src_get_query_types (GstElement * element)
1509 {
1510   static const GstQueryType query_types[] = {
1511     GST_QUERY_DURATION,
1512     GST_QUERY_POSITION,
1513     GST_QUERY_SEEKING,
1514     GST_QUERY_SEGMENT,
1515     GST_QUERY_FORMATS,
1516     GST_QUERY_LATENCY,
1517     GST_QUERY_JITTER,
1518     GST_QUERY_RATE,
1519     GST_QUERY_CONVERT,
1520     0
1521   };
1522
1523   return query_types;
1524 }
1525
1526 /* all events send to this element directly. This is mainly done from the
1527  * application.
1528  */
1529 static gboolean
1530 gst_base_src_send_event (GstElement * element, GstEvent * event)
1531 {
1532   GstBaseSrc *src;
1533   gboolean result = FALSE;
1534
1535   src = GST_BASE_SRC (element);
1536
1537   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1538
1539   switch (GST_EVENT_TYPE (event)) {
1540       /* bidirectional events */
1541     case GST_EVENT_FLUSH_START:
1542     case GST_EVENT_FLUSH_STOP:
1543       /* sending random flushes downstream can break stuff,
1544        * especially sync since all segment info will get flushed */
1545       break;
1546
1547       /* downstream serialized events */
1548     case GST_EVENT_EOS:
1549     {
1550       GstBaseSrcClass *bclass;
1551
1552       bclass = GST_BASE_SRC_GET_CLASS (src);
1553
1554       /* queue EOS and make sure the task or pull function performs the EOS
1555        * actions.
1556        *
1557        * We have two possibilities:
1558        *
1559        *  - Before we are to enter the _create function, we check the pending_eos
1560        *    first and do EOS instead of entering it.
1561        *  - If we are in the _create function or we did not manage to set the
1562        *    flag fast enough and we are about to enter the _create function,
1563        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1564        *    check the EOS flag and do the EOS logic.
1565        */
1566       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1567       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1568
1569       /* unlock the _create function so that we can check the pending_eos flag
1570        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1571        * that we can grab it and stop the unlock again. We don't take the stream
1572        * lock so that this operation is guaranteed to never block. */
1573       if (bclass->unlock)
1574         bclass->unlock (src);
1575
1576       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1577
1578       GST_LIVE_LOCK (src);
1579       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1580       /* now stop the unlock of the streaming thread again. Grabbing the live
1581        * lock is enough because that protects the create function. */
1582       if (bclass->unlock_stop)
1583         bclass->unlock_stop (src);
1584       GST_LIVE_UNLOCK (src);
1585
1586       result = TRUE;
1587       break;
1588     }
1589     case GST_EVENT_NEWSEGMENT:
1590       /* sending random NEWSEGMENT downstream can break sync. */
1591       break;
1592     case GST_EVENT_TAG:
1593     case GST_EVENT_CUSTOM_DOWNSTREAM:
1594     case GST_EVENT_CUSTOM_BOTH:
1595       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1596       GST_OBJECT_LOCK (src);
1597       src->priv->pending_events =
1598           g_list_append (src->priv->pending_events, event);
1599       g_atomic_int_set (&src->priv->have_events, TRUE);
1600       GST_OBJECT_UNLOCK (src);
1601       event = NULL;
1602       result = TRUE;
1603       break;
1604     case GST_EVENT_BUFFERSIZE:
1605       /* does not seem to make much sense currently */
1606       break;
1607
1608       /* upstream events */
1609     case GST_EVENT_QOS:
1610       /* elements should override send_event and do something */
1611       break;
1612     case GST_EVENT_SEEK:
1613     {
1614       gboolean started;
1615
1616       GST_OBJECT_LOCK (src->srcpad);
1617       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1618         goto wrong_mode;
1619       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1620       GST_OBJECT_UNLOCK (src->srcpad);
1621
1622       if (started) {
1623         GST_DEBUG_OBJECT (src, "performing seek");
1624         /* when we are running in push mode, we can execute the
1625          * seek right now, we need to unlock. */
1626         result = gst_base_src_perform_seek (src, event, TRUE);
1627       } else {
1628         GstEvent **event_p;
1629
1630         /* else we store the event and execute the seek when we
1631          * get activated */
1632         GST_OBJECT_LOCK (src);
1633         GST_DEBUG_OBJECT (src, "queueing seek");
1634         event_p = &src->data.ABI.pending_seek;
1635         gst_event_replace ((GstEvent **) event_p, event);
1636         GST_OBJECT_UNLOCK (src);
1637         /* assume the seek will work */
1638         result = TRUE;
1639       }
1640       break;
1641     }
1642     case GST_EVENT_NAVIGATION:
1643       /* could make sense for elements that do something with navigation events
1644        * but then they would need to override the send_event function */
1645       break;
1646     case GST_EVENT_LATENCY:
1647       /* does not seem to make sense currently */
1648       break;
1649
1650       /* custom events */
1651     case GST_EVENT_CUSTOM_UPSTREAM:
1652       /* override send_event if you want this */
1653       break;
1654     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1655     case GST_EVENT_CUSTOM_BOTH_OOB:
1656       /* insert a random custom event into the pipeline */
1657       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1658       result = gst_pad_push_event (src->srcpad, event);
1659       /* we gave away the ref to the event in the push */
1660       event = NULL;
1661       break;
1662     default:
1663       break;
1664   }
1665 done:
1666   /* if we still have a ref to the event, unref it now */
1667   if (event)
1668     gst_event_unref (event);
1669
1670   return result;
1671
1672   /* ERRORS */
1673 wrong_mode:
1674   {
1675     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1676     GST_OBJECT_UNLOCK (src->srcpad);
1677     result = FALSE;
1678     goto done;
1679   }
1680 }
1681
1682 static gboolean
1683 gst_base_src_seekable (GstBaseSrc * src)
1684 {
1685   GstBaseSrcClass *bclass;
1686   bclass = GST_BASE_SRC_GET_CLASS (src);
1687   if (bclass->is_seekable)
1688     return bclass->is_seekable (src);
1689   else
1690     return FALSE;
1691 }
1692
1693 static void
1694 gst_base_src_update_qos (GstBaseSrc * src,
1695     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1696 {
1697   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1698       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1699       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1700
1701   GST_OBJECT_LOCK (src);
1702   src->priv->proportion = proportion;
1703   src->priv->earliest_time = timestamp + diff;
1704   GST_OBJECT_UNLOCK (src);
1705 }
1706
1707
1708 static gboolean
1709 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1710 {
1711   gboolean result;
1712
1713   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1714
1715   switch (GST_EVENT_TYPE (event)) {
1716     case GST_EVENT_SEEK:
1717       /* is normally called when in push mode */
1718       if (!gst_base_src_seekable (src))
1719         goto not_seekable;
1720
1721       result = gst_base_src_perform_seek (src, event, TRUE);
1722       break;
1723     case GST_EVENT_FLUSH_START:
1724       /* cancel any blocking getrange, is normally called
1725        * when in pull mode. */
1726       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1727       break;
1728     case GST_EVENT_FLUSH_STOP:
1729       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1730       break;
1731     case GST_EVENT_QOS:
1732     {
1733       gdouble proportion;
1734       GstClockTimeDiff diff;
1735       GstClockTime timestamp;
1736
1737       gst_event_parse_qos (event, &proportion, &diff, &timestamp);
1738       gst_base_src_update_qos (src, proportion, diff, timestamp);
1739       result = TRUE;
1740       break;
1741     }
1742     default:
1743       result = FALSE;
1744       break;
1745   }
1746   return result;
1747
1748   /* ERRORS */
1749 not_seekable:
1750   {
1751     GST_DEBUG_OBJECT (src, "is not seekable");
1752     return FALSE;
1753   }
1754 }
1755
1756 static gboolean
1757 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1758 {
1759   GstBaseSrc *src;
1760   GstBaseSrcClass *bclass;
1761   gboolean result = FALSE;
1762
1763   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1764   if (G_UNLIKELY (src == NULL)) {
1765     gst_event_unref (event);
1766     return FALSE;
1767   }
1768
1769   bclass = GST_BASE_SRC_GET_CLASS (src);
1770
1771   if (bclass->event) {
1772     if (!(result = bclass->event (src, event)))
1773       goto subclass_failed;
1774   }
1775
1776 done:
1777   gst_event_unref (event);
1778   gst_object_unref (src);
1779
1780   return result;
1781
1782   /* ERRORS */
1783 subclass_failed:
1784   {
1785     GST_DEBUG_OBJECT (src, "subclass refused event");
1786     goto done;
1787   }
1788 }
1789
1790 static void
1791 gst_base_src_set_property (GObject * object, guint prop_id,
1792     const GValue * value, GParamSpec * pspec)
1793 {
1794   GstBaseSrc *src;
1795
1796   src = GST_BASE_SRC (object);
1797
1798   switch (prop_id) {
1799     case PROP_BLOCKSIZE:
1800       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1801       break;
1802     case PROP_NUM_BUFFERS:
1803       src->num_buffers = g_value_get_int (value);
1804       break;
1805     case PROP_TYPEFIND:
1806       src->data.ABI.typefind = g_value_get_boolean (value);
1807       break;
1808     case PROP_DO_TIMESTAMP:
1809       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1810       break;
1811     default:
1812       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1813       break;
1814   }
1815 }
1816
1817 static void
1818 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1819     GParamSpec * pspec)
1820 {
1821   GstBaseSrc *src;
1822
1823   src = GST_BASE_SRC (object);
1824
1825   switch (prop_id) {
1826     case PROP_BLOCKSIZE:
1827       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1828       break;
1829     case PROP_NUM_BUFFERS:
1830       g_value_set_int (value, src->num_buffers);
1831       break;
1832     case PROP_TYPEFIND:
1833       g_value_set_boolean (value, src->data.ABI.typefind);
1834       break;
1835     case PROP_DO_TIMESTAMP:
1836       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1837       break;
1838     default:
1839       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1840       break;
1841   }
1842 }
1843
1844 /* with STREAM_LOCK and LOCK */
1845 static GstClockReturn
1846 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1847 {
1848   GstClockReturn ret;
1849   GstClockID id;
1850
1851   id = gst_clock_new_single_shot_id (clock, time);
1852
1853   basesrc->clock_id = id;
1854   /* release the live lock while waiting */
1855   GST_LIVE_UNLOCK (basesrc);
1856
1857   ret = gst_clock_id_wait (id, NULL);
1858
1859   GST_LIVE_LOCK (basesrc);
1860   gst_clock_id_unref (id);
1861   basesrc->clock_id = NULL;
1862
1863   return ret;
1864 }
1865
1866 /* perform synchronisation on a buffer.
1867  * with STREAM_LOCK.
1868  */
1869 static GstClockReturn
1870 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1871 {
1872   GstClockReturn result;
1873   GstClockTime start, end;
1874   GstBaseSrcClass *bclass;
1875   GstClockTime base_time;
1876   GstClock *clock;
1877   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1878   gboolean do_timestamp, first, pseudo_live;
1879
1880   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1881
1882   start = end = -1;
1883   if (bclass->get_times)
1884     bclass->get_times (basesrc, buffer, &start, &end);
1885
1886   /* get buffer timestamp */
1887   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1888
1889   /* grab the lock to prepare for clocking and calculate the startup
1890    * latency. */
1891   GST_OBJECT_LOCK (basesrc);
1892
1893   /* if we are asked to sync against the clock we are a pseudo live element */
1894   pseudo_live = (start != -1 && basesrc->is_live);
1895   /* check for the first buffer */
1896   first = (basesrc->priv->latency == -1);
1897
1898   if (timestamp != -1 && pseudo_live) {
1899     GstClockTime latency;
1900
1901     /* we have a timestamp and a sync time, latency is the diff */
1902     if (timestamp <= start)
1903       latency = start - timestamp;
1904     else
1905       latency = 0;
1906
1907     if (first) {
1908       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1909           GST_TIME_ARGS (latency));
1910       /* first time we calculate latency, just configure */
1911       basesrc->priv->latency = latency;
1912     } else {
1913       if (basesrc->priv->latency != latency) {
1914         /* we have a new latency, FIXME post latency message */
1915         basesrc->priv->latency = latency;
1916         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1917             GST_TIME_ARGS (latency));
1918       }
1919     }
1920   } else if (first) {
1921     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1922         basesrc->is_live, start != -1);
1923     basesrc->priv->latency = 0;
1924   }
1925
1926   /* get clock, if no clock, we can't sync or do timestamps */
1927   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1928     goto no_clock;
1929
1930   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1931
1932   do_timestamp = basesrc->priv->do_timestamp;
1933
1934   /* first buffer, calculate the timestamp offset */
1935   if (first) {
1936     GstClockTime running_time;
1937
1938     now = gst_clock_get_time (clock);
1939     running_time = now - base_time;
1940
1941     GST_LOG_OBJECT (basesrc,
1942         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1943         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1944         GST_TIME_ARGS (running_time));
1945
1946     if (pseudo_live && timestamp != -1) {
1947       /* live source and we need to sync, add startup latency to all timestamps
1948        * to get the real running_time. Live sources should always timestamp
1949        * according to the current running time. */
1950       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1951
1952       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1953           GST_TIME_ARGS (basesrc->priv->ts_offset));
1954     } else {
1955       basesrc->priv->ts_offset = 0;
1956       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1957     }
1958
1959     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1960       if (do_timestamp)
1961         timestamp = running_time;
1962       else
1963         timestamp = 0;
1964
1965       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1966
1967       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1968           GST_TIME_ARGS (timestamp));
1969     }
1970
1971     /* add the timestamp offset we need for sync */
1972     timestamp += basesrc->priv->ts_offset;
1973   } else {
1974     /* not the first buffer, the timestamp is the diff between the clock and
1975      * base_time */
1976     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1977       now = gst_clock_get_time (clock);
1978
1979       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1980
1981       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1982           GST_TIME_ARGS (now - base_time));
1983     }
1984   }
1985
1986   /* if we don't have a buffer timestamp, we don't sync */
1987   if (!GST_CLOCK_TIME_IS_VALID (start))
1988     goto no_sync;
1989
1990   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1991     /* for pseudo live sources, add our ts_offset to the timestamp */
1992     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1993     start += basesrc->priv->ts_offset;
1994   }
1995
1996   GST_LOG_OBJECT (basesrc,
1997       "waiting for clock, base time %" GST_TIME_FORMAT
1998       ", stream_start %" GST_TIME_FORMAT,
1999       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2000   GST_OBJECT_UNLOCK (basesrc);
2001
2002   result = gst_base_src_wait (basesrc, clock, start + base_time);
2003
2004   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2005
2006   return result;
2007
2008   /* special cases */
2009 no_clock:
2010   {
2011     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2012     GST_OBJECT_UNLOCK (basesrc);
2013     return GST_CLOCK_OK;
2014   }
2015 no_sync:
2016   {
2017     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2018     GST_OBJECT_UNLOCK (basesrc);
2019     return GST_CLOCK_OK;
2020   }
2021 }
2022
2023 /* Called with STREAM_LOCK and LIVE_LOCK */
2024 static gboolean
2025 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2026 {
2027   guint64 size, maxsize;
2028   GstBaseSrcClass *bclass;
2029   GstFormat format;
2030   gint64 stop;
2031
2032   bclass = GST_BASE_SRC_GET_CLASS (src);
2033
2034   format = src->segment.format;
2035   stop = src->segment.stop;
2036   /* get total file size */
2037   size = (guint64) src->segment.duration;
2038
2039   /* only operate if we are working with bytes */
2040   if (format != GST_FORMAT_BYTES)
2041     return TRUE;
2042
2043   /* the max amount of bytes to read is the total size or
2044    * up to the segment.stop if present. */
2045   if (stop != -1)
2046     maxsize = MIN (size, stop);
2047   else
2048     maxsize = size;
2049
2050   GST_DEBUG_OBJECT (src,
2051       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2052       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2053       *length, size, stop, maxsize);
2054
2055   /* check size if we have one */
2056   if (maxsize != -1) {
2057     /* if we run past the end, check if the file became bigger and
2058      * retry. */
2059     if (G_UNLIKELY (offset + *length >= maxsize)) {
2060       /* see if length of the file changed */
2061       if (bclass->get_size)
2062         if (!bclass->get_size (src, &size))
2063           size = -1;
2064
2065       /* make sure we don't exceed the configured segment stop
2066        * if it was set */
2067       if (stop != -1)
2068         maxsize = MIN (size, stop);
2069       else
2070         maxsize = size;
2071
2072       /* if we are at or past the end, EOS */
2073       if (G_UNLIKELY (offset >= maxsize))
2074         goto unexpected_length;
2075
2076       /* else we can clip to the end */
2077       if (G_UNLIKELY (offset + *length >= maxsize))
2078         *length = maxsize - offset;
2079
2080     }
2081   }
2082
2083   /* keep track of current position and update duration.
2084    * segment is in bytes, we checked that above. */
2085   GST_OBJECT_LOCK (src);
2086   gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
2087   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
2088   GST_OBJECT_UNLOCK (src);
2089
2090   return TRUE;
2091
2092   /* ERRORS */
2093 unexpected_length:
2094   {
2095     return FALSE;
2096   }
2097 }
2098
2099 /* must be called with LIVE_LOCK */
2100 static GstFlowReturn
2101 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2102     GstBuffer ** buf)
2103 {
2104   GstFlowReturn ret;
2105   GstBaseSrcClass *bclass;
2106   GstClockReturn status;
2107
2108   bclass = GST_BASE_SRC_GET_CLASS (src);
2109
2110 again:
2111   if (src->is_live) {
2112     if (G_UNLIKELY (!src->live_running)) {
2113       ret = gst_base_src_wait_playing (src);
2114       if (ret != GST_FLOW_OK)
2115         goto stopped;
2116     }
2117   }
2118
2119   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2120     goto not_started;
2121
2122   if (G_UNLIKELY (!bclass->create))
2123     goto no_function;
2124
2125   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2126     goto unexpected_length;
2127
2128   /* normally we don't count buffers */
2129   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2130     if (src->num_buffers_left == 0)
2131       goto reached_num_buffers;
2132     else
2133       src->num_buffers_left--;
2134   }
2135
2136   /* don't enter the create function if a pending EOS event was set. For the
2137    * logic of the pending_eos, check the event function of this class. */
2138   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2139     goto eos;
2140
2141   GST_DEBUG_OBJECT (src,
2142       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2143       G_GINT64_FORMAT, offset, length, src->segment.time);
2144
2145   ret = bclass->create (src, offset, length, buf);
2146
2147   /* The create function could be unlocked because we have a pending EOS. It's
2148    * possible that we have a valid buffer from create that we need to
2149    * discard when the create function returned _OK. */
2150   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2151     if (ret == GST_FLOW_OK) {
2152       gst_buffer_unref (*buf);
2153       *buf = NULL;
2154     }
2155     goto eos;
2156   }
2157
2158   if (G_UNLIKELY (ret != GST_FLOW_OK))
2159     goto not_ok;
2160
2161   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2162   if (offset == 0 && src->segment.time == 0
2163       && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) {
2164     *buf = gst_buffer_make_metadata_writable (*buf);
2165     GST_BUFFER_TIMESTAMP (*buf) = 0;
2166   }
2167
2168   /* set pad caps on the buffer if the buffer had no caps */
2169   if (GST_BUFFER_CAPS (*buf) == NULL) {
2170     *buf = gst_buffer_make_metadata_writable (*buf);
2171     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
2172   }
2173
2174   /* now sync before pushing the buffer */
2175   status = gst_base_src_do_sync (src, *buf);
2176
2177   /* waiting for the clock could have made us flushing */
2178   if (G_UNLIKELY (src->priv->flushing))
2179     goto flushing;
2180
2181   switch (status) {
2182     case GST_CLOCK_EARLY:
2183       /* the buffer is too late. We currently don't drop the buffer. */
2184       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2185       break;
2186     case GST_CLOCK_OK:
2187       /* buffer synchronised properly */
2188       GST_DEBUG_OBJECT (src, "buffer ok");
2189       break;
2190     case GST_CLOCK_UNSCHEDULED:
2191       /* this case is triggered when we were waiting for the clock and
2192        * it got unlocked because we did a state change. In any case, get rid of
2193        * the buffer. */
2194       gst_buffer_unref (*buf);
2195       *buf = NULL;
2196       if (!src->live_running) {
2197         /* We return WRONG_STATE when we are not running to stop the dataflow also
2198          * get rid of the produced buffer. */
2199         GST_DEBUG_OBJECT (src,
2200             "clock was unscheduled (%d), returning WRONG_STATE", status);
2201         ret = GST_FLOW_WRONG_STATE;
2202       } else {
2203         /* If we are running when this happens, we quickly switched between
2204          * pause and playing. We try to produce a new buffer */
2205         GST_DEBUG_OBJECT (src,
2206             "clock was unscheduled (%d), but we are running", status);
2207         goto again;
2208       }
2209       break;
2210     default:
2211       /* all other result values are unexpected and errors */
2212       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2213           (_("Internal clock error.")),
2214           ("clock returned unexpected return value %d", status));
2215       gst_buffer_unref (*buf);
2216       *buf = NULL;
2217       ret = GST_FLOW_ERROR;
2218       break;
2219   }
2220   return ret;
2221
2222   /* ERROR */
2223 stopped:
2224   {
2225     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2226         gst_flow_get_name (ret));
2227     return ret;
2228   }
2229 not_ok:
2230   {
2231     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2232         gst_flow_get_name (ret));
2233     return ret;
2234   }
2235 not_started:
2236   {
2237     GST_DEBUG_OBJECT (src, "getrange but not started");
2238     return GST_FLOW_WRONG_STATE;
2239   }
2240 no_function:
2241   {
2242     GST_DEBUG_OBJECT (src, "no create function");
2243     return GST_FLOW_ERROR;
2244   }
2245 unexpected_length:
2246   {
2247     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2248         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2249     return GST_FLOW_UNEXPECTED;
2250   }
2251 reached_num_buffers:
2252   {
2253     GST_DEBUG_OBJECT (src, "sent all buffers");
2254     return GST_FLOW_UNEXPECTED;
2255   }
2256 flushing:
2257   {
2258     GST_DEBUG_OBJECT (src, "we are flushing");
2259     gst_buffer_unref (*buf);
2260     *buf = NULL;
2261     return GST_FLOW_WRONG_STATE;
2262   }
2263 eos:
2264   {
2265     GST_DEBUG_OBJECT (src, "we are EOS");
2266     return GST_FLOW_UNEXPECTED;
2267   }
2268 }
2269
2270 static GstFlowReturn
2271 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2272     GstBuffer ** buf)
2273 {
2274   GstBaseSrc *src;
2275   GstFlowReturn res;
2276
2277   src = GST_BASE_SRC_CAST (gst_object_ref (GST_OBJECT_PARENT (pad)));
2278
2279   GST_LIVE_LOCK (src);
2280   if (G_UNLIKELY (src->priv->flushing))
2281     goto flushing;
2282
2283   res = gst_base_src_get_range (src, offset, length, buf);
2284
2285 done:
2286   GST_LIVE_UNLOCK (src);
2287
2288   gst_object_unref (src);
2289
2290   return res;
2291
2292   /* ERRORS */
2293 flushing:
2294   {
2295     GST_DEBUG_OBJECT (src, "we are flushing");
2296     res = GST_FLOW_WRONG_STATE;
2297     goto done;
2298   }
2299 }
2300
2301 static gboolean
2302 gst_base_src_default_check_get_range (GstBaseSrc * src)
2303 {
2304   gboolean res;
2305
2306   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2307     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2308     if (G_LIKELY (gst_base_src_start (src)))
2309       gst_base_src_stop (src);
2310   }
2311
2312   /* we can operate in getrange mode if the native format is bytes
2313    * and we are seekable, this condition is set in the random_access
2314    * flag and is set in the _start() method. */
2315   res = src->random_access;
2316
2317   return res;
2318 }
2319
2320 static gboolean
2321 gst_base_src_check_get_range (GstBaseSrc * src)
2322 {
2323   GstBaseSrcClass *bclass;
2324   gboolean res;
2325
2326   bclass = GST_BASE_SRC_GET_CLASS (src);
2327
2328   if (bclass->check_get_range == NULL)
2329     goto no_function;
2330
2331   res = bclass->check_get_range (src);
2332   GST_LOG_OBJECT (src, "%s() returned %d",
2333       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2334
2335   return res;
2336
2337   /* ERRORS */
2338 no_function:
2339   {
2340     GST_WARNING_OBJECT (src, "no check_get_range function set");
2341     return FALSE;
2342   }
2343 }
2344
2345 static gboolean
2346 gst_base_src_pad_check_get_range (GstPad * pad)
2347 {
2348   GstBaseSrc *src;
2349   gboolean res;
2350
2351   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2352
2353   res = gst_base_src_check_get_range (src);
2354
2355   return res;
2356 }
2357
2358 static void
2359 gst_base_src_loop (GstPad * pad)
2360 {
2361   GstBaseSrc *src;
2362   GstBuffer *buf = NULL;
2363   GstFlowReturn ret;
2364   gint64 position;
2365   gboolean eos;
2366   gulong blocksize;
2367   GList *pending_events = NULL, *tmp;
2368
2369   eos = FALSE;
2370
2371   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2372
2373   GST_LIVE_LOCK (src);
2374
2375   if (G_UNLIKELY (src->priv->flushing))
2376     goto flushing;
2377
2378   src->priv->last_sent_eos = FALSE;
2379
2380   blocksize = src->blocksize;
2381
2382   /* if we operate in bytes, we can calculate an offset */
2383   if (src->segment.format == GST_FORMAT_BYTES) {
2384     position = src->segment.last_stop;
2385     /* for negative rates, start with subtracting the blocksize */
2386     if (src->segment.rate < 0.0) {
2387       /* we cannot go below segment.start */
2388       if (position > src->segment.start + blocksize)
2389         position -= blocksize;
2390       else {
2391         /* last block, remainder up to segment.start */
2392         blocksize = position - src->segment.start;
2393         position = src->segment.start;
2394       }
2395     }
2396   } else
2397     position = -1;
2398
2399   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2400       GST_TIME_ARGS (position), blocksize);
2401
2402   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2403   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2404     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2405         gst_flow_get_name (ret));
2406     GST_LIVE_UNLOCK (src);
2407     goto pause;
2408   }
2409   /* this should not happen */
2410   if (G_UNLIKELY (buf == NULL))
2411     goto null_buffer;
2412
2413   /* push events to close/start our segment before we push the buffer. */
2414   if (G_UNLIKELY (src->priv->close_segment)) {
2415     gst_pad_push_event (pad, src->priv->close_segment);
2416     src->priv->close_segment = NULL;
2417   }
2418   if (G_UNLIKELY (src->priv->start_segment)) {
2419     gst_pad_push_event (pad, src->priv->start_segment);
2420     src->priv->start_segment = NULL;
2421   }
2422   src->priv->newsegment_pending = FALSE;
2423
2424   if (g_atomic_int_get (&src->priv->have_events)) {
2425     GST_OBJECT_LOCK (src);
2426     /* take the events */
2427     pending_events = src->priv->pending_events;
2428     src->priv->pending_events = NULL;
2429     g_atomic_int_set (&src->priv->have_events, FALSE);
2430     GST_OBJECT_UNLOCK (src);
2431   }
2432
2433   /* Push out pending events if any */
2434   if (G_UNLIKELY (pending_events != NULL)) {
2435     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2436       GstEvent *ev = (GstEvent *) tmp->data;
2437       gst_pad_push_event (pad, ev);
2438     }
2439     g_list_free (pending_events);
2440   }
2441
2442   /* figure out the new position */
2443   switch (src->segment.format) {
2444     case GST_FORMAT_BYTES:
2445     {
2446       guint bufsize = GST_BUFFER_SIZE (buf);
2447
2448       /* we subtracted above for negative rates */
2449       if (src->segment.rate >= 0.0)
2450         position += bufsize;
2451       break;
2452     }
2453     case GST_FORMAT_TIME:
2454     {
2455       GstClockTime start, duration;
2456
2457       start = GST_BUFFER_TIMESTAMP (buf);
2458       duration = GST_BUFFER_DURATION (buf);
2459
2460       if (GST_CLOCK_TIME_IS_VALID (start))
2461         position = start;
2462       else
2463         position = src->segment.last_stop;
2464
2465       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2466         if (src->segment.rate >= 0.0)
2467           position += duration;
2468         else if (position > duration)
2469           position -= duration;
2470         else
2471           position = 0;
2472       }
2473       break;
2474     }
2475     case GST_FORMAT_DEFAULT:
2476       if (src->segment.rate >= 0.0)
2477         position = GST_BUFFER_OFFSET_END (buf);
2478       else
2479         position = GST_BUFFER_OFFSET (buf);
2480       break;
2481     default:
2482       position = -1;
2483       break;
2484   }
2485   if (position != -1) {
2486     if (src->segment.rate >= 0.0) {
2487       /* positive rate, check if we reached the stop */
2488       if (src->segment.stop != -1) {
2489         if (position >= src->segment.stop) {
2490           eos = TRUE;
2491           position = src->segment.stop;
2492         }
2493       }
2494     } else {
2495       /* negative rate, check if we reached the start. start is always set to
2496        * something different from -1 */
2497       if (position <= src->segment.start) {
2498         eos = TRUE;
2499         position = src->segment.start;
2500       }
2501       /* when going reverse, all buffers are DISCONT */
2502       src->priv->discont = TRUE;
2503     }
2504     GST_OBJECT_LOCK (src);
2505     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2506     GST_OBJECT_UNLOCK (src);
2507   }
2508
2509   if (G_UNLIKELY (src->priv->discont)) {
2510     buf = gst_buffer_make_metadata_writable (buf);
2511     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2512     src->priv->discont = FALSE;
2513   }
2514   GST_LIVE_UNLOCK (src);
2515
2516   ret = gst_pad_push (pad, buf);
2517   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2518     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2519         gst_flow_get_name (ret));
2520     goto pause;
2521   }
2522
2523   if (G_UNLIKELY (eos)) {
2524     GST_INFO_OBJECT (src, "pausing after end of segment");
2525     ret = GST_FLOW_UNEXPECTED;
2526     goto pause;
2527   }
2528
2529 done:
2530   return;
2531
2532   /* special cases */
2533 flushing:
2534   {
2535     GST_DEBUG_OBJECT (src, "we are flushing");
2536     GST_LIVE_UNLOCK (src);
2537     ret = GST_FLOW_WRONG_STATE;
2538     goto pause;
2539   }
2540 pause:
2541   {
2542     const gchar *reason = gst_flow_get_name (ret);
2543     GstEvent *event;
2544
2545     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2546     src->data.ABI.running = FALSE;
2547     gst_pad_pause_task (pad);
2548     if (ret == GST_FLOW_UNEXPECTED) {
2549       gboolean flag_segment;
2550       GstFormat format;
2551       gint64 last_stop;
2552
2553       /* perform EOS logic */
2554       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2555       format = src->segment.format;
2556       last_stop = src->segment.last_stop;
2557
2558       if (flag_segment) {
2559         GstMessage *message;
2560
2561         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2562             format, last_stop);
2563         gst_message_set_seqnum (message, src->priv->seqnum);
2564         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2565       } else {
2566         event = gst_event_new_eos ();
2567         gst_event_set_seqnum (event, src->priv->seqnum);
2568         gst_pad_push_event (pad, event);
2569         src->priv->last_sent_eos = TRUE;
2570       }
2571     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_UNEXPECTED) {
2572       event = gst_event_new_eos ();
2573       gst_event_set_seqnum (event, src->priv->seqnum);
2574       /* for fatal errors we post an error message, post the error
2575        * first so the app knows about the error first.
2576        * Also don't do this for WRONG_STATE because it happens
2577        * due to flushing and posting an error message because of
2578        * that is the wrong thing to do, e.g. when we're doing
2579        * a flushing seek. */
2580       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2581           (_("Internal data flow error.")),
2582           ("streaming task paused, reason %s (%d)", reason, ret));
2583       gst_pad_push_event (pad, event);
2584       src->priv->last_sent_eos = TRUE;
2585     }
2586     goto done;
2587   }
2588 null_buffer:
2589   {
2590     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2591         (_("Internal data flow error.")), ("element returned NULL buffer"));
2592     GST_LIVE_UNLOCK (src);
2593     goto done;
2594   }
2595 }
2596
2597 /* default negotiation code.
2598  *
2599  * Take intersection between src and sink pads, take first
2600  * caps and fixate.
2601  */
2602 static gboolean
2603 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2604 {
2605   GstCaps *thiscaps;
2606   GstCaps *caps = NULL;
2607   GstCaps *peercaps = NULL;
2608   gboolean result = FALSE;
2609
2610   /* first see what is possible on our source pad */
2611   thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2612   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2613   /* nothing or anything is allowed, we're done */
2614   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2615     goto no_nego_needed;
2616
2617   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2618     goto no_caps;
2619
2620   /* get the peer caps */
2621   peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2622   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2623   if (peercaps) {
2624     /* get intersection */
2625     caps =
2626         gst_caps_intersect_full (peercaps, thiscaps, GST_CAPS_INTERSECT_FIRST);
2627     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
2628     gst_caps_unref (peercaps);
2629   } else {
2630     /* no peer, work with our own caps then */
2631     caps = gst_caps_copy (thiscaps);
2632   }
2633   gst_caps_unref (thiscaps);
2634   if (caps) {
2635     /* take first (and best, since they are sorted) possibility */
2636     gst_caps_truncate (caps);
2637
2638     /* now fixate */
2639     if (!gst_caps_is_empty (caps)) {
2640       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2641       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2642
2643       if (gst_caps_is_any (caps)) {
2644         /* hmm, still anything, so element can do anything and
2645          * nego is not needed */
2646         result = TRUE;
2647       } else if (gst_caps_is_fixed (caps)) {
2648         /* yay, fixed caps, use those then, it's possible that the subclass does
2649          * not accept this caps after all and we have to fail. */
2650         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2651       }
2652     }
2653     gst_caps_unref (caps);
2654   } else {
2655     GST_DEBUG_OBJECT (basesrc, "no common caps");
2656   }
2657   return result;
2658
2659 no_nego_needed:
2660   {
2661     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2662     if (thiscaps)
2663       gst_caps_unref (thiscaps);
2664     return TRUE;
2665   }
2666 no_caps:
2667   {
2668     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2669         ("No supported formats found"),
2670         ("This element did not produce valid caps"));
2671     if (thiscaps)
2672       gst_caps_unref (thiscaps);
2673     return TRUE;
2674   }
2675 }
2676
2677 static gboolean
2678 gst_base_src_negotiate (GstBaseSrc * basesrc)
2679 {
2680   GstBaseSrcClass *bclass;
2681   gboolean result = TRUE;
2682
2683   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2684
2685   if (bclass->negotiate)
2686     result = bclass->negotiate (basesrc);
2687
2688   return result;
2689 }
2690
2691 static gboolean
2692 gst_base_src_start (GstBaseSrc * basesrc)
2693 {
2694   GstBaseSrcClass *bclass;
2695   gboolean result;
2696   guint64 size;
2697   gboolean seekable;
2698   GstFormat format;
2699
2700   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2701     return TRUE;
2702
2703   GST_DEBUG_OBJECT (basesrc, "starting source");
2704
2705   basesrc->num_buffers_left = basesrc->num_buffers;
2706
2707   GST_OBJECT_LOCK (basesrc);
2708   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2709   GST_OBJECT_UNLOCK (basesrc);
2710
2711   basesrc->data.ABI.running = FALSE;
2712   basesrc->priv->newsegment_pending = FALSE;
2713
2714   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2715   if (bclass->start)
2716     result = bclass->start (basesrc);
2717   else
2718     result = TRUE;
2719
2720   if (!result)
2721     goto could_not_start;
2722
2723   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2724
2725   format = basesrc->segment.format;
2726
2727   /* figure out the size */
2728   if (format == GST_FORMAT_BYTES) {
2729     if (bclass->get_size) {
2730       if (!(result = bclass->get_size (basesrc, &size)))
2731         size = -1;
2732     } else {
2733       result = FALSE;
2734       size = -1;
2735     }
2736     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2737     /* only update the size when operating in bytes, subclass is supposed
2738      * to set duration in the start method for other formats */
2739     GST_OBJECT_LOCK (basesrc);
2740     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2741     GST_OBJECT_UNLOCK (basesrc);
2742   } else {
2743     size = -1;
2744   }
2745
2746   GST_DEBUG_OBJECT (basesrc,
2747       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2748       G_GINT64_FORMAT, gst_format_get_name (format), result, size,
2749       basesrc->segment.duration);
2750
2751   seekable = gst_base_src_seekable (basesrc);
2752   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2753
2754   /* update for random access flag */
2755   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2756
2757   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2758
2759   /* run typefind if we are random_access and the typefinding is enabled. */
2760   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2761     GstCaps *caps;
2762
2763     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2764       goto typefind_failed;
2765
2766     result = gst_pad_set_caps (basesrc->srcpad, caps);
2767     gst_caps_unref (caps);
2768   } else {
2769     /* use class or default negotiate function */
2770     if (!(result = gst_base_src_negotiate (basesrc)))
2771       goto could_not_negotiate;
2772   }
2773
2774   return result;
2775
2776   /* ERROR */
2777 could_not_start:
2778   {
2779     GST_DEBUG_OBJECT (basesrc, "could not start");
2780     /* subclass is supposed to post a message. We don't have to call _stop. */
2781     return FALSE;
2782   }
2783 could_not_negotiate:
2784   {
2785     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2786     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2787         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2788     /* we must call stop */
2789     gst_base_src_stop (basesrc);
2790     return FALSE;
2791   }
2792 typefind_failed:
2793   {
2794     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2795     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2796     /* we must call stop */
2797     gst_base_src_stop (basesrc);
2798     return FALSE;
2799   }
2800 }
2801
2802 static gboolean
2803 gst_base_src_stop (GstBaseSrc * basesrc)
2804 {
2805   GstBaseSrcClass *bclass;
2806   gboolean result = TRUE;
2807
2808   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2809     return TRUE;
2810
2811   GST_DEBUG_OBJECT (basesrc, "stopping source");
2812
2813   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2814   if (bclass->stop)
2815     result = bclass->stop (basesrc);
2816
2817   if (result)
2818     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2819
2820   return result;
2821 }
2822
2823 /* start or stop flushing dataprocessing
2824  */
2825 static gboolean
2826 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2827     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2828 {
2829   GstBaseSrcClass *bclass;
2830
2831   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2832
2833   if (flushing && unlock) {
2834     /* unlock any subclasses, we need to do this before grabbing the
2835      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2836      * unlock to the params because of backwards compat (see seek handler)*/
2837     if (bclass->unlock)
2838       bclass->unlock (basesrc);
2839   }
2840
2841   /* the live lock is released when we are blocked, waiting for playing or
2842    * when we sync to the clock. */
2843   GST_LIVE_LOCK (basesrc);
2844   if (playing)
2845     *playing = basesrc->live_running;
2846   basesrc->priv->flushing = flushing;
2847   if (flushing) {
2848     /* if we are locked in the live lock, signal it to make it flush */
2849     basesrc->live_running = TRUE;
2850
2851     /* clear pending EOS if any */
2852     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2853
2854     /* step 1, now that we have the LIVE lock, clear our unlock request */
2855     if (bclass->unlock_stop)
2856       bclass->unlock_stop (basesrc);
2857
2858     /* step 2, unblock clock sync (if any) or any other blocking thing */
2859     if (basesrc->clock_id)
2860       gst_clock_id_unschedule (basesrc->clock_id);
2861   } else {
2862     /* signal the live source that it can start playing */
2863     basesrc->live_running = live_play;
2864
2865     /* When unlocking drop all delayed events */
2866     if (unlock) {
2867       GST_OBJECT_LOCK (basesrc);
2868       if (basesrc->priv->pending_events) {
2869         g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
2870             NULL);
2871         g_list_free (basesrc->priv->pending_events);
2872         basesrc->priv->pending_events = NULL;
2873         g_atomic_int_set (&basesrc->priv->have_events, FALSE);
2874       }
2875       GST_OBJECT_UNLOCK (basesrc);
2876     }
2877   }
2878   GST_LIVE_SIGNAL (basesrc);
2879   GST_LIVE_UNLOCK (basesrc);
2880
2881   return TRUE;
2882 }
2883
2884 /* the purpose of this function is to make sure that a live source blocks in the
2885  * LIVE lock or leaves the LIVE lock and continues playing. */
2886 static gboolean
2887 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2888 {
2889   GstBaseSrcClass *bclass;
2890
2891   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2892
2893   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2894   if (!live_play) {
2895     GST_DEBUG_OBJECT (basesrc, "unlock");
2896     if (bclass->unlock)
2897       bclass->unlock (basesrc);
2898   }
2899
2900   /* we are now able to grab the LIVE lock, when we get it, we can be
2901    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2902    * for the clock. */
2903   GST_LIVE_LOCK (basesrc);
2904   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2905
2906   /* unblock clock sync (if any) */
2907   if (basesrc->clock_id)
2908     gst_clock_id_unschedule (basesrc->clock_id);
2909
2910   /* configure what to do when we get to the LIVE lock. */
2911   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2912   basesrc->live_running = live_play;
2913
2914   if (live_play) {
2915     gboolean start;
2916
2917     /* clear our unlock request when going to PLAYING */
2918     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2919     if (bclass->unlock_stop)
2920       bclass->unlock_stop (basesrc);
2921
2922     /* for live sources we restart the timestamp correction */
2923     basesrc->priv->latency = -1;
2924     /* have to restart the task in case it stopped because of the unlock when
2925      * we went to PAUSED. Only do this if we operating in push mode. */
2926     GST_OBJECT_LOCK (basesrc->srcpad);
2927     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2928     GST_OBJECT_UNLOCK (basesrc->srcpad);
2929     if (start)
2930       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2931           basesrc->srcpad);
2932     GST_DEBUG_OBJECT (basesrc, "signal");
2933     GST_LIVE_SIGNAL (basesrc);
2934   }
2935   GST_LIVE_UNLOCK (basesrc);
2936
2937   return TRUE;
2938 }
2939
2940 static gboolean
2941 gst_base_src_activate_push (GstPad * pad, gboolean active)
2942 {
2943   GstBaseSrc *basesrc;
2944   GstEvent *event;
2945
2946   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2947
2948   /* prepare subclass first */
2949   if (active) {
2950     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2951
2952     if (G_UNLIKELY (!basesrc->can_activate_push))
2953       goto no_push_activation;
2954
2955     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2956       goto error_start;
2957
2958     basesrc->priv->last_sent_eos = FALSE;
2959     basesrc->priv->discont = TRUE;
2960     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2961
2962     /* do initial seek, which will start the task */
2963     GST_OBJECT_LOCK (basesrc);
2964     event = basesrc->data.ABI.pending_seek;
2965     basesrc->data.ABI.pending_seek = NULL;
2966     GST_OBJECT_UNLOCK (basesrc);
2967
2968     /* no need to unlock anything, the task is certainly
2969      * not running here. The perform seek code will start the task when
2970      * finished. */
2971     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2972       goto seek_failed;
2973
2974     if (event)
2975       gst_event_unref (event);
2976   } else {
2977     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2978     /* flush all */
2979     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2980     /* stop the task */
2981     gst_pad_stop_task (pad);
2982     /* now we can stop the source */
2983     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2984       goto error_stop;
2985   }
2986   return TRUE;
2987
2988   /* ERRORS */
2989 no_push_activation:
2990   {
2991     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2992     return FALSE;
2993   }
2994 error_start:
2995   {
2996     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2997     return FALSE;
2998   }
2999 seek_failed:
3000   {
3001     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3002     /* flush all */
3003     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3004     /* stop the task */
3005     gst_pad_stop_task (pad);
3006     /* Stop the basesrc */
3007     gst_base_src_stop (basesrc);
3008     if (event)
3009       gst_event_unref (event);
3010     return FALSE;
3011   }
3012 error_stop:
3013   {
3014     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3015     return FALSE;
3016   }
3017 }
3018
3019 static gboolean
3020 gst_base_src_activate_pull (GstPad * pad, gboolean active)
3021 {
3022   GstBaseSrc *basesrc;
3023
3024   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
3025
3026   /* prepare subclass first */
3027   if (active) {
3028     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3029     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3030       goto error_start;
3031
3032     /* if not random_access, we cannot operate in pull mode for now */
3033     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
3034       goto no_get_range;
3035
3036     /* stop flushing now but for live sources, still block in the LIVE lock when
3037      * we are not yet PLAYING */
3038     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
3039   } else {
3040     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3041     /* flush all, there is no task to stop */
3042     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3043
3044     /* don't send EOS when going from PAUSED => READY when in pull mode */
3045     basesrc->priv->last_sent_eos = TRUE;
3046
3047     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3048       goto error_stop;
3049   }
3050   return TRUE;
3051
3052   /* ERRORS */
3053 error_start:
3054   {
3055     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3056     return FALSE;
3057   }
3058 no_get_range:
3059   {
3060     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3061     gst_base_src_stop (basesrc);
3062     return FALSE;
3063   }
3064 error_stop:
3065   {
3066     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3067     return FALSE;
3068   }
3069 }
3070
3071 static GstStateChangeReturn
3072 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3073 {
3074   GstBaseSrc *basesrc;
3075   GstStateChangeReturn result;
3076   gboolean no_preroll = FALSE;
3077
3078   basesrc = GST_BASE_SRC (element);
3079
3080   switch (transition) {
3081     case GST_STATE_CHANGE_NULL_TO_READY:
3082       break;
3083     case GST_STATE_CHANGE_READY_TO_PAUSED:
3084       no_preroll = gst_base_src_is_live (basesrc);
3085       break;
3086     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3087       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3088       if (gst_base_src_is_live (basesrc)) {
3089         /* now we can start playback */
3090         gst_base_src_set_playing (basesrc, TRUE);
3091       }
3092       break;
3093     default:
3094       break;
3095   }
3096
3097   if ((result =
3098           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3099               transition)) == GST_STATE_CHANGE_FAILURE)
3100     goto failure;
3101
3102   switch (transition) {
3103     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3104       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3105       if (gst_base_src_is_live (basesrc)) {
3106         /* make sure we block in the live lock in PAUSED */
3107         gst_base_src_set_playing (basesrc, FALSE);
3108         no_preroll = TRUE;
3109       }
3110       break;
3111     case GST_STATE_CHANGE_PAUSED_TO_READY:
3112     {
3113       GstEvent **event_p, *event;
3114
3115       /* we don't need to unblock anything here, the pad deactivation code
3116        * already did this */
3117
3118       /* FIXME, deprecate this behaviour, it is very dangerous.
3119        * the prefered way of sending EOS downstream is by sending
3120        * the EOS event to the element */
3121       if (!basesrc->priv->last_sent_eos) {
3122         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
3123         event = gst_event_new_eos ();
3124         gst_event_set_seqnum (event, basesrc->priv->seqnum);
3125         gst_pad_push_event (basesrc->srcpad, event);
3126         basesrc->priv->last_sent_eos = TRUE;
3127       }
3128       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3129       event_p = &basesrc->data.ABI.pending_seek;
3130       gst_event_replace (event_p, NULL);
3131       event_p = &basesrc->priv->close_segment;
3132       gst_event_replace (event_p, NULL);
3133       event_p = &basesrc->priv->start_segment;
3134       gst_event_replace (event_p, NULL);
3135       break;
3136     }
3137     case GST_STATE_CHANGE_READY_TO_NULL:
3138       break;
3139     default:
3140       break;
3141   }
3142
3143   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3144     result = GST_STATE_CHANGE_NO_PREROLL;
3145
3146   return result;
3147
3148   /* ERRORS */
3149 failure:
3150   {
3151     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3152     return result;
3153   }
3154 }