basesrc: don't ignore pad_start return value
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines 
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT 
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrc::is_seekable returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  * 
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any 
51  * time by overriding #GstBaseSrc::check_get_range so that it returns %TRUE. 
52  * 
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must 
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrc::is_seekable returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrc::query can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns %TRUE.
67  *   </para></listitem>
68  * </itemizedlist>
69  * 
70  * When the element does not meet the requirements to operate in pull mode,
71  * the offset and length in the #GstBaseSrc::create method should be ignored.
72  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
73  * element can operate in pull mode but only with specific offsets and
74  * lengths, it is allowed to generate an error when the wrong values are passed
75  * to the #GstBaseSrc::create function.
76  * 
77  * #GstBaseSrc has support for live sources. Live sources are sources that when
78  * paused discard data, such as audio or video capture devices. A typical live
79  * source also produces data at a fixed rate and thus provides a clock to publish
80  * this rate.
81  * Use gst_base_src_set_live() to activate the live source mode.
82  * 
83  * A live source does not produce data in the PAUSED state. This means that the 
84  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
85  * To signal the pipeline that the element will not produce data, the return
86  * value from the READY to PAUSED state will be #GST_STATE_CHANGE_NO_PREROLL.
87  * 
88  * A typical live source will timestamp the buffers it creates with the 
89  * current running time of the pipeline. This is one reason why a live source
90  * can only produce data in the PLAYING state, when the clock is actually 
91  * distributed and running. 
92  * 
93  * Live sources that synchronize and block on the clock (an audio source, for
94  * example) can since 0.10.12 use gst_base_src_wait_playing() when the ::create
95  * function was interrupted by a state change to PAUSED.
96  * 
97  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
98  * sources.
99  * It only makes sense to implement the ::get_times function if the source is 
100  * a live source. The ::get_times function should return timestamps starting
101  * from 0, as if it were a non-live source. The base class will make sure that
102  * the timestamps are transformed into the current running_time.
103  * The base source will then wait for the calculated running_time before pushing
104  * out the buffer.
105  * 
106  * For live sources, the base class will by default report a latency of 0.
107  * For pseudo live sources, the base class will by default measure the difference
108  * between the first buffer timestamp and the start time of get_times and will
109  * report this value as the latency. 
110  * Subclasses should override the query function when this behaviour is not
111  * acceptable.
112  * 
113  * There is only support in #GstBaseSrc for exactly one source pad, which 
114  * should be named "src". A source implementation (subclass of #GstBaseSrc) 
115  * should install a pad template in its class_init function, like so:
116  * <programlisting>
117  * static void
118  * my_element_class_init (GstMyElementClass *klass)
119  * {
120  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
121  *   // srctemplate should be a #GstStaticPadTemplate with direction
122  *   // #GST_PAD_SRC and name "src"
123  *   gst_element_class_add_pad_template (gstelement_class,
124  *       gst_static_pad_template_get (&amp;srctemplate));
125  *   // see #GstElementDetails
126  *   gst_element_class_set_details (gstelement_class, &amp;details);
127  * }
128  * </programlisting>
129  *
130  * <refsect2>
131  * <title>Controlled shutdown of live sources in applications</title>
132  * <para>
133  * Applications that record from a live source may want to stop recording
134  * in a controlled way, so that the recording is stopped, but the data
135  * already in the pipeline is processed to the end (remember that many live
136  * sources would go on recording forever otherwise). For that to happen the
137  * application needs to make the source stop recording and send an EOS
138  * event down the pipeline. The application would then wait for an
139  * EOS message posted on the pipeline's bus to know when all data has
140  * been processed and the pipeline can safely be stopped.
141  * 
142  * Since GStreamer 0.10.16 an application may send an EOS event to a source
143  * element to make it perform the EOS logic (send EOS event downstream or post a
144  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
145  * with the gst_element_send_event() function on the element or its parent bin.
146  * 
147  * After the EOS has been sent to the element, the application should wait for
148  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
149  * received, it may safely shut down the entire pipeline.
150  * 
151  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
152  * is still available but deprecated as it is dangerous and less flexible.
153  * 
154  * Last reviewed on 2007-12-19 (0.10.16)
155  * </para>
156  * </refsect2>
157  */
158
159 #ifdef HAVE_CONFIG_H
160 #  include "config.h"
161 #endif
162
163 #include <stdlib.h>
164 #include <string.h>
165
166 #include "gstbasesrc.h"
167 #include "gsttypefindhelper.h"
168 #include <gst/gstmarshal.h>
169 #include <gst/gst-i18n-lib.h>
170
171 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
172 #define GST_CAT_DEFAULT gst_base_src_debug
173
174 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
175 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
177 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
179 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
180 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
181                                                                                 timeval)
182 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
183 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
184
185 /* BaseSrc signals and args */
186 enum
187 {
188   /* FILL ME */
189   LAST_SIGNAL
190 };
191
192 #define DEFAULT_BLOCKSIZE       4096
193 #define DEFAULT_NUM_BUFFERS     -1
194 #define DEFAULT_TYPEFIND        FALSE
195 #define DEFAULT_DO_TIMESTAMP    FALSE
196
197 enum
198 {
199   PROP_0,
200   PROP_BLOCKSIZE,
201   PROP_NUM_BUFFERS,
202   PROP_TYPEFIND,
203   PROP_DO_TIMESTAMP
204 };
205
206 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
207    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
208
209 struct _GstBaseSrcPrivate
210 {
211   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
212                                  * to avoid the sending of two EOS in some cases) */
213   gboolean discont;
214   gboolean flushing;
215
216   /* two segments to be sent in the streaming thread with STREAM_LOCK */
217   GstEvent *close_segment;
218   GstEvent *start_segment;
219
220   /* if EOS is pending (atomic) */
221   gint pending_eos;
222
223   /* startup latency is the time it takes between going to PLAYING and producing
224    * the first BUFFER with running_time 0. This value is included in the latency
225    * reporting. */
226   GstClockTime latency;
227   /* timestamp offset, this is the offset add to the values of gst_times for
228    * pseudo live sources */
229   GstClockTimeDiff ts_offset;
230
231   gboolean do_timestamp;
232
233   /* stream sequence number */
234   guint32 seqnum;
235 };
236
237 static GstElementClass *parent_class = NULL;
238
239 static void gst_base_src_base_init (gpointer g_class);
240 static void gst_base_src_class_init (GstBaseSrcClass * klass);
241 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
242 static void gst_base_src_finalize (GObject * object);
243
244
245 GType
246 gst_base_src_get_type (void)
247 {
248   static volatile gsize base_src_type = 0;
249
250   if (g_once_init_enter (&base_src_type)) {
251     GType _type;
252     static const GTypeInfo base_src_info = {
253       sizeof (GstBaseSrcClass),
254       (GBaseInitFunc) gst_base_src_base_init,
255       NULL,
256       (GClassInitFunc) gst_base_src_class_init,
257       NULL,
258       NULL,
259       sizeof (GstBaseSrc),
260       0,
261       (GInstanceInitFunc) gst_base_src_init,
262     };
263
264     _type = g_type_register_static (GST_TYPE_ELEMENT,
265         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
266     g_once_init_leave (&base_src_type, _type);
267   }
268   return base_src_type;
269 }
270
271 static GstCaps *gst_base_src_getcaps (GstPad * pad);
272 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
273 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
274
275 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
276 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
277 static void gst_base_src_set_property (GObject * object, guint prop_id,
278     const GValue * value, GParamSpec * pspec);
279 static void gst_base_src_get_property (GObject * object, guint prop_id,
280     GValue * value, GParamSpec * pspec);
281 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
282 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
283 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
284 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
285
286 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
287
288 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
289 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
290     GstSegment * segment);
291 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
292 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
293     GstEvent * event, GstSegment * segment);
294
295 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
296     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
297 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
298 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
299
300 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
301     GstStateChange transition);
302
303 static void gst_base_src_loop (GstPad * pad);
304 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
305 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
306 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
307     guint length, GstBuffer ** buf);
308 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
309     guint length, GstBuffer ** buf);
310 static gboolean gst_base_src_seekable (GstBaseSrc * src);
311
312 static void
313 gst_base_src_base_init (gpointer g_class)
314 {
315   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
316 }
317
318 static void
319 gst_base_src_class_init (GstBaseSrcClass * klass)
320 {
321   GObjectClass *gobject_class;
322   GstElementClass *gstelement_class;
323
324   gobject_class = G_OBJECT_CLASS (klass);
325   gstelement_class = GST_ELEMENT_CLASS (klass);
326
327   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
328
329   parent_class = g_type_class_peek_parent (klass);
330
331   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
332   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
333   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
334
335   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
336       g_param_spec_ulong ("blocksize", "Block size",
337           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
338           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
339   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
340       g_param_spec_int ("num-buffers", "num-buffers",
341           "Number of buffers to output before sending EOS (-1 = unlimited)",
342           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
343           G_PARAM_STATIC_STRINGS));
344   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
345       g_param_spec_boolean ("typefind", "Typefind",
346           "Run typefind before negotiating", DEFAULT_TYPEFIND,
347           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
348   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
349       g_param_spec_boolean ("do-timestamp", "Do timestamp",
350           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
351           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352
353   gstelement_class->change_state =
354       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
355   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
356   gstelement_class->get_query_types =
357       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
358
359   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
360   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
361   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
362   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
363   klass->check_get_range =
364       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
365   klass->prepare_seek_segment =
366       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
367 }
368
369 static void
370 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
371 {
372   GstPad *pad;
373   GstPadTemplate *pad_template;
374
375   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
376
377   basesrc->is_live = FALSE;
378   basesrc->live_lock = g_mutex_new ();
379   basesrc->live_cond = g_cond_new ();
380   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
381   basesrc->num_buffers_left = -1;
382
383   basesrc->can_activate_push = TRUE;
384   basesrc->pad_mode = GST_ACTIVATE_NONE;
385
386   pad_template =
387       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
388   g_return_if_fail (pad_template != NULL);
389
390   GST_DEBUG_OBJECT (basesrc, "creating src pad");
391   pad = gst_pad_new_from_template (pad_template, "src");
392
393   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
394   gst_pad_set_activatepush_function (pad,
395       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
396   gst_pad_set_activatepull_function (pad,
397       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
398   gst_pad_set_event_function (pad,
399       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
400   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
401   gst_pad_set_checkgetrange_function (pad,
402       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
403   gst_pad_set_getrange_function (pad,
404       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
405   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
406   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
407   gst_pad_set_fixatecaps_function (pad,
408       GST_DEBUG_FUNCPTR (gst_base_src_fixate));
409
410   /* hold pointer to pad */
411   basesrc->srcpad = pad;
412   GST_DEBUG_OBJECT (basesrc, "adding src pad");
413   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
414
415   basesrc->blocksize = DEFAULT_BLOCKSIZE;
416   basesrc->clock_id = NULL;
417   /* we operate in BYTES by default */
418   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
419   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
420   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
421
422   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
423
424   GST_DEBUG_OBJECT (basesrc, "init done");
425 }
426
427 static void
428 gst_base_src_finalize (GObject * object)
429 {
430   GstBaseSrc *basesrc;
431   GstEvent **event_p;
432
433   basesrc = GST_BASE_SRC (object);
434
435   g_mutex_free (basesrc->live_lock);
436   g_cond_free (basesrc->live_cond);
437
438   event_p = &basesrc->data.ABI.pending_seek;
439   gst_event_replace (event_p, NULL);
440
441   G_OBJECT_CLASS (parent_class)->finalize (object);
442 }
443
444 /**
445  * gst_base_src_wait_playing:
446  * @src: the src
447  *
448  * If the #GstBaseSrcClass::create method performs its own synchronisation against
449  * the clock it must unblock when going from PLAYING to the PAUSED state and call
450  * this method before continuing to produce the remaining data.
451  *
452  * This function will block until a state change to PLAYING happens (in which
453  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
454  * to a state change to READY or a FLUSH event (in which case this function
455  * returns #GST_FLOW_WRONG_STATE).
456  *
457  * Since: 0.10.12
458  *
459  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
460  * continue. Any other return value should be returned from the create vmethod.
461  */
462 GstFlowReturn
463 gst_base_src_wait_playing (GstBaseSrc * src)
464 {
465   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
466
467   /* block until the state changes, or we get a flush, or something */
468   GST_DEBUG_OBJECT (src, "live source waiting for running state");
469   GST_LIVE_WAIT (src);
470   if (src->priv->flushing)
471     goto flushing;
472   GST_DEBUG_OBJECT (src, "live source unlocked");
473
474   return GST_FLOW_OK;
475
476   /* ERRORS */
477 flushing:
478   {
479     GST_DEBUG_OBJECT (src, "we are flushing");
480     return GST_FLOW_WRONG_STATE;
481   }
482 }
483
484 /**
485  * gst_base_src_set_live:
486  * @src: base source instance
487  * @live: new live-mode
488  *
489  * If the element listens to a live source, @live should
490  * be set to %TRUE. 
491  *
492  * A live source will not produce data in the PAUSED state and
493  * will therefore not be able to participate in the PREROLL phase
494  * of a pipeline. To signal this fact to the application and the 
495  * pipeline, the state change return value of the live source will
496  * be GST_STATE_CHANGE_NO_PREROLL.
497  */
498 void
499 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
500 {
501   g_return_if_fail (GST_IS_BASE_SRC (src));
502
503   GST_OBJECT_LOCK (src);
504   src->is_live = live;
505   GST_OBJECT_UNLOCK (src);
506 }
507
508 /**
509  * gst_base_src_is_live:
510  * @src: base source instance
511  *
512  * Check if an element is in live mode.
513  *
514  * Returns: %TRUE if element is in live mode.
515  */
516 gboolean
517 gst_base_src_is_live (GstBaseSrc * src)
518 {
519   gboolean result;
520
521   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
522
523   GST_OBJECT_LOCK (src);
524   result = src->is_live;
525   GST_OBJECT_UNLOCK (src);
526
527   return result;
528 }
529
530 /**
531  * gst_base_src_set_format:
532  * @src: base source instance
533  * @format: the format to use
534  *
535  * Sets the default format of the source. This will be the format used
536  * for sending NEW_SEGMENT events and for performing seeks.
537  *
538  * If a format of GST_FORMAT_BYTES is set, the element will be able to
539  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
540  *
541  * Since: 0.10.1
542  */
543 void
544 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
545 {
546   g_return_if_fail (GST_IS_BASE_SRC (src));
547
548   gst_segment_init (&src->segment, format);
549 }
550
551 /**
552  * gst_base_src_query_latency:
553  * @src: the source
554  * @live: if the source is live
555  * @min_latency: the min latency of the source
556  * @max_latency: the max latency of the source
557  *
558  * Query the source for the latency parameters. @live will be TRUE when @src is
559  * configured as a live source. @min_latency will be set to the difference
560  * between the running time and the timestamp of the first buffer.
561  * @max_latency is always the undefined value of -1.
562  *
563  * This function is mostly used by subclasses. 
564  *
565  * Returns: TRUE if the query succeeded.
566  *
567  * Since: 0.10.13
568  */
569 gboolean
570 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
571     GstClockTime * min_latency, GstClockTime * max_latency)
572 {
573   GstClockTime min;
574
575   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
576
577   GST_OBJECT_LOCK (src);
578   if (live)
579     *live = src->is_live;
580
581   /* if we have a startup latency, report this one, else report 0. Subclasses
582    * are supposed to override the query function if they want something
583    * else. */
584   if (src->priv->latency != -1)
585     min = src->priv->latency;
586   else
587     min = 0;
588
589   if (min_latency)
590     *min_latency = min;
591   if (max_latency)
592     *max_latency = -1;
593
594   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
595       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
596       GST_TIME_ARGS (-1));
597   GST_OBJECT_UNLOCK (src);
598
599   return TRUE;
600 }
601
602 /**
603  * gst_base_src_set_blocksize:
604  * @src: the source
605  * @blocksize: the new blocksize in bytes
606  *
607  * Set the number of bytes that @src will push out with each buffer. When
608  * @blocksize is set to -1, a default length will be used.
609  *
610  * Since: 0.10.22
611  */
612 void
613 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
614 {
615   g_return_if_fail (GST_IS_BASE_SRC (src));
616
617   GST_OBJECT_LOCK (src);
618   src->blocksize = blocksize;
619   GST_OBJECT_UNLOCK (src);
620 }
621
622 /**
623  * gst_base_src_get_blocksize:
624  * @src: the source
625  *
626  * Get the number of bytes that @src will push out with each buffer.
627  *
628  * Returns: the number of bytes pushed with each buffer.
629  *
630  * Since: 0.10.22
631  */
632 gulong
633 gst_base_src_get_blocksize (GstBaseSrc * src)
634 {
635   gulong res;
636
637   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
638
639   GST_OBJECT_LOCK (src);
640   res = src->blocksize;
641   GST_OBJECT_UNLOCK (src);
642
643   return res;
644 }
645
646
647 /**
648  * gst_base_src_set_do_timestamp:
649  * @src: the source
650  * @timestamp: enable or disable timestamping
651  *
652  * Configure @src to automatically timestamp outgoing buffers based on the
653  * current running_time of the pipeline. This property is mostly useful for live
654  * sources.
655  *
656  * Since: 0.10.15
657  */
658 void
659 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
660 {
661   g_return_if_fail (GST_IS_BASE_SRC (src));
662
663   GST_OBJECT_LOCK (src);
664   src->priv->do_timestamp = timestamp;
665   GST_OBJECT_UNLOCK (src);
666 }
667
668 /**
669  * gst_base_src_get_do_timestamp:
670  * @src: the source
671  *
672  * Query if @src timestamps outgoing buffers based on the current running_time.
673  *
674  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
675  *
676  * Since: 0.10.15
677  */
678 gboolean
679 gst_base_src_get_do_timestamp (GstBaseSrc * src)
680 {
681   gboolean res;
682
683   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
684
685   GST_OBJECT_LOCK (src);
686   res = src->priv->do_timestamp;
687   GST_OBJECT_UNLOCK (src);
688
689   return res;
690 }
691
692 static gboolean
693 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
694 {
695   GstBaseSrcClass *bclass;
696   GstBaseSrc *bsrc;
697   gboolean res = TRUE;
698
699   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
700   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
701
702   if (bclass->set_caps)
703     res = bclass->set_caps (bsrc, caps);
704
705   return res;
706 }
707
708 static GstCaps *
709 gst_base_src_getcaps (GstPad * pad)
710 {
711   GstBaseSrcClass *bclass;
712   GstBaseSrc *bsrc;
713   GstCaps *caps = NULL;
714
715   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
716   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
717   if (bclass->get_caps)
718     caps = bclass->get_caps (bsrc);
719
720   if (caps == NULL) {
721     GstPadTemplate *pad_template;
722
723     pad_template =
724         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
725     if (pad_template != NULL) {
726       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
727     }
728   }
729   return caps;
730 }
731
732 static void
733 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
734 {
735   GstBaseSrcClass *bclass;
736   GstBaseSrc *bsrc;
737
738   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
739   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
740
741   if (bclass->fixate)
742     bclass->fixate (bsrc, caps);
743
744   gst_object_unref (bsrc);
745 }
746
747 static gboolean
748 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
749 {
750   gboolean res;
751
752   switch (GST_QUERY_TYPE (query)) {
753     case GST_QUERY_POSITION:
754     {
755       GstFormat format;
756
757       gst_query_parse_position (query, &format, NULL);
758       switch (format) {
759         case GST_FORMAT_PERCENT:
760         {
761           gint64 percent;
762           gint64 position;
763           gint64 duration;
764
765           position = src->segment.last_stop;
766           duration = src->segment.duration;
767
768           if (position != -1 && duration != -1) {
769             if (position < duration)
770               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
771                   duration);
772             else
773               percent = GST_FORMAT_PERCENT_MAX;
774           } else
775             percent = -1;
776
777           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
778           res = TRUE;
779           break;
780         }
781         default:
782         {
783           gint64 position;
784
785           position = src->segment.last_stop;
786
787           if (position != -1) {
788             /* convert to requested format */
789             res =
790                 gst_pad_query_convert (src->srcpad, src->segment.format,
791                 position, &format, &position);
792           } else
793             res = TRUE;
794
795           gst_query_set_position (query, format, position);
796           break;
797         }
798       }
799       break;
800     }
801     case GST_QUERY_DURATION:
802     {
803       GstFormat format;
804
805       gst_query_parse_duration (query, &format, NULL);
806
807       GST_DEBUG_OBJECT (src, "duration query in format %s",
808           gst_format_get_name (format));
809
810       switch (format) {
811         case GST_FORMAT_PERCENT:
812           gst_query_set_duration (query, GST_FORMAT_PERCENT,
813               GST_FORMAT_PERCENT_MAX);
814           res = TRUE;
815           break;
816         default:
817         {
818           gint64 duration;
819
820           /* this is the duration as configured by the subclass. */
821           duration = src->segment.duration;
822
823           if (duration != -1) {
824             /* convert to requested format, if this fails, we have a duration
825              * but we cannot answer the query, we must return FALSE. */
826             res =
827                 gst_pad_query_convert (src->srcpad, src->segment.format,
828                 duration, &format, &duration);
829           } else {
830             /* The subclass did not configure a duration, we assume that the
831              * media has an unknown duration then and we return TRUE to report
832              * this. Note that this is not the same as returning FALSE, which
833              * means that we cannot report the duration at all. */
834             res = TRUE;
835           }
836           gst_query_set_duration (query, format, duration);
837           break;
838         }
839       }
840       break;
841     }
842
843     case GST_QUERY_SEEKING:
844     {
845       gst_query_set_seeking (query, src->segment.format,
846           gst_base_src_seekable (src), 0, src->segment.duration);
847       res = TRUE;
848       break;
849     }
850     case GST_QUERY_SEGMENT:
851     {
852       gint64 start, stop;
853
854       /* no end segment configured, current duration then */
855       if ((stop = src->segment.stop) == -1)
856         stop = src->segment.duration;
857       start = src->segment.start;
858
859       /* adjust to stream time */
860       if (src->segment.time != -1) {
861         start -= src->segment.time;
862         if (stop != -1)
863           stop -= src->segment.time;
864       }
865       gst_query_set_segment (query, src->segment.rate, src->segment.format,
866           start, stop);
867       res = TRUE;
868       break;
869     }
870
871     case GST_QUERY_FORMATS:
872     {
873       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
874           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
875       res = TRUE;
876       break;
877     }
878     case GST_QUERY_CONVERT:
879     {
880       GstFormat src_fmt, dest_fmt;
881       gint64 src_val, dest_val;
882
883       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
884
885       /* we can only convert between equal formats... */
886       if (src_fmt == dest_fmt) {
887         dest_val = src_val;
888         res = TRUE;
889       } else
890         res = FALSE;
891
892       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
893       break;
894     }
895     case GST_QUERY_LATENCY:
896     {
897       GstClockTime min, max;
898       gboolean live;
899
900       /* Subclasses should override and implement something usefull */
901       res = gst_base_src_query_latency (src, &live, &min, &max);
902
903       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
904           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
905           GST_TIME_ARGS (max));
906
907       gst_query_set_latency (query, live, min, max);
908       break;
909     }
910     case GST_QUERY_JITTER:
911     case GST_QUERY_RATE:
912       res = FALSE;
913       break;
914     case GST_QUERY_BUFFERING:
915     {
916       GstFormat format;
917       gint64 start, stop, estimated;
918
919       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
920
921       GST_DEBUG_OBJECT (src, "buffering query in format %s",
922           gst_format_get_name (format));
923
924       if (src->random_access) {
925         estimated = 0;
926         start = 0;
927         if (format == GST_FORMAT_PERCENT)
928           stop = GST_FORMAT_PERCENT_MAX;
929         else
930           stop = src->segment.duration;
931       } else {
932         estimated = -1;
933         start = -1;
934         stop = -1;
935       }
936       /* convert to required format. When the conversion fails, we can't answer
937        * the query. When the value is unknown, we can don't perform conversion
938        * but report TRUE. */
939       if (format != GST_FORMAT_PERCENT && stop != -1) {
940         res = gst_pad_query_convert (src->srcpad, src->segment.format,
941             stop, &format, &stop);
942       } else {
943         res = TRUE;
944       }
945       if (res && format != GST_FORMAT_PERCENT && start != -1)
946         res = gst_pad_query_convert (src->srcpad, src->segment.format,
947             start, &format, &start);
948
949       gst_query_set_buffering_range (query, format, start, stop, estimated);
950       break;
951     }
952     default:
953       res = FALSE;
954       break;
955   }
956   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
957       res);
958   return res;
959 }
960
961 static gboolean
962 gst_base_src_query (GstPad * pad, GstQuery * query)
963 {
964   GstBaseSrc *src;
965   GstBaseSrcClass *bclass;
966   gboolean result = FALSE;
967
968   src = GST_BASE_SRC (gst_pad_get_parent (pad));
969
970   bclass = GST_BASE_SRC_GET_CLASS (src);
971
972   if (bclass->query)
973     result = bclass->query (src, query);
974   else
975     result = gst_pad_query_default (pad, query);
976
977   gst_object_unref (src);
978
979   return result;
980 }
981
982 static gboolean
983 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
984 {
985   gboolean res = TRUE;
986
987   /* update our offset if the start/stop position was updated */
988   if (segment->format == GST_FORMAT_BYTES) {
989     segment->time = segment->start;
990   } else if (segment->start == 0) {
991     /* seek to start, we can implement a default for this. */
992     segment->time = 0;
993   } else {
994     res = FALSE;
995     GST_INFO_OBJECT (src, "Can't do a default seek");
996   }
997
998   return res;
999 }
1000
1001 static gboolean
1002 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1003 {
1004   GstBaseSrcClass *bclass;
1005   gboolean result = FALSE;
1006
1007   bclass = GST_BASE_SRC_GET_CLASS (src);
1008
1009   if (bclass->do_seek)
1010     result = bclass->do_seek (src, segment);
1011
1012   return result;
1013 }
1014
1015 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1016
1017 static gboolean
1018 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1019     GstSegment * segment)
1020 {
1021   /* By default, we try one of 2 things:
1022    *   - For absolute seek positions, convert the requested position to our 
1023    *     configured processing format and place it in the output segment \
1024    *   - For relative seek positions, convert our current (input) values to the
1025    *     seek format, adjust by the relative seek offset and then convert back to
1026    *     the processing format
1027    */
1028   GstSeekType cur_type, stop_type;
1029   gint64 cur, stop;
1030   GstSeekFlags flags;
1031   GstFormat seek_format, dest_format;
1032   gdouble rate;
1033   gboolean update;
1034   gboolean res = TRUE;
1035
1036   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1037       &cur_type, &cur, &stop_type, &stop);
1038   dest_format = segment->format;
1039
1040   if (seek_format == dest_format) {
1041     gst_segment_set_seek (segment, rate, seek_format, flags,
1042         cur_type, cur, stop_type, stop, &update);
1043     return TRUE;
1044   }
1045
1046   if (cur_type != GST_SEEK_TYPE_NONE) {
1047     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1048     res =
1049         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1050         &cur);
1051     cur_type = GST_SEEK_TYPE_SET;
1052   }
1053
1054   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1055     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1056     res =
1057         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1058         &stop);
1059     stop_type = GST_SEEK_TYPE_SET;
1060   }
1061
1062   /* And finally, configure our output segment in the desired format */
1063   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1064       stop_type, stop, &update);
1065
1066   if (!res)
1067     goto no_format;
1068
1069   return res;
1070
1071 no_format:
1072   {
1073     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1074     return FALSE;
1075   }
1076 }
1077
1078 static gboolean
1079 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1080     GstSegment * seeksegment)
1081 {
1082   GstBaseSrcClass *bclass;
1083   gboolean result = FALSE;
1084
1085   bclass = GST_BASE_SRC_GET_CLASS (src);
1086
1087   if (bclass->prepare_seek_segment)
1088     result = bclass->prepare_seek_segment (src, event, seeksegment);
1089
1090   return result;
1091 }
1092
1093 /* this code implements the seeking. It is a good example
1094  * handling all cases.
1095  *
1096  * A seek updates the currently configured segment.start
1097  * and segment.stop values based on the SEEK_TYPE. If the
1098  * segment.start value is updated, a seek to this new position
1099  * should be performed.
1100  *
1101  * The seek can only be executed when we are not currently
1102  * streaming any data, to make sure that this is the case, we
1103  * acquire the STREAM_LOCK which is taken when we are in the
1104  * _loop() function or when a getrange() is called. Normally
1105  * we will not receive a seek if we are operating in pull mode
1106  * though. When we operate as a live source we might block on the live
1107  * cond, which does not release the STREAM_LOCK. Therefore we will try
1108  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1109  * safe to perform the seek.
1110  *
1111  * When we are in the loop() function, we might be in the middle
1112  * of pushing a buffer, which might block in a sink. To make sure
1113  * that the push gets unblocked we push out a FLUSH_START event.
1114  * Our loop function will get a WRONG_STATE return value from
1115  * the push and will pause, effectively releasing the STREAM_LOCK.
1116  *
1117  * For a non-flushing seek, we pause the task, which might eventually
1118  * release the STREAM_LOCK. We say eventually because when the sink
1119  * blocks on the sample we might wait a very long time until the sink
1120  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1121  * can continue the seek. A non-flushing seek is normally done in a 
1122  * running pipeline to perform seamless playback, this means that the sink is
1123  * PLAYING and will return from its chain function.
1124  * In the case of a non-flushing seek we need to make sure that the
1125  * data we output after the seek is continuous with the previous data,
1126  * this is because a non-flushing seek does not reset the running-time
1127  * to 0. We do this by closing the currently running segment, ie. sending
1128  * a new_segment event with the stop position set to the last processed 
1129  * position.
1130  *
1131  * After updating the segment.start/stop values, we prepare for
1132  * streaming again. We push out a FLUSH_STOP to make the peer pad
1133  * accept data again and we start our task again.
1134  *
1135  * A segment seek posts a message on the bus saying that the playback
1136  * of the segment started. We store the segment flag internally because
1137  * when we reach the segment.stop we have to post a segment.done
1138  * instead of EOS when doing a segment seek.
1139  */
1140 /* FIXME (0.11), we have the unlock gboolean here because most current 
1141  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1142  * the streaming thread isn't running, resulting in bogus unlocks later when it 
1143  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1144  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1145  * until 0.11
1146  */
1147 static gboolean
1148 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1149 {
1150   gboolean res = TRUE, tres;
1151   gdouble rate;
1152   GstFormat seek_format, dest_format;
1153   GstSeekFlags flags;
1154   GstSeekType cur_type, stop_type;
1155   gint64 cur, stop;
1156   gboolean flush, playing;
1157   gboolean update;
1158   gboolean relative_seek = FALSE;
1159   gboolean seekseg_configured = FALSE;
1160   GstSegment seeksegment;
1161   guint32 seqnum;
1162   GstEvent *tevent;
1163
1164   GST_DEBUG_OBJECT (src, "doing seek");
1165
1166   dest_format = src->segment.format;
1167
1168   if (event) {
1169     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1170         &cur_type, &cur, &stop_type, &stop);
1171
1172     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1173         SEEK_TYPE_IS_RELATIVE (stop_type);
1174
1175     if (dest_format != seek_format && !relative_seek) {
1176       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1177        * here before taking the stream lock, otherwise we must convert it later,
1178        * once we have the stream lock and can read the last configures segment
1179        * start and stop positions */
1180       gst_segment_init (&seeksegment, dest_format);
1181
1182       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1183         goto prepare_failed;
1184
1185       seekseg_configured = TRUE;
1186     }
1187
1188     flush = flags & GST_SEEK_FLAG_FLUSH;
1189     seqnum = gst_event_get_seqnum (event);
1190   } else {
1191     flush = FALSE;
1192     /* get next seqnum */
1193     seqnum = gst_util_seqnum_next ();
1194   }
1195
1196   /* send flush start */
1197   if (flush) {
1198     tevent = gst_event_new_flush_start ();
1199     gst_event_set_seqnum (tevent, seqnum);
1200     gst_pad_push_event (src->srcpad, tevent);
1201   } else
1202     gst_pad_pause_task (src->srcpad);
1203
1204   /* unblock streaming thread. */
1205   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1206
1207   /* grab streaming lock, this should eventually be possible, either
1208    * because the task is paused, our streaming thread stopped 
1209    * or because our peer is flushing. */
1210   GST_PAD_STREAM_LOCK (src->srcpad);
1211   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1212     /* we have seen this event before, issue a warning for now */
1213     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1214         seqnum);
1215   } else {
1216     src->priv->seqnum = seqnum;
1217     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1218   }
1219
1220   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1221
1222   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1223    * copy the current segment info into the temp segment that we can actually
1224    * attempt the seek with. We only update the real segment if the seek suceeds. */
1225   if (!seekseg_configured) {
1226     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1227
1228     /* now configure the final seek segment */
1229     if (event) {
1230       if (src->segment.format != seek_format) {
1231         /* OK, here's where we give the subclass a chance to convert the relative
1232          * seek into an absolute one in the processing format. We set up any
1233          * absolute seek above, before taking the stream lock. */
1234         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1235           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1236               "Aborting seek");
1237           res = FALSE;
1238         }
1239       } else {
1240         /* The seek format matches our processing format, no need to ask the
1241          * the subclass to configure the segment. */
1242         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1243             cur_type, cur, stop_type, stop, &update);
1244       }
1245     }
1246     /* Else, no seek event passed, so we're just (re)starting the 
1247        current segment. */
1248   }
1249
1250   if (res) {
1251     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1252         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1253         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1254
1255     /* do the seek, segment.last_stop contains the new position. */
1256     res = gst_base_src_do_seek (src, &seeksegment);
1257   }
1258
1259   /* and prepare to continue streaming */
1260   if (flush) {
1261     tevent = gst_event_new_flush_stop ();
1262     gst_event_set_seqnum (tevent, seqnum);
1263     /* send flush stop, peer will accept data and events again. We
1264      * are not yet providing data as we still have the STREAM_LOCK. */
1265     gst_pad_push_event (src->srcpad, tevent);
1266   } else if (res && src->data.ABI.running) {
1267     /* we are running the current segment and doing a non-flushing seek, 
1268      * close the segment first based on the last_stop. */
1269     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1270         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1271
1272     /* queue the segment for sending in the stream thread */
1273     if (src->priv->close_segment)
1274       gst_event_unref (src->priv->close_segment);
1275     src->priv->close_segment =
1276         gst_event_new_new_segment_full (TRUE,
1277         src->segment.rate, src->segment.applied_rate, src->segment.format,
1278         src->segment.start, src->segment.last_stop, src->segment.time);
1279     gst_event_set_seqnum (src->priv->close_segment, seqnum);
1280   }
1281
1282   /* The subclass must have converted the segment to the processing format 
1283    * by now */
1284   if (res && seeksegment.format != dest_format) {
1285     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1286         "in the correct format. Aborting seek.");
1287     res = FALSE;
1288   }
1289
1290   /* if successfull seek, we update our real segment and push
1291    * out the new segment. */
1292   if (res) {
1293     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1294
1295     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1296       GstMessage *message;
1297
1298       message = gst_message_new_segment_start (GST_OBJECT (src),
1299           src->segment.format, src->segment.last_stop);
1300       gst_message_set_seqnum (message, seqnum);
1301
1302       gst_element_post_message (GST_ELEMENT (src), message);
1303     }
1304
1305     /* for deriving a stop position for the playback segment from the seek
1306      * segment, we must take the duration when the stop is not set */
1307     if ((stop = src->segment.stop) == -1)
1308       stop = src->segment.duration;
1309
1310     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1311         " to %" G_GINT64_FORMAT, src->segment.start, stop);
1312
1313     /* now replace the old segment so that we send it in the stream thread the
1314      * next time it is scheduled. */
1315     if (src->priv->start_segment)
1316       gst_event_unref (src->priv->start_segment);
1317     if (src->segment.rate >= 0.0) {
1318       /* forward, we send data from last_stop to stop */
1319       src->priv->start_segment =
1320           gst_event_new_new_segment_full (FALSE,
1321           src->segment.rate, src->segment.applied_rate, src->segment.format,
1322           src->segment.last_stop, stop, src->segment.time);
1323     } else {
1324       /* reverse, we send data from last_stop to start */
1325       src->priv->start_segment =
1326           gst_event_new_new_segment_full (FALSE,
1327           src->segment.rate, src->segment.applied_rate, src->segment.format,
1328           src->segment.start, src->segment.last_stop, src->segment.time);
1329     }
1330     gst_event_set_seqnum (src->priv->start_segment, seqnum);
1331   }
1332
1333   src->priv->discont = TRUE;
1334   src->data.ABI.running = TRUE;
1335   /* and restart the task in case it got paused explicitely or by
1336    * the FLUSH_START event we pushed out. */
1337   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1338       src->srcpad);
1339   if (res && !tres)
1340     res = FALSE;
1341
1342   /* and release the lock again so we can continue streaming */
1343   GST_PAD_STREAM_UNLOCK (src->srcpad);
1344
1345   return res;
1346
1347   /* ERROR */
1348 prepare_failed:
1349   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1350       "Aborting seek");
1351   return FALSE;
1352 }
1353
1354 static const GstQueryType *
1355 gst_base_src_get_query_types (GstElement * element)
1356 {
1357   static const GstQueryType query_types[] = {
1358     GST_QUERY_DURATION,
1359     GST_QUERY_POSITION,
1360     GST_QUERY_SEEKING,
1361     GST_QUERY_SEGMENT,
1362     GST_QUERY_FORMATS,
1363     GST_QUERY_LATENCY,
1364     GST_QUERY_JITTER,
1365     GST_QUERY_RATE,
1366     GST_QUERY_CONVERT,
1367     0
1368   };
1369
1370   return query_types;
1371 }
1372
1373 /* all events send to this element directly. This is mainly done from the
1374  * application.
1375  */
1376 static gboolean
1377 gst_base_src_send_event (GstElement * element, GstEvent * event)
1378 {
1379   GstBaseSrc *src;
1380   gboolean result = FALSE;
1381
1382   src = GST_BASE_SRC (element);
1383
1384   GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
1385
1386   switch (GST_EVENT_TYPE (event)) {
1387       /* bidirectional events */
1388     case GST_EVENT_FLUSH_START:
1389     case GST_EVENT_FLUSH_STOP:
1390       /* sending random flushes downstream can break stuff,
1391        * especially sync since all segment info will get flushed */
1392       break;
1393
1394       /* downstream serialized events */
1395     case GST_EVENT_EOS:
1396     {
1397       GstBaseSrcClass *bclass;
1398
1399       bclass = GST_BASE_SRC_GET_CLASS (src);
1400
1401       /* queue EOS and make sure the task or pull function performs the EOS
1402        * actions. 
1403        *
1404        * We have two possibilities:
1405        *
1406        *  - Before we are to enter the _create function, we check the pending_eos
1407        *    first and do EOS instead of entering it.
1408        *  - If we are in the _create function or we did not manage to set the
1409        *    flag fast enough and we are about to enter the _create function,
1410        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1411        *    check the EOS flag and do the EOS logic.
1412        */
1413       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1414       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1415
1416       /* unlock the _create function so that we can check the pending_eos flag
1417        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1418        * that we can grab it and stop the unlock again. We don't take the stream
1419        * lock so that this operation is guaranteed to never block. */
1420       if (bclass->unlock)
1421         bclass->unlock (src);
1422
1423       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1424
1425       GST_LIVE_LOCK (src);
1426       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1427       /* now stop the unlock of the streaming thread again. Grabbing the live
1428        * lock is enough because that protects the create function. */
1429       if (bclass->unlock_stop)
1430         bclass->unlock_stop (src);
1431       GST_LIVE_UNLOCK (src);
1432
1433       result = TRUE;
1434       break;
1435     }
1436     case GST_EVENT_NEWSEGMENT:
1437       /* sending random NEWSEGMENT downstream can break sync. */
1438       break;
1439     case GST_EVENT_TAG:
1440       /* sending tags could be useful, FIXME insert in dataflow */
1441       break;
1442     case GST_EVENT_BUFFERSIZE:
1443       /* does not seem to make much sense currently */
1444       break;
1445
1446       /* upstream events */
1447     case GST_EVENT_QOS:
1448       /* elements should override send_event and do something */
1449       break;
1450     case GST_EVENT_SEEK:
1451     {
1452       gboolean started;
1453
1454       GST_OBJECT_LOCK (src->srcpad);
1455       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1456         goto wrong_mode;
1457       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1458       GST_OBJECT_UNLOCK (src->srcpad);
1459
1460       if (started) {
1461         /* when we are running in push mode, we can execute the
1462          * seek right now, we need to unlock. */
1463         result = gst_base_src_perform_seek (src, event, TRUE);
1464       } else {
1465         GstEvent **event_p;
1466
1467         /* else we store the event and execute the seek when we
1468          * get activated */
1469         GST_OBJECT_LOCK (src);
1470         event_p = &src->data.ABI.pending_seek;
1471         gst_event_replace ((GstEvent **) event_p, event);
1472         GST_OBJECT_UNLOCK (src);
1473         /* assume the seek will work */
1474         result = TRUE;
1475       }
1476       break;
1477     }
1478     case GST_EVENT_NAVIGATION:
1479       /* could make sense for elements that do something with navigation events
1480        * but then they would need to override the send_event function */
1481       break;
1482     case GST_EVENT_LATENCY:
1483       /* does not seem to make sense currently */
1484       break;
1485
1486       /* custom events */
1487     case GST_EVENT_CUSTOM_UPSTREAM:
1488       /* override send_event if you want this */
1489       break;
1490     case GST_EVENT_CUSTOM_DOWNSTREAM:
1491     case GST_EVENT_CUSTOM_BOTH:
1492       /* FIXME, insert event in the dataflow */
1493       break;
1494     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1495     case GST_EVENT_CUSTOM_BOTH_OOB:
1496       /* insert a random custom event into the pipeline */
1497       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1498       result = gst_pad_push_event (src->srcpad, event);
1499       /* we gave away the ref to the event in the push */
1500       event = NULL;
1501       break;
1502     default:
1503       break;
1504   }
1505 done:
1506   /* if we still have a ref to the event, unref it now */
1507   if (event)
1508     gst_event_unref (event);
1509
1510   return result;
1511
1512   /* ERRORS */
1513 wrong_mode:
1514   {
1515     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1516     GST_OBJECT_UNLOCK (src->srcpad);
1517     result = FALSE;
1518     goto done;
1519   }
1520 }
1521
1522 static gboolean
1523 gst_base_src_seekable (GstBaseSrc * src)
1524 {
1525   GstBaseSrcClass *bclass;
1526   bclass = GST_BASE_SRC_GET_CLASS (src);
1527   if (bclass->is_seekable)
1528     return bclass->is_seekable (src);
1529   else
1530     return FALSE;
1531 }
1532
1533 static gboolean
1534 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1535 {
1536   gboolean result;
1537
1538   switch (GST_EVENT_TYPE (event)) {
1539     case GST_EVENT_SEEK:
1540       /* is normally called when in push mode */
1541       if (!gst_base_src_seekable (src))
1542         goto not_seekable;
1543
1544       result = gst_base_src_perform_seek (src, event, TRUE);
1545       break;
1546     case GST_EVENT_FLUSH_START:
1547       /* cancel any blocking getrange, is normally called
1548        * when in pull mode. */
1549       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1550       break;
1551     case GST_EVENT_FLUSH_STOP:
1552       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1553       break;
1554     default:
1555       result = TRUE;
1556       break;
1557   }
1558   return result;
1559
1560   /* ERRORS */
1561 not_seekable:
1562   {
1563     GST_DEBUG_OBJECT (src, "is not seekable");
1564     return FALSE;
1565   }
1566 }
1567
1568 static gboolean
1569 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1570 {
1571   GstBaseSrc *src;
1572   GstBaseSrcClass *bclass;
1573   gboolean result = FALSE;
1574
1575   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1576   bclass = GST_BASE_SRC_GET_CLASS (src);
1577
1578   if (bclass->event) {
1579     if (!(result = bclass->event (src, event)))
1580       goto subclass_failed;
1581   }
1582
1583 done:
1584   gst_event_unref (event);
1585   gst_object_unref (src);
1586
1587   return result;
1588
1589   /* ERRORS */
1590 subclass_failed:
1591   {
1592     GST_DEBUG_OBJECT (src, "subclass refused event");
1593     goto done;
1594   }
1595 }
1596
1597 static void
1598 gst_base_src_set_property (GObject * object, guint prop_id,
1599     const GValue * value, GParamSpec * pspec)
1600 {
1601   GstBaseSrc *src;
1602
1603   src = GST_BASE_SRC (object);
1604
1605   switch (prop_id) {
1606     case PROP_BLOCKSIZE:
1607       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1608       break;
1609     case PROP_NUM_BUFFERS:
1610       src->num_buffers = g_value_get_int (value);
1611       break;
1612     case PROP_TYPEFIND:
1613       src->data.ABI.typefind = g_value_get_boolean (value);
1614       break;
1615     case PROP_DO_TIMESTAMP:
1616       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1617       break;
1618     default:
1619       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1620       break;
1621   }
1622 }
1623
1624 static void
1625 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1626     GParamSpec * pspec)
1627 {
1628   GstBaseSrc *src;
1629
1630   src = GST_BASE_SRC (object);
1631
1632   switch (prop_id) {
1633     case PROP_BLOCKSIZE:
1634       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1635       break;
1636     case PROP_NUM_BUFFERS:
1637       g_value_set_int (value, src->num_buffers);
1638       break;
1639     case PROP_TYPEFIND:
1640       g_value_set_boolean (value, src->data.ABI.typefind);
1641       break;
1642     case PROP_DO_TIMESTAMP:
1643       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1644       break;
1645     default:
1646       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1647       break;
1648   }
1649 }
1650
1651 /* with STREAM_LOCK and LOCK */
1652 static GstClockReturn
1653 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1654 {
1655   GstClockReturn ret;
1656   GstClockID id;
1657
1658   id = gst_clock_new_single_shot_id (clock, time);
1659
1660   basesrc->clock_id = id;
1661   /* release the live lock while waiting */
1662   GST_LIVE_UNLOCK (basesrc);
1663
1664   ret = gst_clock_id_wait (id, NULL);
1665
1666   GST_LIVE_LOCK (basesrc);
1667   gst_clock_id_unref (id);
1668   basesrc->clock_id = NULL;
1669
1670   return ret;
1671 }
1672
1673 /* perform synchronisation on a buffer. 
1674  * with STREAM_LOCK.
1675  */
1676 static GstClockReturn
1677 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1678 {
1679   GstClockReturn result;
1680   GstClockTime start, end;
1681   GstBaseSrcClass *bclass;
1682   GstClockTime base_time;
1683   GstClock *clock;
1684   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1685   gboolean do_timestamp, first, pseudo_live;
1686
1687   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1688
1689   start = end = -1;
1690   if (bclass->get_times)
1691     bclass->get_times (basesrc, buffer, &start, &end);
1692
1693   /* get buffer timestamp */
1694   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1695
1696   /* grab the lock to prepare for clocking and calculate the startup 
1697    * latency. */
1698   GST_OBJECT_LOCK (basesrc);
1699
1700   /* if we are asked to sync against the clock we are a pseudo live element */
1701   pseudo_live = (start != -1 && basesrc->is_live);
1702   /* check for the first buffer */
1703   first = (basesrc->priv->latency == -1);
1704
1705   if (timestamp != -1 && pseudo_live) {
1706     GstClockTime latency;
1707
1708     /* we have a timestamp and a sync time, latency is the diff */
1709     if (timestamp <= start)
1710       latency = start - timestamp;
1711     else
1712       latency = 0;
1713
1714     if (first) {
1715       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1716           GST_TIME_ARGS (latency));
1717       /* first time we calculate latency, just configure */
1718       basesrc->priv->latency = latency;
1719     } else {
1720       if (basesrc->priv->latency != latency) {
1721         /* we have a new latency, FIXME post latency message */
1722         basesrc->priv->latency = latency;
1723         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1724             GST_TIME_ARGS (latency));
1725       }
1726     }
1727   } else if (first) {
1728     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1729         basesrc->is_live, start != -1);
1730     basesrc->priv->latency = 0;
1731   }
1732
1733   /* get clock, if no clock, we can't sync or do timestamps */
1734   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1735     goto no_clock;
1736
1737   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1738
1739   do_timestamp = basesrc->priv->do_timestamp;
1740
1741   /* first buffer, calculate the timestamp offset */
1742   if (first) {
1743     GstClockTime running_time;
1744
1745     now = gst_clock_get_time (clock);
1746     running_time = now - base_time;
1747
1748     GST_LOG_OBJECT (basesrc,
1749         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1750         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1751         GST_TIME_ARGS (running_time));
1752
1753     if (pseudo_live && timestamp != -1) {
1754       /* live source and we need to sync, add startup latency to all timestamps
1755        * to get the real running_time. Live sources should always timestamp
1756        * according to the current running time. */
1757       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1758
1759       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1760           GST_TIME_ARGS (basesrc->priv->ts_offset));
1761     } else {
1762       basesrc->priv->ts_offset = 0;
1763       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1764     }
1765
1766     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1767       if (do_timestamp)
1768         timestamp = running_time;
1769       else
1770         timestamp = 0;
1771
1772       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1773
1774       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1775           GST_TIME_ARGS (timestamp));
1776     }
1777
1778     /* add the timestamp offset we need for sync */
1779     timestamp += basesrc->priv->ts_offset;
1780   } else {
1781     /* not the first buffer, the timestamp is the diff between the clock and
1782      * base_time */
1783     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1784       now = gst_clock_get_time (clock);
1785
1786       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1787
1788       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1789           GST_TIME_ARGS (now - base_time));
1790     }
1791   }
1792
1793   /* if we don't have a buffer timestamp, we don't sync */
1794   if (!GST_CLOCK_TIME_IS_VALID (start))
1795     goto no_sync;
1796
1797   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1798     /* for pseudo live sources, add our ts_offset to the timestamp */
1799     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1800     start += basesrc->priv->ts_offset;
1801   }
1802
1803   GST_LOG_OBJECT (basesrc,
1804       "waiting for clock, base time %" GST_TIME_FORMAT
1805       ", stream_start %" GST_TIME_FORMAT,
1806       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1807   GST_OBJECT_UNLOCK (basesrc);
1808
1809   result = gst_base_src_wait (basesrc, clock, start + base_time);
1810
1811   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1812
1813   return result;
1814
1815   /* special cases */
1816 no_clock:
1817   {
1818     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1819     GST_OBJECT_UNLOCK (basesrc);
1820     return GST_CLOCK_OK;
1821   }
1822 no_sync:
1823   {
1824     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1825     GST_OBJECT_UNLOCK (basesrc);
1826     return GST_CLOCK_OK;
1827   }
1828 }
1829
1830 static gboolean
1831 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1832 {
1833   guint64 size, maxsize;
1834   GstBaseSrcClass *bclass;
1835
1836   bclass = GST_BASE_SRC_GET_CLASS (src);
1837
1838   /* only operate if we are working with bytes */
1839   if (src->segment.format != GST_FORMAT_BYTES)
1840     return TRUE;
1841
1842   /* get total file size */
1843   size = (guint64) src->segment.duration;
1844
1845   /* the max amount of bytes to read is the total size or
1846    * up to the segment.stop if present. */
1847   if (src->segment.stop != -1)
1848     maxsize = MIN (size, src->segment.stop);
1849   else
1850     maxsize = size;
1851
1852   GST_DEBUG_OBJECT (src,
1853       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1854       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1855       *length, size, src->segment.stop, maxsize);
1856
1857   /* check size if we have one */
1858   if (maxsize != -1) {
1859     /* if we run past the end, check if the file became bigger and 
1860      * retry. */
1861     if (G_UNLIKELY (offset + *length >= maxsize)) {
1862       /* see if length of the file changed */
1863       if (bclass->get_size)
1864         if (!bclass->get_size (src, &size))
1865           size = -1;
1866
1867       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
1868
1869       /* make sure we don't exceed the configured segment stop
1870        * if it was set */
1871       if (src->segment.stop != -1)
1872         maxsize = MIN (size, src->segment.stop);
1873       else
1874         maxsize = size;
1875
1876       /* if we are at or past the end, EOS */
1877       if (G_UNLIKELY (offset >= maxsize))
1878         goto unexpected_length;
1879
1880       /* else we can clip to the end */
1881       if (G_UNLIKELY (offset + *length >= maxsize))
1882         *length = maxsize - offset;
1883
1884     }
1885   }
1886
1887   /* keep track of current position. segment is in bytes, we checked 
1888    * that above. */
1889   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1890
1891   return TRUE;
1892
1893   /* ERRORS */
1894 unexpected_length:
1895   {
1896     return FALSE;
1897   }
1898 }
1899
1900 /* must be called with LIVE_LOCK */
1901 static GstFlowReturn
1902 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1903     GstBuffer ** buf)
1904 {
1905   GstFlowReturn ret;
1906   GstBaseSrcClass *bclass;
1907   GstClockReturn status;
1908
1909   bclass = GST_BASE_SRC_GET_CLASS (src);
1910
1911   if (src->is_live) {
1912     while (G_UNLIKELY (!src->live_running)) {
1913       ret = gst_base_src_wait_playing (src);
1914       if (ret != GST_FLOW_OK)
1915         goto stopped;
1916     }
1917   }
1918
1919   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
1920     goto not_started;
1921
1922   if (G_UNLIKELY (!bclass->create))
1923     goto no_function;
1924
1925   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
1926     goto unexpected_length;
1927
1928   /* normally we don't count buffers */
1929   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
1930     if (src->num_buffers_left == 0)
1931       goto reached_num_buffers;
1932     else
1933       src->num_buffers_left--;
1934   }
1935
1936   /* don't enter the create function if a pending EOS event was set. For the
1937    * logic of the pending_eos, check the event function of this class. */
1938   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
1939     goto eos;
1940
1941   GST_DEBUG_OBJECT (src,
1942       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
1943       G_GINT64_FORMAT, offset, length, src->segment.time);
1944
1945   ret = bclass->create (src, offset, length, buf);
1946
1947   /* The create function could be unlocked because we have a pending EOS. It's
1948    * possible that we have a valid buffer from create that we need to
1949    * discard when the create function returned _OK. */
1950   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
1951     if (ret == GST_FLOW_OK) {
1952       gst_buffer_unref (*buf);
1953       *buf = NULL;
1954     }
1955     goto eos;
1956   }
1957
1958   if (G_UNLIKELY (ret != GST_FLOW_OK))
1959     goto not_ok;
1960
1961   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
1962   if (offset == 0 && src->segment.time == 0
1963       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1964     GST_BUFFER_TIMESTAMP (*buf) = 0;
1965
1966   /* set pad caps on the buffer if the buffer had no caps */
1967   if (GST_BUFFER_CAPS (*buf) == NULL)
1968     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
1969
1970   /* now sync before pushing the buffer */
1971   status = gst_base_src_do_sync (src, *buf);
1972
1973   /* waiting for the clock could have made us flushing */
1974   if (G_UNLIKELY (src->priv->flushing))
1975     goto flushing;
1976
1977   switch (status) {
1978     case GST_CLOCK_EARLY:
1979       /* the buffer is too late. We currently don't drop the buffer. */
1980       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1981       break;
1982     case GST_CLOCK_OK:
1983       /* buffer synchronised properly */
1984       GST_DEBUG_OBJECT (src, "buffer ok");
1985       break;
1986     case GST_CLOCK_UNSCHEDULED:
1987       /* this case is triggered when we were waiting for the clock and
1988        * it got unlocked because we did a state change. We return 
1989        * WRONG_STATE in this case to stop the dataflow also get rid of the
1990        * produced buffer. */
1991       GST_DEBUG_OBJECT (src,
1992           "clock was unscheduled (%d), returning WRONG_STATE", status);
1993       gst_buffer_unref (*buf);
1994       *buf = NULL;
1995       ret = GST_FLOW_WRONG_STATE;
1996       break;
1997     default:
1998       /* all other result values are unexpected and errors */
1999       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2000           (_("Internal clock error.")),
2001           ("clock returned unexpected return value %d", status));
2002       gst_buffer_unref (*buf);
2003       *buf = NULL;
2004       ret = GST_FLOW_ERROR;
2005       break;
2006   }
2007   return ret;
2008
2009   /* ERROR */
2010 stopped:
2011   {
2012     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2013         gst_flow_get_name (ret));
2014     return ret;
2015   }
2016 not_ok:
2017   {
2018     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2019         gst_flow_get_name (ret));
2020     return ret;
2021   }
2022 not_started:
2023   {
2024     GST_DEBUG_OBJECT (src, "getrange but not started");
2025     return GST_FLOW_WRONG_STATE;
2026   }
2027 no_function:
2028   {
2029     GST_DEBUG_OBJECT (src, "no create function");
2030     return GST_FLOW_ERROR;
2031   }
2032 unexpected_length:
2033   {
2034     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2035         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2036     return GST_FLOW_UNEXPECTED;
2037   }
2038 reached_num_buffers:
2039   {
2040     GST_DEBUG_OBJECT (src, "sent all buffers");
2041     return GST_FLOW_UNEXPECTED;
2042   }
2043 flushing:
2044   {
2045     GST_DEBUG_OBJECT (src, "we are flushing");
2046     gst_buffer_unref (*buf);
2047     *buf = NULL;
2048     return GST_FLOW_WRONG_STATE;
2049   }
2050 eos:
2051   {
2052     GST_DEBUG_OBJECT (src, "we are EOS");
2053     return GST_FLOW_UNEXPECTED;
2054   }
2055 }
2056
2057 static GstFlowReturn
2058 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2059     GstBuffer ** buf)
2060 {
2061   GstBaseSrc *src;
2062   GstFlowReturn res;
2063
2064   src = GST_BASE_SRC (gst_pad_get_parent (pad));
2065
2066   GST_LIVE_LOCK (src);
2067   if (G_UNLIKELY (src->priv->flushing))
2068     goto flushing;
2069
2070   res = gst_base_src_get_range (src, offset, length, buf);
2071
2072 done:
2073   GST_LIVE_UNLOCK (src);
2074
2075   gst_object_unref (src);
2076
2077   return res;
2078
2079   /* ERRORS */
2080 flushing:
2081   {
2082     GST_DEBUG_OBJECT (src, "we are flushing");
2083     res = GST_FLOW_WRONG_STATE;
2084     goto done;
2085   }
2086 }
2087
2088 static gboolean
2089 gst_base_src_default_check_get_range (GstBaseSrc * src)
2090 {
2091   gboolean res;
2092
2093   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2094     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2095     if (G_LIKELY (gst_base_src_start (src)))
2096       gst_base_src_stop (src);
2097   }
2098
2099   /* we can operate in getrange mode if the native format is bytes
2100    * and we are seekable, this condition is set in the random_access
2101    * flag and is set in the _start() method. */
2102   res = src->random_access;
2103
2104   return res;
2105 }
2106
2107 static gboolean
2108 gst_base_src_check_get_range (GstBaseSrc * src)
2109 {
2110   GstBaseSrcClass *bclass;
2111   gboolean res;
2112
2113   bclass = GST_BASE_SRC_GET_CLASS (src);
2114
2115   if (bclass->check_get_range == NULL)
2116     goto no_function;
2117
2118   res = bclass->check_get_range (src);
2119   GST_LOG_OBJECT (src, "%s() returned %d",
2120       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2121
2122   return res;
2123
2124   /* ERRORS */
2125 no_function:
2126   {
2127     GST_WARNING_OBJECT (src, "no check_get_range function set");
2128     return FALSE;
2129   }
2130 }
2131
2132 static gboolean
2133 gst_base_src_pad_check_get_range (GstPad * pad)
2134 {
2135   GstBaseSrc *src;
2136   gboolean res;
2137
2138   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2139
2140   res = gst_base_src_check_get_range (src);
2141
2142   return res;
2143 }
2144
2145 static void
2146 gst_base_src_loop (GstPad * pad)
2147 {
2148   GstBaseSrc *src;
2149   GstBuffer *buf = NULL;
2150   GstFlowReturn ret;
2151   gint64 position;
2152   gboolean eos;
2153   gulong blocksize;
2154
2155   eos = FALSE;
2156
2157   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2158
2159   GST_LIVE_LOCK (src);
2160
2161   if (G_UNLIKELY (src->priv->flushing))
2162     goto flushing;
2163
2164   src->priv->last_sent_eos = FALSE;
2165
2166   blocksize = src->blocksize;
2167
2168   /* if we operate in bytes, we can calculate an offset */
2169   if (src->segment.format == GST_FORMAT_BYTES) {
2170     position = src->segment.last_stop;
2171     /* for negative rates, start with subtracting the blocksize */
2172     if (src->segment.rate < 0.0) {
2173       /* we cannot go below segment.start */
2174       if (position > src->segment.start + blocksize)
2175         position -= blocksize;
2176       else {
2177         /* last block, remainder up to segment.start */
2178         blocksize = position - src->segment.start;
2179         position = src->segment.start;
2180       }
2181     }
2182   } else
2183     position = -1;
2184
2185   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2186       GST_TIME_ARGS (position), blocksize);
2187
2188   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2189   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2190     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2191         gst_flow_get_name (ret));
2192     GST_LIVE_UNLOCK (src);
2193     goto pause;
2194   }
2195   /* this should not happen */
2196   if (G_UNLIKELY (buf == NULL))
2197     goto null_buffer;
2198
2199   /* push events to close/start our segment before we push the buffer. */
2200   if (G_UNLIKELY (src->priv->close_segment)) {
2201     gst_pad_push_event (pad, src->priv->close_segment);
2202     src->priv->close_segment = NULL;
2203   }
2204   if (G_UNLIKELY (src->priv->start_segment)) {
2205     gst_pad_push_event (pad, src->priv->start_segment);
2206     src->priv->start_segment = NULL;
2207   }
2208
2209   /* figure out the new position */
2210   switch (src->segment.format) {
2211     case GST_FORMAT_BYTES:
2212     {
2213       guint bufsize = GST_BUFFER_SIZE (buf);
2214
2215       /* we subtracted above for negative rates */
2216       if (src->segment.rate >= 0.0)
2217         position += bufsize;
2218       break;
2219     }
2220     case GST_FORMAT_TIME:
2221     {
2222       GstClockTime start, duration;
2223
2224       start = GST_BUFFER_TIMESTAMP (buf);
2225       duration = GST_BUFFER_DURATION (buf);
2226
2227       if (GST_CLOCK_TIME_IS_VALID (start))
2228         position = start;
2229       else
2230         position = src->segment.last_stop;
2231
2232       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2233         if (src->segment.rate >= 0.0)
2234           position += duration;
2235         else if (position > duration)
2236           position -= duration;
2237         else
2238           position = 0;
2239       }
2240       break;
2241     }
2242     case GST_FORMAT_DEFAULT:
2243       if (src->segment.rate >= 0.0)
2244         position = GST_BUFFER_OFFSET_END (buf);
2245       else
2246         position = GST_BUFFER_OFFSET (buf);
2247       break;
2248     default:
2249       position = -1;
2250       break;
2251   }
2252   if (position != -1) {
2253     if (src->segment.rate >= 0.0) {
2254       /* positive rate, check if we reached the stop */
2255       if (src->segment.stop != -1) {
2256         if (position >= src->segment.stop) {
2257           eos = TRUE;
2258           position = src->segment.stop;
2259         }
2260       }
2261     } else {
2262       /* negative rate, check if we reached the start. start is always set to
2263        * something different from -1 */
2264       if (position <= src->segment.start) {
2265         eos = TRUE;
2266         position = src->segment.start;
2267       }
2268       /* when going reverse, all buffers are DISCONT */
2269       src->priv->discont = TRUE;
2270     }
2271     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2272   }
2273
2274   if (G_UNLIKELY (src->priv->discont)) {
2275     buf = gst_buffer_make_metadata_writable (buf);
2276     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2277     src->priv->discont = FALSE;
2278   }
2279   GST_LIVE_UNLOCK (src);
2280
2281   ret = gst_pad_push (pad, buf);
2282   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2283     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2284         gst_flow_get_name (ret));
2285     goto pause;
2286   }
2287
2288   if (G_UNLIKELY (eos)) {
2289     GST_INFO_OBJECT (src, "pausing after end of segment");
2290     ret = GST_FLOW_UNEXPECTED;
2291     goto pause;
2292   }
2293
2294 done:
2295   return;
2296
2297   /* special cases */
2298 flushing:
2299   {
2300     GST_DEBUG_OBJECT (src, "we are flushing");
2301     GST_LIVE_UNLOCK (src);
2302     ret = GST_FLOW_WRONG_STATE;
2303     goto pause;
2304   }
2305 pause:
2306   {
2307     const gchar *reason = gst_flow_get_name (ret);
2308     GstEvent *event;
2309
2310     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2311     src->data.ABI.running = FALSE;
2312     gst_pad_pause_task (pad);
2313     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2314       if (ret == GST_FLOW_UNEXPECTED) {
2315         /* perform EOS logic */
2316         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
2317           GstMessage *message;
2318
2319           message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2320               src->segment.format, src->segment.last_stop);
2321           gst_message_set_seqnum (message, src->priv->seqnum);
2322           gst_element_post_message (GST_ELEMENT_CAST (src), message);
2323         } else {
2324           event = gst_event_new_eos ();
2325           gst_event_set_seqnum (event, src->priv->seqnum);
2326           gst_pad_push_event (pad, event);
2327           src->priv->last_sent_eos = TRUE;
2328         }
2329       } else {
2330         event = gst_event_new_eos ();
2331         gst_event_set_seqnum (event, src->priv->seqnum);
2332         /* for fatal errors we post an error message, post the error
2333          * first so the app knows about the error first. */
2334         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2335             (_("Internal data flow error.")),
2336             ("streaming task paused, reason %s (%d)", reason, ret));
2337         gst_pad_push_event (pad, event);
2338         src->priv->last_sent_eos = TRUE;
2339       }
2340     }
2341     goto done;
2342   }
2343 null_buffer:
2344   {
2345     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2346         (_("Internal data flow error.")), ("element returned NULL buffer"));
2347     GST_LIVE_UNLOCK (src);
2348     /* we finished the segment on error */
2349     ret = GST_FLOW_ERROR;
2350     goto done;
2351   }
2352 }
2353
2354 /* default negotiation code. 
2355  *
2356  * Take intersection between src and sink pads, take first
2357  * caps and fixate. 
2358  */
2359 static gboolean
2360 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2361 {
2362   GstCaps *thiscaps;
2363   GstCaps *caps = NULL;
2364   GstCaps *peercaps = NULL;
2365   gboolean result = FALSE;
2366
2367   /* first see what is possible on our source pad */
2368   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
2369   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2370   /* nothing or anything is allowed, we're done */
2371   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2372     goto no_nego_needed;
2373
2374   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2375     goto no_caps;
2376
2377   /* get the peer caps */
2378   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
2379   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2380   if (peercaps) {
2381     GstCaps *icaps;
2382
2383     /* get intersection */
2384     icaps = gst_caps_intersect (thiscaps, peercaps);
2385     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2386     gst_caps_unref (thiscaps);
2387     gst_caps_unref (peercaps);
2388     if (icaps) {
2389       /* take first (and best, since they are sorted) possibility */
2390       caps = gst_caps_copy_nth (icaps, 0);
2391       gst_caps_unref (icaps);
2392     }
2393   } else {
2394     /* no peer, work with our own caps then */
2395     caps = thiscaps;
2396   }
2397   if (caps) {
2398     caps = gst_caps_make_writable (caps);
2399     gst_caps_truncate (caps);
2400
2401     /* now fixate */
2402     if (!gst_caps_is_empty (caps)) {
2403       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2404       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2405
2406       if (gst_caps_is_any (caps)) {
2407         /* hmm, still anything, so element can do anything and
2408          * nego is not needed */
2409         result = TRUE;
2410       } else if (gst_caps_is_fixed (caps)) {
2411         /* yay, fixed caps, use those then, it's possible that the subclass does
2412          * not accept this caps after all and we have to fail. */
2413         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2414       }
2415     }
2416     gst_caps_unref (caps);
2417   } else {
2418     GST_DEBUG_OBJECT (basesrc, "no common caps");
2419   }
2420   return result;
2421
2422 no_nego_needed:
2423   {
2424     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2425     if (thiscaps)
2426       gst_caps_unref (thiscaps);
2427     return TRUE;
2428   }
2429 no_caps:
2430   {
2431     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2432         ("No supported formats found"),
2433         ("This element did not produce valid caps"));
2434     if (thiscaps)
2435       gst_caps_unref (thiscaps);
2436     return TRUE;
2437   }
2438 }
2439
2440 static gboolean
2441 gst_base_src_negotiate (GstBaseSrc * basesrc)
2442 {
2443   GstBaseSrcClass *bclass;
2444   gboolean result = TRUE;
2445
2446   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2447
2448   if (bclass->negotiate)
2449     result = bclass->negotiate (basesrc);
2450
2451   return result;
2452 }
2453
2454 static gboolean
2455 gst_base_src_start (GstBaseSrc * basesrc)
2456 {
2457   GstBaseSrcClass *bclass;
2458   gboolean result;
2459   guint64 size;
2460   gboolean seekable;
2461
2462   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2463     return TRUE;
2464
2465   GST_DEBUG_OBJECT (basesrc, "starting source");
2466
2467   basesrc->num_buffers_left = basesrc->num_buffers;
2468
2469   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2470   basesrc->data.ABI.running = FALSE;
2471
2472   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2473   if (bclass->start)
2474     result = bclass->start (basesrc);
2475   else
2476     result = TRUE;
2477
2478   if (!result)
2479     goto could_not_start;
2480
2481   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2482
2483   /* figure out the size */
2484   if (basesrc->segment.format == GST_FORMAT_BYTES) {
2485     if (bclass->get_size) {
2486       if (!(result = bclass->get_size (basesrc, &size)))
2487         size = -1;
2488     } else {
2489       result = FALSE;
2490       size = -1;
2491     }
2492     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2493     /* only update the size when operating in bytes, subclass is supposed
2494      * to set duration in the start method for other formats */
2495     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2496   } else {
2497     size = -1;
2498   }
2499
2500   GST_DEBUG_OBJECT (basesrc,
2501       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2502       G_GINT64_FORMAT, basesrc->segment.format, result, size,
2503       basesrc->segment.duration);
2504
2505   seekable = gst_base_src_seekable (basesrc);
2506   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2507
2508   /* update for random access flag */
2509   basesrc->random_access = seekable &&
2510       basesrc->segment.format == GST_FORMAT_BYTES;
2511
2512   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2513
2514   /* run typefind if we are random_access and the typefinding is enabled. */
2515   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2516     GstCaps *caps;
2517
2518     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2519       goto typefind_failed;
2520
2521     gst_pad_set_caps (basesrc->srcpad, caps);
2522     gst_caps_unref (caps);
2523   } else {
2524     /* use class or default negotiate function */
2525     if (!gst_base_src_negotiate (basesrc))
2526       goto could_not_negotiate;
2527   }
2528
2529   return TRUE;
2530
2531   /* ERROR */
2532 could_not_start:
2533   {
2534     GST_DEBUG_OBJECT (basesrc, "could not start");
2535     /* subclass is supposed to post a message. We don't have to call _stop. */
2536     return FALSE;
2537   }
2538 could_not_negotiate:
2539   {
2540     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2541     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2542         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2543     /* we must call stop */
2544     gst_base_src_stop (basesrc);
2545     return FALSE;
2546   }
2547 typefind_failed:
2548   {
2549     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2550     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2551     /* we must call stop */
2552     gst_base_src_stop (basesrc);
2553     return FALSE;
2554   }
2555 }
2556
2557 static gboolean
2558 gst_base_src_stop (GstBaseSrc * basesrc)
2559 {
2560   GstBaseSrcClass *bclass;
2561   gboolean result = TRUE;
2562
2563   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2564     return TRUE;
2565
2566   GST_DEBUG_OBJECT (basesrc, "stopping source");
2567
2568   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2569   if (bclass->stop)
2570     result = bclass->stop (basesrc);
2571
2572   if (result)
2573     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2574
2575   return result;
2576 }
2577
2578 /* start or stop flushing dataprocessing 
2579  */
2580 static gboolean
2581 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2582     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2583 {
2584   GstBaseSrcClass *bclass;
2585
2586   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2587
2588   if (flushing && unlock) {
2589     /* unlock any subclasses, we need to do this before grabbing the
2590      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2591      * unlock to the params because of backwards compat (see seek handler)*/
2592     if (bclass->unlock)
2593       bclass->unlock (basesrc);
2594   }
2595
2596   /* the live lock is released when we are blocked, waiting for playing or
2597    * when we sync to the clock. */
2598   GST_LIVE_LOCK (basesrc);
2599   if (playing)
2600     *playing = basesrc->live_running;
2601   basesrc->priv->flushing = flushing;
2602   if (flushing) {
2603     /* if we are locked in the live lock, signal it to make it flush */
2604     basesrc->live_running = TRUE;
2605
2606     /* clear pending EOS if any */
2607     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2608
2609     /* step 1, now that we have the LIVE lock, clear our unlock request */
2610     if (bclass->unlock_stop)
2611       bclass->unlock_stop (basesrc);
2612
2613     /* step 2, unblock clock sync (if any) or any other blocking thing */
2614     if (basesrc->clock_id)
2615       gst_clock_id_unschedule (basesrc->clock_id);
2616   } else {
2617     /* signal the live source that it can start playing */
2618     basesrc->live_running = live_play;
2619   }
2620   GST_LIVE_SIGNAL (basesrc);
2621   GST_LIVE_UNLOCK (basesrc);
2622
2623   return TRUE;
2624 }
2625
2626 /* the purpose of this function is to make sure that a live source blocks in the
2627  * LIVE lock or leaves the LIVE lock and continues playing. */
2628 static gboolean
2629 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2630 {
2631   GstBaseSrcClass *bclass;
2632
2633   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2634
2635   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2636   if (!live_play) {
2637     GST_DEBUG_OBJECT (basesrc, "unlock");
2638     if (bclass->unlock)
2639       bclass->unlock (basesrc);
2640   }
2641
2642   /* we are now able to grab the LIVE lock, when we get it, we can be
2643    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2644    * for the clock. */
2645   GST_LIVE_LOCK (basesrc);
2646   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2647
2648   /* unblock clock sync (if any) */
2649   if (basesrc->clock_id)
2650     gst_clock_id_unschedule (basesrc->clock_id);
2651
2652   /* configure what to do when we get to the LIVE lock. */
2653   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2654   basesrc->live_running = live_play;
2655
2656   if (live_play) {
2657     gboolean start;
2658
2659     /* clear our unlock request when going to PLAYING */
2660     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2661     if (bclass->unlock_stop)
2662       bclass->unlock_stop (basesrc);
2663
2664     /* for live sources we restart the timestamp correction */
2665     basesrc->priv->latency = -1;
2666     /* have to restart the task in case it stopped because of the unlock when
2667      * we went to PAUSED. Only do this if we operating in push mode. */
2668     GST_OBJECT_LOCK (basesrc->srcpad);
2669     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2670     GST_OBJECT_UNLOCK (basesrc->srcpad);
2671     if (start)
2672       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2673           basesrc->srcpad);
2674     GST_DEBUG_OBJECT (basesrc, "signal");
2675     GST_LIVE_SIGNAL (basesrc);
2676   }
2677   GST_LIVE_UNLOCK (basesrc);
2678
2679   return TRUE;
2680 }
2681
2682 static gboolean
2683 gst_base_src_activate_push (GstPad * pad, gboolean active)
2684 {
2685   GstBaseSrc *basesrc;
2686   GstEvent *event;
2687
2688   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2689
2690   /* prepare subclass first */
2691   if (active) {
2692     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2693
2694     if (G_UNLIKELY (!basesrc->can_activate_push))
2695       goto no_push_activation;
2696
2697     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2698       goto error_start;
2699
2700     basesrc->priv->last_sent_eos = FALSE;
2701     basesrc->priv->discont = TRUE;
2702     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2703
2704     /* do initial seek, which will start the task */
2705     GST_OBJECT_LOCK (basesrc);
2706     event = basesrc->data.ABI.pending_seek;
2707     basesrc->data.ABI.pending_seek = NULL;
2708     GST_OBJECT_UNLOCK (basesrc);
2709
2710     /* no need to unlock anything, the task is certainly
2711      * not running here. The perform seek code will start the task when
2712      * finished. */
2713     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2714       goto seek_failed;
2715
2716     if (event)
2717       gst_event_unref (event);
2718   } else {
2719     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2720     /* flush all */
2721     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2722     /* stop the task */
2723     gst_pad_stop_task (pad);
2724     /* now we can stop the source */
2725     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2726       goto error_stop;
2727   }
2728   return TRUE;
2729
2730   /* ERRORS */
2731 no_push_activation:
2732   {
2733     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2734     return FALSE;
2735   }
2736 error_start:
2737   {
2738     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2739     return FALSE;
2740   }
2741 seek_failed:
2742   {
2743     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2744     gst_base_src_stop (basesrc);
2745     if (event)
2746       gst_event_unref (event);
2747     return FALSE;
2748   }
2749 error_stop:
2750   {
2751     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2752     return FALSE;
2753   }
2754 }
2755
2756 static gboolean
2757 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2758 {
2759   GstBaseSrc *basesrc;
2760
2761   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2762
2763   /* prepare subclass first */
2764   if (active) {
2765     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2766     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2767       goto error_start;
2768
2769     /* if not random_access, we cannot operate in pull mode for now */
2770     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2771       goto no_get_range;
2772
2773     /* stop flushing now but for live sources, still block in the LIVE lock when
2774      * we are not yet PLAYING */
2775     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2776   } else {
2777     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2778     /* flush all, there is no task to stop */
2779     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2780
2781     /* don't send EOS when going from PAUSED => READY when in pull mode */
2782     basesrc->priv->last_sent_eos = TRUE;
2783
2784     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2785       goto error_stop;
2786   }
2787   return TRUE;
2788
2789   /* ERRORS */
2790 error_start:
2791   {
2792     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2793     return FALSE;
2794   }
2795 no_get_range:
2796   {
2797     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2798     gst_base_src_stop (basesrc);
2799     return FALSE;
2800   }
2801 error_stop:
2802   {
2803     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2804     return FALSE;
2805   }
2806 }
2807
2808 static GstStateChangeReturn
2809 gst_base_src_change_state (GstElement * element, GstStateChange transition)
2810 {
2811   GstBaseSrc *basesrc;
2812   GstStateChangeReturn result;
2813   gboolean no_preroll = FALSE;
2814
2815   basesrc = GST_BASE_SRC (element);
2816
2817   switch (transition) {
2818     case GST_STATE_CHANGE_NULL_TO_READY:
2819       break;
2820     case GST_STATE_CHANGE_READY_TO_PAUSED:
2821       no_preroll = gst_base_src_is_live (basesrc);
2822       break;
2823     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
2824       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
2825       if (gst_base_src_is_live (basesrc)) {
2826         /* now we can start playback */
2827         gst_base_src_set_playing (basesrc, TRUE);
2828       }
2829       break;
2830     default:
2831       break;
2832   }
2833
2834   if ((result =
2835           GST_ELEMENT_CLASS (parent_class)->change_state (element,
2836               transition)) == GST_STATE_CHANGE_FAILURE)
2837     goto failure;
2838
2839   switch (transition) {
2840     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
2841       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
2842       if (gst_base_src_is_live (basesrc)) {
2843         /* make sure we block in the live lock in PAUSED */
2844         gst_base_src_set_playing (basesrc, FALSE);
2845         no_preroll = TRUE;
2846       }
2847       break;
2848     case GST_STATE_CHANGE_PAUSED_TO_READY:
2849     {
2850       GstEvent **event_p, *event;
2851
2852       /* we don't need to unblock anything here, the pad deactivation code
2853        * already did this */
2854
2855       /* FIXME, deprecate this behaviour, it is very dangerous.
2856        * the prefered way of sending EOS downstream is by sending
2857        * the EOS event to the element */
2858       if (!basesrc->priv->last_sent_eos) {
2859         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
2860         event = gst_event_new_eos ();
2861         gst_event_set_seqnum (event, basesrc->priv->seqnum);
2862         gst_pad_push_event (basesrc->srcpad, event);
2863         basesrc->priv->last_sent_eos = TRUE;
2864       }
2865       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2866       event_p = &basesrc->data.ABI.pending_seek;
2867       gst_event_replace (event_p, NULL);
2868       event_p = &basesrc->priv->close_segment;
2869       gst_event_replace (event_p, NULL);
2870       event_p = &basesrc->priv->start_segment;
2871       gst_event_replace (event_p, NULL);
2872       break;
2873     }
2874     case GST_STATE_CHANGE_READY_TO_NULL:
2875       break;
2876     default:
2877       break;
2878   }
2879
2880   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
2881     result = GST_STATE_CHANGE_NO_PREROLL;
2882
2883   return result;
2884
2885   /* ERRORS */
2886 failure:
2887   {
2888     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
2889     return result;
2890   }
2891 }