libs/gst/base/gstbasesrc.c: Update docs.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
55  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
56  * </para>
57  * <para>
58  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
59  * automatically seekable in push mode as well. The following conditions must 
60  * be met to make the element seekable in push mode when the format is not
61  * #GST_FORMAT_BYTES:
62  * <itemizedlist>
63  *   <listitem><para>
64  *     #GstBaseSrc::is_seekable returns %TRUE.
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::query can convert all supported seek formats to the
68  *     internal format as set with gst_base_src_set_format().
69  *   </para></listitem>
70  *   <listitem><para>
71  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
72  *   </para></listitem>
73  * </itemizedlist>
74  * </para>
75  * <para>
76  * When the element does not meet the requirements to operate in pull mode,
77  * the offset and length in the #GstBaseSrc::create method should be ignored.
78  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
79  * element can operate in pull mode but only with specific offsets and
80  * lengths, it is allowed to generate an error when the wrong values are passed
81  * to the #GstBaseSrc::create function.
82  * </para>
83  * <para>
84  * #GstBaseSrc has support for live sources. Live sources are sources that when
85  * paused discard data, such as audio or video capture devices. A typical live
86  * source also produces data at a fixed rate and thus provides a clock to publish
87  * this rate.
88  * Use gst_base_src_set_live() to activate the live source mode.
89  * </para>
90  * <para>
91  * A live source does not produce data in the PAUSED state. This means that the 
92  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
93  * To signal the pipeline that the element will not produce data, the return
94  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
95  * </para>
96  * <para>
97  * A typical live source will timestamp the buffers it creates with the 
98  * current running time of the pipeline. This is one reason why a live source
99  * can only produce data in the PLAYING state, when the clock is actually 
100  * distributed and running. 
101  * </para>
102  * <para>
103  * Live sources that synchronize and block on the clock (an audio source, for
104  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
105  * function was interrupted by a state change to PAUSED.
106  * </para>
107  * <para>
108  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
109  * sources.
110  * It only makes sense to implement the ::get_times function if the source is 
111  * a live source. The ::get_times function should return timestamps starting
112  * from 0, as if it were a non-live source. The base class will make sure that
113  * the timestamps are transformed into the current running_time.
114  * The base source will then wait for the calculated running_time before pushing
115  * out the buffer.
116  * </para>
117  * <para>
118  * For live sources, the base class will by default report a latency of 0.
119  * For pseudo live sources, the base class will by default measure the difference
120  * between the first buffer timestamp and the start time of get_times and will
121  * report this value as the latency. 
122  * Subclasses should override the query function when this behaviour is not
123  * acceptable.
124  * </para>
125  * <para>
126  * There is only support in #GstBaseSrc for exactly one source pad, which 
127  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
128  * should install a pad template in its base_init function, like so:
129  * </para>
130  * <para>
131  * <programlisting>
132  * static void
133  * my_element_base_init (gpointer g_class)
134  * {
135  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
136  *   // srctemplate should be a #GstStaticPadTemplate with direction
137  *   // #GST_PAD_SRC and name "src"
138  *   gst_element_class_add_pad_template (gstelement_class,
139  *       gst_static_pad_template_get (&amp;srctemplate));
140  *   // see #GstElementDetails
141  *   gst_element_class_set_details (gstelement_class, &amp;details);
142  * }
143  * </programlisting>
144  * </para>
145  * <title>Controlled shutdown of live sources in applications</title>
146  * <para>
147  * Applications that record from a live source may want to stop recording
148  * in a controlled way, so that the recording is stopped, but the data
149  * already in the pipeline is processed to the end (remember that many live
150  * sources would go on recording forever otherwise). For that to happen the
151  * application needs to make the source stop recording and send an EOS
152  * event down the pipeline. The application would then wait for an
153  * EOS message posted on the pipeline's bus to know when all data has
154  * been processed and the pipeline can safely be stopped.
155  * </para>
156  * <para>
157  * Since GStreamer 0.10.3 an application may simply set the source
158  * element to NULL or READY state to make it send an EOS event downstream.
159  * The application should lock the state of the source afterwards, so that
160  * shutting down the pipeline from PLAYING doesn't temporarily start up the
161  * source element for a second time:
162  * <programlisting>
163  * ...
164  * // stop recording
165  * gst_element_set_state (audio_source, #GST_STATE_NULL);
166  * gst_element_set_locked_state (audio_source, %TRUE);
167  * ...
168  * </programlisting>
169  * Now the application should wait for an EOS message
170  * to be posted on the pipeline's bus. Once it has received
171  * an EOS message, it may safely shut down the entire pipeline:
172  * <programlisting>
173  * ...
174  * // everything done - shut down pipeline
175  * gst_element_set_state (pipeline, #GST_STATE_NULL);
176  * gst_element_set_locked_state (audio_source, %FALSE);
177  * ...
178  * </programlisting>
179  * </para>
180  * <para>
181  * Note that setting the source element to NULL or READY when the 
182  * pipeline is in the PAUSED state may cause a deadlock since the streaming
183  * thread might be blocked in PREROLL.
184  * </para>
185  * <para>
186  * Last reviewed on 2007-09-13 (0.10.15)
187  * </para>
188  * </refsect2>
189  */
190
191 #ifdef HAVE_CONFIG_H
192 #  include "config.h"
193 #endif
194
195 #include <stdlib.h>
196 #include <string.h>
197
198 #include "gstbasesrc.h"
199 #include "gsttypefindhelper.h"
200 #include <gst/gstmarshal.h>
201 #include <gst/gst-i18n-lib.h>
202
203 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
204 #define GST_CAT_DEFAULT gst_base_src_debug
205
206 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
207 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
208 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
209 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
210 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
211 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
212 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
213                                                                                 timeval)
214 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
215 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
216
217 /* BaseSrc signals and args */
218 enum
219 {
220   /* FILL ME */
221   LAST_SIGNAL
222 };
223
224 #define DEFAULT_BLOCKSIZE       4096
225 #define DEFAULT_NUM_BUFFERS     -1
226 #define DEFAULT_TYPEFIND        FALSE
227 #define DEFAULT_DO_TIMESTAMP    FALSE
228
229 enum
230 {
231   PROP_0,
232   PROP_BLOCKSIZE,
233   PROP_NUM_BUFFERS,
234   PROP_TYPEFIND,
235   PROP_DO_TIMESTAMP
236 };
237
238 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
239    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
240
241 struct _GstBaseSrcPrivate
242 {
243   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
244                                  * to avoid the sending of two EOS in some cases) */
245   gboolean discont;
246
247   /* two segments to be sent in the streaming thread with STREAM_LOCK */
248   GstEvent *close_segment;
249   GstEvent *start_segment;
250
251   /* startup latency is the time it takes between going to PLAYING and producing
252    * the first BUFFER with running_time 0. This value is included in the latency
253    * reporting. */
254   GstClockTime latency;
255   /* timestamp offset, this is the offset add to the values of gst_times for
256    * pseudo live sources */
257   GstClockTimeDiff ts_offset;
258
259   gboolean do_timestamp;
260 };
261
262 static GstElementClass *parent_class = NULL;
263
264 static void gst_base_src_base_init (gpointer g_class);
265 static void gst_base_src_class_init (GstBaseSrcClass * klass);
266 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
267 static void gst_base_src_finalize (GObject * object);
268
269
270 GType
271 gst_base_src_get_type (void)
272 {
273   static GType base_src_type = 0;
274
275   if (G_UNLIKELY (base_src_type == 0)) {
276     static const GTypeInfo base_src_info = {
277       sizeof (GstBaseSrcClass),
278       (GBaseInitFunc) gst_base_src_base_init,
279       NULL,
280       (GClassInitFunc) gst_base_src_class_init,
281       NULL,
282       NULL,
283       sizeof (GstBaseSrc),
284       0,
285       (GInstanceInitFunc) gst_base_src_init,
286     };
287
288     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
289         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
290   }
291   return base_src_type;
292 }
293 static GstCaps *gst_base_src_getcaps (GstPad * pad);
294 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
295 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
296
297 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
298 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
299 static void gst_base_src_set_property (GObject * object, guint prop_id,
300     const GValue * value, GParamSpec * pspec);
301 static void gst_base_src_get_property (GObject * object, guint prop_id,
302     GValue * value, GParamSpec * pspec);
303 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
304 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
305 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
306 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
307
308 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
309
310 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
311 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
312     GstSegment * segment);
313 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
314 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
315     GstEvent * event, GstSegment * segment);
316
317 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
318 static gboolean gst_base_src_unlock_stop (GstBaseSrc * basesrc);
319 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
320 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
321
322 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
323     GstStateChange transition);
324
325 static void gst_base_src_loop (GstPad * pad);
326 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
327 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
328 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
329     guint length, GstBuffer ** buf);
330 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
331     guint length, GstBuffer ** buf);
332
333 static void
334 gst_base_src_base_init (gpointer g_class)
335 {
336   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
337 }
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
349
350   parent_class = g_type_class_peek_parent (klass);
351
352   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
353   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
354   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
355
356   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
357       g_param_spec_ulong ("blocksize", "Block size",
358           "Size in bytes to read per buffer (0 = default)", 0, G_MAXULONG,
359           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE));
360   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
361       g_param_spec_int ("num-buffers", "num-buffers",
362           "Number of buffers to output before sending EOS", -1, G_MAXINT,
363           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
364   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
365       g_param_spec_boolean ("typefind", "Typefind",
366           "Run typefind before negotiating", DEFAULT_TYPEFIND,
367           G_PARAM_READWRITE));
368   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
369       g_param_spec_boolean ("do-timestamp", "Do timestamp",
370           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
371           G_PARAM_READWRITE));
372
373   gstelement_class->change_state =
374       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
375   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
376   gstelement_class->get_query_types =
377       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
378
379   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
380   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
381   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
382   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
383   klass->check_get_range =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
385   klass->prepare_seek_segment =
386       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
387 }
388
389 static void
390 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
391 {
392   GstPad *pad;
393   GstPadTemplate *pad_template;
394
395   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
396
397   basesrc->is_live = FALSE;
398   basesrc->live_lock = g_mutex_new ();
399   basesrc->live_cond = g_cond_new ();
400   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
401   basesrc->num_buffers_left = -1;
402
403   basesrc->can_activate_push = TRUE;
404   basesrc->pad_mode = GST_ACTIVATE_NONE;
405
406   pad_template =
407       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
408   g_return_if_fail (pad_template != NULL);
409
410   GST_DEBUG_OBJECT (basesrc, "creating src pad");
411   pad = gst_pad_new_from_template (pad_template, "src");
412
413   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
414   gst_pad_set_activatepush_function (pad,
415       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
416   gst_pad_set_activatepull_function (pad,
417       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
418   gst_pad_set_event_function (pad,
419       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
420   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
421   gst_pad_set_checkgetrange_function (pad,
422       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
423   gst_pad_set_getrange_function (pad,
424       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
425   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
426   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
427   gst_pad_set_fixatecaps_function (pad,
428       GST_DEBUG_FUNCPTR (gst_base_src_fixate));
429
430   /* hold pointer to pad */
431   basesrc->srcpad = pad;
432   GST_DEBUG_OBJECT (basesrc, "adding src pad");
433   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
434
435   basesrc->blocksize = DEFAULT_BLOCKSIZE;
436   basesrc->clock_id = NULL;
437   /* we operate in BYTES by default */
438   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
439   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
440   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
441
442   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
443
444   GST_DEBUG_OBJECT (basesrc, "init done");
445 }
446
447 static void
448 gst_base_src_finalize (GObject * object)
449 {
450   GstBaseSrc *basesrc;
451   GstEvent **event_p;
452
453   basesrc = GST_BASE_SRC (object);
454
455   g_mutex_free (basesrc->live_lock);
456   g_cond_free (basesrc->live_cond);
457
458   event_p = &basesrc->data.ABI.pending_seek;
459   gst_event_replace ((GstEvent **) event_p, NULL);
460
461   G_OBJECT_CLASS (parent_class)->finalize (object);
462 }
463
464 /**
465  * gst_base_src_wait_playing:
466  * @src: the src
467  *
468  * If the #GstBaseSrcClass::create method performs its own synchronisation against
469  * the clock it must unblock when going from PLAYING to the PAUSED state and call
470  * this method before continuing to produce the remaining data.
471  *
472  * This function will block until a state change to PLAYING happens (in which
473  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
474  * to a state change to READY or a FLUSH event (in which case this function
475  * returns #GST_FLOW_WRONG_STATE).
476  *
477  * Since: 0.10.12
478  *
479  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
480  * continue. Any other return value should be returned from the create vmethod.
481  */
482 GstFlowReturn
483 gst_base_src_wait_playing (GstBaseSrc * src)
484 {
485   /* block until the state changes, or we get a flush, or something */
486   GST_LIVE_LOCK (src);
487   if (src->is_live) {
488     while (G_UNLIKELY (!src->live_running)) {
489       GST_DEBUG ("live source signal waiting");
490       GST_LIVE_SIGNAL (src);
491       GST_DEBUG ("live source waiting for running state");
492       GST_LIVE_WAIT (src);
493       GST_DEBUG ("live source unlocked");
494     }
495     /* FIXME, use another variable to signal stopping so that we don't
496      * have to grab another lock. */
497     GST_OBJECT_LOCK (src->srcpad);
498     if (G_UNLIKELY (GST_PAD_IS_FLUSHING (src->srcpad)))
499       goto flushing;
500     GST_OBJECT_UNLOCK (src->srcpad);
501   }
502   GST_LIVE_UNLOCK (src);
503
504   return GST_FLOW_OK;
505
506   /* ERRORS */
507 flushing:
508   {
509     GST_DEBUG_OBJECT (src, "pad is flushing");
510     GST_OBJECT_UNLOCK (src->srcpad);
511     GST_LIVE_UNLOCK (src);
512     return GST_FLOW_WRONG_STATE;
513   }
514 }
515
516 /**
517  * gst_base_src_set_live:
518  * @src: base source instance
519  * @live: new live-mode
520  *
521  * If the element listens to a live source, @live should
522  * be set to %TRUE. 
523  *
524  * A live source will not produce data in the PAUSED state and
525  * will therefore not be able to participate in the PREROLL phase
526  * of a pipeline. To signal this fact to the application and the 
527  * pipeline, the state change return value of the live source will
528  * be GST_STATE_CHANGE_NO_PREROLL.
529  */
530 void
531 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
532 {
533   GST_LIVE_LOCK (src);
534   src->is_live = live;
535   GST_LIVE_UNLOCK (src);
536 }
537
538 /**
539  * gst_base_src_is_live:
540  * @src: base source instance
541  *
542  * Check if an element is in live mode.
543  *
544  * Returns: %TRUE if element is in live mode.
545  */
546 gboolean
547 gst_base_src_is_live (GstBaseSrc * src)
548 {
549   gboolean result;
550
551   GST_LIVE_LOCK (src);
552   result = src->is_live;
553   GST_LIVE_UNLOCK (src);
554
555   return result;
556 }
557
558 /**
559  * gst_base_src_set_format:
560  * @src: base source instance
561  * @format: the format to use
562  *
563  * Sets the default format of the source. This will be the format used
564  * for sending NEW_SEGMENT events and for performing seeks.
565  *
566  * If a format of GST_FORMAT_BYTES is set, the element will be able to
567  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
568  *
569  * @Since: 0.10.1
570  */
571 void
572 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
573 {
574   gst_segment_init (&src->segment, format);
575 }
576
577 /**
578  * gst_base_src_query_latency:
579  * @src: the source
580  * @live: if the source is live
581  * @min_latency: the min latency of the source
582  * @max_latency: the max latency of the source
583  *
584  * Query the source for the latency parameters. @live will be TRUE when @src is
585  * configured as a live source. @min_latency will be set to the difference
586  * between the running time and the timestamp of the first buffer.
587  * @max_latency is always the undefined value of -1.
588  *
589  * This function is mostly used by subclasses. 
590  *
591  * Returns: TRUE if the query succeeded.
592  *
593  * Since: 0.10.13
594  */
595 gboolean
596 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
597     GstClockTime * min_latency, GstClockTime * max_latency)
598 {
599   GstClockTime min;
600
601   GST_LIVE_LOCK (src);
602   if (live)
603     *live = src->is_live;
604
605   /* if we have a startup latency, report this one, else report 0. Subclasses
606    * are supposed to override the query function if they want something
607    * else. */
608   if (src->priv->latency != -1)
609     min = src->priv->latency;
610   else
611     min = 0;
612
613   if (min_latency)
614     *min_latency = min;
615   if (max_latency)
616     *max_latency = -1;
617
618   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
619       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
620       GST_TIME_ARGS (-1));
621   GST_LIVE_UNLOCK (src);
622
623   return TRUE;
624 }
625
626 /**
627  * gst_base_src_set_do_timestamp:
628  * @src: the source
629  * @timestamp: enable or disable timestamping
630  *
631  * Configure @src to automatically timestamp outgoing buffers based on the
632  * current running_time of the pipeline. This property is mostly useful for live
633  * sources.
634  *
635  * Since: 0.10.15
636  */
637 void
638 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
639 {
640   GST_OBJECT_LOCK (src);
641   src->priv->do_timestamp = timestamp;
642   GST_OBJECT_UNLOCK (src);
643 }
644
645 /**
646  * gst_base_src_get_do_timestamp:
647  * @src: the source
648  *
649  * Query if @src timestamps outgoing buffers based on the current running_time.
650  *
651  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
652  *
653  * Since: 0.10.15
654  */
655 gboolean
656 gst_base_src_get_do_timestamp (GstBaseSrc * src)
657 {
658   gboolean res;
659
660   GST_OBJECT_LOCK (src);
661   res = src->priv->do_timestamp;
662   GST_OBJECT_UNLOCK (src);
663
664   return res;
665 }
666
667 static gboolean
668 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
669 {
670   GstBaseSrcClass *bclass;
671   GstBaseSrc *bsrc;
672   gboolean res = TRUE;
673
674   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
675   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
676
677   if (bclass->set_caps)
678     res = bclass->set_caps (bsrc, caps);
679
680   return res;
681 }
682
683 static GstCaps *
684 gst_base_src_getcaps (GstPad * pad)
685 {
686   GstBaseSrcClass *bclass;
687   GstBaseSrc *bsrc;
688   GstCaps *caps = NULL;
689
690   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
691   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
692   if (bclass->get_caps)
693     caps = bclass->get_caps (bsrc);
694
695   if (caps == NULL) {
696     GstPadTemplate *pad_template;
697
698     pad_template =
699         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
700     if (pad_template != NULL) {
701       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
702     }
703   }
704   return caps;
705 }
706
707 static void
708 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
709 {
710   GstBaseSrcClass *bclass;
711   GstBaseSrc *bsrc;
712
713   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
714   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
715
716   if (bclass->fixate)
717     bclass->fixate (bsrc, caps);
718
719   gst_object_unref (bsrc);
720 }
721
722 static gboolean
723 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
724 {
725   gboolean res;
726
727   switch (GST_QUERY_TYPE (query)) {
728     case GST_QUERY_POSITION:
729     {
730       GstFormat format;
731
732       gst_query_parse_position (query, &format, NULL);
733       switch (format) {
734         case GST_FORMAT_PERCENT:
735         {
736           gint64 percent;
737           gint64 position;
738           gint64 duration;
739
740           position = src->segment.last_stop;
741           duration = src->segment.duration;
742
743           if (position != -1 && duration != -1) {
744             if (position < duration)
745               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
746                   duration);
747             else
748               percent = GST_FORMAT_PERCENT_MAX;
749           } else
750             percent = -1;
751
752           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
753           res = TRUE;
754           break;
755         }
756         default:
757         {
758           gint64 position;
759
760           position = src->segment.last_stop;
761
762           if (position != -1) {
763             /* convert to requested format */
764             res =
765                 gst_pad_query_convert (src->srcpad, src->segment.format,
766                 position, &format, &position);
767           } else
768             res = TRUE;
769
770           gst_query_set_position (query, format, position);
771           break;
772         }
773       }
774       break;
775     }
776     case GST_QUERY_DURATION:
777     {
778       GstFormat format;
779
780       gst_query_parse_duration (query, &format, NULL);
781
782       GST_DEBUG_OBJECT (src, "duration query in format %s",
783           gst_format_get_name (format));
784       switch (format) {
785         case GST_FORMAT_PERCENT:
786           gst_query_set_duration (query, GST_FORMAT_PERCENT,
787               GST_FORMAT_PERCENT_MAX);
788           res = TRUE;
789           break;
790         default:
791         {
792           gint64 duration;
793
794           duration = src->segment.duration;
795
796           if (duration != -1) {
797             /* convert to requested format */
798             res =
799                 gst_pad_query_convert (src->srcpad, src->segment.format,
800                 duration, &format, &duration);
801           } else {
802             res = TRUE;
803           }
804           gst_query_set_duration (query, format, duration);
805           break;
806         }
807       }
808       break;
809     }
810
811     case GST_QUERY_SEEKING:
812     {
813       gst_query_set_seeking (query, src->segment.format,
814           src->seekable, 0, src->segment.duration);
815       res = TRUE;
816       break;
817     }
818     case GST_QUERY_SEGMENT:
819     {
820       gint64 start, stop;
821
822       /* no end segment configured, current duration then */
823       if ((stop = src->segment.stop) == -1)
824         stop = src->segment.duration;
825       start = src->segment.start;
826
827       /* adjust to stream time */
828       if (src->segment.time != -1) {
829         start -= src->segment.time;
830         if (stop != -1)
831           stop -= src->segment.time;
832       }
833       gst_query_set_segment (query, src->segment.rate, src->segment.format,
834           start, stop);
835       res = TRUE;
836       break;
837     }
838
839     case GST_QUERY_FORMATS:
840     {
841       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
842           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
843       res = TRUE;
844       break;
845     }
846     case GST_QUERY_CONVERT:
847     {
848       GstFormat src_fmt, dest_fmt;
849       gint64 src_val, dest_val;
850
851       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
852
853       /* we can only convert between equal formats... */
854       if (src_fmt == dest_fmt) {
855         dest_val = src_val;
856         res = TRUE;
857       } else
858         res = FALSE;
859
860       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
861       break;
862     }
863     case GST_QUERY_LATENCY:
864     {
865       GstClockTime min, max;
866       gboolean live;
867
868       /* Subclasses should override and implement something usefull */
869       res = gst_base_src_query_latency (src, &live, &min, &max);
870
871       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
872           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
873           GST_TIME_ARGS (max));
874
875       gst_query_set_latency (query, live, min, max);
876       break;
877     }
878     case GST_QUERY_JITTER:
879     case GST_QUERY_RATE:
880     default:
881       res = FALSE;
882       break;
883   }
884   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
885       res);
886   return res;
887 }
888
889 static gboolean
890 gst_base_src_query (GstPad * pad, GstQuery * query)
891 {
892   GstBaseSrc *src;
893   GstBaseSrcClass *bclass;
894   gboolean result = FALSE;
895
896   src = GST_BASE_SRC (gst_pad_get_parent (pad));
897
898   bclass = GST_BASE_SRC_GET_CLASS (src);
899
900   if (bclass->query)
901     result = bclass->query (src, query);
902   else
903     result = gst_pad_query_default (pad, query);
904
905   gst_object_unref (src);
906
907   return result;
908 }
909
910 static gboolean
911 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
912 {
913   gboolean res = TRUE;
914
915   /* update our offset if the start/stop position was updated */
916   if (segment->format == GST_FORMAT_BYTES) {
917     segment->last_stop = segment->start;
918     segment->time = segment->start;
919   } else if (segment->start == 0) {
920     /* seek to start, we can implement a default for this. */
921     segment->last_stop = 0;
922     segment->time = 0;
923     res = TRUE;
924   } else
925     res = FALSE;
926
927   return res;
928 }
929
930 static gboolean
931 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
932 {
933   GstBaseSrcClass *bclass;
934   gboolean result = FALSE;
935
936   bclass = GST_BASE_SRC_GET_CLASS (src);
937
938   if (bclass->do_seek)
939     result = bclass->do_seek (src, segment);
940
941   return result;
942 }
943
944 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
945
946 static gboolean
947 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
948     GstSegment * segment)
949 {
950   /* By default, we try one of 2 things:
951    *   - For absolute seek positions, convert the requested position to our 
952    *     configured processing format and place it in the output segment \
953    *   - For relative seek positions, convert our current (input) values to the
954    *     seek format, adjust by the relative seek offset and then convert back to
955    *     the processing format
956    */
957   GstSeekType cur_type, stop_type;
958   gint64 cur, stop;
959   GstSeekFlags flags;
960   GstFormat seek_format, dest_format;
961   gdouble rate;
962   gboolean update;
963   gboolean res = TRUE;
964
965   gst_event_parse_seek (event, &rate, &seek_format, &flags,
966       &cur_type, &cur, &stop_type, &stop);
967   dest_format = segment->format;
968
969   if (seek_format == dest_format) {
970     gst_segment_set_seek (segment, rate, seek_format, flags,
971         cur_type, cur, stop_type, stop, &update);
972     return TRUE;
973   }
974
975   if (cur_type != GST_SEEK_TYPE_NONE) {
976     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
977     res =
978         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
979         &cur);
980     cur_type = GST_SEEK_TYPE_SET;
981   }
982
983   if (res && stop_type != GST_SEEK_TYPE_NONE) {
984     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
985     res =
986         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
987         &stop);
988     stop_type = GST_SEEK_TYPE_SET;
989   }
990
991   /* And finally, configure our output segment in the desired format */
992   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
993       stop_type, stop, &update);
994
995   if (!res)
996     goto no_format;
997
998   return res;
999
1000 no_format:
1001   {
1002     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1003     return FALSE;
1004   }
1005 }
1006
1007 static gboolean
1008 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1009     GstSegment * seeksegment)
1010 {
1011   GstBaseSrcClass *bclass;
1012   gboolean result = FALSE;
1013
1014   bclass = GST_BASE_SRC_GET_CLASS (src);
1015
1016   if (bclass->prepare_seek_segment)
1017     result = bclass->prepare_seek_segment (src, event, seeksegment);
1018
1019   return result;
1020 }
1021
1022 /* this code implements the seeking. It is a good example
1023  * handling all cases.
1024  *
1025  * A seek updates the currently configured segment.start
1026  * and segment.stop values based on the SEEK_TYPE. If the
1027  * segment.start value is updated, a seek to this new position
1028  * should be performed.
1029  *
1030  * The seek can only be executed when we are not currently
1031  * streaming any data, to make sure that this is the case, we
1032  * acquire the STREAM_LOCK which is taken when we are in the
1033  * _loop() function or when a getrange() is called. Normally
1034  * we will not receive a seek if we are operating in pull mode
1035  * though.
1036  *
1037  * When we are in the loop() function, we might be in the middle
1038  * of pushing a buffer, which might block in a sink. To make sure
1039  * that the push gets unblocked we push out a FLUSH_START event.
1040  * Our loop function will get a WRONG_STATE return value from
1041  * the push and will pause, effectively releasing the STREAM_LOCK.
1042  *
1043  * For a non-flushing seek, we pause the task, which might eventually
1044  * release the STREAM_LOCK. We say eventually because when the sink
1045  * blocks on the sample we might wait a very long time until the sink
1046  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1047  * can continue the seek. A non-flushing seek is normally done in a 
1048  * running pipeline to perform seamless playback.
1049  * In the case of a non-flushing seek we need to make sure that the
1050  * data we output after the seek is continuous with the previous data,
1051  * this is because a non-flushing seek does not reset the stream-time
1052  * to 0. We do this by closing the currently running segment, ie. sending
1053  * a new_segment event with the stop position set to the last processed 
1054  * position.
1055  *
1056  * After updating the segment.start/stop values, we prepare for
1057  * streaming again. We push out a FLUSH_STOP to make the peer pad
1058  * accept data again and we start our task again.
1059  *
1060  * A segment seek posts a message on the bus saying that the playback
1061  * of the segment started. We store the segment flag internally because
1062  * when we reach the segment.stop we have to post a segment.done
1063  * instead of EOS when doing a segment seek.
1064  */
1065 /* FIXME (0.11), we have the unlock gboolean here because most current 
1066  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1067  * the streaming thread isn't running, resulting in bogus unlocks later when it 
1068  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1069  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1070  * until 0.11
1071  */
1072 static gboolean
1073 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1074 {
1075   gboolean res = TRUE;
1076   gdouble rate;
1077   GstFormat seek_format, dest_format;
1078   GstSeekFlags flags;
1079   GstSeekType cur_type, stop_type;
1080   gint64 cur, stop;
1081   gboolean flush;
1082   gboolean update;
1083   gboolean relative_seek = FALSE;
1084   gboolean seekseg_configured = FALSE;
1085   GstSegment seeksegment;
1086
1087   GST_DEBUG_OBJECT (src, "doing seek");
1088
1089   dest_format = src->segment.format;
1090
1091   if (event) {
1092     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1093         &cur_type, &cur, &stop_type, &stop);
1094
1095     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1096         SEEK_TYPE_IS_RELATIVE (stop_type);
1097
1098     if (dest_format != seek_format && !relative_seek) {
1099       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1100        * here before taking the stream lock, otherwise we must convert it later,
1101        * once we have the stream lock and can read the current position */
1102       gst_segment_init (&seeksegment, dest_format);
1103
1104       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1105         goto prepare_failed;
1106
1107       seekseg_configured = TRUE;
1108     }
1109
1110     flush = flags & GST_SEEK_FLAG_FLUSH;
1111   } else {
1112     flush = FALSE;
1113   }
1114
1115   /* send flush start */
1116   if (flush)
1117     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
1118   else
1119     gst_pad_pause_task (src->srcpad);
1120
1121   /* unblock streaming thread */
1122   if (unlock)
1123     gst_base_src_unlock (src);
1124
1125   /* grab streaming lock, this should eventually be possible, either
1126    * because the task is paused or our streaming thread stopped 
1127    * because our peer is flushing. */
1128   GST_PAD_STREAM_LOCK (src->srcpad);
1129
1130   if (unlock)
1131     gst_base_src_unlock_stop (src);
1132
1133   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1134    * copy the current segment info into the temp segment that we can actually
1135    * attempt the seek with. We only update the real segment if the seek suceeds. */
1136   if (!seekseg_configured) {
1137     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1138
1139     /* now configure the final seek segment */
1140     if (event) {
1141       if (src->segment.format != seek_format) {
1142         /* OK, here's where we give the subclass a chance to convert the relative
1143          * seek into an absolute one in the processing format. We set up any
1144          * absolute seek above, before taking the stream lock. */
1145         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1146           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1147               "Aborting seek");
1148           res = FALSE;
1149         }
1150       } else {
1151         /* The seek format matches our processing format, no need to ask the
1152          * the subclass to configure the segment. */
1153         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1154             cur_type, cur, stop_type, stop, &update);
1155       }
1156     }
1157     /* Else, no seek event passed, so we're just (re)starting the 
1158        current segment. */
1159   }
1160
1161   if (res) {
1162     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1163         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1164         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1165
1166     /* do the seek, segment.last_stop contains the new position. */
1167     res = gst_base_src_do_seek (src, &seeksegment);
1168   }
1169
1170   /* and prepare to continue streaming */
1171   if (flush) {
1172     /* send flush stop, peer will accept data and events again. We
1173      * are not yet providing data as we still have the STREAM_LOCK. */
1174     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
1175   } else if (res && src->data.ABI.running) {
1176     /* we are running the current segment and doing a non-flushing seek, 
1177      * close the segment first based on the last_stop. */
1178     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1179         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1180
1181     /* queue the segment for sending in the stream thread */
1182     if (src->priv->close_segment)
1183       gst_event_unref (src->priv->close_segment);
1184     src->priv->close_segment =
1185         gst_event_new_new_segment_full (TRUE,
1186         src->segment.rate, src->segment.applied_rate, src->segment.format,
1187         src->segment.start, src->segment.last_stop, src->segment.time);
1188   }
1189
1190   /* The subclass must have converted the segment to the processing format 
1191    * by now */
1192   if (res && seeksegment.format != dest_format) {
1193     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1194         "in the correct format. Aborting seek.");
1195     res = FALSE;
1196   }
1197
1198   /* if successfull seek, we update our real segment and push
1199    * out the new segment. */
1200   if (res) {
1201     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1202
1203     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1204       gst_element_post_message (GST_ELEMENT (src),
1205           gst_message_new_segment_start (GST_OBJECT (src),
1206               src->segment.format, src->segment.last_stop));
1207     }
1208
1209     /* for deriving a stop position for the playback segment form the seek
1210      * segment, we must take the duration when the stop is not set */
1211     if ((stop = src->segment.stop) == -1)
1212       stop = src->segment.duration;
1213
1214     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1215         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1216
1217     /* now replace the old segment so that we send it in the stream thread the
1218      * next time it is scheduled. */
1219     if (src->priv->start_segment)
1220       gst_event_unref (src->priv->start_segment);
1221     src->priv->start_segment =
1222         gst_event_new_new_segment_full (FALSE,
1223         src->segment.rate, src->segment.applied_rate, src->segment.format,
1224         src->segment.last_stop, stop, src->segment.time);
1225   }
1226
1227   src->priv->discont = TRUE;
1228   src->data.ABI.running = TRUE;
1229   /* and restart the task in case it got paused explicitely or by
1230    * the FLUSH_START event we pushed out. */
1231   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1232       src->srcpad);
1233
1234   /* and release the lock again so we can continue streaming */
1235   GST_PAD_STREAM_UNLOCK (src->srcpad);
1236
1237   return res;
1238
1239   /* ERROR */
1240 prepare_failed:
1241   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1242       "Aborting seek");
1243   return FALSE;
1244 }
1245
1246 static const GstQueryType *
1247 gst_base_src_get_query_types (GstElement * element)
1248 {
1249   static const GstQueryType query_types[] = {
1250     GST_QUERY_DURATION,
1251     GST_QUERY_POSITION,
1252     GST_QUERY_SEEKING,
1253     GST_QUERY_SEGMENT,
1254     GST_QUERY_FORMATS,
1255     GST_QUERY_LATENCY,
1256     GST_QUERY_JITTER,
1257     GST_QUERY_RATE,
1258     GST_QUERY_CONVERT,
1259     0
1260   };
1261
1262   return query_types;
1263 }
1264
1265 /* all events send to this element directly
1266  */
1267 static gboolean
1268 gst_base_src_send_event (GstElement * element, GstEvent * event)
1269 {
1270   GstBaseSrc *src;
1271   gboolean result = FALSE;
1272
1273   src = GST_BASE_SRC (element);
1274
1275   switch (GST_EVENT_TYPE (event)) {
1276     case GST_EVENT_FLUSH_START:
1277     case GST_EVENT_FLUSH_STOP:
1278       /* sending random flushes downstream can break stuff,
1279        * especially sync since all segment info will get flushed */
1280       break;
1281     case GST_EVENT_EOS:
1282       /* FIXME, queue EOS and make sure the task or pull function 
1283        * perform the EOS actions. */
1284       break;
1285     case GST_EVENT_NEWSEGMENT:
1286       /* sending random NEWSEGMENT downstream can break sync. */
1287       break;
1288     case GST_EVENT_TAG:
1289     case GST_EVENT_BUFFERSIZE:
1290       break;
1291     case GST_EVENT_QOS:
1292       break;
1293     case GST_EVENT_SEEK:
1294     {
1295       gboolean started;
1296
1297       GST_OBJECT_LOCK (src->srcpad);
1298       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1299         goto wrong_mode;
1300       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1301       GST_OBJECT_UNLOCK (src->srcpad);
1302
1303       if (started) {
1304         /* when we are running in push mode, we can execute the
1305          * seek right now, we need to unlock. */
1306         result = gst_base_src_perform_seek (src, event, TRUE);
1307       } else {
1308         GstEvent **event_p;
1309
1310         /* else we store the event and execute the seek when we
1311          * get activated */
1312         GST_OBJECT_LOCK (src);
1313         event_p = &src->data.ABI.pending_seek;
1314         gst_event_replace ((GstEvent **) event_p, event);
1315         GST_OBJECT_UNLOCK (src);
1316         /* assume the seek will work */
1317         result = TRUE;
1318       }
1319       break;
1320     }
1321     case GST_EVENT_NAVIGATION:
1322       break;
1323     default:
1324       break;
1325   }
1326 done:
1327   gst_event_unref (event);
1328
1329   return result;
1330
1331   /* ERRORS */
1332 wrong_mode:
1333   {
1334     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1335     GST_OBJECT_UNLOCK (src->srcpad);
1336     result = FALSE;
1337     goto done;
1338   }
1339 }
1340
1341 static gboolean
1342 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1343 {
1344   gboolean result;
1345
1346   switch (GST_EVENT_TYPE (event)) {
1347     case GST_EVENT_SEEK:
1348       /* is normally called when in push mode */
1349       if (!src->seekable)
1350         goto not_seekable;
1351
1352       result = gst_base_src_perform_seek (src, event, TRUE);
1353       break;
1354     case GST_EVENT_FLUSH_START:
1355       /* cancel any blocking getrange, is normally called
1356        * when in pull mode. */
1357       result = gst_base_src_unlock (src);
1358       break;
1359     case GST_EVENT_FLUSH_STOP:
1360       result = gst_base_src_unlock_stop (src);
1361       break;
1362     default:
1363       result = TRUE;
1364       break;
1365   }
1366   return result;
1367
1368   /* ERRORS */
1369 not_seekable:
1370   {
1371     GST_DEBUG_OBJECT (src, "is not seekable");
1372     return FALSE;
1373   }
1374 }
1375
1376 static gboolean
1377 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1378 {
1379   GstBaseSrc *src;
1380   GstBaseSrcClass *bclass;
1381   gboolean result = FALSE;
1382
1383   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1384   bclass = GST_BASE_SRC_GET_CLASS (src);
1385
1386   if (bclass->event) {
1387     if (!(result = bclass->event (src, event)))
1388       goto subclass_failed;
1389   }
1390
1391 done:
1392   gst_event_unref (event);
1393   gst_object_unref (src);
1394
1395   return result;
1396
1397   /* ERRORS */
1398 subclass_failed:
1399   {
1400     GST_DEBUG_OBJECT (src, "subclass refused event");
1401     goto done;
1402   }
1403 }
1404
1405 static void
1406 gst_base_src_set_property (GObject * object, guint prop_id,
1407     const GValue * value, GParamSpec * pspec)
1408 {
1409   GstBaseSrc *src;
1410
1411   src = GST_BASE_SRC (object);
1412
1413   switch (prop_id) {
1414     case PROP_BLOCKSIZE:
1415       src->blocksize = g_value_get_ulong (value);
1416       break;
1417     case PROP_NUM_BUFFERS:
1418       src->num_buffers = g_value_get_int (value);
1419       break;
1420     case PROP_TYPEFIND:
1421       src->data.ABI.typefind = g_value_get_boolean (value);
1422       break;
1423     case PROP_DO_TIMESTAMP:
1424       src->priv->do_timestamp = g_value_get_boolean (value);
1425       break;
1426     default:
1427       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1428       break;
1429   }
1430 }
1431
1432 static void
1433 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1434     GParamSpec * pspec)
1435 {
1436   GstBaseSrc *src;
1437
1438   src = GST_BASE_SRC (object);
1439
1440   switch (prop_id) {
1441     case PROP_BLOCKSIZE:
1442       g_value_set_ulong (value, src->blocksize);
1443       break;
1444     case PROP_NUM_BUFFERS:
1445       g_value_set_int (value, src->num_buffers);
1446       break;
1447     case PROP_TYPEFIND:
1448       g_value_set_boolean (value, src->data.ABI.typefind);
1449       break;
1450     case PROP_DO_TIMESTAMP:
1451       g_value_set_boolean (value, src->priv->do_timestamp);
1452       break;
1453     default:
1454       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1455       break;
1456   }
1457 }
1458
1459 /* with STREAM_LOCK and LOCK */
1460 static GstClockReturn
1461 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1462 {
1463   GstClockReturn ret;
1464   GstClockID id;
1465
1466   id = gst_clock_new_single_shot_id (clock, time);
1467
1468   basesrc->clock_id = id;
1469   /* release the object lock while waiting */
1470   GST_OBJECT_UNLOCK (basesrc);
1471
1472   ret = gst_clock_id_wait (id, NULL);
1473
1474   GST_OBJECT_LOCK (basesrc);
1475   gst_clock_id_unref (id);
1476   basesrc->clock_id = NULL;
1477
1478   return ret;
1479 }
1480
1481 /* perform synchronisation on a buffer. 
1482  * with STREAM_LOCK.
1483  */
1484 static GstClockReturn
1485 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1486 {
1487   GstClockReturn result;
1488   GstClockTime start, end;
1489   GstBaseSrcClass *bclass;
1490   GstClockTime base_time;
1491   GstClock *clock;
1492   GstClockTime now = -1, timestamp;
1493   gboolean do_timestamp, first, pseudo_live;
1494
1495   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1496
1497   start = end = -1;
1498   if (bclass->get_times)
1499     bclass->get_times (basesrc, buffer, &start, &end);
1500
1501   /* get buffer timestamp */
1502   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1503
1504   /* grab the lock to prepare for clocking and calculate the startup 
1505    * latency. */
1506   GST_OBJECT_LOCK (basesrc);
1507
1508   /* if we are asked to sync against the clock we are a pseudo live element */
1509   pseudo_live = (start != -1 && basesrc->is_live);
1510   /* check for the first buffer */
1511   first = (basesrc->priv->latency == -1);
1512
1513   if (timestamp != -1 && pseudo_live) {
1514     GstClockTime latency;
1515
1516     /* we have a timestamp and a sync time, latency is the diff */
1517     if (timestamp <= start)
1518       latency = start - timestamp;
1519     else
1520       latency = 0;
1521
1522     if (first) {
1523       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1524           GST_TIME_ARGS (latency));
1525       /* first time we calculate latency, just configure */
1526       basesrc->priv->latency = latency;
1527     } else {
1528       if (basesrc->priv->latency != latency) {
1529         /* we have a new latency, FIXME post latency message */
1530         basesrc->priv->latency = latency;
1531         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1532             GST_TIME_ARGS (latency));
1533       }
1534     }
1535   } else if (first) {
1536     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1537         basesrc->is_live, start != -1);
1538     basesrc->priv->latency = 0;
1539   }
1540
1541   /* get clock, if no clock, we can't sync or do timestamps */
1542   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1543     goto no_clock;
1544
1545   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1546
1547   do_timestamp = basesrc->priv->do_timestamp;
1548
1549   /* first buffer, calculate the timestamp offset */
1550   if (first) {
1551     GstClockTime running_time;
1552
1553     now = gst_clock_get_time (clock);
1554     running_time = now - base_time;
1555
1556     GST_LOG_OBJECT (basesrc,
1557         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1558         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1559         GST_TIME_ARGS (running_time));
1560
1561     if (pseudo_live && timestamp != -1) {
1562       /* live source and we need to sync, add startup latency to all timestamps
1563        * to get the real running_time. Live sources should always timestamp
1564        * according to the current running time. */
1565       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1566
1567       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1568           GST_TIME_ARGS (basesrc->priv->ts_offset));
1569     } else {
1570       basesrc->priv->ts_offset = 0;
1571       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1572     }
1573
1574     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1575       if (do_timestamp)
1576         timestamp = running_time;
1577       else
1578         timestamp = 0;
1579
1580       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1581
1582       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1583           GST_TIME_ARGS (timestamp));
1584     }
1585
1586     /* add the timestamp offset we need for sync */
1587     timestamp += basesrc->priv->ts_offset;
1588   } else {
1589     /* not the first buffer, the timestamp is the diff between the clock and
1590      * base_time */
1591     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1592       now = gst_clock_get_time (clock);
1593
1594       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1595     }
1596   }
1597
1598   /* if we don't have a buffer timestamp, we don't sync */
1599   if (!GST_CLOCK_TIME_IS_VALID (start))
1600     goto no_sync;
1601
1602   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1603     /* for pseudo live sources, add our ts_offset to the timestamp */
1604     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1605     start += basesrc->priv->ts_offset;
1606   }
1607
1608   GST_LOG_OBJECT (basesrc,
1609       "waiting for clock, base time %" GST_TIME_FORMAT
1610       ", stream_start %" GST_TIME_FORMAT,
1611       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1612
1613   result = gst_base_src_wait (basesrc, clock, start + base_time);
1614   GST_OBJECT_UNLOCK (basesrc);
1615
1616   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1617
1618   return result;
1619
1620   /* special cases */
1621 no_clock:
1622   {
1623     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1624     GST_OBJECT_UNLOCK (basesrc);
1625     return GST_CLOCK_OK;
1626   }
1627 no_sync:
1628   {
1629     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1630     GST_OBJECT_UNLOCK (basesrc);
1631     return GST_CLOCK_OK;
1632   }
1633 }
1634
1635 static gboolean
1636 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1637 {
1638   guint64 size, maxsize;
1639   GstBaseSrcClass *bclass;
1640
1641   bclass = GST_BASE_SRC_GET_CLASS (src);
1642
1643   /* only operate if we are working with bytes */
1644   if (src->segment.format != GST_FORMAT_BYTES)
1645     return TRUE;
1646
1647   /* get total file size */
1648   size = (guint64) src->segment.duration;
1649
1650   /* the max amount of bytes to read is the total size or
1651    * up to the segment.stop if present. */
1652   if (src->segment.stop != -1)
1653     maxsize = MIN (size, src->segment.stop);
1654   else
1655     maxsize = size;
1656
1657   GST_DEBUG_OBJECT (src,
1658       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1659       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1660       *length, size, src->segment.stop, maxsize);
1661
1662   /* check size if we have one */
1663   if (maxsize != -1) {
1664     /* if we run past the end, check if the file became bigger and 
1665      * retry. */
1666     if (G_UNLIKELY (offset + *length >= maxsize)) {
1667       /* see if length of the file changed */
1668       if (bclass->get_size)
1669         if (!bclass->get_size (src, &size))
1670           size = -1;
1671
1672       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1673
1674       /* make sure we don't exceed the configured segment stop
1675        * if it was set */
1676       if (src->segment.stop != -1)
1677         maxsize = MIN (size, src->segment.stop);
1678       else
1679         maxsize = size;
1680
1681       /* if we are at or past the end, EOS */
1682       if (G_UNLIKELY (offset >= maxsize))
1683         goto unexpected_length;
1684
1685       /* else we can clip to the end */
1686       if (G_UNLIKELY (offset + *length >= maxsize))
1687         *length = maxsize - offset;
1688
1689     }
1690   }
1691
1692   /* keep track of current position. segment is in bytes, we checked 
1693    * that above. */
1694   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1695
1696   return TRUE;
1697
1698   /* ERRORS */
1699 unexpected_length:
1700   {
1701     return FALSE;
1702   }
1703 }
1704
1705 static GstFlowReturn
1706 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1707     GstBuffer ** buf)
1708 {
1709   GstFlowReturn ret;
1710   GstBaseSrcClass *bclass;
1711   GstClockReturn status;
1712
1713   bclass = GST_BASE_SRC_GET_CLASS (src);
1714
1715   ret = gst_base_src_wait_playing (src);
1716   if (ret != GST_FLOW_OK)
1717     goto stopped;
1718
1719   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
1720     goto not_started;
1721
1722   if (G_UNLIKELY (!bclass->create))
1723     goto no_function;
1724
1725   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
1726     goto unexpected_length;
1727
1728   /* normally we don't count buffers */
1729   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
1730     if (src->num_buffers_left == 0)
1731       goto reached_num_buffers;
1732     else
1733       src->num_buffers_left--;
1734   }
1735
1736   GST_DEBUG_OBJECT (src,
1737       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
1738       G_GINT64_FORMAT, offset, length, src->segment.time);
1739
1740   ret = bclass->create (src, offset, length, buf);
1741   if (G_UNLIKELY (ret != GST_FLOW_OK))
1742     goto done;
1743
1744   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
1745   if (offset == 0 && src->segment.time == 0
1746       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1747     GST_BUFFER_TIMESTAMP (*buf) = 0;
1748
1749   /* now sync before pushing the buffer */
1750   status = gst_base_src_do_sync (src, *buf);
1751   switch (status) {
1752     case GST_CLOCK_EARLY:
1753       /* the buffer is too late. We currently don't drop the buffer. */
1754       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1755       break;
1756     case GST_CLOCK_OK:
1757       /* buffer synchronised properly */
1758       GST_DEBUG_OBJECT (src, "buffer ok");
1759       break;
1760     case GST_CLOCK_UNSCHEDULED:
1761       /* this case is triggered when we were waiting for the clock and
1762        * it got unlocked because we did a state change. We return 
1763        * WRONG_STATE in this case to stop the dataflow also get rid of the
1764        * produced buffer. */
1765       GST_DEBUG_OBJECT (src,
1766           "clock was unscheduled (%d), returning WRONG_STATE", status);
1767       gst_buffer_unref (*buf);
1768       *buf = NULL;
1769       ret = GST_FLOW_WRONG_STATE;
1770       break;
1771     default:
1772       /* all other result values are unexpected and errors */
1773       GST_ELEMENT_ERROR (src, CORE, CLOCK,
1774           (_("Internal clock error.")),
1775           ("clock returned unexpected return value %d", status));
1776       gst_buffer_unref (*buf);
1777       *buf = NULL;
1778       ret = GST_FLOW_ERROR;
1779       break;
1780   }
1781 done:
1782   return ret;
1783
1784   /* ERROR */
1785 stopped:
1786   {
1787     GST_DEBUG_OBJECT (src, "wait_playing returned %d", ret);
1788     return ret;
1789   }
1790 not_started:
1791   {
1792     GST_DEBUG_OBJECT (src, "getrange but not started");
1793     return GST_FLOW_WRONG_STATE;
1794   }
1795 no_function:
1796   {
1797     GST_DEBUG_OBJECT (src, "no create function");
1798     return GST_FLOW_ERROR;
1799   }
1800 unexpected_length:
1801   {
1802     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1803         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1804     return GST_FLOW_UNEXPECTED;
1805   }
1806 reached_num_buffers:
1807   {
1808     GST_DEBUG_OBJECT (src, "sent all buffers");
1809     return GST_FLOW_UNEXPECTED;
1810   }
1811 }
1812
1813 static GstFlowReturn
1814 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1815     GstBuffer ** buf)
1816 {
1817   GstBaseSrc *src;
1818   GstFlowReturn res;
1819
1820   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1821
1822   res = gst_base_src_get_range (src, offset, length, buf);
1823
1824   gst_object_unref (src);
1825
1826   return res;
1827 }
1828
1829 static gboolean
1830 gst_base_src_default_check_get_range (GstBaseSrc * src)
1831 {
1832   gboolean res;
1833
1834   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1835     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1836     if (G_LIKELY (gst_base_src_start (src)))
1837       gst_base_src_stop (src);
1838   }
1839
1840   /* we can operate in getrange mode if the native format is bytes
1841    * and we are seekable, this condition is set in the random_access
1842    * flag and is set in the _start() method. */
1843   res = src->random_access;
1844
1845   return res;
1846 }
1847
1848 static gboolean
1849 gst_base_src_check_get_range (GstBaseSrc * src)
1850 {
1851   GstBaseSrcClass *bclass;
1852   gboolean res;
1853
1854   bclass = GST_BASE_SRC_GET_CLASS (src);
1855
1856   if (bclass->check_get_range == NULL)
1857     goto no_function;
1858
1859   res = bclass->check_get_range (src);
1860   GST_LOG_OBJECT (src, "%s() returned %d",
1861       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1862
1863   return res;
1864
1865   /* ERRORS */
1866 no_function:
1867   {
1868     GST_WARNING_OBJECT (src, "no check_get_range function set");
1869     return FALSE;
1870   }
1871 }
1872
1873 static gboolean
1874 gst_base_src_pad_check_get_range (GstPad * pad)
1875 {
1876   GstBaseSrc *src;
1877   gboolean res;
1878
1879   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1880
1881   res = gst_base_src_check_get_range (src);
1882
1883   gst_object_unref (src);
1884
1885   return res;
1886 }
1887
1888 static void
1889 gst_base_src_loop (GstPad * pad)
1890 {
1891   GstBaseSrc *src;
1892   GstBuffer *buf = NULL;
1893   GstFlowReturn ret;
1894   gint64 position;
1895   gboolean eos;
1896
1897   eos = FALSE;
1898
1899   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1900
1901   src->priv->last_sent_eos = FALSE;
1902
1903   /* if we operate in bytes, we can calculate an offset */
1904   if (src->segment.format == GST_FORMAT_BYTES)
1905     position = src->segment.last_stop;
1906   else
1907     position = -1;
1908
1909   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1910   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1911     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
1912         gst_flow_get_name (ret));
1913     goto pause;
1914   }
1915   /* this should not happen */
1916   if (G_UNLIKELY (buf == NULL))
1917     goto null_buffer;
1918
1919   /* push events to close/start our segment before we push the buffer. */
1920   if (src->priv->close_segment) {
1921     gst_pad_push_event (pad, src->priv->close_segment);
1922     src->priv->close_segment = NULL;
1923   }
1924   if (src->priv->start_segment) {
1925     gst_pad_push_event (pad, src->priv->start_segment);
1926     src->priv->start_segment = NULL;
1927   }
1928
1929   /* figure out the new position */
1930   switch (src->segment.format) {
1931     case GST_FORMAT_BYTES:
1932       position += GST_BUFFER_SIZE (buf);
1933       break;
1934     case GST_FORMAT_TIME:
1935     {
1936       GstClockTime start, duration;
1937
1938       start = GST_BUFFER_TIMESTAMP (buf);
1939       duration = GST_BUFFER_DURATION (buf);
1940
1941       if (GST_CLOCK_TIME_IS_VALID (start))
1942         position = start;
1943       else
1944         position = src->segment.last_stop;
1945
1946       if (GST_CLOCK_TIME_IS_VALID (duration))
1947         position += duration;
1948       break;
1949     }
1950     case GST_FORMAT_DEFAULT:
1951       position = GST_BUFFER_OFFSET_END (buf);
1952       break;
1953     default:
1954       position = -1;
1955       break;
1956   }
1957   if (position != -1) {
1958     if (src->segment.stop != -1) {
1959       if (position >= src->segment.stop) {
1960         eos = TRUE;
1961         position = src->segment.stop;
1962       }
1963     }
1964     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1965   }
1966
1967   if (G_UNLIKELY (src->priv->discont)) {
1968     buf = gst_buffer_make_metadata_writable (buf);
1969     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1970     src->priv->discont = FALSE;
1971   }
1972
1973   ret = gst_pad_push (pad, buf);
1974   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1975     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
1976         gst_flow_get_name (ret));
1977     goto pause;
1978   }
1979
1980   if (eos) {
1981     GST_INFO_OBJECT (src, "pausing after EOS");
1982     ret = GST_FLOW_UNEXPECTED;
1983     goto pause;
1984   }
1985
1986 done:
1987   gst_object_unref (src);
1988   return;
1989
1990   /* special cases */
1991 pause:
1992   {
1993     const gchar *reason = gst_flow_get_name (ret);
1994
1995     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1996     src->data.ABI.running = FALSE;
1997     gst_pad_pause_task (pad);
1998     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1999       if (ret == GST_FLOW_UNEXPECTED) {
2000         /* perform EOS logic */
2001         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
2002           gst_element_post_message (GST_ELEMENT_CAST (src),
2003               gst_message_new_segment_done (GST_OBJECT_CAST (src),
2004                   src->segment.format, src->segment.last_stop));
2005         } else {
2006           gst_pad_push_event (pad, gst_event_new_eos ());
2007           src->priv->last_sent_eos = TRUE;
2008         }
2009       } else {
2010         /* for fatal errors we post an error message, post the error
2011          * first so the app knows about the error first. */
2012         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2013             (_("Internal data flow error.")),
2014             ("streaming task paused, reason %s (%d)", reason, ret));
2015         gst_pad_push_event (pad, gst_event_new_eos ());
2016         src->priv->last_sent_eos = TRUE;
2017       }
2018     }
2019     goto done;
2020   }
2021 null_buffer:
2022   {
2023     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2024         (_("Internal data flow error.")), ("element returned NULL buffer"));
2025     /* we finished the segment on error */
2026     src->data.ABI.running = FALSE;
2027     gst_pad_pause_task (pad);
2028     gst_pad_push_event (pad, gst_event_new_eos ());
2029     src->priv->last_sent_eos = TRUE;
2030     goto done;
2031   }
2032 }
2033
2034 /* this will always be called between start() and stop(). So you can rely on
2035  * resources allocated by start() and freed from stop(). This needs to be added
2036  * to the docs at some point. */
2037 static gboolean
2038 gst_base_src_unlock (GstBaseSrc * basesrc)
2039 {
2040   GstBaseSrcClass *bclass;
2041   gboolean result = TRUE;
2042
2043   GST_DEBUG ("unlock");
2044   /* unblock whatever the subclass is doing */
2045   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2046   if (bclass->unlock)
2047     result = bclass->unlock (basesrc);
2048
2049   GST_DEBUG ("unschedule clock");
2050   /* and unblock the clock as well, if any */
2051   GST_OBJECT_LOCK (basesrc);
2052   if (basesrc->clock_id) {
2053     gst_clock_id_unschedule (basesrc->clock_id);
2054   }
2055   GST_OBJECT_UNLOCK (basesrc);
2056
2057   GST_DEBUG ("unlock done");
2058
2059   return result;
2060 }
2061
2062 /* this will always be called between start() and stop(). So you can rely on
2063  * resources allocated by start() and freed from stop(). This needs to be added
2064  * to the docs at some point. */
2065 static gboolean
2066 gst_base_src_unlock_stop (GstBaseSrc * basesrc)
2067 {
2068   GstBaseSrcClass *bclass;
2069   gboolean result = TRUE;
2070
2071   GST_DEBUG_OBJECT (basesrc, "unlock stop");
2072
2073   /* Finish a previous unblock request, allowing subclasses to flush command
2074    * queues or whatever they need to do */
2075   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2076   if (bclass->unlock_stop)
2077     result = bclass->unlock_stop (basesrc);
2078
2079   GST_DEBUG_OBJECT (basesrc, "unlock stop done");
2080
2081   return result;
2082 }
2083
2084 /* default negotiation code. 
2085  *
2086  * Take intersection between src and sink pads, take first
2087  * caps and fixate. 
2088  */
2089 static gboolean
2090 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2091 {
2092   GstCaps *thiscaps;
2093   GstCaps *caps = NULL;
2094   GstCaps *peercaps = NULL;
2095   gboolean result = FALSE;
2096
2097   /* first see what is possible on our source pad */
2098   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
2099   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2100   /* nothing or anything is allowed, we're done */
2101   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2102     goto no_nego_needed;
2103
2104   /* get the peer caps */
2105   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
2106   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2107   if (peercaps) {
2108     GstCaps *icaps;
2109
2110     /* get intersection */
2111     icaps = gst_caps_intersect (thiscaps, peercaps);
2112     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2113     gst_caps_unref (thiscaps);
2114     gst_caps_unref (peercaps);
2115     if (icaps) {
2116       /* take first (and best, since they are sorted) possibility */
2117       caps = gst_caps_copy_nth (icaps, 0);
2118       gst_caps_unref (icaps);
2119     }
2120   } else {
2121     /* no peer, work with our own caps then */
2122     caps = thiscaps;
2123   }
2124   if (caps) {
2125     caps = gst_caps_make_writable (caps);
2126     gst_caps_truncate (caps);
2127
2128     /* now fixate */
2129     if (!gst_caps_is_empty (caps)) {
2130       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2131       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2132
2133       if (gst_caps_is_any (caps)) {
2134         /* hmm, still anything, so element can do anything and
2135          * nego is not needed */
2136         result = TRUE;
2137       } else if (gst_caps_is_fixed (caps)) {
2138         /* yay, fixed caps, use those then */
2139         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2140         result = TRUE;
2141       }
2142     }
2143     gst_caps_unref (caps);
2144   }
2145   return result;
2146
2147 no_nego_needed:
2148   {
2149     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2150     if (thiscaps)
2151       gst_caps_unref (thiscaps);
2152     return TRUE;
2153   }
2154 }
2155
2156 static gboolean
2157 gst_base_src_negotiate (GstBaseSrc * basesrc)
2158 {
2159   GstBaseSrcClass *bclass;
2160   gboolean result = TRUE;
2161
2162   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2163
2164   if (bclass->negotiate)
2165     result = bclass->negotiate (basesrc);
2166
2167   return result;
2168 }
2169
2170 static gboolean
2171 gst_base_src_start (GstBaseSrc * basesrc)
2172 {
2173   GstBaseSrcClass *bclass;
2174   gboolean result;
2175   guint64 size;
2176
2177   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2178     return TRUE;
2179
2180   GST_DEBUG_OBJECT (basesrc, "starting source");
2181
2182   basesrc->num_buffers_left = basesrc->num_buffers;
2183
2184   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2185   basesrc->data.ABI.running = FALSE;
2186
2187   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2188   if (bclass->start)
2189     result = bclass->start (basesrc);
2190   else
2191     result = TRUE;
2192
2193   if (!result)
2194     goto could_not_start;
2195
2196   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2197
2198   /* figure out the size */
2199   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2200     if (bclass->get_size) {
2201       if (!(result = bclass->get_size (basesrc, &size)))
2202         size = -1;
2203     } else {
2204       result = FALSE;
2205       size = -1;
2206     }
2207     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2208     /* only update the size when operating in bytes, subclass is supposed
2209      * to set duration in the start method for other formats */
2210     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2211   } else {
2212     size = -1;
2213   }
2214
2215   GST_DEBUG_OBJECT (basesrc,
2216       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2217       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2218       basesrc->segment.duration);
2219
2220   /* check if we can seek */
2221   if (bclass->is_seekable)
2222     basesrc->seekable = bclass->is_seekable (basesrc);
2223   else
2224     basesrc->seekable = FALSE;
2225
2226   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
2227
2228   /* update for random access flag */
2229   basesrc->random_access = basesrc->seekable &&
2230       basesrc->segment.format == GST_FORMAT_BYTES;
2231
2232   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2233
2234   /* run typefind if we are random_access and the typefinding is enabled. */
2235   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2236     GstCaps *caps;
2237
2238     caps = gst_type_find_helper (basesrc->srcpad, size);
2239     gst_pad_set_caps (basesrc->srcpad, caps);
2240     gst_caps_unref (caps);
2241   } else {
2242     /* use class or default negotiate function */
2243     if (!gst_base_src_negotiate (basesrc))
2244       goto could_not_negotiate;
2245   }
2246
2247   return TRUE;
2248
2249   /* ERROR */
2250 could_not_start:
2251   {
2252     GST_DEBUG_OBJECT (basesrc, "could not start");
2253     /* subclass is supposed to post a message. We don't have to call _stop. */
2254     return FALSE;
2255   }
2256 could_not_negotiate:
2257   {
2258     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2259     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2260         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2261     /* we must call stop */
2262     gst_base_src_stop (basesrc);
2263     return FALSE;
2264   }
2265 }
2266
2267 static gboolean
2268 gst_base_src_stop (GstBaseSrc * basesrc)
2269 {
2270   GstBaseSrcClass *bclass;
2271   gboolean result = TRUE;
2272
2273   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2274     return TRUE;
2275
2276   GST_DEBUG_OBJECT (basesrc, "stopping source");
2277
2278   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2279   if (bclass->stop)
2280     result = bclass->stop (basesrc);
2281
2282   if (result)
2283     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2284
2285   return result;
2286 }
2287
2288 static gboolean
2289 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
2290 {
2291   gboolean result;
2292
2293   GST_LIVE_LOCK (basesrc);
2294   basesrc->live_running = TRUE;
2295   GST_LIVE_SIGNAL (basesrc);
2296   GST_LIVE_UNLOCK (basesrc);
2297
2298   /* step 1, unblock clock sync (if any) */
2299   result = gst_base_src_unlock (basesrc);
2300
2301   /* step 2, make sure streaming finishes */
2302   result &= gst_pad_stop_task (pad);
2303
2304   /* step 3, clear the unblock condition */
2305   result &= gst_base_src_unlock_stop (basesrc);
2306
2307   return result;
2308 }
2309
2310 static gboolean
2311 gst_base_src_activate_push (GstPad * pad, gboolean active)
2312 {
2313   GstBaseSrc *basesrc;
2314   GstEvent *event;
2315
2316   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2317
2318   /* prepare subclass first */
2319   if (active) {
2320     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2321
2322     if (G_UNLIKELY (!basesrc->can_activate_push))
2323       goto no_push_activation;
2324
2325     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2326       goto error_start;
2327
2328     basesrc->priv->last_sent_eos = FALSE;
2329
2330     /* do initial seek, which will start the task */
2331     GST_OBJECT_LOCK (basesrc);
2332     event = basesrc->data.ABI.pending_seek;
2333     basesrc->data.ABI.pending_seek = NULL;
2334     GST_OBJECT_UNLOCK (basesrc);
2335
2336     /* no need to unlock anything, the task is certainly
2337      * not running here. The perform seek code will start the task when
2338      * finished. */
2339     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2340       goto seek_failed;
2341
2342     if (event)
2343       gst_event_unref (event);
2344   } else {
2345     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2346     /* call the unlock function and stop the task */
2347     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2348       goto deactivate_failed;
2349
2350     /* now we can stop the source */
2351     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2352       goto error_stop;
2353   }
2354   return TRUE;
2355
2356   /* ERRORS */
2357 no_push_activation:
2358   {
2359     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2360     return FALSE;
2361   }
2362 error_start:
2363   {
2364     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2365     return FALSE;
2366   }
2367 seek_failed:
2368   {
2369     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2370     gst_base_src_stop (basesrc);
2371     if (event)
2372       gst_event_unref (event);
2373     return FALSE;
2374   }
2375 deactivate_failed:
2376   {
2377     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in push mode");
2378     return FALSE;
2379   }
2380 error_stop:
2381   {
2382     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2383     return FALSE;
2384   }
2385 }
2386
2387 static gboolean
2388 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2389 {
2390   GstBaseSrc *basesrc;
2391
2392   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2393
2394   /* prepare subclass first */
2395   if (active) {
2396     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2397     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2398       goto error_start;
2399
2400     /* if not random_access, we cannot operate in pull mode for now */
2401     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2402       goto no_get_range;
2403   } else {
2404     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2405     /* call the unlock function. We have no task to stop. */
2406     if (G_UNLIKELY (!gst_base_src_deactivate (basesrc, pad)))
2407       goto deactivate_failed;
2408
2409     /* don't send EOS when going from PAUSED => READY when in pull mode */
2410     basesrc->priv->last_sent_eos = TRUE;
2411
2412     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2413       goto error_stop;
2414   }
2415   return TRUE;
2416
2417   /* ERRORS */
2418 error_start:
2419   {
2420     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2421     return FALSE;
2422   }
2423 no_get_range:
2424   {
2425     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2426     gst_base_src_stop (basesrc);
2427     return FALSE;
2428   }
2429 deactivate_failed:
2430   {
2431     GST_ERROR_OBJECT (basesrc, "Failed to deactivate in pull mode");
2432     return FALSE;
2433   }
2434 error_stop:
2435   {
2436     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2437     return FALSE;
2438   }
2439 }
2440
2441 static GstStateChangeReturn
2442 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2443 {
2444   GstBaseSrc *basesrc;
2445   GstStateChangeReturn result;
2446   gboolean no_preroll = FALSE;
2447
2448   basesrc = GST_BASE_SRC (element);
2449
2450   switch (transition) {
2451     case GST_STATE_CHANGE_NULL_TO_READY:
2452       break;
2453     case GST_STATE_CHANGE_READY_TO_PAUSED:
2454       GST_LIVE_LOCK (element);
2455       if (basesrc->is_live) {
2456         no_preroll = TRUE;
2457         basesrc->live_running = FALSE;
2458       }
2459       basesrc->priv->last_sent_eos = FALSE;
2460       basesrc->priv->discont = TRUE;
2461       GST_LIVE_UNLOCK (element);
2462       break;
2463     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2464       GST_LIVE_LOCK (element);
2465       basesrc->priv->latency = -1;
2466       if (basesrc->is_live) {
2467         basesrc->live_running = TRUE;
2468         GST_LIVE_SIGNAL (element);
2469       }
2470       GST_LIVE_UNLOCK (element);
2471       break;
2472     default:
2473       break;
2474   }
2475
2476   if ((result =
2477           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2478               transition)) == GST_STATE_CHANGE_FAILURE)
2479     goto failure;
2480
2481   switch (transition) {
2482     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2483       GST_LIVE_LOCK (element);
2484       if (basesrc->is_live) {
2485         no_preroll = TRUE;
2486         basesrc->live_running = FALSE;
2487       }
2488       GST_LIVE_UNLOCK (element);
2489       break;
2490     case GST_STATE_CHANGE_PAUSED_TO_READY:
2491     {
2492       GstEvent **event_p;
2493
2494       /* FIXME, deprecate this behaviour, it is very dangerous.
2495        * the prefered way of sending EOS downstream is by sending
2496        * the EOS event to the element */
2497       if (!basesrc->priv->last_sent_eos) {
2498         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2499         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
2500         basesrc->priv->last_sent_eos = TRUE;
2501       }
2502       event_p = &basesrc->data.ABI.pending_seek;
2503       gst_event_replace (event_p, NULL);
2504       event_p = &basesrc->priv->close_segment;
2505       gst_event_replace (event_p, NULL);
2506       event_p = &basesrc->priv->start_segment;
2507       gst_event_replace (event_p, NULL);
2508       break;
2509     }
2510     case GST_STATE_CHANGE_READY_TO_NULL:
2511       break;
2512     default:
2513       break;
2514   }
2515
2516   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
2517     result = GST_STATE_CHANGE_NO_PREROLL;
2518
2519   return result;
2520
2521   /* ERRORS */
2522 failure:
2523   {
2524     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
2525     return result;
2526   }
2527 }