libs/gst/base/gstbasesrc.c: Push OOB events downstream when we get them in send_event...
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
55  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
56  * </para>
57  * <para>
58  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
59  * automatically seekable in push mode as well. The following conditions must 
60  * be met to make the element seekable in push mode when the format is not
61  * #GST_FORMAT_BYTES:
62  * <itemizedlist>
63  *   <listitem><para>
64  *     #GstBaseSrc::is_seekable returns %TRUE.
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::query can convert all supported seek formats to the
68  *     internal format as set with gst_base_src_set_format().
69  *   </para></listitem>
70  *   <listitem><para>
71  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
72  *   </para></listitem>
73  * </itemizedlist>
74  * </para>
75  * <para>
76  * When the element does not meet the requirements to operate in pull mode,
77  * the offset and length in the #GstBaseSrc::create method should be ignored.
78  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
79  * element can operate in pull mode but only with specific offsets and
80  * lengths, it is allowed to generate an error when the wrong values are passed
81  * to the #GstBaseSrc::create function.
82  * </para>
83  * <para>
84  * #GstBaseSrc has support for live sources. Live sources are sources that when
85  * paused discard data, such as audio or video capture devices. A typical live
86  * source also produces data at a fixed rate and thus provides a clock to publish
87  * this rate.
88  * Use gst_base_src_set_live() to activate the live source mode.
89  * </para>
90  * <para>
91  * A live source does not produce data in the PAUSED state. This means that the 
92  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
93  * To signal the pipeline that the element will not produce data, the return
94  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
95  * </para>
96  * <para>
97  * A typical live source will timestamp the buffers it creates with the 
98  * current running time of the pipeline. This is one reason why a live source
99  * can only produce data in the PLAYING state, when the clock is actually 
100  * distributed and running. 
101  * </para>
102  * <para>
103  * Live sources that synchronize and block on the clock (an audio source, for
104  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
105  * function was interrupted by a state change to PAUSED.
106  * </para>
107  * <para>
108  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
109  * sources.
110  * It only makes sense to implement the ::get_times function if the source is 
111  * a live source. The ::get_times function should return timestamps starting
112  * from 0, as if it were a non-live source. The base class will make sure that
113  * the timestamps are transformed into the current running_time.
114  * The base source will then wait for the calculated running_time before pushing
115  * out the buffer.
116  * </para>
117  * <para>
118  * For live sources, the base class will by default report a latency of 0.
119  * For pseudo live sources, the base class will by default measure the difference
120  * between the first buffer timestamp and the start time of get_times and will
121  * report this value as the latency. 
122  * Subclasses should override the query function when this behaviour is not
123  * acceptable.
124  * </para>
125  * <para>
126  * There is only support in #GstBaseSrc for exactly one source pad, which 
127  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
128  * should install a pad template in its base_init function, like so:
129  * </para>
130  * <para>
131  * <programlisting>
132  * static void
133  * my_element_base_init (gpointer g_class)
134  * {
135  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
136  *   // srctemplate should be a #GstStaticPadTemplate with direction
137  *   // #GST_PAD_SRC and name "src"
138  *   gst_element_class_add_pad_template (gstelement_class,
139  *       gst_static_pad_template_get (&amp;srctemplate));
140  *   // see #GstElementDetails
141  *   gst_element_class_set_details (gstelement_class, &amp;details);
142  * }
143  * </programlisting>
144  * </para>
145  * <title>Controlled shutdown of live sources in applications</title>
146  * <para>
147  * Applications that record from a live source may want to stop recording
148  * in a controlled way, so that the recording is stopped, but the data
149  * already in the pipeline is processed to the end (remember that many live
150  * sources would go on recording forever otherwise). For that to happen the
151  * application needs to make the source stop recording and send an EOS
152  * event down the pipeline. The application would then wait for an
153  * EOS message posted on the pipeline's bus to know when all data has
154  * been processed and the pipeline can safely be stopped.
155  * </para>
156  * <para>
157  * Since GStreamer 0.10.3 an application may simply set the source
158  * element to NULL or READY state to make it send an EOS event downstream.
159  * The application should lock the state of the source afterwards, so that
160  * shutting down the pipeline from PLAYING doesn't temporarily start up the
161  * source element for a second time:
162  * <programlisting>
163  * ...
164  * // stop recording
165  * gst_element_set_state (audio_source, #GST_STATE_NULL);
166  * gst_element_set_locked_state (audio_source, %TRUE);
167  * ...
168  * </programlisting>
169  * Now the application should wait for an EOS message
170  * to be posted on the pipeline's bus. Once it has received
171  * an EOS message, it may safely shut down the entire pipeline:
172  * <programlisting>
173  * ...
174  * // everything done - shut down pipeline
175  * gst_element_set_state (pipeline, #GST_STATE_NULL);
176  * gst_element_set_locked_state (audio_source, %FALSE);
177  * ...
178  * </programlisting>
179  * </para>
180  * <para>
181  * Note that setting the source element to NULL or READY when the 
182  * pipeline is in the PAUSED state may cause a deadlock since the streaming
183  * thread might be blocked in PREROLL.
184  * </para>
185  * <para>
186  * Last reviewed on 2007-09-13 (0.10.15)
187  * </para>
188  * </refsect2>
189  */
190
191 #ifdef HAVE_CONFIG_H
192 #  include "config.h"
193 #endif
194
195 #include <stdlib.h>
196 #include <string.h>
197
198 #include "gstbasesrc.h"
199 #include "gsttypefindhelper.h"
200 #include <gst/gstmarshal.h>
201 #include <gst/gst-i18n-lib.h>
202
203 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
204 #define GST_CAT_DEFAULT gst_base_src_debug
205
206 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
207 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
208 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
209 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
210 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
211 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
212 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
213                                                                                 timeval)
214 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
215 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
216
217 /* BaseSrc signals and args */
218 enum
219 {
220   /* FILL ME */
221   LAST_SIGNAL
222 };
223
224 #define DEFAULT_BLOCKSIZE       4096
225 #define DEFAULT_NUM_BUFFERS     -1
226 #define DEFAULT_TYPEFIND        FALSE
227 #define DEFAULT_DO_TIMESTAMP    FALSE
228
229 enum
230 {
231   PROP_0,
232   PROP_BLOCKSIZE,
233   PROP_NUM_BUFFERS,
234   PROP_TYPEFIND,
235   PROP_DO_TIMESTAMP
236 };
237
238 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
239    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
240
241 struct _GstBaseSrcPrivate
242 {
243   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
244                                  * to avoid the sending of two EOS in some cases) */
245   gboolean discont;
246
247   /* two segments to be sent in the streaming thread with STREAM_LOCK */
248   GstEvent *close_segment;
249   GstEvent *start_segment;
250
251   /* startup latency is the time it takes between going to PLAYING and producing
252    * the first BUFFER with running_time 0. This value is included in the latency
253    * reporting. */
254   GstClockTime latency;
255   /* timestamp offset, this is the offset add to the values of gst_times for
256    * pseudo live sources */
257   GstClockTimeDiff ts_offset;
258
259   gboolean do_timestamp;
260 };
261
262 static GstElementClass *parent_class = NULL;
263
264 static void gst_base_src_base_init (gpointer g_class);
265 static void gst_base_src_class_init (GstBaseSrcClass * klass);
266 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
267 static void gst_base_src_finalize (GObject * object);
268
269
270 GType
271 gst_base_src_get_type (void)
272 {
273   static GType base_src_type = 0;
274
275   if (G_UNLIKELY (base_src_type == 0)) {
276     static const GTypeInfo base_src_info = {
277       sizeof (GstBaseSrcClass),
278       (GBaseInitFunc) gst_base_src_base_init,
279       NULL,
280       (GClassInitFunc) gst_base_src_class_init,
281       NULL,
282       NULL,
283       sizeof (GstBaseSrc),
284       0,
285       (GInstanceInitFunc) gst_base_src_init,
286     };
287
288     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
289         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
290   }
291   return base_src_type;
292 }
293 static GstCaps *gst_base_src_getcaps (GstPad * pad);
294 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
295 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
296
297 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
298 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
299 static void gst_base_src_set_property (GObject * object, guint prop_id,
300     const GValue * value, GParamSpec * pspec);
301 static void gst_base_src_get_property (GObject * object, guint prop_id,
302     GValue * value, GParamSpec * pspec);
303 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
304 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
305 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
306 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
307
308 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
309
310 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
311 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
312     GstSegment * segment);
313 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
314 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
315     GstEvent * event, GstSegment * segment);
316
317 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
318 static gboolean gst_base_src_unlock_stop (GstBaseSrc * basesrc);
319 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
321
322 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
323     GstStateChange transition);
324
325 static void gst_base_src_loop (GstPad * pad);
326 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
327 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
328 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
329     guint length, GstBuffer ** buf);
330 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
331     guint length, GstBuffer ** buf);
332
333 static void
334 gst_base_src_base_init (gpointer g_class)
335 {
336   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
337 }
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
349
350   parent_class = g_type_class_peek_parent (klass);
351
352   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
353   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
354   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
355
356   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
357       g_param_spec_ulong ("blocksize", "Block size",
358           "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
359           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
360   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
361       g_param_spec_int ("num-buffers", "num-buffers",
362           "Number of buffers to output before sending EOS", -1, G_MAXINT,
363           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
364   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
365       g_param_spec_boolean ("typefind", "Typefind",
366           "Run typefind before negotiating", DEFAULT_TYPEFIND,
367           G_PARAM_READWRITE));
368   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
369       g_param_spec_boolean ("do-timestamp", "Do timestamp",
370           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
371           G_PARAM_READWRITE));
372
373   gstelement_class->change_state =
374       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
375   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
376   gstelement_class->get_query_types =
377       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
378
379   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
380   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
381   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
382   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
383   klass->check_get_range =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
385   klass->prepare_seek_segment =
386       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
387 }
388
389 static void
390 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
391 {
392   GstPad *pad;
393   GstPadTemplate *pad_template;
394
395   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
396
397   basesrc->is_live = FALSE;
398   basesrc->live_lock = g_mutex_new ();
399   basesrc->live_cond = g_cond_new ();
400   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
401   basesrc->num_buffers_left = -1;
402
403   basesrc->can_activate_push = TRUE;
404   basesrc->pad_mode = GST_ACTIVATE_NONE;
405
406   pad_template =
407       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
408   g_return_if_fail (pad_template != NULL);
409
410   GST_DEBUG_OBJECT (basesrc, "creating src pad");
411   pad = gst_pad_new_from_template (pad_template, "src");
412
413   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
414   gst_pad_set_activatepush_function (pad,
415       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
416   gst_pad_set_activatepull_function (pad,
417       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
418   gst_pad_set_event_function (pad,
419       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
420   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
421   gst_pad_set_checkgetrange_function (pad,
422       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
423   gst_pad_set_getrange_function (pad,
424       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
425   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
426   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
427   gst_pad_set_fixatecaps_function (pad,
428       GST_DEBUG_FUNCPTR (gst_base_src_fixate));
429
430   /* hold pointer to pad */
431   basesrc->srcpad = pad;
432   GST_DEBUG_OBJECT (basesrc, "adding src pad");
433   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
434
435   basesrc->blocksize = DEFAULT_BLOCKSIZE;
436   basesrc->clock_id = NULL;
437   /* we operate in BYTES by default */
438   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
439   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
440   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
441
442   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
443
444   GST_DEBUG_OBJECT (basesrc, "init done");
445 }
446
447 static void
448 gst_base_src_finalize (GObject * object)
449 {
450   GstBaseSrc *basesrc;
451   GstEvent **event_p;
452
453   basesrc = GST_BASE_SRC (object);
454
455   g_mutex_free (basesrc->live_lock);
456   g_cond_free (basesrc->live_cond);
457
458   event_p = &basesrc->data.ABI.pending_seek;
459   gst_event_replace ((GstEvent **) event_p, NULL);
460
461   G_OBJECT_CLASS (parent_class)->finalize (object);
462 }
463
464 /**
465  * gst_base_src_wait_playing:
466  * @src: the src
467  *
468  * If the #GstBaseSrcClass::create method performs its own synchronisation against
469  * the clock it must unblock when going from PLAYING to the PAUSED state and call
470  * this method before continuing to produce the remaining data.
471  *
472  * This function will block until a state change to PLAYING happens (in which
473  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
474  * to a state change to READY or a FLUSH event (in which case this function
475  * returns #GST_FLOW_WRONG_STATE).
476  *
477  * Since: 0.10.12
478  *
479  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
480  * continue. Any other return value should be returned from the create vmethod.
481  */
482 GstFlowReturn
483 gst_base_src_wait_playing (GstBaseSrc * src)
484 {
485   /* block until the state changes, or we get a flush, or something */
486   GST_LIVE_LOCK (src);
487   if (src->is_live) {
488     while (G_UNLIKELY (!src->live_running)) {
489       GST_DEBUG ("live source signal waiting");
490       GST_LIVE_SIGNAL (src);
491       GST_DEBUG ("live source waiting for running state");
492       GST_LIVE_WAIT (src);
493       GST_DEBUG ("live source unlocked");
494     }
495     /* FIXME, use another variable to signal stopping so that we don't
496      * have to grab another lock. */
497     GST_OBJECT_LOCK (src->srcpad);
498     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (src->srcpad)))
499       goto flushing;
500     GST_OBJECT_UNLOCK (src->srcpad);
501   }
502   GST_LIVE_UNLOCK (src);
503
504   return GST_FLOW_OK;
505
506   /* ERRORS */
507 flushing:
508   {
509     GST_DEBUG_OBJECT (src, "pad is flushing");
510     GST_OBJECT_UNLOCK (src->srcpad);
511     GST_LIVE_UNLOCK (src);
512     return GST_FLOW_WRONG_STATE;
513   }
514 }
515
516 /**
517  * gst_base_src_set_live:
518  * @src: base source instance
519  * @live: new live-mode
520  *
521  * If the element listens to a live source, @live should
522  * be set to %TRUE. 
523  *
524  * A live source will not produce data in the PAUSED state and
525  * will therefore not be able to participate in the PREROLL phase
526  * of a pipeline. To signal this fact to the application and the 
527  * pipeline, the state change return value of the live source will
528  * be GST_STATE_CHANGE_NO_PREROLL.
529  */
530 void
531 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
532 {
533   GST_LIVE_LOCK (src);
534   src->is_live = live;
535   GST_LIVE_UNLOCK (src);
536 }
537
538 /**
539  * gst_base_src_is_live:
540  * @src: base source instance
541  *
542  * Check if an element is in live mode.
543  *
544  * Returns: %TRUE if element is in live mode.
545  */
546 gboolean
547 gst_base_src_is_live (GstBaseSrc * src)
548 {
549   gboolean result;
550
551   GST_LIVE_LOCK (src);
552   result = src->is_live;
553   GST_LIVE_UNLOCK (src);
554
555   return result;
556 }
557
558 /**
559  * gst_base_src_set_format:
560  * @src: base source instance
561  * @format: the format to use
562  *
563  * Sets the default format of the source. This will be the format used
564  * for sending NEW_SEGMENT events and for performing seeks.
565  *
566  * If a format of GST_FORMAT_BYTES is set, the element will be able to
567  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
568  *
569  * @Since: 0.10.1
570  */
571 void
572 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
573 {
574   gst_segment_init (&src->segment, format);
575 }
576
577 /**
578  * gst_base_src_query_latency:
579  * @src: the source
580  * @live: if the source is live
581  * @min_latency: the min latency of the source
582  * @max_latency: the max latency of the source
583  *
584  * Query the source for the latency parameters. @live will be TRUE when @src is
585  * configured as a live source. @min_latency will be set to the difference
586  * between the running time and the timestamp of the first buffer.
587  * @max_latency is always the undefined value of -1.
588  *
589  * This function is mostly used by subclasses. 
590  *
591  * Returns: TRUE if the query succeeded.
592  *
593  * Since: 0.10.13
594  */
595 gboolean
596 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
597     GstClockTime * min_latency, GstClockTime * max_latency)
598 {
599   GstClockTime min;
600
601   GST_LIVE_LOCK (src);
602   if (live)
603     *live = src->is_live;
604
605   /* if we have a startup latency, report this one, else report 0. Subclasses
606    * are supposed to override the query function if they want something
607    * else. */
608   if (src->priv->latency != -1)
609     min = src->priv->latency;
610   else
611     min = 0;
612
613   if (min_latency)
614     *min_latency = min;
615   if (max_latency)
616     *max_latency = -1;
617
618   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
619       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
620       GST_TIME_ARGS (-1));
621   GST_LIVE_UNLOCK (src);
622
623   return TRUE;
624 }
625
626 /**
627  * gst_base_src_set_do_timestamp:
628  * @src: the source
629  * @timestamp: enable or disable timestamping
630  *
631  * Configure @src to automatically timestamp outgoing buffers based on the
632  * current running_time of the pipeline. This property is mostly useful for live
633  * sources.
634  *
635  * Since: 0.10.15
636  */
637 void
638 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
639 {
640   GST_OBJECT_LOCK (src);
641   src->priv->do_timestamp = timestamp;
642   GST_OBJECT_UNLOCK (src);
643 }
644
645 /**
646  * gst_base_src_get_do_timestamp:
647  * @src: the source
648  *
649  * Query if @src timestamps outgoing buffers based on the current running_time.
650  *
651  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
652  *
653  * Since: 0.10.15
654  */
655 gboolean
656 gst_base_src_get_do_timestamp (GstBaseSrc * src)
657 {
658   gboolean res;
659
660   GST_OBJECT_LOCK (src);
661   res = src->priv->do_timestamp;
662   GST_OBJECT_UNLOCK (src);
663
664   return res;
665 }
666
667 static gboolean
668 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
669 {
670   GstBaseSrcClass *bclass;
671   GstBaseSrc *bsrc;
672   gboolean res = TRUE;
673
674   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
675   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
676
677   if (bclass->set_caps)
678     res = bclass->set_caps (bsrc, caps);
679
680   return res;
681 }
682
683 static GstCaps *
684 gst_base_src_getcaps (GstPad * pad)
685 {
686   GstBaseSrcClass *bclass;
687   GstBaseSrc *bsrc;
688   GstCaps *caps = NULL;
689
690   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
691   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
692   if (bclass->get_caps)
693     caps = bclass->get_caps (bsrc);
694
695   if (caps == NULL) {
696     GstPadTemplate *pad_template;
697
698     pad_template =
699         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
700     if (pad_template != NULL) {
701       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
702     }
703   }
704   return caps;
705 }
706
707 static void
708 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
709 {
710   GstBaseSrcClass *bclass;
711   GstBaseSrc *bsrc;
712
713   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
714   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
715
716   if (bclass->fixate)
717     bclass->fixate (bsrc, caps);
718
719   gst_object_unref (bsrc);
720 }
721
722 static gboolean
723 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
724 {
725   gboolean res;
726
727   switch (GST_QUERY_TYPE (query)) {
728     case GST_QUERY_POSITION:
729     {
730       GstFormat format;
731
732       gst_query_parse_position (query, &format, NULL);
733       switch (format) {
734         case GST_FORMAT_PERCENT:
735         {
736           gint64 percent;
737           gint64 position;
738           gint64 duration;
739
740           position = src->segment.last_stop;
741           duration = src->segment.duration;
742
743           if (position != -1 && duration != -1) {
744             if (position < duration)
745               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
746                   duration);
747             else
748               percent = GST_FORMAT_PERCENT_MAX;
749           } else
750             percent = -1;
751
752           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
753           res = TRUE;
754           break;
755         }
756         default:
757         {
758           gint64 position;
759
760           position = src->segment.last_stop;
761
762           if (position != -1) {
763             /* convert to requested format */
764             res =
765                 gst_pad_query_convert (src->srcpad, src->segment.format,
766                 position, &format, &position);
767           } else
768             res = TRUE;
769
770           gst_query_set_position (query, format, position);
771           break;
772         }
773       }
774       break;
775     }
776     case GST_QUERY_DURATION:
777     {
778       GstFormat format;
779
780       gst_query_parse_duration (query, &format, NULL);
781
782       GST_DEBUG_OBJECT (src, "duration query in format %s",
783           gst_format_get_name (format));
784       switch (format) {
785         case GST_FORMAT_PERCENT:
786           gst_query_set_duration (query, GST_FORMAT_PERCENT,
787               GST_FORMAT_PERCENT_MAX);
788           res = TRUE;
789           break;
790         default:
791         {
792           gint64 duration;
793
794           duration = src->segment.duration;
795
796           if (duration != -1) {
797             /* convert to requested format */
798             res =
799                 gst_pad_query_convert (src->srcpad, src->segment.format,
800                 duration, &format, &duration);
801           } else {
802             res = TRUE;
803           }
804           gst_query_set_duration (query, format, duration);
805           break;
806         }
807       }
808       break;
809     }
810
811     case GST_QUERY_SEEKING:
812     {
813       gst_query_set_seeking (query, src->segment.format,
814           src->seekable, 0, src->segment.duration);
815       res = TRUE;
816       break;
817     }
818     case GST_QUERY_SEGMENT:
819     {
820       gint64 start, stop;
821
822       /* no end segment configured, current duration then */
823       if ((stop = src->segment.stop) == -1)
824         stop = src->segment.duration;
825       start = src->segment.start;
826
827       /* adjust to stream time */
828       if (src->segment.time != -1) {
829         start -= src->segment.time;
830         if (stop != -1)
831           stop -= src->segment.time;
832       }
833       gst_query_set_segment (query, src->segment.rate, src->segment.format,
834           start, stop);
835       res = TRUE;
836       break;
837     }
838
839     case GST_QUERY_FORMATS:
840     {
841       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
842           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
843       res = TRUE;
844       break;
845     }
846     case GST_QUERY_CONVERT:
847     {
848       GstFormat src_fmt, dest_fmt;
849       gint64 src_val, dest_val;
850
851       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
852
853       /* we can only convert between equal formats... */
854       if (src_fmt == dest_fmt) {
855         dest_val = src_val;
856         res = TRUE;
857       } else
858         res = FALSE;
859
860       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
861       break;
862     }
863     case GST_QUERY_LATENCY:
864     {
865       GstClockTime min, max;
866       gboolean live;
867
868       /* Subclasses should override and implement something usefull */
869       res = gst_base_src_query_latency (src, &live, &min, &max);
870
871       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
872           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
873           GST_TIME_ARGS (max));
874
875       gst_query_set_latency (query, live, min, max);
876       break;
877     }
878     case GST_QUERY_JITTER:
879     case GST_QUERY_RATE:
880     default:
881       res = FALSE;
882       break;
883   }
884   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
885       res);
886   return res;
887 }
888
889 static gboolean
890 gst_base_src_query (GstPad * pad, GstQuery * query)
891 {
892   GstBaseSrc *src;
893   GstBaseSrcClass *bclass;
894   gboolean result = FALSE;
895
896   src = GST_BASE_SRC (gst_pad_get_parent (pad));
897
898   bclass = GST_BASE_SRC_GET_CLASS (src);
899
900   if (bclass->query)
901     result = bclass->query (src, query);
902   else
903     result = gst_pad_query_default (pad, query);
904
905   gst_object_unref (src);
906
907   return result;
908 }
909
910 static gboolean
911 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
912 {
913   gboolean res = TRUE;
914
915   /* update our offset if the start/stop position was updated */
916   if (segment->format == GST_FORMAT_BYTES) {
917     segment->last_stop = segment->start;
918     segment->time = segment->start;
919   } else if (segment->start == 0) {
920     /* seek to start, we can implement a default for this. */
921     segment->last_stop = 0;
922     segment->time = 0;
923     res = TRUE;
924   } else
925     res = FALSE;
926
927   return res;
928 }
929
930 static gboolean
931 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
932 {
933   GstBaseSrcClass *bclass;
934   gboolean result = FALSE;
935
936   bclass = GST_BASE_SRC_GET_CLASS (src);
937
938   if (bclass->do_seek)
939     result = bclass->do_seek (src, segment);
940
941   return result;
942 }
943
944 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
945
946 static gboolean
947 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
948     GstSegment * segment)
949 {
950   /* By default, we try one of 2 things:
951    *   - For absolute seek positions, convert the requested position to our 
952    *     configured processing format and place it in the output segment \
953    *   - For relative seek positions, convert our current (input) values to the
954    *     seek format, adjust by the relative seek offset and then convert back to
955    *     the processing format
956    */
957   GstSeekType cur_type, stop_type;
958   gint64 cur, stop;
959   GstSeekFlags flags;
960   GstFormat seek_format, dest_format;
961   gdouble rate;
962   gboolean update;
963   gboolean res = TRUE;
964
965   gst_event_parse_seek (event, &rate, &seek_format, &flags,
966       &cur_type, &cur, &stop_type, &stop);
967   dest_format = segment->format;
968
969   if (seek_format == dest_format) {
970     gst_segment_set_seek (segment, rate, seek_format, flags,
971         cur_type, cur, stop_type, stop, &update);
972     return TRUE;
973   }
974
975   if (cur_type != GST_SEEK_TYPE_NONE) {
976     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
977     res =
978         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
979         &cur);
980     cur_type = GST_SEEK_TYPE_SET;
981   }
982
983   if (res && stop_type != GST_SEEK_TYPE_NONE) {
984     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
985     res =
986         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
987         &stop);
988     stop_type = GST_SEEK_TYPE_SET;
989   }
990
991   /* And finally, configure our output segment in the desired format */
992   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
993       stop_type, stop, &update);
994
995   if (!res)
996     goto no_format;
997
998   return res;
999
1000 no_format:
1001   {
1002     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1003     return FALSE;
1004   }
1005 }
1006
1007 static gboolean
1008 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1009     GstSegment * seeksegment)
1010 {
1011   GstBaseSrcClass *bclass;
1012   gboolean result = FALSE;
1013
1014   bclass = GST_BASE_SRC_GET_CLASS (src);
1015
1016   if (bclass->prepare_seek_segment)
1017     result = bclass->prepare_seek_segment (src, event, seeksegment);
1018
1019   return result;
1020 }
1021
1022 /* this code implements the seeking. It is a good example
1023  * handling all cases.
1024  *
1025  * A seek updates the currently configured segment.start
1026  * and segment.stop values based on the SEEK_TYPE. If the
1027  * segment.start value is updated, a seek to this new position
1028  * should be performed.
1029  *
1030  * The seek can only be executed when we are not currently
1031  * streaming any data, to make sure that this is the case, we
1032  * acquire the STREAM_LOCK which is taken when we are in the
1033  * _loop() function or when a getrange() is called. Normally
1034  * we will not receive a seek if we are operating in pull mode
1035  * though.
1036  *
1037  * When we are in the loop() function, we might be in the middle
1038  * of pushing a buffer, which might block in a sink. To make sure
1039  * that the push gets unblocked we push out a FLUSH_START event.
1040  * Our loop function will get a WRONG_STATE return value from
1041  * the push and will pause, effectively releasing the STREAM_LOCK.
1042  *
1043  * For a non-flushing seek, we pause the task, which might eventually
1044  * release the STREAM_LOCK. We say eventually because when the sink
1045  * blocks on the sample we might wait a very long time until the sink
1046  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1047  * can continue the seek. A non-flushing seek is normally done in a 
1048  * running pipeline to perform seamless playback.
1049  * In the case of a non-flushing seek we need to make sure that the
1050  * data we output after the seek is continuous with the previous data,
1051  * this is because a non-flushing seek does not reset the stream-time
1052  * to 0. We do this by closing the currently running segment, ie. sending
1053  * a new_segment event with the stop position set to the last processed 
1054  * position.
1055  *
1056  * After updating the segment.start/stop values, we prepare for
1057  * streaming again. We push out a FLUSH_STOP to make the peer pad
1058  * accept data again and we start our task again.
1059  *
1060  * A segment seek posts a message on the bus saying that the playback
1061  * of the segment started. We store the segment flag internally because
1062  * when we reach the segment.stop we have to post a segment.done
1063  * instead of EOS when doing a segment seek.
1064  */
1065 /* FIXME (0.11), we have the unlock gboolean here because most current 
1066  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1067  * the streaming thread isn't running, resulting in bogus unlocks later when it 
1068  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1069  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1070  * until 0.11
1071  */
1072 static gboolean
1073 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1074 {
1075   gboolean res = TRUE;
1076   gdouble rate;
1077   GstFormat seek_format, dest_format;
1078   GstSeekFlags flags;
1079   GstSeekType cur_type, stop_type;
1080   gint64 cur, stop;
1081   gboolean flush;
1082   gboolean update;
1083   gboolean relative_seek = FALSE;
1084   gboolean seekseg_configured = FALSE;
1085   GstSegment seeksegment;
1086
1087   GST_DEBUG_OBJECT (src, "doing seek");
1088
1089   dest_format = src->segment.format;
1090
1091   if (event) {
1092     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1093         &cur_type, &cur, &stop_type, &stop);
1094
1095     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1096         SEEK_TYPE_IS_RELATIVE (stop_type);
1097
1098     if (dest_format != seek_format && !relative_seek) {
1099       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1100        * here before taking the stream lock, otherwise we must convert it later,
1101        * once we have the stream lock and can read the current position */
1102       gst_segment_init (&seeksegment, dest_format);
1103
1104       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1105         goto prepare_failed;
1106
1107       seekseg_configured = TRUE;
1108     }
1109
1110     flush = flags & GST_SEEK_FLAG_FLUSH;
1111   } else {
1112     flush = FALSE;
1113   }
1114
1115   /* send flush start */
1116   if (flush)
1117     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
1118   else
1119     gst_pad_pause_task (src->srcpad);
1120
1121   /* unblock streaming thread */
1122   if (unlock)
1123     gst_base_src_unlock (src);
1124
1125   /* grab streaming lock, this should eventually be possible, either
1126    * because the task is paused or our streaming thread stopped 
1127    * because our peer is flushing. */
1128   GST_PAD_STREAM_LOCK (src->srcpad);
1129
1130   if (unlock)
1131     gst_base_src_unlock_stop (src);
1132
1133   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1134    * copy the current segment info into the temp segment that we can actually
1135    * attempt the seek with. We only update the real segment if the seek suceeds. */
1136   if (!seekseg_configured) {
1137     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1138
1139     /* now configure the final seek segment */
1140     if (event) {
1141       if (src->segment.format != seek_format) {
1142         /* OK, here's where we give the subclass a chance to convert the relative
1143          * seek into an absolute one in the processing format. We set up any
1144          * absolute seek above, before taking the stream lock. */
1145         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1146           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1147               "Aborting seek");
1148           res = FALSE;
1149         }
1150       } else {
1151         /* The seek format matches our processing format, no need to ask the
1152          * the subclass to configure the segment. */
1153         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1154             cur_type, cur, stop_type, stop, &update);
1155       }
1156     }
1157     /* Else, no seek event passed, so we're just (re)starting the 
1158        current segment. */
1159   }
1160
1161   if (res) {
1162     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1163         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1164         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1165
1166     /* do the seek, segment.last_stop contains the new position. */
1167     res = gst_base_src_do_seek (src, &seeksegment);
1168   }
1169
1170   /* and prepare to continue streaming */
1171   if (flush) {
1172     /* send flush stop, peer will accept data and events again. We
1173      * are not yet providing data as we still have the STREAM_LOCK. */
1174     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
1175   } else if (res && src->data.ABI.running) {
1176     /* we are running the current segment and doing a non-flushing seek, 
1177      * close the segment first based on the last_stop. */
1178     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1179         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1180
1181     /* queue the segment for sending in the stream thread */
1182     if (src->priv->close_segment)
1183       gst_event_unref (src->priv->close_segment);
1184     src->priv->close_segment =
1185         gst_event_new_new_segment_full (TRUE,
1186         src->segment.rate, src->segment.applied_rate, src->segment.format,
1187         src->segment.start, src->segment.last_stop, src->segment.time);
1188   }
1189
1190   /* The subclass must have converted the segment to the processing format 
1191    * by now */
1192   if (res && seeksegment.format != dest_format) {
1193     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1194         "in the correct format. Aborting seek.");
1195     res = FALSE;
1196   }
1197
1198   /* if successfull seek, we update our real segment and push
1199    * out the new segment. */
1200   if (res) {
1201     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1202
1203     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1204       gst_element_post_message (GST_ELEMENT (src),
1205           gst_message_new_segment_start (GST_OBJECT (src),
1206               src->segment.format, src->segment.last_stop));
1207     }
1208
1209     /* for deriving a stop position for the playback segment form the seek
1210      * segment, we must take the duration when the stop is not set */
1211     if ((stop = src->segment.stop) == -1)
1212       stop = src->segment.duration;
1213
1214     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1215         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1216
1217     /* now replace the old segment so that we send it in the stream thread the
1218      * next time it is scheduled. */
1219     if (src->priv->start_segment)
1220       gst_event_unref (src->priv->start_segment);
1221     src->priv->start_segment =
1222         gst_event_new_new_segment_full (FALSE,
1223         src->segment.rate, src->segment.applied_rate, src->segment.format,
1224         src->segment.last_stop, stop, src->segment.time);
1225   }
1226
1227   src->priv->discont = TRUE;
1228   src->data.ABI.running = TRUE;
1229   /* and restart the task in case it got paused explicitely or by
1230    * the FLUSH_START event we pushed out. */
1231   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1232       src->srcpad);
1233
1234   /* and release the lock again so we can continue streaming */
1235   GST_PAD_STREAM_UNLOCK (src->srcpad);
1236
1237   return res;
1238
1239   /* ERROR */
1240 prepare_failed:
1241   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1242       "Aborting seek");
1243   return FALSE;
1244 }
1245
1246 static const GstQueryType *
1247 gst_base_src_get_query_types (GstElement * element)
1248 {
1249   static const GstQueryType query_types[] = {
1250     GST_QUERY_DURATION,
1251     GST_QUERY_POSITION,
1252     GST_QUERY_SEEKING,
1253     GST_QUERY_SEGMENT,
1254     GST_QUERY_FORMATS,
1255     GST_QUERY_LATENCY,
1256     GST_QUERY_JITTER,
1257     GST_QUERY_RATE,
1258     GST_QUERY_CONVERT,
1259     0
1260   };
1261
1262   return query_types;
1263 }
1264
1265 /* all events send to this element directly. This is mainly done from the
1266  * application.
1267  */
1268 static gboolean
1269 gst_base_src_send_event (GstElement * element, GstEvent * event)
1270 {
1271   GstBaseSrc *src;
1272   gboolean result = FALSE;
1273
1274   src = GST_BASE_SRC (element);
1275
1276   switch (GST_EVENT_TYPE (event)) {
1277       /* bidirectional events */
1278     case GST_EVENT_FLUSH_START:
1279     case GST_EVENT_FLUSH_STOP:
1280       /* sending random flushes downstream can break stuff,
1281        * especially sync since all segment info will get flushed */
1282       break;
1283
1284       /* downstream serialized events */
1285     case GST_EVENT_EOS:
1286       /* FIXME, queue EOS and make sure the task or pull function 
1287        * perform the EOS actions. */
1288       break;
1289     case GST_EVENT_NEWSEGMENT:
1290       /* sending random NEWSEGMENT downstream can break sync. */
1291       break;
1292     case GST_EVENT_TAG:
1293       /* sending tags could be useful, FIXME insert in dataflow */
1294       break;
1295     case GST_EVENT_BUFFERSIZE:
1296       /* does not seem to make much sense currently */
1297       break;
1298
1299       /* upstream events */
1300     case GST_EVENT_QOS:
1301       /* elements should override send_event and do something */
1302       break;
1303     case GST_EVENT_SEEK:
1304     {
1305       gboolean started;
1306
1307       GST_OBJECT_LOCK (src->srcpad);
1308       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1309         goto wrong_mode;
1310       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1311       GST_OBJECT_UNLOCK (src->srcpad);
1312
1313       if (started) {
1314         /* when we are running in push mode, we can execute the
1315          * seek right now, we need to unlock. */
1316         result = gst_base_src_perform_seek (src, event, TRUE);
1317       } else {
1318         GstEvent **event_p;
1319
1320         /* else we store the event and execute the seek when we
1321          * get activated */
1322         GST_OBJECT_LOCK (src);
1323         event_p = &src->data.ABI.pending_seek;
1324         gst_event_replace ((GstEvent **) event_p, event);
1325         GST_OBJECT_UNLOCK (src);
1326         /* assume the seek will work */
1327         result = TRUE;
1328       }
1329       break;
1330     }
1331     case GST_EVENT_NAVIGATION:
1332       /* could make sense for elements that do something with navigation events
1333        * but then they would need to override the send_event function */
1334       break;
1335     case GST_EVENT_LATENCY:
1336       /* does not seem to make sense currently */
1337       break;
1338
1339       /* custom events */
1340     case GST_EVENT_CUSTOM_UPSTREAM:
1341       /* override send_event if you want this */
1342       break;
1343     case GST_EVENT_CUSTOM_DOWNSTREAM:
1344     case GST_EVENT_CUSTOM_BOTH:
1345       /* FIXME, insert event in the dataflow */
1346       break;
1347     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1348     case GST_EVENT_CUSTOM_BOTH_OOB:
1349       /* insert a random custom event into the pipeline */
1350       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1351       result = gst_pad_push_event (src->srcpad, event);
1352       break;
1353     default:
1354       break;
1355   }
1356 done:
1357   gst_event_unref (event);
1358
1359   return result;
1360
1361   /* ERRORS */
1362 wrong_mode:
1363   {
1364     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1365     GST_OBJECT_UNLOCK (src->srcpad);
1366     result = FALSE;
1367     goto done;
1368   }
1369 }
1370
1371 static gboolean
1372 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1373 {
1374   gboolean result;
1375
1376   switch (GST_EVENT_TYPE (event)) {
1377     case GST_EVENT_SEEK:
1378       /* is normally called when in push mode */
1379       if (!src->seekable)
1380         goto not_seekable;
1381
1382       result = gst_base_src_perform_seek (src, event, TRUE);
1383       break;
1384     case GST_EVENT_FLUSH_START:
1385       /* cancel any blocking getrange, is normally called
1386        * when in pull mode. */
1387       result = gst_base_src_unlock (src);
1388       break;
1389     case GST_EVENT_FLUSH_STOP:
1390       result = gst_base_src_unlock_stop (src);
1391       break;
1392     default:
1393       result = TRUE;
1394       break;
1395   }
1396   return result;
1397
1398   /* ERRORS */
1399 not_seekable:
1400   {
1401     GST_DEBUG_OBJECT (src, "is not seekable");
1402     return FALSE;
1403   }
1404 }
1405
1406 static gboolean
1407 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1408 {
1409   GstBaseSrc *src;
1410   GstBaseSrcClass *bclass;
1411   gboolean result = FALSE;
1412
1413   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1414   bclass = GST_BASE_SRC_GET_CLASS (src);
1415
1416   if (bclass->event) {
1417     if (!(result = bclass->event (src, event)))
1418       goto subclass_failed;
1419   }
1420
1421 done:
1422   gst_event_unref (event);
1423   gst_object_unref (src);
1424
1425   return result;
1426
1427   /* ERRORS */
1428 subclass_failed:
1429   {
1430     GST_DEBUG_OBJECT (src, "subclass refused event");
1431     goto done;
1432   }
1433 }
1434
1435 static void
1436 gst_base_src_set_property (GObject * object, guint prop_id,
1437     const GValue * value, GParamSpec * pspec)
1438 {
1439   GstBaseSrc *src;
1440
1441   src = GST_BASE_SRC (object);
1442
1443   switch (prop_id) {
1444     case PROP_BLOCKSIZE:
1445       src->blocksize = g_value_get_ulong (value);
1446       break;
1447     case PROP_NUM_BUFFERS:
1448       src->num_buffers = g_value_get_int (value);
1449       break;
1450     case PROP_TYPEFIND:
1451       src->data.ABI.typefind = g_value_get_boolean (value);
1452       break;
1453     case PROP_DO_TIMESTAMP:
1454       src->priv->do_timestamp = g_value_get_boolean (value);
1455       break;
1456     default:
1457       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1458       break;
1459   }
1460 }
1461
1462 static void
1463 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1464     GParamSpec * pspec)
1465 {
1466   GstBaseSrc *src;
1467
1468   src = GST_BASE_SRC (object);
1469
1470   switch (prop_id) {
1471     case PROP_BLOCKSIZE:
1472       g_value_set_ulong (value, src->blocksize);
1473       break;
1474     case PROP_NUM_BUFFERS:
1475       g_value_set_int (value, src->num_buffers);
1476       break;
1477     case PROP_TYPEFIND:
1478       g_value_set_boolean (value, src->data.ABI.typefind);
1479       break;
1480     case PROP_DO_TIMESTAMP:
1481       g_value_set_boolean (value, src->priv->do_timestamp);
1482       break;
1483     default:
1484       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1485       break;
1486   }
1487 }
1488
1489 /* with STREAM_LOCK and LOCK */
1490 static GstClockReturn
1491 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1492 {
1493   GstClockReturn ret;
1494   GstClockID id;
1495
1496   id = gst_clock_new_single_shot_id (clock, time);
1497
1498   basesrc->clock_id = id;
1499   /* release the object lock while waiting */
1500   GST_OBJECT_UNLOCK (basesrc);
1501
1502   ret = gst_clock_id_wait (id, NULL);
1503
1504   GST_OBJECT_LOCK (basesrc);
1505   gst_clock_id_unref (id);
1506   basesrc->clock_id = NULL;
1507
1508   return ret;
1509 }
1510
1511 /* perform synchronisation on a buffer. 
1512  * with STREAM_LOCK.
1513  */
1514 static GstClockReturn
1515 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1516 {
1517   GstClockReturn result;
1518   GstClockTime start, end;
1519   GstBaseSrcClass *bclass;
1520   GstClockTime base_time;
1521   GstClock *clock;
1522   GstClockTime now = -1, timestamp;
1523   gboolean do_timestamp, first, pseudo_live;
1524
1525   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1526
1527   start = end = -1;
1528   if (bclass->get_times)
1529     bclass->get_times (basesrc, buffer, &start, &end);
1530
1531   /* get buffer timestamp */
1532   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1533
1534   /* grab the lock to prepare for clocking and calculate the startup 
1535    * latency. */
1536   GST_OBJECT_LOCK (basesrc);
1537
1538   /* if we are asked to sync against the clock we are a pseudo live element */
1539   pseudo_live = (start != -1 && basesrc->is_live);
1540   /* check for the first buffer */
1541   first = (basesrc->priv->latency == -1);
1542
1543   if (timestamp != -1 && pseudo_live) {
1544     GstClockTime latency;
1545
1546     /* we have a timestamp and a sync time, latency is the diff */
1547     if (timestamp <= start)
1548       latency = start - timestamp;
1549     else
1550       latency = 0;
1551
1552     if (first) {
1553       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1554           GST_TIME_ARGS (latency));
1555       /* first time we calculate latency, just configure */
1556       basesrc->priv->latency = latency;
1557     } else {
1558       if (basesrc->priv->latency != latency) {
1559         /* we have a new latency, FIXME post latency message */
1560         basesrc->priv->latency = latency;
1561         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1562             GST_TIME_ARGS (latency));
1563       }
1564     }
1565   } else if (first) {
1566     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1567         basesrc->is_live, start != -1);
1568     basesrc->priv->latency = 0;
1569   }
1570
1571   /* get clock, if no clock, we can't sync or do timestamps */
1572   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1573     goto no_clock;
1574
1575   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1576
1577   do_timestamp = basesrc->priv->do_timestamp;
1578
1579   /* first buffer, calculate the timestamp offset */
1580   if (first) {
1581     GstClockTime running_time;
1582
1583     now = gst_clock_get_time (clock);
1584     running_time = now - base_time;
1585
1586     GST_LOG_OBJECT (basesrc,
1587         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1588         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1589         GST_TIME_ARGS (running_time));
1590
1591     if (pseudo_live && timestamp != -1) {
1592       /* live source and we need to sync, add startup latency to all timestamps
1593        * to get the real running_time. Live sources should always timestamp
1594        * according to the current running time. */
1595       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1596
1597       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1598           GST_TIME_ARGS (basesrc->priv->ts_offset));
1599     } else {
1600       basesrc->priv->ts_offset = 0;
1601       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1602     }
1603
1604     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1605       if (do_timestamp)
1606         timestamp = running_time;
1607       else
1608         timestamp = 0;
1609
1610       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1611
1612       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1613           GST_TIME_ARGS (timestamp));
1614     }
1615
1616     /* add the timestamp offset we need for sync */
1617     timestamp += basesrc->priv->ts_offset;
1618   } else {
1619     /* not the first buffer, the timestamp is the diff between the clock and
1620      * base_time */
1621     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1622       now = gst_clock_get_time (clock);
1623
1624       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1625     }
1626   }
1627
1628   /* if we don't have a buffer timestamp, we don't sync */
1629   if (!GST_CLOCK_TIME_IS_VALID (start))
1630     goto no_sync;
1631
1632   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1633     /* for pseudo live sources, add our ts_offset to the timestamp */
1634     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1635     start += basesrc->priv->ts_offset;
1636   }
1637
1638   GST_LOG_OBJECT (basesrc,
1639       "waiting for clock, base time %" GST_TIME_FORMAT
1640       ", stream_start %" GST_TIME_FORMAT,
1641       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1642
1643   result = gst_base_src_wait (basesrc, clock, start + base_time);
1644   GST_OBJECT_UNLOCK (basesrc);
1645
1646   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1647
1648   return result;
1649
1650   /* special cases */
1651 no_clock:
1652   {
1653     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1654     GST_OBJECT_UNLOCK (basesrc);
1655     return GST_CLOCK_OK;
1656   }
1657 no_sync:
1658   {
1659     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1660     GST_OBJECT_UNLOCK (basesrc);
1661     return GST_CLOCK_OK;
1662   }
1663 }
1664
1665 static gboolean
1666 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1667 {
1668   guint64 size, maxsize;
1669   GstBaseSrcClass *bclass;
1670
1671   bclass = GST_BASE_SRC_GET_CLASS (src);
1672
1673   /* only operate if we are working with bytes */
1674   if (src->segment.format != GST_FORMAT_BYTES)
1675     return TRUE;
1676
1677   /* get total file size */
1678   size = (guint64) src->segment.duration;
1679
1680   /* the max amount of bytes to read is the total size or
1681    * up to the segment.stop if present. */
1682   if (src->segment.stop != -1)
1683     maxsize = MIN (size, src->segment.stop);
1684   else
1685     maxsize = size;
1686
1687   GST_DEBUG_OBJECT (src,
1688       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1689       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1690       *length, size, src->segment.stop, maxsize);
1691
1692   /* check size if we have one */
1693   if (maxsize != -1) {
1694     /* if we run past the end, check if the file became bigger and 
1695      * retry. */
1696     if (G_UNLIKELY (offset + *length >= maxsize)) {
1697       /* see if length of the file changed */
1698       if (bclass->get_size)
1699         if (!bclass->get_size (src, &size))
1700           size = -1;
1701
1702       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1703
1704       /* make sure we don't exceed the configured segment stop
1705        * if it was set */
1706       if (src->segment.stop != -1)
1707         maxsize = MIN (size, src->segment.stop);
1708       else
1709         maxsize = size;
1710
1711       /* if we are at or past the end, EOS */
1712       if (G_UNLIKELY (offset >= maxsize))
1713         goto unexpected_length;
1714
1715       /* else we can clip to the end */
1716       if (G_UNLIKELY (offset + *length >= maxsize))
1717         *length = maxsize - offset;
1718
1719     }
1720   }
1721
1722   /* keep track of current position. segment is in bytes, we checked 
1723    * that above. */
1724   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1725
1726   return TRUE;
1727
1728   /* ERRORS */
1729 unexpected_length:
1730   {
1731     return FALSE;
1732   }
1733 }
1734
1735 static GstFlowReturn
1736 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1737     GstBuffer ** buf)
1738 {
1739   GstFlowReturn ret;
1740   GstBaseSrcClass *bclass;
1741   GstClockReturn status;
1742
1743   bclass = GST_BASE_SRC_GET_CLASS (src);
1744
1745   ret = gst_base_src_wait_playing (src);
1746   if (ret != GST_FLOW_OK)
1747     goto stopped;
1748
1749   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
1750     goto not_started;
1751
1752   if (G_UNLIKELY (!bclass->create))
1753     goto no_function;
1754
1755   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
1756     goto unexpected_length;
1757
1758   /* normally we don't count buffers */
1759   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
1760     if (src->num_buffers_left == 0)
1761       goto reached_num_buffers;
1762     else
1763       src->num_buffers_left--;
1764   }
1765
1766   GST_DEBUG_OBJECT (src,
1767       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
1768       G_GINT64_FORMAT, offset, length, src->segment.time);
1769
1770   ret = bclass->create (src, offset, length, buf);
1771   if (G_UNLIKELY (ret != GST_FLOW_OK))
1772     goto done;
1773
1774   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
1775   if (offset == 0 && src->segment.time == 0
1776       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1777     GST_BUFFER_TIMESTAMP (*buf) = 0;
1778
1779   /* now sync before pushing the buffer */
1780   status = gst_base_src_do_sync (src, *buf);
1781   switch (status) {
1782     case GST_CLOCK_EARLY:
1783       /* the buffer is too late. We currently don't drop the buffer. */
1784       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1785       break;
1786     case GST_CLOCK_OK:
1787       /* buffer synchronised properly */
1788       GST_DEBUG_OBJECT (src, "buffer ok");
1789       break;
1790     case GST_CLOCK_UNSCHEDULED:
1791       /* this case is triggered when we were waiting for the clock and
1792        * it got unlocked because we did a state change. We return 
1793        * WRONG_STATE in this case to stop the dataflow also get rid of the
1794        * produced buffer. */
1795       GST_DEBUG_OBJECT (src,
1796           "clock was unscheduled (%d), returning WRONG_STATE", status);
1797       gst_buffer_unref (*buf);
1798       *buf = NULL;
1799       ret = GST_FLOW_WRONG_STATE;
1800       break;
1801     default:
1802       /* all other result values are unexpected and errors */
1803       GST_ELEMENT_ERROR (src, CORE, CLOCK,
1804           (_("Internal clock error.")),
1805           ("clock returned unexpected return value %d", status));
1806       gst_buffer_unref (*buf);
1807       *buf = NULL;
1808       ret = GST_FLOW_ERROR;
1809       break;
1810   }
1811 done:
1812   return ret;
1813
1814   /* ERROR */
1815 stopped:
1816   {
1817     GST_DEBUG_OBJECT (src, "wait_playing returned %d", ret);
1818     return ret;
1819   }
1820 not_started:
1821   {
1822     GST_DEBUG_OBJECT (src, "getrange but not started");
1823     return GST_FLOW_WRONG_STATE;
1824   }
1825 no_function:
1826   {
1827     GST_DEBUG_OBJECT (src, "no create function");
1828     return GST_FLOW_ERROR;
1829   }
1830 unexpected_length:
1831   {
1832     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1833         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1834     return GST_FLOW_UNEXPECTED;
1835   }
1836 reached_num_buffers:
1837   {
1838     GST_DEBUG_OBJECT (src, "sent all buffers");
1839     return GST_FLOW_UNEXPECTED;
1840   }
1841 }
1842
1843 static GstFlowReturn
1844 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1845     GstBuffer ** buf)
1846 {
1847   GstBaseSrc *src;
1848   GstFlowReturn res;
1849
1850   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1851
1852   res = gst_base_src_get_range (src, offset, length, buf);
1853
1854   gst_object_unref (src);
1855
1856   return res;
1857 }
1858
1859 static gboolean
1860 gst_base_src_default_check_get_range (GstBaseSrc * src)
1861 {
1862   gboolean res;
1863
1864   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1865     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1866     if (G_LIKELY (gst_base_src_start (src)))
1867       gst_base_src_stop (src);
1868   }
1869
1870   /* we can operate in getrange mode if the native format is bytes
1871    * and we are seekable, this condition is set in the random_access
1872    * flag and is set in the _start() method. */
1873   res = src->random_access;
1874
1875   return res;
1876 }
1877
1878 static gboolean
1879 gst_base_src_check_get_range (GstBaseSrc * src)
1880 {
1881   GstBaseSrcClass *bclass;
1882   gboolean res;
1883
1884   bclass = GST_BASE_SRC_GET_CLASS (src);
1885
1886   if (bclass->check_get_range == NULL)
1887     goto no_function;
1888
1889   res = bclass->check_get_range (src);
1890   GST_LOG_OBJECT (src, "%s() returned %d",
1891       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1892
1893   return res;
1894
1895   /* ERRORS */
1896 no_function:
1897   {
1898     GST_WARNING_OBJECT (src, "no check_get_range function set");
1899     return FALSE;
1900   }
1901 }
1902
1903 static gboolean
1904 gst_base_src_pad_check_get_range (GstPad * pad)
1905 {
1906   GstBaseSrc *src;
1907   gboolean res;
1908
1909   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1910
1911   res = gst_base_src_check_get_range (src);
1912
1913   gst_object_unref (src);
1914
1915   return res;
1916 }
1917
1918 static void
1919 gst_base_src_loop (GstPad * pad)
1920 {
1921   GstBaseSrc *src;
1922   GstBuffer *buf = NULL;
1923   GstFlowReturn ret;
1924   gint64 position;
1925   gboolean eos;
1926
1927   eos = FALSE;
1928
1929   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1930
1931   src->priv->last_sent_eos = FALSE;
1932
1933   /* if we operate in bytes, we can calculate an offset */
1934   if (src->segment.format == GST_FORMAT_BYTES)
1935     position = src->segment.last_stop;
1936   else
1937     position = -1;
1938
1939   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1940   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1941     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
1942         gst_flow_get_name (ret));
1943     goto pause;
1944   }
1945   /* this should not happen */
1946   if (G_UNLIKELY (buf == NULL))
1947     goto null_buffer;
1948
1949   /* push events to close/start our segment before we push the buffer. */
1950   if (src->priv->close_segment) {
1951     gst_pad_push_event (pad, src->priv->close_segment);
1952     src->priv->close_segment = NULL;
1953   }
1954   if (src->priv->start_segment) {
1955     gst_pad_push_event (pad, src->priv->start_segment);
1956     src->priv->start_segment = NULL;
1957   }
1958
1959   /* figure out the new position */
1960   switch (src->segment.format) {
1961     case GST_FORMAT_BYTES:
1962       position += GST_BUFFER_SIZE (buf);
1963       break;
1964     case GST_FORMAT_TIME:
1965     {
1966       GstClockTime start, duration;
1967
1968       start = GST_BUFFER_TIMESTAMP (buf);
1969       duration = GST_BUFFER_DURATION (buf);
1970
1971       if (GST_CLOCK_TIME_IS_VALID (start))
1972         position = start;
1973       else
1974         position = src->segment.last_stop;
1975
1976       if (GST_CLOCK_TIME_IS_VALID (duration))
1977         position += duration;
1978       break;
1979     }
1980     case GST_FORMAT_DEFAULT:
1981       position = GST_BUFFER_OFFSET_END (buf);
1982       break;
1983     default:
1984       position = -1;
1985       break;
1986   }
1987   if (position != -1) {
1988     if (src->segment.stop != -1) {
1989       if (position >= src->segment.stop) {
1990         eos = TRUE;
1991         position = src->segment.stop;
1992       }
1993     }
1994     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1995   }
1996
1997   if (G_UNLIKELY (src->priv->discont)) {
1998     buf = gst_buffer_make_metadata_writable (buf);
1999     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2000     src->priv->discont = FALSE;
2001   }
2002
2003   ret = gst_pad_push (pad, buf);
2004   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2005     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2006         gst_flow_get_name (ret));
2007     goto pause;
2008   }
2009
2010   if (eos) {
2011     GST_INFO_OBJECT (src, "pausing after EOS");
2012     ret = GST_FLOW_UNEXPECTED;
2013     goto pause;
2014   }
2015
2016 done:
2017   gst_object_unref (src);
2018   return;
2019
2020   /* special cases */
2021 pause:
2022   {
2023     const gchar *reason = gst_flow_get_name (ret);
2024
2025     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2026     src->data.ABI.running = FALSE;
2027     gst_pad_pause_task (pad);
2028     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2029       if (ret == GST_FLOW_UNEXPECTED) {
2030         /* perform EOS logic */
2031         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
2032           gst_element_post_message (GST_ELEMENT_CAST (src),
2033               gst_message_new_segment_done (GST_OBJECT_CAST (src),
2034                   src->segment.format, src->segment.last_stop));
2035         } else {
2036           gst_pad_push_event (pad, gst_event_new_eos ());
2037           src->priv->last_sent_eos = TRUE;
2038         }
2039       } else {
2040         /* for fatal errors we post an error message, post the error
2041          * first so the app knows about the error first. */
2042         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2043             (_("Internal data flow error.")),
2044             ("streaming task paused, reason %s (%d)", reason, ret));
2045         gst_pad_push_event (pad, gst_event_new_eos ());
2046         src->priv->last_sent_eos = TRUE;
2047       }
2048     }
2049     goto done;
2050   }
2051 null_buffer:
2052   {
2053     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2054         (_("Internal data flow error.")), ("element returned NULL buffer"));
2055     /* we finished the segment on error */
2056     src->data.ABI.running = FALSE;
2057     gst_pad_pause_task (pad);
2058     gst_pad_push_event (pad, gst_event_new_eos ());
2059     src->priv->last_sent_eos = TRUE;
2060     goto done;
2061   }
2062 }
2063
2064 /* this will always be called between start() and stop(). So you can rely on
2065  * resources allocated by start() and freed from stop(). This needs to be added
2066  * to the docs at some point. */
2067 static gboolean
2068 gst_base_src_unlock (GstBaseSrc * basesrc)
2069 {
2070   GstBaseSrcClass *bclass;
2071   gboolean result = TRUE;
2072
2073   GST_DEBUG ("unlock");
2074   /* unblock whatever the subclass is doing */
2075   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2076   if (bclass->unlock)
2077     result = bclass->unlock (basesrc);
2078
2079   GST_DEBUG ("unschedule clock");
2080   /* and unblock the clock as well, if any */
2081   GST_OBJECT_LOCK (basesrc);
2082   if (basesrc->clock_id) {
2083     gst_clock_id_unschedule (basesrc->clock_id);
2084   }
2085   GST_OBJECT_UNLOCK (basesrc);
2086
2087   GST_DEBUG ("unlock done");
2088
2089   return result;
2090 }
2091
2092 /* this will always be called between start() and stop(). So you can rely on
2093  * resources allocated by start() and freed from stop(). This needs to be added
2094  * to the docs at some point. */
2095 static gboolean
2096 gst_base_src_unlock_stop (GstBaseSrc * basesrc)
2097 {
2098   GstBaseSrcClass *bclass;
2099   gboolean result = TRUE;
2100
2101   GST_DEBUG_OBJECT (basesrc, "unlock stop");
2102
2103   /* Finish a previous unblock request, allowing subclasses to flush command
2104    * queues or whatever they need to do */
2105   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2106   if (bclass->unlock_stop)
2107     result = bclass->unlock_stop (basesrc);
2108
2109   GST_DEBUG_OBJECT (basesrc, "unlock stop done");
2110
2111   return result;
2112 }
2113
2114 /* default negotiation code. 
2115  *
2116  * Take intersection between src and sink pads, take first
2117  * caps and fixate. 
2118  */
2119 static gboolean
2120 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2121 {
2122   GstCaps *thiscaps;
2123   GstCaps *caps = NULL;
2124   GstCaps *peercaps = NULL;
2125   gboolean result = FALSE;
2126
2127   /* first see what is possible on our source pad */
2128   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
2129   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2130   /* nothing or anything is allowed, we're done */
2131   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2132     goto no_nego_needed;
2133
2134   /* get the peer caps */
2135   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
2136   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2137   if (peercaps) {
2138     GstCaps *icaps;
2139
2140     /* get intersection */
2141     icaps = gst_caps_intersect (thiscaps, peercaps);
2142     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2143     gst_caps_unref (thiscaps);
2144     gst_caps_unref (peercaps);
2145     if (icaps) {
2146       /* take first (and best, since they are sorted) possibility */
2147       caps = gst_caps_copy_nth (icaps, 0);
2148       gst_caps_unref (icaps);
2149     }
2150   } else {
2151     /* no peer, work with our own caps then */
2152     caps = thiscaps;
2153   }
2154   if (caps) {
2155     caps = gst_caps_make_writable (caps);
2156     gst_caps_truncate (caps);
2157
2158     /* now fixate */
2159     if (!gst_caps_is_empty (caps)) {
2160       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2161       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2162
2163       if (gst_caps_is_any (caps)) {
2164         /* hmm, still anything, so element can do anything and
2165          * nego is not needed */
2166         result = TRUE;
2167       } else if (gst_caps_is_fixed (caps)) {
2168         /* yay, fixed caps, use those then */
2169         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2170         result = TRUE;
2171       }
2172     }
2173     gst_caps_unref (caps);
2174   }
2175   return result;
2176
2177 no_nego_needed:
2178   {
2179     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2180     if (thiscaps)
2181       gst_caps_unref (thiscaps);
2182     return TRUE;
2183   }
2184 }
2185
2186 static gboolean
2187 gst_base_src_negotiate (GstBaseSrc * basesrc)
2188 {
2189   GstBaseSrcClass *bclass;
2190   gboolean result = TRUE;
2191
2192   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2193
2194   if (bclass->negotiate)
2195     result = bclass->negotiate (basesrc);
2196
2197   return result;
2198 }
2199
2200 static gboolean
2201 gst_base_src_start (GstBaseSrc * basesrc)
2202 {
2203   GstBaseSrcClass *bclass;
2204   gboolean result;
2205   guint64 size;
2206
2207   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2208     return TRUE;
2209
2210   GST_DEBUG_OBJECT (basesrc, "starting source");
2211
2212   basesrc->num_buffers_left = basesrc->num_buffers;
2213
2214   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2215   basesrc->data.ABI.running = FALSE;
2216
2217   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2218   if (bclass->start)
2219     result = bclass->start (basesrc);
2220   else
2221     result = TRUE;
2222
2223   if (!result)
2224     goto could_not_start;
2225
2226   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2227
2228   /* figure out the size */
2229   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2230     if (bclass->get_size) {
2231       if (!(result = bclass->get_size (basesrc, &size)))
2232         size = -1;
2233     } else {
2234       result = FALSE;
2235       size = -1;
2236     }
2237     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2238     /* only update the size when operating in bytes, subclass is supposed
2239      * to set duration in the start method for other formats */
2240     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2241   } else {
2242     size = -1;
2243   }
2244
2245   GST_DEBUG_OBJECT (basesrc,
2246       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2247       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2248       basesrc->segment.duration);
2249
2250   /* check if we can seek */
2251   if (bclass->is_seekable)
2252     basesrc->seekable = bclass->is_seekable (basesrc);
2253   else
2254     basesrc->seekable = FALSE;
2255
2256   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
2257
2258   /* update for random access flag */
2259   basesrc->random_access = basesrc->seekable &&
2260       basesrc->segment.format == GST_FORMAT_BYTES;
2261
2262   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2263
2264   /* run typefind if we are random_access and the typefinding is enabled. */
2265   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2266     GstCaps *caps;
2267
2268     caps = gst_type_find_helper (basesrc->srcpad, size);
2269     gst_pad_set_caps (basesrc->srcpad, caps);
2270     gst_caps_unref (caps);
2271   } else {
2272     /* use class or default negotiate function */
2273     if (!gst_base_src_negotiate (basesrc))
2274       goto could_not_negotiate;
2275   }
2276
2277   return TRUE;
2278
2279   /* ERROR */
2280 could_not_start:
2281   {
2282     GST_DEBUG_OBJECT (basesrc, "could not start");
2283     /* subclass is supposed to post a message. We don't have to call _stop. */
2284     return FALSE;
2285   }
2286 could_not_negotiate:
2287   {
2288     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2289     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2290         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2291     /* we must call stop */
2292     gst_base_src_stop (basesrc);
2293     return FALSE;
2294   }
2295 }
2296
2297 static gboolean
2298 gst_base_src_stop (GstBaseSrc * basesrc)
2299 {
2300   GstBaseSrcClass *bclass;
2301   gboolean result = TRUE;
2302
2303   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2304     return TRUE;
2305
2306   GST_DEBUG_OBJECT (basesrc, "stopping source");
2307
2308   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2309   if (bclass->stop)
2310     result = bclass->stop (basesrc);
2311
2312   if (result)
2313     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2314
2315   return result;
2316 }
2317
2318 static gboolean
2319 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
2320 {
2321   gboolean result;
2322
2323   GST_LIVE_LOCK (basesrc);
2324   basesrc->live_running = TRUE;
2325   GST_LIVE_SIGNAL (basesrc);
2326   GST_LIVE_UNLOCK (basesrc);
2327
2328   /* step 1, unblock clock sync (if any) */
2329   result = gst_base_src_unlock (basesrc);
2330
2331   /* step 2, make sure streaming finishes */
2332   result &= gst_pad_stop_task (pad);
2333
2334   /* step 3, clear the unblock condition */
2335   result &= gst_base_src_unlock_stop (basesrc);
2336
2337   return result;
2338 }
2339
2340 static gboolean
2341 gst_base_src_activate_push (GstPad * pad, gboolean active)
2342 {
2343   GstBaseSrc *basesrc;
2344   GstEvent *event;
2345
2346   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2347
2348   /* prepare subclass first */
2349   if (active) {
2350     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2351
2352     if (G_UNLIKELY (!basesrc->can_activate_push))
2353       goto no_push_activation;
2354
2355     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2356       goto error_start;
2357
2358     basesrc->priv->last_sent_eos = FALSE;
2359
2360     /* do initial seek, which will start the task */
2361     GST_OBJECT_LOCK (basesrc);
2362     event = basesrc->data.ABI.pending_seek;
2363     basesrc->data.ABI.pending_seek = NULL;
2364     GST_OBJECT_UNLOCK (basesrc);
2365
2366     /* no need to unlock anything, the task is certainly
2367      * not running here. The perform seek code will start the task when
2368      * finished. */
2369     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2370       goto seek_failed;
2371
2372     if (event)
2373       gst_event_unref (event);
2374   } else {
2375     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2376     /* call the unlock function and stop the task */
2377     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2378       goto deactivate_failed;
2379
2380     /* now we can stop the source */
2381     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2382       goto error_stop;
2383   }
2384   return TRUE;
2385
2386   /* ERRORS */
2387 no_push_activation:
2388   {
2389     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2390     return FALSE;
2391   }
2392 error_start:
2393   {
2394     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2395     return FALSE;
2396   }
2397 seek_failed:
2398   {
2399     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2400     gst_base_src_stop (basesrc);
2401     if (event)
2402       gst_event_unref (event);
2403     return FALSE;
2404   }
2405 deactivate_failed:
2406   {
2407     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in push mode");
2408     return FALSE;
2409   }
2410 error_stop:
2411   {
2412     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2413     return FALSE;
2414   }
2415 }
2416
2417 static gboolean
2418 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2419 {
2420   GstBaseSrc *basesrc;
2421
2422   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2423
2424   /* prepare subclass first */
2425   if (active) {
2426     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2427     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2428       goto error_start;
2429
2430     /* if not random_access, we cannot operate in pull mode for now */
2431     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2432       goto no_get_range;
2433   } else {
2434     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2435     /* call the unlock function. We have no task to stop. */
2436     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2437       goto deactivate_failed;
2438
2439     /* don't send EOS when going from PAUSED => READY when in pull mode */
2440     basesrc->priv->last_sent_eos = TRUE;
2441
2442     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2443       goto error_stop;
2444   }
2445   return TRUE;
2446
2447   /* ERRORS */
2448 error_start:
2449   {
2450     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2451     return FALSE;
2452   }
2453 no_get_range:
2454   {
2455     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2456     gst_base_src_stop (basesrc);
2457     return FALSE;
2458   }
2459 deactivate_failed:
2460   {
2461     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in pull mode");
2462     return FALSE;
2463   }
2464 error_stop:
2465   {
2466     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2467     return FALSE;
2468   }
2469 }
2470
2471 static GstStateChangeReturn
2472 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2473 {
2474   GstBaseSrc *basesrc;
2475   GstStateChangeReturn result;
2476   gboolean no_preroll = FALSE;
2477
2478   basesrc = GST_BASE_SRC (element);
2479
2480   switch (transition) {
2481     case GST_STATE_CHANGE_NULL_TO_READY:
2482       break;
2483     case GST_STATE_CHANGE_READY_TO_PAUSED:
2484       GST_LIVE_LOCK (element);
2485       if (basesrc->is_live) {
2486         no_preroll = TRUE;
2487         basesrc->live_running = FALSE;
2488       }
2489       basesrc->priv->last_sent_eos = FALSE;
2490       basesrc->priv->discont = TRUE;
2491       GST_LIVE_UNLOCK (element);
2492       break;
2493     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2494       GST_LIVE_LOCK (element);
2495       basesrc->priv->latency = -1;
2496       if (basesrc->is_live) {
2497         basesrc->live_running = TRUE;
2498         GST_LIVE_SIGNAL (element);
2499       }
2500       GST_LIVE_UNLOCK (element);
2501       break;
2502     default:
2503       break;
2504   }
2505
2506   if ((result =
2507           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2508               transition)) == GST_STATE_CHANGE_FAILURE)
2509     goto failure;
2510
2511   switch (transition) {
2512     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2513       GST_LIVE_LOCK (element);
2514       if (basesrc->is_live) {
2515         no_preroll = TRUE;
2516         basesrc->live_running = FALSE;
2517       }
2518       GST_LIVE_UNLOCK (element);
2519       break;
2520     case GST_STATE_CHANGE_PAUSED_TO_READY:
2521     {
2522       GstEvent **event_p;
2523
2524       /* FIXME, deprecate this behaviour, it is very dangerous.
2525        * the prefered way of sending EOS downstream is by sending
2526        * the EOS event to the element */
2527       if (!basesrc->priv->last_sent_eos) {
2528         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2529         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
2530         basesrc->priv->last_sent_eos = TRUE;
2531       }
2532       event_p = &basesrc->data.ABI.pending_seek;
2533       gst_event_replace (event_p, NULL);
2534       event_p = &basesrc->priv->close_segment;
2535       gst_event_replace (event_p, NULL);
2536       event_p = &basesrc->priv->start_segment;
2537       gst_event_replace (event_p, NULL);
2538       break;
2539     }
2540     case GST_STATE_CHANGE_READY_TO_NULL:
2541       break;
2542     default:
2543       break;
2544   }
2545
2546   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
2547     result = GST_STATE_CHANGE_NO_PREROLL;
2548
2549   return result;
2550
2551   /* ERRORS */
2552 failure:
2553   {
2554     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
2555     return result;
2556   }
2557 }