Rework GstSegment handling
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include <gst/gst_private.h>
169
170 #include "gstbasesrc.h"
171 #include "gsttypefindhelper.h"
172 #include <gst/gstmarshal.h>
173 #include <gst/gst-i18n-lib.h>
174
175 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
176 #define GST_CAT_DEFAULT gst_base_src_debug
177
178 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
179 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
181 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
182 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
183 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
184 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
185                                                                                 timeval)
186 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
187 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
188
189 /* BaseSrc signals and args */
190 enum
191 {
192   /* FILL ME */
193   LAST_SIGNAL
194 };
195
196 #define DEFAULT_BLOCKSIZE       4096
197 #define DEFAULT_NUM_BUFFERS     -1
198 #define DEFAULT_TYPEFIND        FALSE
199 #define DEFAULT_DO_TIMESTAMP    FALSE
200
201 enum
202 {
203   PROP_0,
204   PROP_BLOCKSIZE,
205   PROP_NUM_BUFFERS,
206   PROP_TYPEFIND,
207   PROP_DO_TIMESTAMP
208 };
209
210 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
211    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
212
213 struct _GstBaseSrcPrivate
214 {
215   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
216                                  * to avoid the sending of two EOS in some cases) */
217   gboolean discont;
218   gboolean flushing;
219
220   /* if segment should be sent */
221   gboolean segment_pending;
222
223   /* if EOS is pending (atomic) */
224   gint pending_eos;
225
226   /* startup latency is the time it takes between going to PLAYING and producing
227    * the first BUFFER with running_time 0. This value is included in the latency
228    * reporting. */
229   GstClockTime latency;
230   /* timestamp offset, this is the offset add to the values of gst_times for
231    * pseudo live sources */
232   GstClockTimeDiff ts_offset;
233
234   gboolean do_timestamp;
235
236   /* stream sequence number */
237   guint32 seqnum;
238
239   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
240    * pushed in the data stream */
241   GList *pending_events;
242   volatile gint have_events;
243
244   /* QoS *//* with LOCK */
245   gboolean qos_enabled;
246   gdouble proportion;
247   GstClockTime earliest_time;
248 };
249
250 static GstElementClass *parent_class = NULL;
251
252 static void gst_base_src_class_init (GstBaseSrcClass * klass);
253 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
254 static void gst_base_src_finalize (GObject * object);
255
256
257 GType
258 gst_base_src_get_type (void)
259 {
260   static volatile gsize base_src_type = 0;
261
262   if (g_once_init_enter (&base_src_type)) {
263     GType _type;
264     static const GTypeInfo base_src_info = {
265       sizeof (GstBaseSrcClass),
266       NULL,
267       NULL,
268       (GClassInitFunc) gst_base_src_class_init,
269       NULL,
270       NULL,
271       sizeof (GstBaseSrc),
272       0,
273       (GInstanceInitFunc) gst_base_src_init,
274     };
275
276     _type = g_type_register_static (GST_TYPE_ELEMENT,
277         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
278     g_once_init_leave (&base_src_type, _type);
279   }
280   return base_src_type;
281 }
282
283 static GstCaps *gst_base_src_getcaps (GstPad * pad);
284 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
285 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
286
287 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
288 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
289 static void gst_base_src_set_property (GObject * object, guint prop_id,
290     const GValue * value, GParamSpec * pspec);
291 static void gst_base_src_get_property (GObject * object, guint prop_id,
292     GValue * value, GParamSpec * pspec);
293 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
294 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
295 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
296 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
297
298 static gboolean gst_base_src_query (GstPad * pad, GstQuery ** query);
299
300 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
301 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
302     GstSegment * segment);
303 static gboolean gst_base_src_default_query (GstBaseSrc * src,
304     GstQuery ** query);
305 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
306     GstEvent * event, GstSegment * segment);
307
308 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
309     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
310 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
311 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
312
313 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
314     GstStateChange transition);
315
316 static void gst_base_src_loop (GstPad * pad);
317 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
318 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
319 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
320     guint length, GstBuffer ** buf);
321 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
322     guint length, GstBuffer ** buf);
323 static gboolean gst_base_src_seekable (GstBaseSrc * src);
324 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
325
326 static void
327 gst_base_src_class_init (GstBaseSrcClass * klass)
328 {
329   GObjectClass *gobject_class;
330   GstElementClass *gstelement_class;
331
332   gobject_class = G_OBJECT_CLASS (klass);
333   gstelement_class = GST_ELEMENT_CLASS (klass);
334
335   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
336
337   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
338
339   parent_class = g_type_class_peek_parent (klass);
340
341   gobject_class->finalize = gst_base_src_finalize;
342   gobject_class->set_property = gst_base_src_set_property;
343   gobject_class->get_property = gst_base_src_get_property;
344
345 /* FIXME 0.11: blocksize property should be int, not ulong (min is >max here) */
346   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
347       g_param_spec_ulong ("blocksize", "Block size",
348           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
349           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
351       g_param_spec_int ("num-buffers", "num-buffers",
352           "Number of buffers to output before sending EOS (-1 = unlimited)",
353           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
354           G_PARAM_STATIC_STRINGS));
355   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
356       g_param_spec_boolean ("typefind", "Typefind",
357           "Run typefind before negotiating", DEFAULT_TYPEFIND,
358           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
360       g_param_spec_boolean ("do-timestamp", "Do timestamp",
361           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363
364   gstelement_class->change_state =
365       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
366   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
367   gstelement_class->get_query_types =
368       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
369
370   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
371   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
372   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
373   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
374   klass->check_get_range =
375       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
376   klass->prepare_seek_segment =
377       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
378
379   /* Registering debug symbols for function pointers */
380   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
381   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
383   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
384   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
385   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
386   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
387   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
388   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
389 }
390
391 static void
392 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
393 {
394   GstPad *pad;
395   GstPadTemplate *pad_template;
396
397   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
398
399   basesrc->is_live = FALSE;
400   basesrc->live_lock = g_mutex_new ();
401   basesrc->live_cond = g_cond_new ();
402   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
403   basesrc->num_buffers_left = -1;
404
405   basesrc->can_activate_push = TRUE;
406   basesrc->pad_mode = GST_ACTIVATE_NONE;
407
408   pad_template =
409       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
410   g_return_if_fail (pad_template != NULL);
411
412   GST_DEBUG_OBJECT (basesrc, "creating src pad");
413   pad = gst_pad_new_from_template (pad_template, "src");
414
415   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
416   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
417   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
418   gst_pad_set_event_function (pad, gst_base_src_event_handler);
419   gst_pad_set_query_function (pad, gst_base_src_query);
420   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
421   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
422   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
423   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
424   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
425
426   /* hold pointer to pad */
427   basesrc->srcpad = pad;
428   GST_DEBUG_OBJECT (basesrc, "adding src pad");
429   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
430
431   basesrc->blocksize = DEFAULT_BLOCKSIZE;
432   basesrc->clock_id = NULL;
433   /* we operate in BYTES by default */
434   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
435   basesrc->typefind = DEFAULT_TYPEFIND;
436   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
437   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
438
439   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
440   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE);
441
442   GST_DEBUG_OBJECT (basesrc, "init done");
443 }
444
445 static void
446 gst_base_src_finalize (GObject * object)
447 {
448   GstBaseSrc *basesrc;
449   GstEvent **event_p;
450
451   basesrc = GST_BASE_SRC (object);
452
453   g_mutex_free (basesrc->live_lock);
454   g_cond_free (basesrc->live_cond);
455
456   event_p = &basesrc->pending_seek;
457   gst_event_replace (event_p, NULL);
458
459   if (basesrc->priv->pending_events) {
460     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
461         NULL);
462     g_list_free (basesrc->priv->pending_events);
463   }
464
465   G_OBJECT_CLASS (parent_class)->finalize (object);
466 }
467
468 /**
469  * gst_base_src_wait_playing:
470  * @src: the src
471  *
472  * If the #GstBaseSrcClass.create() method performs its own synchronisation
473  * against the clock it must unblock when going from PLAYING to the PAUSED state
474  * and call this method before continuing to produce the remaining data.
475  *
476  * This function will block until a state change to PLAYING happens (in which
477  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
478  * to a state change to READY or a FLUSH event (in which case this function
479  * returns #GST_FLOW_WRONG_STATE).
480  *
481  * Since: 0.10.12
482  *
483  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
484  * continue. Any other return value should be returned from the create vmethod.
485  */
486 GstFlowReturn
487 gst_base_src_wait_playing (GstBaseSrc * src)
488 {
489   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
490
491   do {
492     /* block until the state changes, or we get a flush, or something */
493     GST_DEBUG_OBJECT (src, "live source waiting for running state");
494     GST_LIVE_WAIT (src);
495     GST_DEBUG_OBJECT (src, "live source unlocked");
496     if (src->priv->flushing)
497       goto flushing;
498   } while (G_UNLIKELY (!src->live_running));
499
500   return GST_FLOW_OK;
501
502   /* ERRORS */
503 flushing:
504   {
505     GST_DEBUG_OBJECT (src, "we are flushing");
506     return GST_FLOW_WRONG_STATE;
507   }
508 }
509
510 /**
511  * gst_base_src_set_live:
512  * @src: base source instance
513  * @live: new live-mode
514  *
515  * If the element listens to a live source, @live should
516  * be set to %TRUE.
517  *
518  * A live source will not produce data in the PAUSED state and
519  * will therefore not be able to participate in the PREROLL phase
520  * of a pipeline. To signal this fact to the application and the
521  * pipeline, the state change return value of the live source will
522  * be GST_STATE_CHANGE_NO_PREROLL.
523  */
524 void
525 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
526 {
527   g_return_if_fail (GST_IS_BASE_SRC (src));
528
529   GST_OBJECT_LOCK (src);
530   src->is_live = live;
531   GST_OBJECT_UNLOCK (src);
532 }
533
534 /**
535  * gst_base_src_is_live:
536  * @src: base source instance
537  *
538  * Check if an element is in live mode.
539  *
540  * Returns: %TRUE if element is in live mode.
541  */
542 gboolean
543 gst_base_src_is_live (GstBaseSrc * src)
544 {
545   gboolean result;
546
547   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
548
549   GST_OBJECT_LOCK (src);
550   result = src->is_live;
551   GST_OBJECT_UNLOCK (src);
552
553   return result;
554 }
555
556 /**
557  * gst_base_src_set_format:
558  * @src: base source instance
559  * @format: the format to use
560  *
561  * Sets the default format of the source. This will be the format used
562  * for sending NEW_SEGMENT events and for performing seeks.
563  *
564  * If a format of GST_FORMAT_BYTES is set, the element will be able to
565  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
566  *
567  * This function must only be called in states < %GST_STATE_PAUSED.
568  *
569  * Since: 0.10.1
570  */
571 void
572 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
573 {
574   g_return_if_fail (GST_IS_BASE_SRC (src));
575   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
576
577   GST_OBJECT_LOCK (src);
578   gst_segment_init (&src->segment, format);
579   GST_OBJECT_UNLOCK (src);
580 }
581
582 /**
583  * gst_base_src_query_latency:
584  * @src: the source
585  * @live: (out) (allow-none): if the source is live
586  * @min_latency: (out) (allow-none): the min latency of the source
587  * @max_latency: (out) (allow-none): the max latency of the source
588  *
589  * Query the source for the latency parameters. @live will be TRUE when @src is
590  * configured as a live source. @min_latency will be set to the difference
591  * between the running time and the timestamp of the first buffer.
592  * @max_latency is always the undefined value of -1.
593  *
594  * This function is mostly used by subclasses.
595  *
596  * Returns: TRUE if the query succeeded.
597  *
598  * Since: 0.10.13
599  */
600 gboolean
601 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
602     GstClockTime * min_latency, GstClockTime * max_latency)
603 {
604   GstClockTime min;
605
606   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
607
608   GST_OBJECT_LOCK (src);
609   if (live)
610     *live = src->is_live;
611
612   /* if we have a startup latency, report this one, else report 0. Subclasses
613    * are supposed to override the query function if they want something
614    * else. */
615   if (src->priv->latency != -1)
616     min = src->priv->latency;
617   else
618     min = 0;
619
620   if (min_latency)
621     *min_latency = min;
622   if (max_latency)
623     *max_latency = -1;
624
625   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
626       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
627       GST_TIME_ARGS (-1));
628   GST_OBJECT_UNLOCK (src);
629
630   return TRUE;
631 }
632
633 /**
634  * gst_base_src_set_blocksize:
635  * @src: the source
636  * @blocksize: the new blocksize in bytes
637  *
638  * Set the number of bytes that @src will push out with each buffer. When
639  * @blocksize is set to -1, a default length will be used.
640  *
641  * Since: 0.10.22
642  */
643 /* FIXME 0.11: blocksize property should be int, not ulong */
644 void
645 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
646 {
647   g_return_if_fail (GST_IS_BASE_SRC (src));
648
649   GST_OBJECT_LOCK (src);
650   src->blocksize = blocksize;
651   GST_OBJECT_UNLOCK (src);
652 }
653
654 /**
655  * gst_base_src_get_blocksize:
656  * @src: the source
657  *
658  * Get the number of bytes that @src will push out with each buffer.
659  *
660  * Returns: the number of bytes pushed with each buffer.
661  *
662  * Since: 0.10.22
663  */
664 /* FIXME 0.11: blocksize property should be int, not ulong */
665 gulong
666 gst_base_src_get_blocksize (GstBaseSrc * src)
667 {
668   gulong res;
669
670   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
671
672   GST_OBJECT_LOCK (src);
673   res = src->blocksize;
674   GST_OBJECT_UNLOCK (src);
675
676   return res;
677 }
678
679
680 /**
681  * gst_base_src_set_do_timestamp:
682  * @src: the source
683  * @timestamp: enable or disable timestamping
684  *
685  * Configure @src to automatically timestamp outgoing buffers based on the
686  * current running_time of the pipeline. This property is mostly useful for live
687  * sources.
688  *
689  * Since: 0.10.15
690  */
691 void
692 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
693 {
694   g_return_if_fail (GST_IS_BASE_SRC (src));
695
696   GST_OBJECT_LOCK (src);
697   src->priv->do_timestamp = timestamp;
698   GST_OBJECT_UNLOCK (src);
699 }
700
701 /**
702  * gst_base_src_get_do_timestamp:
703  * @src: the source
704  *
705  * Query if @src timestamps outgoing buffers based on the current running_time.
706  *
707  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
708  *
709  * Since: 0.10.15
710  */
711 gboolean
712 gst_base_src_get_do_timestamp (GstBaseSrc * src)
713 {
714   gboolean res;
715
716   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
717
718   GST_OBJECT_LOCK (src);
719   res = src->priv->do_timestamp;
720   GST_OBJECT_UNLOCK (src);
721
722   return res;
723 }
724
725 /**
726  * gst_base_src_new_seamless_segment:
727  * @src: The source
728  * @start: The new start value for the segment
729  * @stop: Stop value for the new segment
730  * @position: The position value for the new segent
731  *
732  * Prepare a new seamless segment for emission downstream. This function must
733  * only be called by derived sub-classes, and only from the create() function,
734  * as the stream-lock needs to be held.
735  *
736  * The format for the new segment will be the current format of the source, as
737  * configured with gst_base_src_set_format()
738  *
739  * Returns: %TRUE if preparation of the seamless segment succeeded.
740  *
741  * Since: 0.10.26
742  */
743 gboolean
744 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
745     gint64 position)
746 {
747   gboolean res = TRUE;
748
749   GST_DEBUG_OBJECT (src,
750       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
751       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
752       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
753
754   GST_OBJECT_LOCK (src);
755
756   src->segment.base = gst_segment_to_running_time (&src->segment,
757       src->segment.format, src->segment.position);
758   src->segment.start = start;
759   src->segment.stop = stop;
760   src->segment.position = position;
761
762   /* forward, we send data from position to stop */
763   src->priv->segment_pending = TRUE;
764   GST_OBJECT_UNLOCK (src);
765
766   src->priv->discont = TRUE;
767   src->running = TRUE;
768
769   return res;
770 }
771
772 static gboolean
773 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
774 {
775   GstBaseSrcClass *bclass;
776   GstBaseSrc *bsrc;
777   gboolean res = TRUE;
778
779   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
780   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
781
782   if (bclass->set_caps)
783     res = bclass->set_caps (bsrc, caps);
784
785   return res;
786 }
787
788 static GstCaps *
789 gst_base_src_getcaps (GstPad * pad)
790 {
791   GstBaseSrcClass *bclass;
792   GstBaseSrc *bsrc;
793   GstCaps *caps = NULL;
794
795   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
796   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
797   if (bclass->get_caps)
798     caps = bclass->get_caps (bsrc);
799
800   if (caps == NULL) {
801     GstPadTemplate *pad_template;
802
803     pad_template =
804         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
805     if (pad_template != NULL) {
806       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
807     }
808   }
809   return caps;
810 }
811
812 static void
813 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
814 {
815   GstBaseSrcClass *bclass;
816   GstBaseSrc *bsrc;
817
818   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
819   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
820
821   if (bclass->fixate)
822     bclass->fixate (bsrc, caps);
823
824   gst_object_unref (bsrc);
825 }
826
827 static gboolean
828 gst_base_src_default_query (GstBaseSrc * src, GstQuery ** query)
829 {
830   gboolean res;
831
832   switch (GST_QUERY_TYPE (*query)) {
833     case GST_QUERY_POSITION:
834     {
835       GstFormat format;
836
837       gst_query_parse_position (*query, &format, NULL);
838
839       GST_DEBUG_OBJECT (src, "position query in format %s",
840           gst_format_get_name (format));
841
842       switch (format) {
843         case GST_FORMAT_PERCENT:
844         {
845           gint64 percent;
846           gint64 position;
847           gint64 duration;
848
849           GST_OBJECT_LOCK (src);
850           position = src->segment.position;
851           duration = src->segment.duration;
852           GST_OBJECT_UNLOCK (src);
853
854           if (position != -1 && duration != -1) {
855             if (position < duration)
856               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
857                   duration);
858             else
859               percent = GST_FORMAT_PERCENT_MAX;
860           } else
861             percent = -1;
862
863           gst_query_set_position (*query, GST_FORMAT_PERCENT, percent);
864           res = TRUE;
865           break;
866         }
867         default:
868         {
869           gint64 position;
870           GstFormat seg_format;
871
872           GST_OBJECT_LOCK (src);
873           position =
874               gst_segment_to_stream_time (&src->segment, src->segment.format,
875               src->segment.position);
876           seg_format = src->segment.format;
877           GST_OBJECT_UNLOCK (src);
878
879           if (position != -1) {
880             /* convert to requested format */
881             res =
882                 gst_pad_query_convert (src->srcpad, seg_format,
883                 position, &format, &position);
884           } else
885             res = TRUE;
886
887           gst_query_set_position (*query, format, position);
888           break;
889         }
890       }
891       break;
892     }
893     case GST_QUERY_DURATION:
894     {
895       GstFormat format;
896
897       gst_query_parse_duration (*query, &format, NULL);
898
899       GST_DEBUG_OBJECT (src, "duration query in format %s",
900           gst_format_get_name (format));
901
902       switch (format) {
903         case GST_FORMAT_PERCENT:
904           gst_query_set_duration (*query, GST_FORMAT_PERCENT,
905               GST_FORMAT_PERCENT_MAX);
906           res = TRUE;
907           break;
908         default:
909         {
910           gint64 duration;
911           GstFormat seg_format;
912
913           GST_OBJECT_LOCK (src);
914           /* this is the duration as configured by the subclass. */
915           duration = src->segment.duration;
916           seg_format = src->segment.format;
917           GST_OBJECT_UNLOCK (src);
918
919           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
920               duration, gst_format_get_name (seg_format));
921
922           if (duration != -1) {
923             /* convert to requested format, if this fails, we have a duration
924              * but we cannot answer the query, we must return FALSE. */
925             res =
926                 gst_pad_query_convert (src->srcpad, seg_format,
927                 duration, &format, &duration);
928           } else {
929             /* The subclass did not configure a duration, we assume that the
930              * media has an unknown duration then and we return TRUE to report
931              * this. Note that this is not the same as returning FALSE, which
932              * means that we cannot report the duration at all. */
933             res = TRUE;
934           }
935           gst_query_set_duration (*query, format, duration);
936           break;
937         }
938       }
939       break;
940     }
941
942     case GST_QUERY_SEEKING:
943     {
944       GstFormat format, seg_format;
945       gint64 duration;
946
947       GST_OBJECT_LOCK (src);
948       duration = src->segment.duration;
949       seg_format = src->segment.format;
950       GST_OBJECT_UNLOCK (src);
951
952       gst_query_parse_seeking (*query, &format, NULL, NULL, NULL);
953       if (format == seg_format) {
954         gst_query_set_seeking (*query, seg_format,
955             gst_base_src_seekable (src), 0, duration);
956         res = TRUE;
957       } else {
958         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
959         /* Don't reply to the query to make up for demuxers which don't
960          * handle the SEEKING query yet. Players like Totem will fall back
961          * to the duration when the SEEKING query isn't answered. */
962         res = FALSE;
963       }
964       break;
965     }
966     case GST_QUERY_SEGMENT:
967     {
968       gint64 start, stop;
969
970       GST_OBJECT_LOCK (src);
971       /* no end segment configured, current duration then */
972       if ((stop = src->segment.stop) == -1)
973         stop = src->segment.duration;
974       start = src->segment.start;
975
976       /* adjust to stream time */
977       if (src->segment.time != -1) {
978         start -= src->segment.time;
979         if (stop != -1)
980           stop -= src->segment.time;
981       }
982
983       gst_query_set_segment (*query, src->segment.rate, src->segment.format,
984           start, stop);
985       GST_OBJECT_UNLOCK (src);
986       res = TRUE;
987       break;
988     }
989
990     case GST_QUERY_FORMATS:
991     {
992       gst_query_set_formats (*query, 3, GST_FORMAT_DEFAULT,
993           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
994       res = TRUE;
995       break;
996     }
997     case GST_QUERY_CONVERT:
998     {
999       GstFormat src_fmt, dest_fmt;
1000       gint64 src_val, dest_val;
1001
1002       gst_query_parse_convert (*query, &src_fmt, &src_val, &dest_fmt,
1003           &dest_val);
1004
1005       /* we can only convert between equal formats... */
1006       if (src_fmt == dest_fmt) {
1007         dest_val = src_val;
1008         res = TRUE;
1009       } else
1010         res = FALSE;
1011
1012       gst_query_set_convert (*query, src_fmt, src_val, dest_fmt, dest_val);
1013       break;
1014     }
1015     case GST_QUERY_LATENCY:
1016     {
1017       GstClockTime min, max;
1018       gboolean live;
1019
1020       /* Subclasses should override and implement something usefull */
1021       res = gst_base_src_query_latency (src, &live, &min, &max);
1022
1023       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1024           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1025           GST_TIME_ARGS (max));
1026
1027       gst_query_set_latency (*query, live, min, max);
1028       break;
1029     }
1030     case GST_QUERY_JITTER:
1031     case GST_QUERY_RATE:
1032       res = FALSE;
1033       break;
1034     case GST_QUERY_BUFFERING:
1035     {
1036       GstFormat format, seg_format;
1037       gint64 start, stop, estimated;
1038
1039       gst_query_parse_buffering_range (*query, &format, NULL, NULL, NULL);
1040
1041       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1042           gst_format_get_name (format));
1043
1044       GST_OBJECT_LOCK (src);
1045       if (src->random_access) {
1046         estimated = 0;
1047         start = 0;
1048         if (format == GST_FORMAT_PERCENT)
1049           stop = GST_FORMAT_PERCENT_MAX;
1050         else
1051           stop = src->segment.duration;
1052       } else {
1053         estimated = -1;
1054         start = -1;
1055         stop = -1;
1056       }
1057       seg_format = src->segment.format;
1058       GST_OBJECT_UNLOCK (src);
1059
1060       /* convert to required format. When the conversion fails, we can't answer
1061        * the query. When the value is unknown, we can don't perform conversion
1062        * but report TRUE. */
1063       if (format != GST_FORMAT_PERCENT && stop != -1) {
1064         res = gst_pad_query_convert (src->srcpad, seg_format,
1065             stop, &format, &stop);
1066       } else {
1067         res = TRUE;
1068       }
1069       if (res && format != GST_FORMAT_PERCENT && start != -1)
1070         res = gst_pad_query_convert (src->srcpad, seg_format,
1071             start, &format, &start);
1072
1073       gst_query_set_buffering_range (*query, format, start, stop, estimated);
1074       break;
1075     }
1076     default:
1077       res = FALSE;
1078       break;
1079   }
1080   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (*query),
1081       res);
1082   return res;
1083 }
1084
1085 static gboolean
1086 gst_base_src_query (GstPad * pad, GstQuery ** query)
1087 {
1088   GstBaseSrc *src;
1089   GstBaseSrcClass *bclass;
1090   gboolean result = FALSE;
1091
1092   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1093   if (G_UNLIKELY (src == NULL))
1094     return FALSE;
1095
1096   bclass = GST_BASE_SRC_GET_CLASS (src);
1097
1098   if (bclass->query)
1099     result = bclass->query (src, query);
1100   else
1101     result = gst_pad_query_default (pad, query);
1102
1103   gst_object_unref (src);
1104
1105   return result;
1106 }
1107
1108 static gboolean
1109 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1110 {
1111   gboolean res = TRUE;
1112
1113   /* update our offset if the start/stop position was updated */
1114   if (segment->format == GST_FORMAT_BYTES) {
1115     segment->time = segment->start;
1116   } else if (segment->start == 0) {
1117     /* seek to start, we can implement a default for this. */
1118     segment->time = 0;
1119   } else {
1120     res = FALSE;
1121     GST_INFO_OBJECT (src, "Can't do a default seek");
1122   }
1123
1124   return res;
1125 }
1126
1127 static gboolean
1128 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1129 {
1130   GstBaseSrcClass *bclass;
1131   gboolean result = FALSE;
1132
1133   bclass = GST_BASE_SRC_GET_CLASS (src);
1134
1135   if (bclass->do_seek)
1136     result = bclass->do_seek (src, segment);
1137
1138   return result;
1139 }
1140
1141 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1142
1143 static gboolean
1144 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1145     GstSegment * segment)
1146 {
1147   /* By default, we try one of 2 things:
1148    *   - For absolute seek positions, convert the requested position to our
1149    *     configured processing format and place it in the output segment \
1150    *   - For relative seek positions, convert our current (input) values to the
1151    *     seek format, adjust by the relative seek offset and then convert back to
1152    *     the processing format
1153    */
1154   GstSeekType cur_type, stop_type;
1155   gint64 cur, stop;
1156   GstSeekFlags flags;
1157   GstFormat seek_format, dest_format;
1158   gdouble rate;
1159   gboolean update;
1160   gboolean res = TRUE;
1161
1162   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1163       &cur_type, &cur, &stop_type, &stop);
1164   dest_format = segment->format;
1165
1166   if (seek_format == dest_format) {
1167     gst_segment_do_seek (segment, rate, seek_format, flags,
1168         cur_type, cur, stop_type, stop, &update);
1169     return TRUE;
1170   }
1171
1172   if (cur_type != GST_SEEK_TYPE_NONE) {
1173     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1174     res =
1175         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1176         &cur);
1177     cur_type = GST_SEEK_TYPE_SET;
1178   }
1179
1180   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1181     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1182     res =
1183         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1184         &stop);
1185     stop_type = GST_SEEK_TYPE_SET;
1186   }
1187
1188   /* And finally, configure our output segment in the desired format */
1189   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1190       stop_type, stop, &update);
1191
1192   if (!res)
1193     goto no_format;
1194
1195   return res;
1196
1197 no_format:
1198   {
1199     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1200     return FALSE;
1201   }
1202 }
1203
1204 static gboolean
1205 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1206     GstSegment * seeksegment)
1207 {
1208   GstBaseSrcClass *bclass;
1209   gboolean result = FALSE;
1210
1211   bclass = GST_BASE_SRC_GET_CLASS (src);
1212
1213   if (bclass->prepare_seek_segment)
1214     result = bclass->prepare_seek_segment (src, event, seeksegment);
1215
1216   return result;
1217 }
1218
1219 /* this code implements the seeking. It is a good example
1220  * handling all cases.
1221  *
1222  * A seek updates the currently configured segment.start
1223  * and segment.stop values based on the SEEK_TYPE. If the
1224  * segment.start value is updated, a seek to this new position
1225  * should be performed.
1226  *
1227  * The seek can only be executed when we are not currently
1228  * streaming any data, to make sure that this is the case, we
1229  * acquire the STREAM_LOCK which is taken when we are in the
1230  * _loop() function or when a getrange() is called. Normally
1231  * we will not receive a seek if we are operating in pull mode
1232  * though. When we operate as a live source we might block on the live
1233  * cond, which does not release the STREAM_LOCK. Therefore we will try
1234  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1235  * safe to perform the seek.
1236  *
1237  * When we are in the loop() function, we might be in the middle
1238  * of pushing a buffer, which might block in a sink. To make sure
1239  * that the push gets unblocked we push out a FLUSH_START event.
1240  * Our loop function will get a WRONG_STATE return value from
1241  * the push and will pause, effectively releasing the STREAM_LOCK.
1242  *
1243  * For a non-flushing seek, we pause the task, which might eventually
1244  * release the STREAM_LOCK. We say eventually because when the sink
1245  * blocks on the sample we might wait a very long time until the sink
1246  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1247  * can continue the seek. A non-flushing seek is normally done in a
1248  * running pipeline to perform seamless playback, this means that the sink is
1249  * PLAYING and will return from its chain function.
1250  * In the case of a non-flushing seek we need to make sure that the
1251  * data we output after the seek is continuous with the previous data,
1252  * this is because a non-flushing seek does not reset the running-time
1253  * to 0. We do this by closing the currently running segment, ie. sending
1254  * a new_segment event with the stop position set to the last processed
1255  * position.
1256  *
1257  * After updating the segment.start/stop values, we prepare for
1258  * streaming again. We push out a FLUSH_STOP to make the peer pad
1259  * accept data again and we start our task again.
1260  *
1261  * A segment seek posts a message on the bus saying that the playback
1262  * of the segment started. We store the segment flag internally because
1263  * when we reach the segment.stop we have to post a segment.done
1264  * instead of EOS when doing a segment seek.
1265  */
1266 /* FIXME (0.11), we have the unlock gboolean here because most current
1267  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1268  * the streaming thread isn't running, resulting in bogus unlocks later when it
1269  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1270  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1271  * until 0.11
1272  */
1273 static gboolean
1274 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1275 {
1276   gboolean res = TRUE, tres;
1277   gdouble rate;
1278   GstFormat seek_format, dest_format;
1279   GstSeekFlags flags;
1280   GstSeekType cur_type, stop_type;
1281   gint64 cur, stop;
1282   gboolean flush, playing;
1283   gboolean update;
1284   gboolean relative_seek = FALSE;
1285   gboolean seekseg_configured = FALSE;
1286   GstSegment seeksegment;
1287   guint32 seqnum;
1288   GstEvent *tevent;
1289
1290   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1291
1292   GST_OBJECT_LOCK (src);
1293   dest_format = src->segment.format;
1294   GST_OBJECT_UNLOCK (src);
1295
1296   if (event) {
1297     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1298         &cur_type, &cur, &stop_type, &stop);
1299
1300     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1301         SEEK_TYPE_IS_RELATIVE (stop_type);
1302
1303     if (dest_format != seek_format && !relative_seek) {
1304       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1305        * here before taking the stream lock, otherwise we must convert it later,
1306        * once we have the stream lock and can read the last configures segment
1307        * start and stop positions */
1308       gst_segment_init (&seeksegment, dest_format);
1309
1310       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1311         goto prepare_failed;
1312
1313       seekseg_configured = TRUE;
1314     }
1315
1316     flush = flags & GST_SEEK_FLAG_FLUSH;
1317     seqnum = gst_event_get_seqnum (event);
1318   } else {
1319     flush = FALSE;
1320     /* get next seqnum */
1321     seqnum = gst_util_seqnum_next ();
1322   }
1323
1324   /* send flush start */
1325   if (flush) {
1326     tevent = gst_event_new_flush_start ();
1327     gst_event_set_seqnum (tevent, seqnum);
1328     gst_pad_push_event (src->srcpad, tevent);
1329   } else
1330     gst_pad_pause_task (src->srcpad);
1331
1332   /* unblock streaming thread. */
1333   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1334
1335   /* grab streaming lock, this should eventually be possible, either
1336    * because the task is paused, our streaming thread stopped
1337    * or because our peer is flushing. */
1338   GST_PAD_STREAM_LOCK (src->srcpad);
1339   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1340     /* we have seen this event before, issue a warning for now */
1341     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1342         seqnum);
1343   } else {
1344     src->priv->seqnum = seqnum;
1345     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1346   }
1347
1348   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1349
1350   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1351    * copy the current segment info into the temp segment that we can actually
1352    * attempt the seek with. We only update the real segment if the seek suceeds. */
1353   if (!seekseg_configured) {
1354     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1355
1356     /* now configure the final seek segment */
1357     if (event) {
1358       if (seeksegment.format != seek_format) {
1359         /* OK, here's where we give the subclass a chance to convert the relative
1360          * seek into an absolute one in the processing format. We set up any
1361          * absolute seek above, before taking the stream lock. */
1362         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1363           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1364               "Aborting seek");
1365           res = FALSE;
1366         }
1367       } else {
1368         /* The seek format matches our processing format, no need to ask the
1369          * the subclass to configure the segment. */
1370         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1371             cur_type, cur, stop_type, stop, &update);
1372       }
1373     }
1374     /* Else, no seek event passed, so we're just (re)starting the
1375        current segment. */
1376   }
1377
1378   if (res) {
1379     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1380         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1381         seeksegment.start, seeksegment.stop, seeksegment.position);
1382
1383     /* do the seek, segment.position contains the new position. */
1384     res = gst_base_src_do_seek (src, &seeksegment);
1385   }
1386
1387   /* and prepare to continue streaming */
1388   if (flush) {
1389     tevent = gst_event_new_flush_stop ();
1390     gst_event_set_seqnum (tevent, seqnum);
1391     /* send flush stop, peer will accept data and events again. We
1392      * are not yet providing data as we still have the STREAM_LOCK. */
1393     gst_pad_push_event (src->srcpad, tevent);
1394   }
1395
1396   /* The subclass must have converted the segment to the processing format
1397    * by now */
1398   if (res && seeksegment.format != dest_format) {
1399     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1400         "in the correct format. Aborting seek.");
1401     res = FALSE;
1402   }
1403
1404   /* if the seek was successful, we update our real segment and push
1405    * out the new segment. */
1406   if (res) {
1407     GST_OBJECT_LOCK (src);
1408     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1409     GST_OBJECT_UNLOCK (src);
1410
1411     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1412       GstMessage *message;
1413
1414       message = gst_message_new_segment_start (GST_OBJECT (src),
1415           seeksegment.format, seeksegment.position);
1416       gst_message_set_seqnum (message, seqnum);
1417
1418       gst_element_post_message (GST_ELEMENT (src), message);
1419     }
1420
1421     /* for deriving a stop position for the playback segment from the seek
1422      * segment, we must take the duration when the stop is not set */
1423     if ((stop = seeksegment.stop) == -1)
1424       stop = seeksegment.duration;
1425
1426     src->priv->segment_pending = TRUE;
1427   }
1428
1429   src->priv->discont = TRUE;
1430   src->running = TRUE;
1431   /* and restart the task in case it got paused explicitly or by
1432    * the FLUSH_START event we pushed out. */
1433   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1434       src->srcpad);
1435   if (res && !tres)
1436     res = FALSE;
1437
1438   /* and release the lock again so we can continue streaming */
1439   GST_PAD_STREAM_UNLOCK (src->srcpad);
1440
1441   return res;
1442
1443   /* ERROR */
1444 prepare_failed:
1445   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1446       "Aborting seek");
1447   return FALSE;
1448 }
1449
1450 static const GstQueryType *
1451 gst_base_src_get_query_types (GstElement * element)
1452 {
1453   static const GstQueryType query_types[] = {
1454     GST_QUERY_DURATION,
1455     GST_QUERY_POSITION,
1456     GST_QUERY_SEEKING,
1457     GST_QUERY_SEGMENT,
1458     GST_QUERY_FORMATS,
1459     GST_QUERY_LATENCY,
1460     GST_QUERY_JITTER,
1461     GST_QUERY_RATE,
1462     GST_QUERY_CONVERT,
1463     0
1464   };
1465
1466   return query_types;
1467 }
1468
1469 /* all events send to this element directly. This is mainly done from the
1470  * application.
1471  */
1472 static gboolean
1473 gst_base_src_send_event (GstElement * element, GstEvent * event)
1474 {
1475   GstBaseSrc *src;
1476   gboolean result = FALSE;
1477
1478   src = GST_BASE_SRC (element);
1479
1480   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1481
1482   switch (GST_EVENT_TYPE (event)) {
1483       /* bidirectional events */
1484     case GST_EVENT_FLUSH_START:
1485     case GST_EVENT_FLUSH_STOP:
1486       /* sending random flushes downstream can break stuff,
1487        * especially sync since all segment info will get flushed */
1488       break;
1489
1490       /* downstream serialized events */
1491     case GST_EVENT_EOS:
1492     {
1493       GstBaseSrcClass *bclass;
1494
1495       bclass = GST_BASE_SRC_GET_CLASS (src);
1496
1497       /* queue EOS and make sure the task or pull function performs the EOS
1498        * actions.
1499        *
1500        * We have two possibilities:
1501        *
1502        *  - Before we are to enter the _create function, we check the pending_eos
1503        *    first and do EOS instead of entering it.
1504        *  - If we are in the _create function or we did not manage to set the
1505        *    flag fast enough and we are about to enter the _create function,
1506        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1507        *    check the EOS flag and do the EOS logic.
1508        */
1509       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1510       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1511
1512       /* unlock the _create function so that we can check the pending_eos flag
1513        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1514        * that we can grab it and stop the unlock again. We don't take the stream
1515        * lock so that this operation is guaranteed to never block. */
1516       if (bclass->unlock)
1517         bclass->unlock (src);
1518
1519       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1520
1521       GST_LIVE_LOCK (src);
1522       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1523       /* now stop the unlock of the streaming thread again. Grabbing the live
1524        * lock is enough because that protects the create function. */
1525       if (bclass->unlock_stop)
1526         bclass->unlock_stop (src);
1527       GST_LIVE_UNLOCK (src);
1528
1529       result = TRUE;
1530       break;
1531     }
1532     case GST_EVENT_SEGMENT:
1533       /* sending random SEGMENT downstream can break sync. */
1534       break;
1535     case GST_EVENT_TAG:
1536     case GST_EVENT_CUSTOM_DOWNSTREAM:
1537     case GST_EVENT_CUSTOM_BOTH:
1538       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1539       GST_OBJECT_LOCK (src);
1540       src->priv->pending_events =
1541           g_list_append (src->priv->pending_events, event);
1542       g_atomic_int_set (&src->priv->have_events, TRUE);
1543       GST_OBJECT_UNLOCK (src);
1544       event = NULL;
1545       result = TRUE;
1546       break;
1547     case GST_EVENT_BUFFERSIZE:
1548       /* does not seem to make much sense currently */
1549       break;
1550
1551       /* upstream events */
1552     case GST_EVENT_QOS:
1553       /* elements should override send_event and do something */
1554       break;
1555     case GST_EVENT_SEEK:
1556     {
1557       gboolean started;
1558
1559       GST_OBJECT_LOCK (src->srcpad);
1560       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1561         goto wrong_mode;
1562       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1563       GST_OBJECT_UNLOCK (src->srcpad);
1564
1565       if (started) {
1566         GST_DEBUG_OBJECT (src, "performing seek");
1567         /* when we are running in push mode, we can execute the
1568          * seek right now, we need to unlock. */
1569         result = gst_base_src_perform_seek (src, event, TRUE);
1570       } else {
1571         GstEvent **event_p;
1572
1573         /* else we store the event and execute the seek when we
1574          * get activated */
1575         GST_OBJECT_LOCK (src);
1576         GST_DEBUG_OBJECT (src, "queueing seek");
1577         event_p = &src->pending_seek;
1578         gst_event_replace ((GstEvent **) event_p, event);
1579         GST_OBJECT_UNLOCK (src);
1580         /* assume the seek will work */
1581         result = TRUE;
1582       }
1583       break;
1584     }
1585     case GST_EVENT_NAVIGATION:
1586       /* could make sense for elements that do something with navigation events
1587        * but then they would need to override the send_event function */
1588       break;
1589     case GST_EVENT_LATENCY:
1590       /* does not seem to make sense currently */
1591       break;
1592
1593       /* custom events */
1594     case GST_EVENT_CUSTOM_UPSTREAM:
1595       /* override send_event if you want this */
1596       break;
1597     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1598     case GST_EVENT_CUSTOM_BOTH_OOB:
1599       /* insert a random custom event into the pipeline */
1600       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1601       result = gst_pad_push_event (src->srcpad, event);
1602       /* we gave away the ref to the event in the push */
1603       event = NULL;
1604       break;
1605     default:
1606       break;
1607   }
1608 done:
1609   /* if we still have a ref to the event, unref it now */
1610   if (event)
1611     gst_event_unref (event);
1612
1613   return result;
1614
1615   /* ERRORS */
1616 wrong_mode:
1617   {
1618     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1619     GST_OBJECT_UNLOCK (src->srcpad);
1620     result = FALSE;
1621     goto done;
1622   }
1623 }
1624
1625 static gboolean
1626 gst_base_src_seekable (GstBaseSrc * src)
1627 {
1628   GstBaseSrcClass *bclass;
1629   bclass = GST_BASE_SRC_GET_CLASS (src);
1630   if (bclass->is_seekable)
1631     return bclass->is_seekable (src);
1632   else
1633     return FALSE;
1634 }
1635
1636 static void
1637 gst_base_src_update_qos (GstBaseSrc * src,
1638     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1639 {
1640   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1641       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1642       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1643
1644   GST_OBJECT_LOCK (src);
1645   src->priv->proportion = proportion;
1646   src->priv->earliest_time = timestamp + diff;
1647   GST_OBJECT_UNLOCK (src);
1648 }
1649
1650
1651 static gboolean
1652 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1653 {
1654   gboolean result;
1655
1656   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1657
1658   switch (GST_EVENT_TYPE (event)) {
1659     case GST_EVENT_SEEK:
1660       /* is normally called when in push mode */
1661       if (!gst_base_src_seekable (src))
1662         goto not_seekable;
1663
1664       result = gst_base_src_perform_seek (src, event, TRUE);
1665       break;
1666     case GST_EVENT_FLUSH_START:
1667       /* cancel any blocking getrange, is normally called
1668        * when in pull mode. */
1669       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1670       break;
1671     case GST_EVENT_FLUSH_STOP:
1672       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1673       break;
1674     case GST_EVENT_QOS:
1675     {
1676       gdouble proportion;
1677       GstClockTimeDiff diff;
1678       GstClockTime timestamp;
1679
1680       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1681       gst_base_src_update_qos (src, proportion, diff, timestamp);
1682       result = TRUE;
1683       break;
1684     }
1685     default:
1686       result = FALSE;
1687       break;
1688   }
1689   return result;
1690
1691   /* ERRORS */
1692 not_seekable:
1693   {
1694     GST_DEBUG_OBJECT (src, "is not seekable");
1695     return FALSE;
1696   }
1697 }
1698
1699 static gboolean
1700 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1701 {
1702   GstBaseSrc *src;
1703   GstBaseSrcClass *bclass;
1704   gboolean result = FALSE;
1705
1706   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1707   if (G_UNLIKELY (src == NULL)) {
1708     gst_event_unref (event);
1709     return FALSE;
1710   }
1711
1712   bclass = GST_BASE_SRC_GET_CLASS (src);
1713
1714   if (bclass->event) {
1715     if (!(result = bclass->event (src, event)))
1716       goto subclass_failed;
1717   }
1718
1719 done:
1720   gst_event_unref (event);
1721   gst_object_unref (src);
1722
1723   return result;
1724
1725   /* ERRORS */
1726 subclass_failed:
1727   {
1728     GST_DEBUG_OBJECT (src, "subclass refused event");
1729     goto done;
1730   }
1731 }
1732
1733 static void
1734 gst_base_src_set_property (GObject * object, guint prop_id,
1735     const GValue * value, GParamSpec * pspec)
1736 {
1737   GstBaseSrc *src;
1738
1739   src = GST_BASE_SRC (object);
1740
1741   switch (prop_id) {
1742     case PROP_BLOCKSIZE:
1743       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1744       break;
1745     case PROP_NUM_BUFFERS:
1746       src->num_buffers = g_value_get_int (value);
1747       break;
1748     case PROP_TYPEFIND:
1749       src->typefind = g_value_get_boolean (value);
1750       break;
1751     case PROP_DO_TIMESTAMP:
1752       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1753       break;
1754     default:
1755       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1756       break;
1757   }
1758 }
1759
1760 static void
1761 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1762     GParamSpec * pspec)
1763 {
1764   GstBaseSrc *src;
1765
1766   src = GST_BASE_SRC (object);
1767
1768   switch (prop_id) {
1769     case PROP_BLOCKSIZE:
1770       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1771       break;
1772     case PROP_NUM_BUFFERS:
1773       g_value_set_int (value, src->num_buffers);
1774       break;
1775     case PROP_TYPEFIND:
1776       g_value_set_boolean (value, src->typefind);
1777       break;
1778     case PROP_DO_TIMESTAMP:
1779       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1780       break;
1781     default:
1782       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1783       break;
1784   }
1785 }
1786
1787 /* with STREAM_LOCK and LOCK */
1788 static GstClockReturn
1789 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1790 {
1791   GstClockReturn ret;
1792   GstClockID id;
1793
1794   id = gst_clock_new_single_shot_id (clock, time);
1795
1796   basesrc->clock_id = id;
1797   /* release the live lock while waiting */
1798   GST_LIVE_UNLOCK (basesrc);
1799
1800   ret = gst_clock_id_wait (id, NULL);
1801
1802   GST_LIVE_LOCK (basesrc);
1803   gst_clock_id_unref (id);
1804   basesrc->clock_id = NULL;
1805
1806   return ret;
1807 }
1808
1809 /* perform synchronisation on a buffer.
1810  * with STREAM_LOCK.
1811  */
1812 static GstClockReturn
1813 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1814 {
1815   GstClockReturn result;
1816   GstClockTime start, end;
1817   GstBaseSrcClass *bclass;
1818   GstClockTime base_time;
1819   GstClock *clock;
1820   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1821   gboolean do_timestamp, first, pseudo_live;
1822
1823   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1824
1825   start = end = -1;
1826   if (bclass->get_times)
1827     bclass->get_times (basesrc, buffer, &start, &end);
1828
1829   /* get buffer timestamp */
1830   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1831
1832   /* grab the lock to prepare for clocking and calculate the startup
1833    * latency. */
1834   GST_OBJECT_LOCK (basesrc);
1835
1836   /* if we are asked to sync against the clock we are a pseudo live element */
1837   pseudo_live = (start != -1 && basesrc->is_live);
1838   /* check for the first buffer */
1839   first = (basesrc->priv->latency == -1);
1840
1841   if (timestamp != -1 && pseudo_live) {
1842     GstClockTime latency;
1843
1844     /* we have a timestamp and a sync time, latency is the diff */
1845     if (timestamp <= start)
1846       latency = start - timestamp;
1847     else
1848       latency = 0;
1849
1850     if (first) {
1851       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1852           GST_TIME_ARGS (latency));
1853       /* first time we calculate latency, just configure */
1854       basesrc->priv->latency = latency;
1855     } else {
1856       if (basesrc->priv->latency != latency) {
1857         /* we have a new latency, FIXME post latency message */
1858         basesrc->priv->latency = latency;
1859         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1860             GST_TIME_ARGS (latency));
1861       }
1862     }
1863   } else if (first) {
1864     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1865         basesrc->is_live, start != -1);
1866     basesrc->priv->latency = 0;
1867   }
1868
1869   /* get clock, if no clock, we can't sync or do timestamps */
1870   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1871     goto no_clock;
1872
1873   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1874
1875   do_timestamp = basesrc->priv->do_timestamp;
1876
1877   /* first buffer, calculate the timestamp offset */
1878   if (first) {
1879     GstClockTime running_time;
1880
1881     now = gst_clock_get_time (clock);
1882     running_time = now - base_time;
1883
1884     GST_LOG_OBJECT (basesrc,
1885         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1886         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1887         GST_TIME_ARGS (running_time));
1888
1889     if (pseudo_live && timestamp != -1) {
1890       /* live source and we need to sync, add startup latency to all timestamps
1891        * to get the real running_time. Live sources should always timestamp
1892        * according to the current running time. */
1893       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1894
1895       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1896           GST_TIME_ARGS (basesrc->priv->ts_offset));
1897     } else {
1898       basesrc->priv->ts_offset = 0;
1899       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1900     }
1901
1902     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1903       if (do_timestamp)
1904         timestamp = running_time;
1905       else
1906         timestamp = 0;
1907
1908       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1909
1910       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1911           GST_TIME_ARGS (timestamp));
1912     }
1913
1914     /* add the timestamp offset we need for sync */
1915     timestamp += basesrc->priv->ts_offset;
1916   } else {
1917     /* not the first buffer, the timestamp is the diff between the clock and
1918      * base_time */
1919     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1920       now = gst_clock_get_time (clock);
1921
1922       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1923
1924       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1925           GST_TIME_ARGS (now - base_time));
1926     }
1927   }
1928
1929   /* if we don't have a buffer timestamp, we don't sync */
1930   if (!GST_CLOCK_TIME_IS_VALID (start))
1931     goto no_sync;
1932
1933   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1934     /* for pseudo live sources, add our ts_offset to the timestamp */
1935     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1936     start += basesrc->priv->ts_offset;
1937   }
1938
1939   GST_LOG_OBJECT (basesrc,
1940       "waiting for clock, base time %" GST_TIME_FORMAT
1941       ", stream_start %" GST_TIME_FORMAT,
1942       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1943   GST_OBJECT_UNLOCK (basesrc);
1944
1945   result = gst_base_src_wait (basesrc, clock, start + base_time);
1946
1947   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1948
1949   return result;
1950
1951   /* special cases */
1952 no_clock:
1953   {
1954     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1955     GST_OBJECT_UNLOCK (basesrc);
1956     return GST_CLOCK_OK;
1957   }
1958 no_sync:
1959   {
1960     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1961     GST_OBJECT_UNLOCK (basesrc);
1962     return GST_CLOCK_OK;
1963   }
1964 }
1965
1966 /* Called with STREAM_LOCK and LIVE_LOCK */
1967 static gboolean
1968 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1969 {
1970   guint64 size, maxsize;
1971   GstBaseSrcClass *bclass;
1972   GstFormat format;
1973   gint64 stop;
1974
1975   bclass = GST_BASE_SRC_GET_CLASS (src);
1976
1977   format = src->segment.format;
1978   stop = src->segment.stop;
1979   /* get total file size */
1980   size = (guint64) src->segment.duration;
1981
1982   /* only operate if we are working with bytes */
1983   if (format != GST_FORMAT_BYTES)
1984     return TRUE;
1985
1986   /* the max amount of bytes to read is the total size or
1987    * up to the segment.stop if present. */
1988   if (stop != -1)
1989     maxsize = MIN (size, stop);
1990   else
1991     maxsize = size;
1992
1993   GST_DEBUG_OBJECT (src,
1994       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1995       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1996       *length, size, stop, maxsize);
1997
1998   /* check size if we have one */
1999   if (maxsize != -1) {
2000     /* if we run past the end, check if the file became bigger and
2001      * retry. */
2002     if (G_UNLIKELY (offset + *length >= maxsize)) {
2003       /* see if length of the file changed */
2004       if (bclass->get_size)
2005         if (!bclass->get_size (src, &size))
2006           size = -1;
2007
2008       /* make sure we don't exceed the configured segment stop
2009        * if it was set */
2010       if (stop != -1)
2011         maxsize = MIN (size, stop);
2012       else
2013         maxsize = size;
2014
2015       /* if we are at or past the end, EOS */
2016       if (G_UNLIKELY (offset >= maxsize))
2017         goto unexpected_length;
2018
2019       /* else we can clip to the end */
2020       if (G_UNLIKELY (offset + *length >= maxsize))
2021         *length = maxsize - offset;
2022
2023     }
2024   }
2025
2026   /* keep track of current position and update duration.
2027    * segment is in bytes, we checked that above. */
2028   GST_OBJECT_LOCK (src);
2029   src->segment.duration = size;
2030   src->segment.position = offset;
2031   GST_OBJECT_UNLOCK (src);
2032
2033   return TRUE;
2034
2035   /* ERRORS */
2036 unexpected_length:
2037   {
2038     return FALSE;
2039   }
2040 }
2041
2042 /* must be called with LIVE_LOCK */
2043 static GstFlowReturn
2044 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2045     GstBuffer ** buf)
2046 {
2047   GstFlowReturn ret;
2048   GstBaseSrcClass *bclass;
2049   GstClockReturn status;
2050
2051   bclass = GST_BASE_SRC_GET_CLASS (src);
2052
2053 again:
2054   if (src->is_live) {
2055     if (G_UNLIKELY (!src->live_running)) {
2056       ret = gst_base_src_wait_playing (src);
2057       if (ret != GST_FLOW_OK)
2058         goto stopped;
2059     }
2060   }
2061
2062   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2063     goto not_started;
2064
2065   if (G_UNLIKELY (!bclass->create))
2066     goto no_function;
2067
2068   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2069     goto unexpected_length;
2070
2071   /* normally we don't count buffers */
2072   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2073     if (src->num_buffers_left == 0)
2074       goto reached_num_buffers;
2075     else
2076       src->num_buffers_left--;
2077   }
2078
2079   /* don't enter the create function if a pending EOS event was set. For the
2080    * logic of the pending_eos, check the event function of this class. */
2081   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2082     goto eos;
2083
2084   GST_DEBUG_OBJECT (src,
2085       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2086       G_GINT64_FORMAT, offset, length, src->segment.time);
2087
2088   ret = bclass->create (src, offset, length, buf);
2089
2090   /* The create function could be unlocked because we have a pending EOS. It's
2091    * possible that we have a valid buffer from create that we need to
2092    * discard when the create function returned _OK. */
2093   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2094     if (ret == GST_FLOW_OK) {
2095       gst_buffer_unref (*buf);
2096       *buf = NULL;
2097     }
2098     goto eos;
2099   }
2100
2101   if (G_UNLIKELY (ret != GST_FLOW_OK))
2102     goto not_ok;
2103
2104   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2105   if (offset == 0 && src->segment.time == 0
2106       && GST_BUFFER_TIMESTAMP (*buf) == -1) {
2107     *buf = gst_buffer_make_writable (*buf);
2108     GST_BUFFER_TIMESTAMP (*buf) = 0;
2109   }
2110
2111   /* now sync before pushing the buffer */
2112   status = gst_base_src_do_sync (src, *buf);
2113
2114   /* waiting for the clock could have made us flushing */
2115   if (G_UNLIKELY (src->priv->flushing))
2116     goto flushing;
2117
2118   switch (status) {
2119     case GST_CLOCK_EARLY:
2120       /* the buffer is too late. We currently don't drop the buffer. */
2121       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2122       break;
2123     case GST_CLOCK_OK:
2124       /* buffer synchronised properly */
2125       GST_DEBUG_OBJECT (src, "buffer ok");
2126       break;
2127     case GST_CLOCK_UNSCHEDULED:
2128       /* this case is triggered when we were waiting for the clock and
2129        * it got unlocked because we did a state change. In any case, get rid of
2130        * the buffer. */
2131       gst_buffer_unref (*buf);
2132       *buf = NULL;
2133       if (!src->live_running) {
2134         /* We return WRONG_STATE when we are not running to stop the dataflow also
2135          * get rid of the produced buffer. */
2136         GST_DEBUG_OBJECT (src,
2137             "clock was unscheduled (%d), returning WRONG_STATE", status);
2138         ret = GST_FLOW_WRONG_STATE;
2139       } else {
2140         /* If we are running when this happens, we quickly switched between
2141          * pause and playing. We try to produce a new buffer */
2142         GST_DEBUG_OBJECT (src,
2143             "clock was unscheduled (%d), but we are running", status);
2144         goto again;
2145       }
2146       break;
2147     default:
2148       /* all other result values are unexpected and errors */
2149       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2150           (_("Internal clock error.")),
2151           ("clock returned unexpected return value %d", status));
2152       gst_buffer_unref (*buf);
2153       *buf = NULL;
2154       ret = GST_FLOW_ERROR;
2155       break;
2156   }
2157   return ret;
2158
2159   /* ERROR */
2160 stopped:
2161   {
2162     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2163         gst_flow_get_name (ret));
2164     return ret;
2165   }
2166 not_ok:
2167   {
2168     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2169         gst_flow_get_name (ret));
2170     return ret;
2171   }
2172 not_started:
2173   {
2174     GST_DEBUG_OBJECT (src, "getrange but not started");
2175     return GST_FLOW_WRONG_STATE;
2176   }
2177 no_function:
2178   {
2179     GST_DEBUG_OBJECT (src, "no create function");
2180     return GST_FLOW_ERROR;
2181   }
2182 unexpected_length:
2183   {
2184     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2185         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2186     return GST_FLOW_UNEXPECTED;
2187   }
2188 reached_num_buffers:
2189   {
2190     GST_DEBUG_OBJECT (src, "sent all buffers");
2191     return GST_FLOW_UNEXPECTED;
2192   }
2193 flushing:
2194   {
2195     GST_DEBUG_OBJECT (src, "we are flushing");
2196     gst_buffer_unref (*buf);
2197     *buf = NULL;
2198     return GST_FLOW_WRONG_STATE;
2199   }
2200 eos:
2201   {
2202     GST_DEBUG_OBJECT (src, "we are EOS");
2203     return GST_FLOW_UNEXPECTED;
2204   }
2205 }
2206
2207 static GstFlowReturn
2208 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2209     GstBuffer ** buf)
2210 {
2211   GstBaseSrc *src;
2212   GstFlowReturn res;
2213
2214   src = GST_BASE_SRC_CAST (gst_object_ref (GST_OBJECT_PARENT (pad)));
2215
2216   GST_LIVE_LOCK (src);
2217   if (G_UNLIKELY (src->priv->flushing))
2218     goto flushing;
2219
2220   res = gst_base_src_get_range (src, offset, length, buf);
2221
2222 done:
2223   GST_LIVE_UNLOCK (src);
2224
2225   gst_object_unref (src);
2226
2227   return res;
2228
2229   /* ERRORS */
2230 flushing:
2231   {
2232     GST_DEBUG_OBJECT (src, "we are flushing");
2233     res = GST_FLOW_WRONG_STATE;
2234     goto done;
2235   }
2236 }
2237
2238 static gboolean
2239 gst_base_src_default_check_get_range (GstBaseSrc * src)
2240 {
2241   gboolean res;
2242
2243   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2244     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2245     if (G_LIKELY (gst_base_src_start (src)))
2246       gst_base_src_stop (src);
2247   }
2248
2249   /* we can operate in getrange mode if the native format is bytes
2250    * and we are seekable, this condition is set in the random_access
2251    * flag and is set in the _start() method. */
2252   res = src->random_access;
2253
2254   return res;
2255 }
2256
2257 static gboolean
2258 gst_base_src_check_get_range (GstBaseSrc * src)
2259 {
2260   GstBaseSrcClass *bclass;
2261   gboolean res;
2262
2263   bclass = GST_BASE_SRC_GET_CLASS (src);
2264
2265   if (bclass->check_get_range == NULL)
2266     goto no_function;
2267
2268   res = bclass->check_get_range (src);
2269   GST_LOG_OBJECT (src, "%s() returned %d",
2270       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2271
2272   return res;
2273
2274   /* ERRORS */
2275 no_function:
2276   {
2277     GST_WARNING_OBJECT (src, "no check_get_range function set");
2278     return FALSE;
2279   }
2280 }
2281
2282 static gboolean
2283 gst_base_src_pad_check_get_range (GstPad * pad)
2284 {
2285   GstBaseSrc *src;
2286   gboolean res;
2287
2288   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2289
2290   res = gst_base_src_check_get_range (src);
2291
2292   return res;
2293 }
2294
2295 static void
2296 gst_base_src_loop (GstPad * pad)
2297 {
2298   GstBaseSrc *src;
2299   GstBuffer *buf = NULL;
2300   GstFlowReturn ret;
2301   gint64 position;
2302   gboolean eos;
2303   gulong blocksize;
2304   GList *pending_events = NULL, *tmp;
2305   gboolean reconfigure;
2306
2307   eos = FALSE;
2308
2309   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2310
2311   GST_OBJECT_LOCK (pad);
2312   reconfigure = GST_PAD_NEEDS_RECONFIGURE (pad);
2313   GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_RECONFIGURE);
2314   GST_OBJECT_UNLOCK (pad);
2315   /* check if we need to renegotiate */
2316   if (reconfigure) {
2317     if (!gst_base_src_negotiate (src))
2318       GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2319   }
2320
2321   GST_LIVE_LOCK (src);
2322
2323   if (G_UNLIKELY (src->priv->flushing))
2324     goto flushing;
2325
2326   src->priv->last_sent_eos = FALSE;
2327
2328   blocksize = src->blocksize;
2329
2330   /* if we operate in bytes, we can calculate an offset */
2331   if (src->segment.format == GST_FORMAT_BYTES) {
2332     position = src->segment.position;
2333     /* for negative rates, start with subtracting the blocksize */
2334     if (src->segment.rate < 0.0) {
2335       /* we cannot go below segment.start */
2336       if (position > src->segment.start + blocksize)
2337         position -= blocksize;
2338       else {
2339         /* last block, remainder up to segment.start */
2340         blocksize = position - src->segment.start;
2341         position = src->segment.start;
2342       }
2343     }
2344   } else
2345     position = -1;
2346
2347   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2348       GST_TIME_ARGS (position), blocksize);
2349
2350   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2351   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2352     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2353         gst_flow_get_name (ret));
2354     GST_LIVE_UNLOCK (src);
2355     goto pause;
2356   }
2357   /* this should not happen */
2358   if (G_UNLIKELY (buf == NULL))
2359     goto null_buffer;
2360
2361   /* push events to close/start our segment before we push the buffer. */
2362   if (G_UNLIKELY (src->priv->segment_pending)) {
2363     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2364     src->priv->segment_pending = FALSE;
2365   }
2366
2367   if (g_atomic_int_get (&src->priv->have_events)) {
2368     GST_OBJECT_LOCK (src);
2369     /* take the events */
2370     pending_events = src->priv->pending_events;
2371     src->priv->pending_events = NULL;
2372     g_atomic_int_set (&src->priv->have_events, FALSE);
2373     GST_OBJECT_UNLOCK (src);
2374   }
2375
2376   /* Push out pending events if any */
2377   if (G_UNLIKELY (pending_events != NULL)) {
2378     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2379       GstEvent *ev = (GstEvent *) tmp->data;
2380       gst_pad_push_event (pad, ev);
2381     }
2382     g_list_free (pending_events);
2383   }
2384
2385   /* figure out the new position */
2386   switch (src->segment.format) {
2387     case GST_FORMAT_BYTES:
2388     {
2389       guint bufsize = gst_buffer_get_size (buf);
2390
2391       /* we subtracted above for negative rates */
2392       if (src->segment.rate >= 0.0)
2393         position += bufsize;
2394       break;
2395     }
2396     case GST_FORMAT_TIME:
2397     {
2398       GstClockTime start, duration;
2399
2400       start = GST_BUFFER_TIMESTAMP (buf);
2401       duration = GST_BUFFER_DURATION (buf);
2402
2403       if (GST_CLOCK_TIME_IS_VALID (start))
2404         position = start;
2405       else
2406         position = src->segment.position;
2407
2408       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2409         if (src->segment.rate >= 0.0)
2410           position += duration;
2411         else if (position > duration)
2412           position -= duration;
2413         else
2414           position = 0;
2415       }
2416       break;
2417     }
2418     case GST_FORMAT_DEFAULT:
2419       if (src->segment.rate >= 0.0)
2420         position = GST_BUFFER_OFFSET_END (buf);
2421       else
2422         position = GST_BUFFER_OFFSET (buf);
2423       break;
2424     default:
2425       position = -1;
2426       break;
2427   }
2428   if (position != -1) {
2429     if (src->segment.rate >= 0.0) {
2430       /* positive rate, check if we reached the stop */
2431       if (src->segment.stop != -1) {
2432         if (position >= src->segment.stop) {
2433           eos = TRUE;
2434           position = src->segment.stop;
2435         }
2436       }
2437     } else {
2438       /* negative rate, check if we reached the start. start is always set to
2439        * something different from -1 */
2440       if (position <= src->segment.start) {
2441         eos = TRUE;
2442         position = src->segment.start;
2443       }
2444       /* when going reverse, all buffers are DISCONT */
2445       src->priv->discont = TRUE;
2446     }
2447     GST_OBJECT_LOCK (src);
2448     src->segment.position = position;
2449     GST_OBJECT_UNLOCK (src);
2450   }
2451
2452   if (G_UNLIKELY (src->priv->discont)) {
2453     buf = gst_buffer_make_writable (buf);
2454     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2455     src->priv->discont = FALSE;
2456   }
2457   GST_LIVE_UNLOCK (src);
2458
2459   ret = gst_pad_push (pad, buf);
2460   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2461     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2462         gst_flow_get_name (ret));
2463     goto pause;
2464   }
2465
2466   if (G_UNLIKELY (eos)) {
2467     GST_INFO_OBJECT (src, "pausing after end of segment");
2468     ret = GST_FLOW_UNEXPECTED;
2469     goto pause;
2470   }
2471
2472 done:
2473   return;
2474
2475   /* special cases */
2476 flushing:
2477   {
2478     GST_DEBUG_OBJECT (src, "we are flushing");
2479     GST_LIVE_UNLOCK (src);
2480     ret = GST_FLOW_WRONG_STATE;
2481     goto pause;
2482   }
2483 pause:
2484   {
2485     const gchar *reason = gst_flow_get_name (ret);
2486     GstEvent *event;
2487
2488     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2489     src->running = FALSE;
2490     gst_pad_pause_task (pad);
2491     if (ret == GST_FLOW_UNEXPECTED) {
2492       gboolean flag_segment;
2493       GstFormat format;
2494       gint64 position;
2495
2496       /* perform EOS logic */
2497       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2498       format = src->segment.format;
2499       position = src->segment.position;
2500
2501       if (flag_segment) {
2502         GstMessage *message;
2503
2504         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2505             format, position);
2506         gst_message_set_seqnum (message, src->priv->seqnum);
2507         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2508       } else {
2509         event = gst_event_new_eos ();
2510         gst_event_set_seqnum (event, src->priv->seqnum);
2511         gst_pad_push_event (pad, event);
2512         src->priv->last_sent_eos = TRUE;
2513       }
2514     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_UNEXPECTED) {
2515       event = gst_event_new_eos ();
2516       gst_event_set_seqnum (event, src->priv->seqnum);
2517       /* for fatal errors we post an error message, post the error
2518        * first so the app knows about the error first.
2519        * Also don't do this for WRONG_STATE because it happens
2520        * due to flushing and posting an error message because of
2521        * that is the wrong thing to do, e.g. when we're doing
2522        * a flushing seek. */
2523       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2524           (_("Internal data flow error.")),
2525           ("streaming task paused, reason %s (%d)", reason, ret));
2526       gst_pad_push_event (pad, event);
2527       src->priv->last_sent_eos = TRUE;
2528     }
2529     goto done;
2530   }
2531 null_buffer:
2532   {
2533     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2534         (_("Internal data flow error.")), ("element returned NULL buffer"));
2535     GST_LIVE_UNLOCK (src);
2536     goto done;
2537   }
2538 }
2539
2540 /* default negotiation code.
2541  *
2542  * Take intersection between src and sink pads, take first
2543  * caps and fixate.
2544  */
2545 static gboolean
2546 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2547 {
2548   GstCaps *thiscaps;
2549   GstCaps *caps = NULL;
2550   GstCaps *peercaps = NULL;
2551   gboolean result = FALSE;
2552
2553   /* first see what is possible on our source pad */
2554   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
2555   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2556   /* nothing or anything is allowed, we're done */
2557   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2558     goto no_nego_needed;
2559
2560   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2561     goto no_caps;
2562
2563   /* get the peer caps */
2564   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
2565   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2566   if (peercaps) {
2567     /* get intersection */
2568     caps =
2569         gst_caps_intersect_full (peercaps, thiscaps, GST_CAPS_INTERSECT_FIRST);
2570     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
2571     gst_caps_unref (peercaps);
2572   } else {
2573     /* no peer, work with our own caps then */
2574     caps = gst_caps_copy (thiscaps);
2575   }
2576   gst_caps_unref (thiscaps);
2577   if (caps) {
2578     /* take first (and best, since they are sorted) possibility */
2579     gst_caps_truncate (caps);
2580
2581     /* now fixate */
2582     if (!gst_caps_is_empty (caps)) {
2583       GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2584       if (gst_caps_is_any (caps)) {
2585         /* hmm, still anything, so element can do anything and
2586          * nego is not needed */
2587         result = TRUE;
2588       } else {
2589         gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2590         GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2591         if (gst_caps_is_fixed (caps)) {
2592           /* yay, fixed caps, use those then, it's possible that the subclass does
2593            * not accept this caps after all and we have to fail. */
2594           result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2595         }
2596       }
2597     }
2598     gst_caps_unref (caps);
2599   } else {
2600     GST_DEBUG_OBJECT (basesrc, "no common caps");
2601   }
2602   return result;
2603
2604 no_nego_needed:
2605   {
2606     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2607     if (thiscaps)
2608       gst_caps_unref (thiscaps);
2609     return TRUE;
2610   }
2611 no_caps:
2612   {
2613     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2614         ("No supported formats found"),
2615         ("This element did not produce valid caps"));
2616     if (thiscaps)
2617       gst_caps_unref (thiscaps);
2618     return TRUE;
2619   }
2620 }
2621
2622 static gboolean
2623 gst_base_src_negotiate (GstBaseSrc * basesrc)
2624 {
2625   GstBaseSrcClass *bclass;
2626   gboolean result = TRUE;
2627
2628   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2629
2630   if (bclass->negotiate)
2631     result = bclass->negotiate (basesrc);
2632
2633   return result;
2634 }
2635
2636 static gboolean
2637 gst_base_src_start (GstBaseSrc * basesrc)
2638 {
2639   GstBaseSrcClass *bclass;
2640   gboolean result;
2641   guint64 size;
2642   gboolean seekable;
2643   GstFormat format;
2644
2645   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2646     return TRUE;
2647
2648   GST_DEBUG_OBJECT (basesrc, "starting source");
2649
2650   basesrc->num_buffers_left = basesrc->num_buffers;
2651
2652   GST_OBJECT_LOCK (basesrc);
2653   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2654   GST_OBJECT_UNLOCK (basesrc);
2655
2656   basesrc->running = FALSE;
2657   basesrc->priv->segment_pending = FALSE;
2658
2659   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2660   if (bclass->start)
2661     result = bclass->start (basesrc);
2662   else
2663     result = TRUE;
2664
2665   if (!result)
2666     goto could_not_start;
2667
2668   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2669
2670   format = basesrc->segment.format;
2671
2672   /* figure out the size */
2673   if (format == GST_FORMAT_BYTES) {
2674     if (bclass->get_size) {
2675       if (!(result = bclass->get_size (basesrc, &size)))
2676         size = -1;
2677     } else {
2678       result = FALSE;
2679       size = -1;
2680     }
2681     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2682     /* only update the size when operating in bytes, subclass is supposed
2683      * to set duration in the start method for other formats */
2684     GST_OBJECT_LOCK (basesrc);
2685     basesrc->segment.duration = size;
2686     GST_OBJECT_UNLOCK (basesrc);
2687   } else {
2688     size = -1;
2689   }
2690
2691   GST_DEBUG_OBJECT (basesrc,
2692       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2693       G_GINT64_FORMAT, gst_format_get_name (format), result, size,
2694       basesrc->segment.duration);
2695
2696   seekable = gst_base_src_seekable (basesrc);
2697   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2698
2699   /* update for random access flag */
2700   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2701
2702   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2703
2704   /* run typefind if we are random_access and the typefinding is enabled. */
2705   if (basesrc->random_access && basesrc->typefind && size != -1) {
2706     GstCaps *caps;
2707
2708     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2709       goto typefind_failed;
2710
2711     result = gst_pad_set_caps (basesrc->srcpad, caps);
2712     gst_caps_unref (caps);
2713   } else {
2714     /* use class or default negotiate function */
2715     if (!(result = gst_base_src_negotiate (basesrc)))
2716       goto could_not_negotiate;
2717   }
2718
2719   return result;
2720
2721   /* ERROR */
2722 could_not_start:
2723   {
2724     GST_DEBUG_OBJECT (basesrc, "could not start");
2725     /* subclass is supposed to post a message. We don't have to call _stop. */
2726     return FALSE;
2727   }
2728 could_not_negotiate:
2729   {
2730     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2731     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2732         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2733     /* we must call stop */
2734     gst_base_src_stop (basesrc);
2735     return FALSE;
2736   }
2737 typefind_failed:
2738   {
2739     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2740     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2741     /* we must call stop */
2742     gst_base_src_stop (basesrc);
2743     return FALSE;
2744   }
2745 }
2746
2747 static gboolean
2748 gst_base_src_stop (GstBaseSrc * basesrc)
2749 {
2750   GstBaseSrcClass *bclass;
2751   gboolean result = TRUE;
2752
2753   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2754     return TRUE;
2755
2756   GST_DEBUG_OBJECT (basesrc, "stopping source");
2757
2758   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2759   if (bclass->stop)
2760     result = bclass->stop (basesrc);
2761
2762   if (result)
2763     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2764
2765   return result;
2766 }
2767
2768 /* start or stop flushing dataprocessing
2769  */
2770 static gboolean
2771 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2772     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2773 {
2774   GstBaseSrcClass *bclass;
2775
2776   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2777
2778   if (flushing && unlock) {
2779     /* unlock any subclasses, we need to do this before grabbing the
2780      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2781      * unlock to the params because of backwards compat (see seek handler)*/
2782     if (bclass->unlock)
2783       bclass->unlock (basesrc);
2784   }
2785
2786   /* the live lock is released when we are blocked, waiting for playing or
2787    * when we sync to the clock. */
2788   GST_LIVE_LOCK (basesrc);
2789   if (playing)
2790     *playing = basesrc->live_running;
2791   basesrc->priv->flushing = flushing;
2792   if (flushing) {
2793     /* if we are locked in the live lock, signal it to make it flush */
2794     basesrc->live_running = TRUE;
2795
2796     /* clear pending EOS if any */
2797     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2798
2799     /* step 1, now that we have the LIVE lock, clear our unlock request */
2800     if (bclass->unlock_stop)
2801       bclass->unlock_stop (basesrc);
2802
2803     /* step 2, unblock clock sync (if any) or any other blocking thing */
2804     if (basesrc->clock_id)
2805       gst_clock_id_unschedule (basesrc->clock_id);
2806   } else {
2807     /* signal the live source that it can start playing */
2808     basesrc->live_running = live_play;
2809
2810     /* When unlocking drop all delayed events */
2811     if (unlock) {
2812       GST_OBJECT_LOCK (basesrc);
2813       if (basesrc->priv->pending_events) {
2814         g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
2815             NULL);
2816         g_list_free (basesrc->priv->pending_events);
2817         basesrc->priv->pending_events = NULL;
2818         g_atomic_int_set (&basesrc->priv->have_events, FALSE);
2819       }
2820       GST_OBJECT_UNLOCK (basesrc);
2821     }
2822   }
2823   GST_LIVE_SIGNAL (basesrc);
2824   GST_LIVE_UNLOCK (basesrc);
2825
2826   return TRUE;
2827 }
2828
2829 /* the purpose of this function is to make sure that a live source blocks in the
2830  * LIVE lock or leaves the LIVE lock and continues playing. */
2831 static gboolean
2832 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2833 {
2834   GstBaseSrcClass *bclass;
2835
2836   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2837
2838   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2839   if (!live_play) {
2840     GST_DEBUG_OBJECT (basesrc, "unlock");
2841     if (bclass->unlock)
2842       bclass->unlock (basesrc);
2843   }
2844
2845   /* we are now able to grab the LIVE lock, when we get it, we can be
2846    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2847    * for the clock. */
2848   GST_LIVE_LOCK (basesrc);
2849   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2850
2851   /* unblock clock sync (if any) */
2852   if (basesrc->clock_id)
2853     gst_clock_id_unschedule (basesrc->clock_id);
2854
2855   /* configure what to do when we get to the LIVE lock. */
2856   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2857   basesrc->live_running = live_play;
2858
2859   if (live_play) {
2860     gboolean start;
2861
2862     /* clear our unlock request when going to PLAYING */
2863     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2864     if (bclass->unlock_stop)
2865       bclass->unlock_stop (basesrc);
2866
2867     /* for live sources we restart the timestamp correction */
2868     basesrc->priv->latency = -1;
2869     /* have to restart the task in case it stopped because of the unlock when
2870      * we went to PAUSED. Only do this if we operating in push mode. */
2871     GST_OBJECT_LOCK (basesrc->srcpad);
2872     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2873     GST_OBJECT_UNLOCK (basesrc->srcpad);
2874     if (start)
2875       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2876           basesrc->srcpad);
2877     GST_DEBUG_OBJECT (basesrc, "signal");
2878     GST_LIVE_SIGNAL (basesrc);
2879   }
2880   GST_LIVE_UNLOCK (basesrc);
2881
2882   return TRUE;
2883 }
2884
2885 static gboolean
2886 gst_base_src_activate_push (GstPad * pad, gboolean active)
2887 {
2888   GstBaseSrc *basesrc;
2889   GstEvent *event;
2890
2891   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2892
2893   /* prepare subclass first */
2894   if (active) {
2895     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2896
2897     if (G_UNLIKELY (!basesrc->can_activate_push))
2898       goto no_push_activation;
2899
2900     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2901       goto error_start;
2902
2903     basesrc->priv->last_sent_eos = FALSE;
2904     basesrc->priv->discont = TRUE;
2905     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2906
2907     /* do initial seek, which will start the task */
2908     GST_OBJECT_LOCK (basesrc);
2909     event = basesrc->pending_seek;
2910     basesrc->pending_seek = NULL;
2911     GST_OBJECT_UNLOCK (basesrc);
2912
2913     /* no need to unlock anything, the task is certainly
2914      * not running here. The perform seek code will start the task when
2915      * finished. */
2916     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2917       goto seek_failed;
2918
2919     if (event)
2920       gst_event_unref (event);
2921   } else {
2922     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2923     /* flush all */
2924     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2925     /* stop the task */
2926     gst_pad_stop_task (pad);
2927     /* now we can stop the source */
2928     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2929       goto error_stop;
2930   }
2931   return TRUE;
2932
2933   /* ERRORS */
2934 no_push_activation:
2935   {
2936     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2937     return FALSE;
2938   }
2939 error_start:
2940   {
2941     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2942     return FALSE;
2943   }
2944 seek_failed:
2945   {
2946     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2947     /* flush all */
2948     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2949     /* stop the task */
2950     gst_pad_stop_task (pad);
2951     /* Stop the basesrc */
2952     gst_base_src_stop (basesrc);
2953     if (event)
2954       gst_event_unref (event);
2955     return FALSE;
2956   }
2957 error_stop:
2958   {
2959     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2960     return FALSE;
2961   }
2962 }
2963
2964 static gboolean
2965 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2966 {
2967   GstBaseSrc *basesrc;
2968
2969   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2970
2971   /* prepare subclass first */
2972   if (active) {
2973     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2974     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2975       goto error_start;
2976
2977     /* if not random_access, we cannot operate in pull mode for now */
2978     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2979       goto no_get_range;
2980
2981     /* stop flushing now but for live sources, still block in the LIVE lock when
2982      * we are not yet PLAYING */
2983     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2984   } else {
2985     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2986     /* flush all, there is no task to stop */
2987     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2988
2989     /* don't send EOS when going from PAUSED => READY when in pull mode */
2990     basesrc->priv->last_sent_eos = TRUE;
2991
2992     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2993       goto error_stop;
2994   }
2995   return TRUE;
2996
2997   /* ERRORS */
2998 error_start:
2999   {
3000     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3001     return FALSE;
3002   }
3003 no_get_range:
3004   {
3005     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3006     gst_base_src_stop (basesrc);
3007     return FALSE;
3008   }
3009 error_stop:
3010   {
3011     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3012     return FALSE;
3013   }
3014 }
3015
3016 static GstStateChangeReturn
3017 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3018 {
3019   GstBaseSrc *basesrc;
3020   GstStateChangeReturn result;
3021   gboolean no_preroll = FALSE;
3022
3023   basesrc = GST_BASE_SRC (element);
3024
3025   switch (transition) {
3026     case GST_STATE_CHANGE_NULL_TO_READY:
3027       break;
3028     case GST_STATE_CHANGE_READY_TO_PAUSED:
3029       no_preroll = gst_base_src_is_live (basesrc);
3030       break;
3031     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3032       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3033       if (gst_base_src_is_live (basesrc)) {
3034         /* now we can start playback */
3035         gst_base_src_set_playing (basesrc, TRUE);
3036       }
3037       break;
3038     default:
3039       break;
3040   }
3041
3042   if ((result =
3043           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3044               transition)) == GST_STATE_CHANGE_FAILURE)
3045     goto failure;
3046
3047   switch (transition) {
3048     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3049       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3050       if (gst_base_src_is_live (basesrc)) {
3051         /* make sure we block in the live lock in PAUSED */
3052         gst_base_src_set_playing (basesrc, FALSE);
3053         no_preroll = TRUE;
3054       }
3055       break;
3056     case GST_STATE_CHANGE_PAUSED_TO_READY:
3057     {
3058       GstEvent **event_p, *event;
3059
3060       /* we don't need to unblock anything here, the pad deactivation code
3061        * already did this */
3062
3063       /* FIXME, deprecate this behaviour, it is very dangerous.
3064        * the prefered way of sending EOS downstream is by sending
3065        * the EOS event to the element */
3066       if (!basesrc->priv->last_sent_eos) {
3067         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
3068         event = gst_event_new_eos ();
3069         gst_event_set_seqnum (event, basesrc->priv->seqnum);
3070         gst_pad_push_event (basesrc->srcpad, event);
3071         basesrc->priv->last_sent_eos = TRUE;
3072       }
3073       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3074       event_p = &basesrc->pending_seek;
3075       gst_event_replace (event_p, NULL);
3076       break;
3077     }
3078     case GST_STATE_CHANGE_READY_TO_NULL:
3079       break;
3080     default:
3081       break;
3082   }
3083
3084   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3085     result = GST_STATE_CHANGE_NO_PREROLL;
3086
3087   return result;
3088
3089   /* ERRORS */
3090 failure:
3091   {
3092     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3093     return result;
3094   }
3095 }