G_OBJECT_CLASS macro usage batch cleanup, fixes #337747 for core
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEW_SEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
55  * automatically seekable in push mode as well. The following conditions must 
56  * be met to make the element seekable in push mode when the format is not
57  * GST_FORMAT_BYTES:
58  * <itemizedlist>
59  *   <listitem><para>
60  *     #GstBaseSrc::is_seekable returns TRUE.
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrc::query can convert all supported seek formats to the
64  *     internal format as set with gst_base_src_set_format().
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  * </para>
71  * <para>
72  * When the default format is not GST_FORMAT_BYTES, the subclass should ignore 
73  * the offset and length in the #GstBaseSrc::create method. It is recommended 
74  * to subclass #GstPushSrc instead in this situation.
75  * </para>
76  * <para>
77  * #GstBaseSrc has support for live sources. Live sources are sources that 
78  * produce data at a fixed rate, such as audio or video capture devices. A 
79  * typical live source also provides a clock to publish the rate at which 
80  * they produce data.
81  * Use gst_base_src_set_live() to activate the live source mode.
82  * </para>
83  * <para>
84  * A live source does not produce data in the PAUSED state. This means that the 
85  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
86  * To signal the pipeline that the element will not produce data, the return
87  * value from the READY to PAUSED state will be GST_STATE_CHANGE_NO_PREROLL.
88  * </para>
89  * <para>
90  * A typical live source will timestamp the buffers it creates with the 
91  * current stream time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually 
93  * distributed and running.
94  * </para>
95  * <para>
96  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
97  * sources. The base source will wait for the specified stream time returned in 
98  * #GstBaseSrc::get_times before pushing out the buffer. 
99  * It only makes sense to implement the ::get_times function if the source is 
100  * a live source.
101  * </para>
102  * <para>
103  * There is only support in GstBaseSrc for exactly one source pad, which 
104  * should be named "src". A source implementation (subclass of GstBaseSrc) 
105  * should install a pad template in its base_init function, like so:
106  * </para>
107  * <para>
108  * <programlisting>
109  * static void
110  * my_element_base_init (gpointer g_class)
111  * {
112  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
113  *   // srctemplate should be a #GstStaticPadTemplate with direction
114  *   // #GST_PAD_SRC and name "src"
115  *   gst_element_class_add_pad_template (gstelement_class,
116  *       gst_static_pad_template_get (&amp;srctemplate));
117  *   // see #GstElementDetails
118  *   gst_element_class_set_details (gstelement_class, &amp;details);
119  * }
120  * </programlisting>
121  * </para>
122  * <title>Controlled shutdown of live sources in applications</title>
123  * <para>
124  * Applications that record from a live source may want to stop recording
125  * in a controlled way, so that the recording is stopped, but the data
126  * already in the pipeline is processed to the end (remember that many live
127  * sources would go on recording forever otherwise). For that to happen the
128  * application needs to make the source stop recording and send an EOS
129  * event down the pipeline. The application would then wait for an
130  * EOS message posted on the pipeline's bus to know when all data has
131  * been processed and the pipeline can safely be stopped.
132  * </para>
133  * <para>
134  * Since GStreamer 0.10.3 an application may simply set the source
135  * element to NULL or READY state to make it send an EOS event downstream.
136  * The application should lock the state of the source afterwards, so that
137  * shutting down the pipeline from PLAYING doesn't temporarily start up the
138  * source element for a second time:
139  * <programlisting>
140  * ...
141  * // stop recording
142  * gst_element_set_state (audio_source, GST_STATE_NULL);
143  * gst_element_set_locked_state (audio_source, TRUE);
144  * ...
145  * </programlisting>
146  * Now the application should wait for an EOS message
147  * to be posted on the pipeline's bus. Once it has received
148  * an EOS message, it may safely shut down the entire pipeline:
149  * <programlisting>
150  * ...
151  * // everything done - shut down pipeline
152  * gst_element_set_state (pipeline, GST_STATE_NULL);
153  * gst_element_set_locked_state (audio_source, FALSE);
154  * ...
155  * </programlisting>
156  * </para>
157  * <para>
158  * Note that setting the source element to NULL or READY when the 
159  * pipeline is in the PAUSED state may cause a deadlock since the streaming
160  * thread might be blocked in PREROLL.
161  * </para>
162  * <para>
163  * Last reviewed on 2006-04-14 (0.10.6)
164  * </para>
165  * </refsect2>
166  */
167
168 #ifdef HAVE_CONFIG_H
169 #  include "config.h"
170 #endif
171
172 #include <stdlib.h>
173 #include <string.h>
174
175 #include "gstbasesrc.h"
176 #include "gsttypefindhelper.h"
177 #include <gst/gstmarshal.h>
178 #include <gst/gst-i18n-lib.h>
179
180 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
181 #define GST_CAT_DEFAULT gst_base_src_debug
182
183 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
184 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
185 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
186 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
187 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
188 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
189 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
190                                                                                 timeval)
191 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
192 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
193
194 /* BaseSrc signals and args */
195 enum
196 {
197   /* FILL ME */
198   LAST_SIGNAL
199 };
200
201 #define DEFAULT_BLOCKSIZE       4096
202 #define DEFAULT_NUM_BUFFERS     -1
203 #define DEFAULT_TYPEFIND        FALSE
204
205 enum
206 {
207   PROP_0,
208   PROP_BLOCKSIZE,
209   PROP_NUM_BUFFERS,
210   PROP_TYPEFIND,
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
219                                  * to avoid the sending of two EOS in some cases) */
220   gboolean discont;
221
222   /* two segments to be sent in the streaming thread with STREAM_LOCK */
223   GstEvent *close_segment;
224   GstEvent *start_segment;
225 };
226
227 static GstElementClass *parent_class = NULL;
228
229 static void gst_base_src_base_init (gpointer g_class);
230 static void gst_base_src_class_init (GstBaseSrcClass * klass);
231 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
232 static void gst_base_src_finalize (GObject * object);
233
234
235 GType
236 gst_base_src_get_type (void)
237 {
238   static GType base_src_type = 0;
239
240   if (!base_src_type) {
241     static const GTypeInfo base_src_info = {
242       sizeof (GstBaseSrcClass),
243       (GBaseInitFunc) gst_base_src_base_init,
244       NULL,
245       (GClassInitFunc) gst_base_src_class_init,
246       NULL,
247       NULL,
248       sizeof (GstBaseSrc),
249       0,
250       (GInstanceInitFunc) gst_base_src_init,
251     };
252
253     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
254         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
255   }
256   return base_src_type;
257 }
258 static GstCaps *gst_base_src_getcaps (GstPad * pad);
259 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
260
261 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
262 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
263 static void gst_base_src_set_property (GObject * object, guint prop_id,
264     const GValue * value, GParamSpec * pspec);
265 static void gst_base_src_get_property (GObject * object, guint prop_id,
266     GValue * value, GParamSpec * pspec);
267 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
268 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
269 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
270
271 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
272
273 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
274 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
275     GstSegment * segment);
276 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
277
278 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
279 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
280 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
281
282 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
283     GstStateChange transition);
284
285 static void gst_base_src_loop (GstPad * pad);
286 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
287 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
288 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
289     guint length, GstBuffer ** buf);
290 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
291     guint length, GstBuffer ** buf);
292
293 static void
294 gst_base_src_base_init (gpointer g_class)
295 {
296   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
297 }
298
299 static void
300 gst_base_src_class_init (GstBaseSrcClass * klass)
301 {
302   GObjectClass *gobject_class;
303   GstElementClass *gstelement_class;
304
305   gobject_class = G_OBJECT_CLASS (klass);
306   gstelement_class = GST_ELEMENT_CLASS (klass);
307
308   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
309
310   parent_class = g_type_class_peek_parent (klass);
311
312   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
313   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
314   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
315
316   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
317       g_param_spec_ulong ("blocksize", "Block size",
318           "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE,
319           G_PARAM_READWRITE));
320   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
321       g_param_spec_int ("num-buffers", "num-buffers",
322           "Number of buffers to output before sending EOS", -1, G_MAXINT,
323           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
324   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
325       g_param_spec_boolean ("typefind", "Typefind",
326           "Run typefind before negotiating", DEFAULT_TYPEFIND,
327           G_PARAM_READWRITE));
328
329   gstelement_class->change_state =
330       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
331   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
332
333   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
334   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
335   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
336   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
337   klass->check_get_range =
338       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
339 }
340
341 static void
342 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
343 {
344   GstPad *pad;
345   GstPadTemplate *pad_template;
346
347   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
348
349   basesrc->is_live = FALSE;
350   basesrc->live_lock = g_mutex_new ();
351   basesrc->live_cond = g_cond_new ();
352   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
353   basesrc->num_buffers_left = -1;
354
355   basesrc->can_activate_push = TRUE;
356   basesrc->pad_mode = GST_ACTIVATE_NONE;
357
358   pad_template =
359       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
360   g_return_if_fail (pad_template != NULL);
361
362   GST_DEBUG_OBJECT (basesrc, "creating src pad");
363   pad = gst_pad_new_from_template (pad_template, "src");
364
365   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
366   gst_pad_set_activatepush_function (pad,
367       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
368   gst_pad_set_activatepull_function (pad,
369       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
370   gst_pad_set_event_function (pad,
371       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
372   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
373   gst_pad_set_checkgetrange_function (pad,
374       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
375   gst_pad_set_getrange_function (pad,
376       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
377   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
378   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
379
380   /* hold pointer to pad */
381   basesrc->srcpad = pad;
382   GST_DEBUG_OBJECT (basesrc, "adding src pad");
383   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
384
385   basesrc->blocksize = DEFAULT_BLOCKSIZE;
386   basesrc->clock_id = NULL;
387   /* we operate in BYTES by default */
388   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
389   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
390
391   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
392
393   GST_DEBUG_OBJECT (basesrc, "init done");
394 }
395
396 static void
397 gst_base_src_finalize (GObject * object)
398 {
399   GstBaseSrc *basesrc;
400   GstEvent **event_p;
401
402   basesrc = GST_BASE_SRC (object);
403
404   g_mutex_free (basesrc->live_lock);
405   g_cond_free (basesrc->live_cond);
406
407   event_p = &basesrc->data.ABI.pending_seek;
408   gst_event_replace ((GstEvent **) event_p, NULL);
409
410   G_OBJECT_CLASS (parent_class)->finalize (object);
411 }
412
413 /**
414  * gst_base_src_set_live:
415  * @src: base source instance
416  * @live: new live-mode
417  *
418  * If the element listens to a live source, @live should
419  * be set to %TRUE. 
420  *
421  * A live source will not produce data in the PAUSED state and
422  * will therefore not be able to participate in the PREROLL phase
423  * of a pipeline. To signal this fact to the application and the 
424  * pipeline, the state change return value of the live source will
425  * be GST_STATE_CHANGE_NO_PREROLL.
426  */
427 void
428 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
429 {
430   GST_LIVE_LOCK (src);
431   src->is_live = live;
432   GST_LIVE_UNLOCK (src);
433 }
434
435 /**
436  * gst_base_src_is_live:
437  * @src: base source instance
438  *
439  * Check if an element is in live mode.
440  *
441  * Returns: %TRUE if element is in live mode.
442  */
443 gboolean
444 gst_base_src_is_live (GstBaseSrc * src)
445 {
446   gboolean result;
447
448   GST_LIVE_LOCK (src);
449   result = src->is_live;
450   GST_LIVE_UNLOCK (src);
451
452   return result;
453 }
454
455 /**
456  * gst_base_src_set_format:
457  * @src: base source instance
458  * @format: the format to use
459  *
460  * Sets the default format of the source. This will be the format used
461  * for sending NEW_SEGMENT events and for performing seeks.
462  *
463  * If a format of GST_FORMAT_BYTES is set, the element will be able to
464  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
465  *
466  * @Since: 0.10.1
467  */
468 void
469 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
470 {
471   gst_segment_init (&src->segment, format);
472 }
473
474 static gboolean
475 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
476 {
477   GstBaseSrcClass *bclass;
478   GstBaseSrc *bsrc;
479   gboolean res = TRUE;
480
481   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
482   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
483
484   if (bclass->set_caps)
485     res = bclass->set_caps (bsrc, caps);
486
487   return res;
488 }
489
490 static GstCaps *
491 gst_base_src_getcaps (GstPad * pad)
492 {
493   GstBaseSrcClass *bclass;
494   GstBaseSrc *bsrc;
495   GstCaps *caps = NULL;
496
497   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
498   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
499   if (bclass->get_caps)
500     caps = bclass->get_caps (bsrc);
501
502   if (caps == NULL) {
503     GstPadTemplate *pad_template;
504
505     pad_template =
506         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
507     if (pad_template != NULL) {
508       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
509     }
510   }
511   return caps;
512 }
513
514 static gboolean
515 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
516 {
517   gboolean res;
518
519   switch (GST_QUERY_TYPE (query)) {
520     case GST_QUERY_POSITION:
521     {
522       GstFormat format;
523
524       gst_query_parse_position (query, &format, NULL);
525       switch (format) {
526         case GST_FORMAT_PERCENT:
527         {
528           gint64 percent;
529           gint64 position;
530           gint64 duration;
531
532           position = src->segment.last_stop;
533           duration = src->segment.duration;
534
535           if (position != -1 && duration != -1) {
536             if (position < duration)
537               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
538                   duration);
539             else
540               percent = GST_FORMAT_PERCENT_MAX;
541           } else
542             percent = -1;
543
544           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
545           res = TRUE;
546           break;
547         }
548         default:
549         {
550           gint64 position;
551
552           position = src->segment.last_stop;
553
554           if (position != -1) {
555             /* convert to requested format */
556             res =
557                 gst_pad_query_convert (src->srcpad, src->segment.format,
558                 position, &format, &position);
559           } else
560             res = TRUE;
561
562           gst_query_set_position (query, format, position);
563           break;
564         }
565       }
566       break;
567     }
568     case GST_QUERY_DURATION:
569     {
570       GstFormat format;
571
572       gst_query_parse_duration (query, &format, NULL);
573       switch (format) {
574         case GST_FORMAT_PERCENT:
575           gst_query_set_duration (query, GST_FORMAT_PERCENT,
576               GST_FORMAT_PERCENT_MAX);
577           res = TRUE;
578           break;
579         default:
580         {
581           gint64 duration;
582
583           duration = src->segment.duration;
584
585           if (duration != -1) {
586             /* convert to requested format */
587             res =
588                 gst_pad_query_convert (src->srcpad, src->segment.format,
589                 duration, &format, &duration);
590           } else {
591             res = TRUE;
592           }
593           gst_query_set_duration (query, format, duration);
594           break;
595         }
596       }
597       break;
598     }
599
600     case GST_QUERY_SEEKING:
601     {
602       gst_query_set_seeking (query, src->segment.format,
603           src->seekable, 0, src->segment.duration);
604       res = TRUE;
605       break;
606     }
607     case GST_QUERY_SEGMENT:
608     {
609       gint64 start, stop;
610
611       /* no end segment configured, current duration then */
612       if ((stop = src->segment.stop) == -1)
613         stop = src->segment.duration;
614       start = src->segment.start;
615
616       /* adjust to stream time */
617       if (src->segment.time != -1) {
618         start -= src->segment.time;
619         if (stop != -1)
620           stop -= src->segment.time;
621       }
622       gst_query_set_segment (query, src->segment.rate, src->segment.format,
623           start, stop);
624       res = TRUE;
625       break;
626     }
627
628     case GST_QUERY_FORMATS:
629     {
630       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
631           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
632       res = TRUE;
633       break;
634     }
635     case GST_QUERY_LATENCY:
636     case GST_QUERY_JITTER:
637     case GST_QUERY_RATE:
638     case GST_QUERY_CONVERT:
639     {
640       GstFormat src_fmt, dest_fmt;
641       gint64 src_val, dest_val;
642
643       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
644
645       /* we can only convert between equal formats... */
646       if (src_fmt == dest_fmt) {
647         dest_val = src_val;
648         res = TRUE;
649       } else
650         res = FALSE;
651
652       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
653       break;
654     }
655     default:
656       res = FALSE;
657       break;
658   }
659   return res;
660 }
661
662 static gboolean
663 gst_base_src_query (GstPad * pad, GstQuery * query)
664 {
665   GstBaseSrc *src;
666   GstBaseSrcClass *bclass;
667   gboolean result = FALSE;
668
669   src = GST_BASE_SRC (gst_pad_get_parent (pad));
670
671   bclass = GST_BASE_SRC_GET_CLASS (src);
672
673   if (bclass->query)
674     result = bclass->query (src, query);
675   else
676     result = gst_pad_query_default (pad, query);
677
678   gst_object_unref (src);
679
680   return result;
681 }
682
683 static gboolean
684 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
685 {
686   gboolean res = TRUE;
687
688   /* update our offset if the start/stop position was updated */
689   if (segment->format == GST_FORMAT_BYTES) {
690     segment->last_stop = segment->start;
691     segment->time = segment->start;
692   } else if (segment->start == 0) {
693     /* seek to start, we can implement a default for this. */
694     segment->last_stop = 0;
695     segment->time = 0;
696     res = TRUE;
697   } else
698     res = FALSE;
699
700   return res;
701 }
702
703 static gboolean
704 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
705 {
706   GstBaseSrcClass *bclass;
707   gboolean result = FALSE;
708
709   bclass = GST_BASE_SRC_GET_CLASS (src);
710
711   if (bclass->do_seek)
712     result = bclass->do_seek (src, segment);
713
714   return result;
715 }
716
717 /* this code implements the seeking. It is a good example
718  * handling all cases.
719  *
720  * A seek updates the currently configured segment.start
721  * and segment.stop values based on the SEEK_TYPE. If the
722  * segment.start value is updated, a seek to this new position
723  * should be performed.
724  *
725  * The seek can only be executed when we are not currently
726  * streaming any data, to make sure that this is the case, we
727  * acquire the STREAM_LOCK which is taken when we are in the
728  * _loop() function or when a getrange() is called. Normally
729  * we will not receive a seek if we are operating in pull mode
730  * though.
731  *
732  * When we are in the loop() function, we might be in the middle
733  * of pushing a buffer, which might block in a sink. To make sure
734  * that the push gets unblocked we push out a FLUSH_START event.
735  * Our loop function will get a WRONG_STATE return value from
736  * the push and will pause, effectively releasing the STREAM_LOCK.
737  *
738  * For a non-flushing seek, we pause the task, which might eventually
739  * release the STREAM_LOCK. We say eventually because when the sink
740  * blocks on the sample we might wait a very long time until the sink
741  * unblocks the sample. In any case we acquire the STREAM_LOCK and
742  * can continue the seek. A non-flushing seek is normally done in a 
743  * running pipeline to perform seamless playback.
744  * In the case of a non-flushing seek we need to make sure that the
745  * data we output after the seek is continuous with the previous data,
746  * this is because a non-flushing seek does not reset the stream-time
747  * to 0. We do this by closing the currently running segment, ie. sending
748  * a new_segment event with the stop position set to the last processed 
749  * position.
750  *
751  * After updating the segment.start/stop values, we prepare for
752  * streaming again. We push out a FLUSH_STOP to make the peer pad
753  * accept data again and we start our task again.
754  *
755  * A segment seek posts a message on the bus saying that the playback
756  * of the segment started. We store the segment flag internally because
757  * when we reach the segment.stop we have to post a segment.done
758  * instead of EOS when doing a segment seek.
759  */
760 /* FIXME, we have the unlock gboolean here because most current implementations
761  * (fdsrc, -base/gst/tcp/, ...) not only unlock when there is something to
762  * unlock
763  */
764 static gboolean
765 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
766 {
767   gboolean res;
768   gdouble rate;
769   GstFormat format;
770   GstSeekFlags flags;
771   GstSeekType cur_type, stop_type;
772   gint64 cur, stop;
773   gboolean flush;
774   gboolean update;
775   GstSegment seeksegment;
776
777   GST_DEBUG_OBJECT (src, "doing seek");
778
779   if (event) {
780     gst_event_parse_seek (event, &rate, &format, &flags,
781         &cur_type, &cur, &stop_type, &stop);
782
783     /* we have to have a format as the segment format. Try to convert
784      * if not. */
785     if (src->segment.format != format) {
786       GstFormat fmt;
787
788       fmt = src->segment.format;
789       res = TRUE;
790       if (cur_type != GST_SEEK_TYPE_NONE)
791         res = gst_pad_query_convert (src->srcpad, format, cur, &fmt, &cur);
792       if (res && stop_type != GST_SEEK_TYPE_NONE)
793         res = gst_pad_query_convert (src->srcpad, format, stop, &fmt, &stop);
794       if (!res)
795         goto no_format;
796
797       format = fmt;
798     }
799   } else {
800     flags = 0;
801   }
802
803   flush = flags & GST_SEEK_FLAG_FLUSH;
804
805   /* send flush start */
806   if (flush)
807     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
808   else
809     gst_pad_pause_task (src->srcpad);
810
811   /* unblock streaming thread */
812   if (unlock)
813     gst_base_src_unlock (src);
814
815   /* grab streaming lock, this should eventually be possible, either
816    * because the task is paused or our streaming thread stopped 
817    * because our peer is flushing. */
818   GST_PAD_STREAM_LOCK (src->srcpad);
819
820   /* make copy into temp structure, we can only update the main one
821    * when the subclass actually could do the seek. */
822   memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
823
824   /* now configure the seek segment */
825   if (event) {
826     gst_segment_set_seek (&seeksegment, rate, format, flags,
827         cur_type, cur, stop_type, stop, &update);
828   }
829
830   GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
831       " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
832       seeksegment.start, seeksegment.stop, seeksegment.last_stop);
833
834   /* do the seek, segment.last_stop contains new position. */
835   res = gst_base_src_do_seek (src, &seeksegment);
836
837   /* and prepare to continue streaming */
838   if (flush) {
839     /* send flush stop, peer will accept data and events again. We
840      * are not yet providing data as we still have the STREAM_LOCK. */
841     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
842   } else if (res && src->data.ABI.running) {
843     /* we are running the current segment and doing a non-flushing seek, 
844      * close the segment first based on the last_stop. */
845     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
846         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
847
848     if (src->priv->close_segment)
849       gst_event_unref (src->priv->close_segment);
850     src->priv->close_segment =
851         gst_event_new_new_segment_full (TRUE,
852         src->segment.rate, src->segment.applied_rate, src->segment.format,
853         src->segment.start, src->segment.last_stop, src->segment.time);
854   }
855
856   /* if successfull seek, we update our real segment and push
857    * out the new segment. */
858   if (res) {
859     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
860
861     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
862       gst_element_post_message (GST_ELEMENT (src),
863           gst_message_new_segment_start (GST_OBJECT (src),
864               src->segment.format, src->segment.last_stop));
865     }
866
867     if ((stop = src->segment.stop) == -1)
868       stop = src->segment.duration;
869
870     /* now send the newsegment */
871     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
872         " to %" G_GINT64_FORMAT, src->segment.start, stop);
873
874     if (src->priv->start_segment)
875       gst_event_unref (src->priv->start_segment);
876     src->priv->start_segment =
877         gst_event_new_new_segment_full (FALSE,
878         src->segment.rate, src->segment.applied_rate, src->segment.format,
879         src->segment.last_stop, stop, src->segment.time);
880   }
881
882   src->priv->discont = TRUE;
883   src->data.ABI.running = TRUE;
884   /* and restart the task in case it got paused explicitely or by
885    * the FLUSH_START event we pushed out. */
886   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
887       src->srcpad);
888
889   /* and release the lock again so we can continue streaming */
890   GST_PAD_STREAM_UNLOCK (src->srcpad);
891
892   return res;
893
894   /* ERROR */
895 no_format:
896   {
897     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
898     return FALSE;
899   }
900 }
901
902 /* all events send to this element directly
903  */
904 static gboolean
905 gst_base_src_send_event (GstElement * element, GstEvent * event)
906 {
907   GstBaseSrc *src;
908   gboolean result = FALSE;
909
910   src = GST_BASE_SRC (element);
911
912   switch (GST_EVENT_TYPE (event)) {
913     case GST_EVENT_FLUSH_START:
914     case GST_EVENT_FLUSH_STOP:
915       /* sending random flushes downstream can break stuff,
916        * especially sync since all segment info will get flushed */
917       break;
918     case GST_EVENT_EOS:
919       /* FIXME, queue EOS and make sure the task or pull function 
920        * perform the EOS actions. */
921       break;
922     case GST_EVENT_NEWSEGMENT:
923       /* sending random NEWSEGMENT downstream can break sync. */
924       break;
925     case GST_EVENT_TAG:
926     case GST_EVENT_BUFFERSIZE:
927       break;
928     case GST_EVENT_QOS:
929       break;
930     case GST_EVENT_SEEK:
931     {
932       gboolean started;
933
934       GST_OBJECT_LOCK (src->srcpad);
935       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
936         goto wrong_mode;
937       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
938       GST_OBJECT_UNLOCK (src->srcpad);
939
940       if (started) {
941         /* when we are running in push mode, we can execute the
942          * seek right now, we need to unlock. */
943         result = gst_base_src_perform_seek (src, event, TRUE);
944       } else {
945         GstEvent **event_p;
946
947         /* else we store the event and execute the seek when we
948          * get activated */
949         GST_OBJECT_LOCK (src);
950         event_p = &src->data.ABI.pending_seek;
951         gst_event_replace ((GstEvent **) event_p, event);
952         GST_OBJECT_UNLOCK (src);
953         /* assume the seek will work */
954         result = TRUE;
955       }
956       break;
957     }
958     case GST_EVENT_NAVIGATION:
959       break;
960     default:
961       break;
962   }
963 done:
964   gst_event_unref (event);
965
966   return result;
967
968   /* ERRORS */
969 wrong_mode:
970   {
971     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
972     GST_OBJECT_UNLOCK (src->srcpad);
973     result = FALSE;
974     goto done;
975   }
976 }
977
978 static gboolean
979 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
980 {
981   gboolean result;
982
983   switch (GST_EVENT_TYPE (event)) {
984     case GST_EVENT_SEEK:
985       /* is normally called when in push mode */
986       if (!src->seekable)
987         goto not_seekable;
988
989       result = gst_base_src_perform_seek (src, event, TRUE);
990       break;
991     case GST_EVENT_FLUSH_START:
992       /* cancel any blocking getrange, is normally called
993        * when in pull mode. */
994       result = gst_base_src_unlock (src);
995       break;
996     case GST_EVENT_FLUSH_STOP:
997     default:
998       result = TRUE;
999       break;
1000   }
1001   return result;
1002
1003   /* ERRORS */
1004 not_seekable:
1005   {
1006     GST_DEBUG_OBJECT (src, "is not seekable");
1007     return FALSE;
1008   }
1009 }
1010
1011 static gboolean
1012 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1013 {
1014   GstBaseSrc *src;
1015   GstBaseSrcClass *bclass;
1016   gboolean result = FALSE;
1017
1018   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1019   bclass = GST_BASE_SRC_GET_CLASS (src);
1020
1021   if (bclass->event) {
1022     if (!(result = bclass->event (src, event)))
1023       goto subclass_failed;
1024   }
1025
1026 done:
1027   gst_event_unref (event);
1028   gst_object_unref (src);
1029
1030   return result;
1031
1032   /* ERRORS */
1033 subclass_failed:
1034   {
1035     GST_DEBUG_OBJECT (src, "subclass refused event");
1036     goto done;
1037   }
1038 }
1039
1040 static void
1041 gst_base_src_set_property (GObject * object, guint prop_id,
1042     const GValue * value, GParamSpec * pspec)
1043 {
1044   GstBaseSrc *src;
1045
1046   src = GST_BASE_SRC (object);
1047
1048   switch (prop_id) {
1049     case PROP_BLOCKSIZE:
1050       src->blocksize = g_value_get_ulong (value);
1051       break;
1052     case PROP_NUM_BUFFERS:
1053       src->num_buffers = g_value_get_int (value);
1054       break;
1055     case PROP_TYPEFIND:
1056       src->data.ABI.typefind = g_value_get_boolean (value);
1057       break;
1058     default:
1059       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1060       break;
1061   }
1062 }
1063
1064 static void
1065 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1066     GParamSpec * pspec)
1067 {
1068   GstBaseSrc *src;
1069
1070   src = GST_BASE_SRC (object);
1071
1072   switch (prop_id) {
1073     case PROP_BLOCKSIZE:
1074       g_value_set_ulong (value, src->blocksize);
1075       break;
1076     case PROP_NUM_BUFFERS:
1077       g_value_set_int (value, src->num_buffers);
1078       break;
1079     case PROP_TYPEFIND:
1080       g_value_set_boolean (value, src->data.ABI.typefind);
1081       break;
1082     default:
1083       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1084       break;
1085   }
1086 }
1087
1088 /* with STREAM_LOCK and LOCK*/
1089 static GstClockReturn
1090 gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time)
1091 {
1092   GstClockReturn ret;
1093   GstClockID id;
1094   GstClock *clock;
1095
1096   /* get clock, if no clock, we don't sync */
1097   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1098     return GST_CLOCK_OK;
1099
1100   /* clock_id should be NULL outside of this function */
1101   g_assert (basesrc->clock_id == NULL);
1102   g_assert (GST_CLOCK_TIME_IS_VALID (time));
1103
1104   id = gst_clock_new_single_shot_id (clock, time);
1105
1106   basesrc->clock_id = id;
1107   /* release the object lock while waiting */
1108   GST_OBJECT_UNLOCK (basesrc);
1109
1110   ret = gst_clock_id_wait (id, NULL);
1111
1112   GST_OBJECT_LOCK (basesrc);
1113   gst_clock_id_unref (id);
1114   basesrc->clock_id = NULL;
1115
1116   return ret;
1117 }
1118
1119 /* perform synchronisation on a buffer. 
1120  * with STREAM_LOCK.
1121  */
1122 static GstClockReturn
1123 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1124 {
1125   GstClockReturn result;
1126   GstClockTime start, end;
1127   GstBaseSrcClass *bclass;
1128   GstClockTime base_time;
1129
1130   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1131
1132   start = end = -1;
1133   if (bclass->get_times)
1134     bclass->get_times (basesrc, buffer, &start, &end);
1135
1136   /* if we don't have a timestamp, we don't sync */
1137   if (!GST_CLOCK_TIME_IS_VALID (start))
1138     goto invalid_start;
1139
1140   /* now do clocking */
1141   GST_OBJECT_LOCK (basesrc);
1142   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1143
1144   GST_LOG_OBJECT (basesrc,
1145       "waiting for clock, base time %" GST_TIME_FORMAT
1146       ", stream_start %" GST_TIME_FORMAT,
1147       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1148
1149   result = gst_base_src_wait (basesrc, start + base_time);
1150   GST_OBJECT_UNLOCK (basesrc);
1151
1152   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1153
1154   return result;
1155
1156   /* special cases */
1157 invalid_start:
1158   {
1159     GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1160     return GST_CLOCK_OK;
1161   }
1162 }
1163
1164 static gboolean
1165 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1166 {
1167   guint64 size, maxsize;
1168   GstBaseSrcClass *bclass;
1169
1170   bclass = GST_BASE_SRC_GET_CLASS (src);
1171
1172   /* only operate if we are working with bytes */
1173   if (src->segment.format != GST_FORMAT_BYTES)
1174     return TRUE;
1175
1176   size = (guint64) src->segment.duration;
1177
1178   /* the max amount of bytes to read is the total size or
1179    * up to the segment.stop if present. */
1180   if (src->segment.stop != -1)
1181     maxsize = MIN (size, src->segment.stop);
1182   else
1183     maxsize = size;
1184
1185   GST_DEBUG_OBJECT (src,
1186       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1187       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1188       *length, size, src->segment.stop, maxsize);
1189
1190   /* check size */
1191   if (maxsize != -1) {
1192     if (offset > maxsize)
1193       goto unexpected_length;
1194
1195     if (offset + *length > maxsize) {
1196       /* see if length of the file changed */
1197       if (bclass->get_size)
1198         bclass->get_size (src, &size);
1199
1200       if (src->segment.stop != -1)
1201         maxsize = MIN (size, src->segment.stop);
1202       else
1203         maxsize = size;
1204
1205       if (offset + *length > maxsize) {
1206         *length = maxsize - offset;
1207       }
1208     }
1209   }
1210   if (*length == 0)
1211     goto unexpected_length;
1212
1213   return TRUE;
1214
1215   /* ERRORS */
1216 unexpected_length:
1217   {
1218     return FALSE;
1219   }
1220 }
1221
1222 static GstFlowReturn
1223 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1224     GstBuffer ** buf)
1225 {
1226   GstFlowReturn ret;
1227   GstBaseSrcClass *bclass;
1228   GstClockReturn status;
1229
1230   bclass = GST_BASE_SRC_GET_CLASS (src);
1231
1232   GST_LIVE_LOCK (src);
1233   if (src->is_live) {
1234     while (!src->live_running) {
1235       GST_DEBUG ("live source signal waiting");
1236       GST_LIVE_SIGNAL (src);
1237       GST_DEBUG ("live source waiting for running state");
1238       GST_LIVE_WAIT (src);
1239       GST_DEBUG ("live source unlocked");
1240     }
1241     /* FIXME, use another variable to signal stopping */
1242     GST_OBJECT_LOCK (src->srcpad);
1243     if (GST_PAD_IS_FLUSHING (src->srcpad))
1244       goto flushing;
1245     GST_OBJECT_UNLOCK (src->srcpad);
1246   }
1247   GST_LIVE_UNLOCK (src);
1248
1249   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))
1250     goto not_started;
1251
1252   if (G_UNLIKELY (!bclass->create))
1253     goto no_function;
1254
1255   if (!gst_base_src_update_length (src, offset, &length))
1256     goto unexpected_length;
1257
1258   if (src->num_buffers_left == 0) {
1259     goto reached_num_buffers;
1260   } else {
1261     if (src->num_buffers_left > 0)
1262       src->num_buffers_left--;
1263   }
1264
1265   GST_DEBUG_OBJECT (src,
1266       "calling create offset %" G_GUINT64_FORMAT " %" G_GINT64_FORMAT, offset,
1267       src->segment.time);
1268
1269   ret = bclass->create (src, offset, length, buf);
1270   if (ret != GST_FLOW_OK)
1271     goto done;
1272
1273   /* no timestamp set and we are at offset 0 */
1274   if (offset == 0 && src->segment.time == 0
1275       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1276     GST_BUFFER_TIMESTAMP (*buf) = 0;
1277
1278   /* now sync before pushing the buffer */
1279   status = gst_base_src_do_sync (src, *buf);
1280   switch (status) {
1281     case GST_CLOCK_EARLY:
1282       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1283       break;
1284     case GST_CLOCK_OK:
1285       GST_DEBUG_OBJECT (src, "buffer ok");
1286       break;
1287     default:
1288       GST_DEBUG_OBJECT (src, "clock returned %d, not returning", status);
1289       gst_buffer_unref (*buf);
1290       *buf = NULL;
1291       ret = GST_FLOW_WRONG_STATE;
1292       break;
1293   }
1294 done:
1295   return ret;
1296
1297   /* ERROR */
1298 flushing:
1299   {
1300     GST_DEBUG_OBJECT (src, "pad is flushing");
1301     GST_OBJECT_UNLOCK (src->srcpad);
1302     GST_LIVE_UNLOCK (src);
1303     return GST_FLOW_WRONG_STATE;
1304   }
1305 not_started:
1306   {
1307     GST_DEBUG_OBJECT (src, "getrange but not started");
1308     return GST_FLOW_WRONG_STATE;
1309   }
1310 no_function:
1311   {
1312     GST_DEBUG_OBJECT (src, "no create function");
1313     return GST_FLOW_ERROR;
1314   }
1315 unexpected_length:
1316   {
1317     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1318         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1319     return GST_FLOW_UNEXPECTED;
1320   }
1321 reached_num_buffers:
1322   {
1323     GST_DEBUG_OBJECT (src, "sent all buffers");
1324     return GST_FLOW_UNEXPECTED;
1325   }
1326 }
1327
1328 static GstFlowReturn
1329 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1330     GstBuffer ** buf)
1331 {
1332   GstBaseSrc *src;
1333   GstFlowReturn res;
1334
1335   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1336
1337   res = gst_base_src_get_range (src, offset, length, buf);
1338
1339   gst_object_unref (src);
1340
1341   return res;
1342 }
1343
1344 static gboolean
1345 gst_base_src_pad_check_get_range (GstPad * pad)
1346 {
1347   GstBaseSrcClass *bclass;
1348   GstBaseSrc *src;
1349   gboolean res;
1350
1351   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1352
1353   bclass = GST_BASE_SRC_GET_CLASS (src);
1354
1355   if (bclass->check_get_range == NULL) {
1356     GST_WARNING_OBJECT (src, "no check_get_range function set");
1357     return FALSE;
1358   }
1359
1360   res = bclass->check_get_range (src);
1361   GST_LOG_OBJECT (src, "%s() returned %d",
1362       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1363
1364   gst_object_unref (src);
1365
1366   return res;
1367 }
1368
1369 static gboolean
1370 gst_base_src_default_check_get_range (GstBaseSrc * src)
1371 {
1372   gboolean res;
1373
1374   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1375     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1376     gst_base_src_start (src);
1377     gst_base_src_stop (src);
1378   }
1379
1380   /* we can operate in getrange mode if the native format is bytes
1381    * and we are seekable, this condition is set in the random_access
1382    * flag and is set in the _start() method. */
1383   res = src->random_access;
1384
1385   return res;
1386 }
1387
1388 static void
1389 gst_base_src_loop (GstPad * pad)
1390 {
1391   GstBaseSrc *src;
1392   GstBuffer *buf = NULL;
1393   GstFlowReturn ret;
1394   gint64 position;
1395   gboolean eos;
1396
1397   eos = FALSE;
1398
1399   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1400
1401   src->priv->last_sent_eos = FALSE;
1402
1403   /* push events to close/start our segment before we do the real work. */
1404   if (src->priv->close_segment) {
1405     gst_pad_push_event (pad, src->priv->close_segment);
1406     src->priv->close_segment = NULL;
1407   }
1408   if (src->priv->start_segment) {
1409     gst_pad_push_event (pad, src->priv->start_segment);
1410     src->priv->start_segment = NULL;
1411   }
1412
1413   /* if we operate in bytes, we can calculate an offset */
1414   if (src->segment.format == GST_FORMAT_BYTES)
1415     position = src->segment.last_stop;
1416   else
1417     position = -1;
1418
1419   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1420   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1421     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() =  %d", ret);
1422     goto pause;
1423   }
1424   if (G_UNLIKELY (buf == NULL))
1425     goto null_buffer;
1426
1427   /* figure out the new position */
1428   switch (src->segment.format) {
1429     case GST_FORMAT_BYTES:
1430       position += GST_BUFFER_SIZE (buf);
1431       break;
1432     case GST_FORMAT_TIME:
1433     {
1434       GstClockTime start, duration;
1435
1436       start = GST_BUFFER_TIMESTAMP (buf);
1437       duration = GST_BUFFER_DURATION (buf);
1438
1439       if (GST_CLOCK_TIME_IS_VALID (start))
1440         position = start;
1441       else
1442         position = src->segment.last_stop;
1443
1444       if (GST_CLOCK_TIME_IS_VALID (duration))
1445         position += duration;
1446       break;
1447     }
1448     case GST_FORMAT_DEFAULT:
1449       position = GST_BUFFER_OFFSET_END (buf);
1450       break;
1451     default:
1452       position = -1;
1453       break;
1454   }
1455   if (position != -1) {
1456     if (src->segment.stop != -1) {
1457       if (position >= src->segment.stop) {
1458         eos = TRUE;
1459         position = src->segment.stop;
1460       }
1461     }
1462     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1463   }
1464
1465   if (G_UNLIKELY (src->priv->discont)) {
1466     buf = gst_buffer_make_metadata_writable (buf);
1467     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1468     src->priv->discont = FALSE;
1469   }
1470
1471   ret = gst_pad_push (pad, buf);
1472   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1473     GST_INFO_OBJECT (src, "pausing after gst_pad_push() =  %d", ret);
1474     goto pause;
1475   }
1476
1477   if (eos) {
1478     GST_INFO_OBJECT (src, "pausing after EOS");
1479     ret = GST_FLOW_UNEXPECTED;
1480     goto pause;
1481   }
1482
1483 done:
1484   gst_object_unref (src);
1485   return;
1486
1487   /* special cases */
1488 pause:
1489   {
1490     const gchar *reason = gst_flow_get_name (ret);
1491
1492     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1493     src->data.ABI.running = FALSE;
1494     gst_pad_pause_task (pad);
1495     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1496       if (ret == GST_FLOW_UNEXPECTED) {
1497         /* perform EOS logic */
1498         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1499           gst_element_post_message (GST_ELEMENT (src),
1500               gst_message_new_segment_done (GST_OBJECT (src),
1501                   src->segment.format, src->segment.last_stop));
1502         } else {
1503           gst_pad_push_event (pad, gst_event_new_eos ());
1504           src->priv->last_sent_eos = TRUE;
1505         }
1506       } else {
1507         /* for fatal errors we post an error message */
1508         GST_ELEMENT_ERROR (src, STREAM, FAILED,
1509             (_("Internal data flow error.")),
1510             ("streaming task paused, reason %s", reason));
1511         gst_pad_push_event (pad, gst_event_new_eos ());
1512         src->priv->last_sent_eos = TRUE;
1513       }
1514     }
1515     goto done;
1516   }
1517 null_buffer:
1518   {
1519     GST_ELEMENT_ERROR (src, STREAM, FAILED,
1520         (_("Internal data flow error.")), ("element returned NULL buffer"));
1521     /* we finished the segment on error */
1522     src->data.ABI.running = FALSE;
1523     gst_pad_pause_task (pad);
1524     gst_pad_push_event (pad, gst_event_new_eos ());
1525     src->priv->last_sent_eos = TRUE;
1526     goto done;
1527   }
1528 }
1529
1530 /* this will always be called between start() and stop(). So you can rely on
1531  * resources allocated by start() and freed from stop(). This needs to be added
1532  * to the docs at some point. */
1533 static gboolean
1534 gst_base_src_unlock (GstBaseSrc * basesrc)
1535 {
1536   GstBaseSrcClass *bclass;
1537   gboolean result = TRUE;
1538
1539   GST_DEBUG ("unlock");
1540   /* unblock whatever the subclass is doing */
1541   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1542   if (bclass->unlock)
1543     result = bclass->unlock (basesrc);
1544
1545   GST_DEBUG ("unschedule clock");
1546   /* and unblock the clock as well, if any */
1547   GST_OBJECT_LOCK (basesrc);
1548   if (basesrc->clock_id) {
1549     gst_clock_id_unschedule (basesrc->clock_id);
1550   }
1551   GST_OBJECT_UNLOCK (basesrc);
1552
1553   GST_DEBUG ("unlock done");
1554
1555   return result;
1556 }
1557
1558 /* default negotiation code. 
1559  *
1560  * Take intersection between src and sink pads, take first
1561  * caps and fixate. 
1562  */
1563 static gboolean
1564 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
1565 {
1566   GstCaps *thiscaps;
1567   GstCaps *caps = NULL;
1568   GstCaps *peercaps = NULL;
1569   gboolean result = FALSE;
1570
1571   /* first see what is possible on our source pad */
1572   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
1573   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
1574   /* nothing or anything is allowed, we're done */
1575   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
1576     goto no_nego_needed;
1577
1578   /* get the peer caps */
1579   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
1580   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
1581   if (peercaps) {
1582     GstCaps *icaps;
1583
1584     /* get intersection */
1585     icaps = gst_caps_intersect (thiscaps, peercaps);
1586     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
1587     gst_caps_unref (thiscaps);
1588     gst_caps_unref (peercaps);
1589     if (icaps) {
1590       /* take first (and best, since they are sorted) possibility */
1591       caps = gst_caps_copy_nth (icaps, 0);
1592       gst_caps_unref (icaps);
1593     }
1594   } else {
1595     /* no peer, work with our own caps then */
1596     caps = thiscaps;
1597   }
1598   if (caps) {
1599     caps = gst_caps_make_writable (caps);
1600     gst_caps_truncate (caps);
1601
1602     /* now fixate */
1603     if (!gst_caps_is_empty (caps)) {
1604       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
1605       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
1606
1607       if (gst_caps_is_any (caps)) {
1608         /* hmm, still anything, so element can do anything and
1609          * nego is not needed */
1610         result = TRUE;
1611       } else if (gst_caps_is_fixed (caps)) {
1612         /* yay, fixed caps, use those then */
1613         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
1614         result = TRUE;
1615       }
1616     }
1617     gst_caps_unref (caps);
1618   }
1619   return result;
1620
1621 no_nego_needed:
1622   {
1623     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
1624     if (thiscaps)
1625       gst_caps_unref (thiscaps);
1626     return TRUE;
1627   }
1628 }
1629
1630 static gboolean
1631 gst_base_src_negotiate (GstBaseSrc * basesrc)
1632 {
1633   GstBaseSrcClass *bclass;
1634   gboolean result = TRUE;
1635
1636   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1637
1638   if (bclass->negotiate)
1639     result = bclass->negotiate (basesrc);
1640
1641   return result;
1642 }
1643
1644 static gboolean
1645 gst_base_src_start (GstBaseSrc * basesrc)
1646 {
1647   GstBaseSrcClass *bclass;
1648   gboolean result;
1649   guint64 size;
1650
1651   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1652     return TRUE;
1653
1654   GST_DEBUG_OBJECT (basesrc, "starting source");
1655
1656   basesrc->num_buffers_left = basesrc->num_buffers;
1657
1658   gst_segment_init (&basesrc->segment, basesrc->segment.format);
1659   basesrc->data.ABI.running = FALSE;
1660
1661   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1662   if (bclass->start)
1663     result = bclass->start (basesrc);
1664   else
1665     result = TRUE;
1666
1667   if (!result)
1668     goto could_not_start;
1669
1670   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
1671
1672   /* figure out the size */
1673   if (basesrc->segment.format == GST_FORMAT_BYTES) {
1674     if (bclass->get_size) {
1675       if (!(result = bclass->get_size (basesrc, &size)))
1676         size = -1;
1677     } else {
1678       result = FALSE;
1679       size = -1;
1680     }
1681     /* only update the size when operating in bytes, subclass is supposed
1682      * to set duration in the start method for other formats */
1683     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
1684   }
1685
1686   GST_DEBUG_OBJECT (basesrc,
1687       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
1688       G_GINT64_FORMAT, basesrc->segment.format, result, size,
1689       basesrc->segment.duration);
1690
1691   /* check if we can seek */
1692   if (bclass->is_seekable)
1693     basesrc->seekable = bclass->is_seekable (basesrc);
1694   else
1695     basesrc->seekable = FALSE;
1696
1697   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
1698
1699   /* update for random access flag */
1700   basesrc->random_access = basesrc->seekable &&
1701       basesrc->segment.format == GST_FORMAT_BYTES;
1702
1703   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
1704
1705   /* run typefind if we are random_access and the typefinding is enabled. */
1706   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
1707     GstCaps *caps;
1708
1709     caps = gst_type_find_helper (basesrc->srcpad, size);
1710     gst_pad_set_caps (basesrc->srcpad, caps);
1711     gst_caps_unref (caps);
1712   } else {
1713     /* use class or default negotiate function */
1714     if (!gst_base_src_negotiate (basesrc))
1715       goto could_not_negotiate;
1716   }
1717
1718   return TRUE;
1719
1720   /* ERROR */
1721 could_not_start:
1722   {
1723     GST_DEBUG_OBJECT (basesrc, "could not start");
1724     return FALSE;
1725   }
1726 could_not_negotiate:
1727   {
1728     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
1729     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
1730         ("Could not negotiate format"), ("Check your filtered caps, if any"));
1731     gst_base_src_stop (basesrc);
1732     return FALSE;
1733   }
1734 }
1735
1736 static gboolean
1737 gst_base_src_stop (GstBaseSrc * basesrc)
1738 {
1739   GstBaseSrcClass *bclass;
1740   gboolean result = TRUE;
1741
1742   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1743     return TRUE;
1744
1745   GST_DEBUG_OBJECT (basesrc, "stopping source");
1746
1747   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1748   if (bclass->stop)
1749     result = bclass->stop (basesrc);
1750
1751   if (result)
1752     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
1753
1754   return result;
1755 }
1756
1757 static gboolean
1758 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
1759 {
1760   gboolean result;
1761
1762   GST_LIVE_LOCK (basesrc);
1763   basesrc->live_running = TRUE;
1764   GST_LIVE_SIGNAL (basesrc);
1765   GST_LIVE_UNLOCK (basesrc);
1766
1767   /* step 1, unblock clock sync (if any) */
1768   result = gst_base_src_unlock (basesrc);
1769
1770   /* step 2, make sure streaming finishes */
1771   result &= gst_pad_stop_task (pad);
1772
1773   return result;
1774 }
1775
1776 static gboolean
1777 gst_base_src_activate_push (GstPad * pad, gboolean active)
1778 {
1779   GstBaseSrc *basesrc;
1780   gboolean res;
1781
1782   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1783
1784   /* prepare subclass first */
1785   if (active) {
1786     GstEvent *event;
1787
1788     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
1789
1790     if (!basesrc->can_activate_push)
1791       goto no_push_activation;
1792
1793     if (!gst_base_src_start (basesrc))
1794       goto error_start;
1795
1796     basesrc->priv->last_sent_eos = FALSE;
1797
1798     /* do initial seek, which will start the task */
1799     GST_OBJECT_LOCK (basesrc);
1800     event = basesrc->data.ABI.pending_seek;
1801     basesrc->data.ABI.pending_seek = NULL;
1802     GST_OBJECT_UNLOCK (basesrc);
1803
1804     /* no need to unlock anything, the task is certainly
1805      * not running here. */
1806     res = gst_base_src_perform_seek (basesrc, event, FALSE);
1807
1808     if (event)
1809       gst_event_unref (event);
1810   } else {
1811     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
1812     res = gst_base_src_deactivate (basesrc, pad);
1813   }
1814   return res;
1815
1816   /* ERRORS */
1817 no_push_activation:
1818   {
1819     GST_DEBUG_OBJECT (basesrc, "Subclass disabled push-mode activation");
1820     return FALSE;
1821   }
1822 error_start:
1823   {
1824     gst_base_src_stop (basesrc);
1825     GST_DEBUG_OBJECT (basesrc, "Failed to start in push mode");
1826     return FALSE;
1827   }
1828 }
1829
1830 static gboolean
1831 gst_base_src_activate_pull (GstPad * pad, gboolean active)
1832 {
1833   GstBaseSrc *basesrc;
1834
1835   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1836
1837   /* prepare subclass first */
1838   if (active) {
1839     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
1840     if (!gst_base_src_start (basesrc))
1841       goto error_start;
1842
1843     /* if not random_access, we cannot operate in pull mode for now */
1844     if (!basesrc->random_access) {
1845       gst_base_src_stop (basesrc);
1846       return FALSE;
1847     }
1848     return TRUE;
1849   } else {
1850     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
1851
1852     /* don't send EOS when going from PAUSED => READY when in pull mode */
1853     basesrc->priv->last_sent_eos = TRUE;
1854
1855     if (!gst_base_src_stop (basesrc))
1856       goto error_stop;
1857
1858     return gst_base_src_deactivate (basesrc, pad);
1859   }
1860
1861 error_start:
1862   {
1863     gst_base_src_stop (basesrc);
1864     GST_DEBUG_OBJECT (basesrc, "Failed to start in pull mode");
1865     return FALSE;
1866   }
1867 error_stop:
1868   {
1869     GST_DEBUG_OBJECT (basesrc, "Failed to stop in pull mode");
1870     return FALSE;
1871   }
1872 }
1873
1874 static GstStateChangeReturn
1875 gst_base_src_change_state (GstElement * element, GstStateChange transition)
1876 {
1877   GstBaseSrc *basesrc;
1878   GstStateChangeReturn result;
1879   gboolean no_preroll = FALSE;
1880
1881   basesrc = GST_BASE_SRC (element);
1882
1883   switch (transition) {
1884     case GST_STATE_CHANGE_NULL_TO_READY:
1885       break;
1886     case GST_STATE_CHANGE_READY_TO_PAUSED:
1887       GST_LIVE_LOCK (element);
1888       if (basesrc->is_live) {
1889         no_preroll = TRUE;
1890         basesrc->live_running = FALSE;
1891       }
1892       basesrc->priv->last_sent_eos = FALSE;
1893       basesrc->priv->discont = TRUE;
1894       GST_LIVE_UNLOCK (element);
1895       break;
1896     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1897       GST_LIVE_LOCK (element);
1898       if (basesrc->is_live) {
1899         basesrc->live_running = TRUE;
1900         GST_LIVE_SIGNAL (element);
1901       }
1902       GST_LIVE_UNLOCK (element);
1903       break;
1904     default:
1905       break;
1906   }
1907
1908   if ((result =
1909           GST_ELEMENT_CLASS (parent_class)->change_state (element,
1910               transition)) == GST_STATE_CHANGE_FAILURE)
1911     goto failure;
1912
1913   switch (transition) {
1914     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1915       GST_LIVE_LOCK (element);
1916       if (basesrc->is_live) {
1917         no_preroll = TRUE;
1918         basesrc->live_running = FALSE;
1919       }
1920       GST_LIVE_UNLOCK (element);
1921       break;
1922     case GST_STATE_CHANGE_PAUSED_TO_READY:
1923     {
1924       GstEvent **event_p;
1925
1926       if (!gst_base_src_stop (basesrc))
1927         goto error_stop;
1928
1929       /* FIXME, deprecate this behaviour, it is very dangerous.
1930        * the prefered way of sending EOS downstream is by sending
1931        * the EOS event to the element */
1932       if (!basesrc->priv->last_sent_eos) {
1933         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
1934         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
1935         basesrc->priv->last_sent_eos = TRUE;
1936       }
1937       event_p = &basesrc->data.ABI.pending_seek;
1938       gst_event_replace (event_p, NULL);
1939       event_p = &basesrc->priv->close_segment;
1940       gst_event_replace (event_p, NULL);
1941       event_p = &basesrc->priv->start_segment;
1942       gst_event_replace (event_p, NULL);
1943       break;
1944     }
1945     case GST_STATE_CHANGE_READY_TO_NULL:
1946       break;
1947     default:
1948       break;
1949   }
1950
1951   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
1952     result = GST_STATE_CHANGE_NO_PREROLL;
1953
1954   return result;
1955
1956   /* ERRORS */
1957 failure:
1958   {
1959     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
1960     gst_base_src_stop (basesrc);
1961     return result;
1962   }
1963 error_stop:
1964   {
1965     GST_DEBUG_OBJECT (basesrc, "Failed to stop");
1966     return GST_STATE_CHANGE_FAILURE;
1967   }
1968 }