Add G_UNLIKELY in type registration.
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * <refsect2>
37  * <para>
38  * The source can be configured to operate in any #GstFormat with the
39  * gst_base_src_set_format() method. The currently set format determines 
40  * the format of the internal #GstSegment and any #GST_EVENT_NEW_SEGMENT 
41  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
42  * </para>
43  * <para>
44  * #GstBaseSrc always supports push mode scheduling. If the following
45  * conditions are met, it also supports pull mode scheduling:
46  * <itemizedlist>
47  *   <listitem><para>The format is set to GST_FORMAT_BYTES (default).</para>
48  *   </listitem>
49  *   <listitem><para>#GstBaseSrc::is_seekable returns TRUE.</para>
50  *   </listitem>
51  * </itemizedlist>
52  * </para>
53  * <para>
54  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
55  * automatically seekable in push mode as well. The following conditions must 
56  * be met to make the element seekable in push mode when the format is not
57  * GST_FORMAT_BYTES:
58  * <itemizedlist>
59  *   <listitem><para>
60  *     #GstBaseSrc::is_seekable returns TRUE.
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrc::query can convert all supported seek formats to the
64  *     internal format as set with gst_base_src_set_format().
65  *   </para></listitem>
66  *   <listitem><para>
67  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  * </para>
71  * <para>
72  * When the default format is not GST_FORMAT_BYTES, the subclass should ignore 
73  * the offset and length in the #GstBaseSrc::create method. It is recommended 
74  * to subclass #GstPushSrc instead in this situation.
75  * </para>
76  * <para>
77  * #GstBaseSrc has support for live sources. Live sources are sources that 
78  * produce data at a fixed rate, such as audio or video capture devices. A 
79  * typical live source also provides a clock to publish the rate at which 
80  * they produce data.
81  * Use gst_base_src_set_live() to activate the live source mode.
82  * </para>
83  * <para>
84  * A live source does not produce data in the PAUSED state. This means that the 
85  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
86  * To signal the pipeline that the element will not produce data, the return
87  * value from the READY to PAUSED state will be GST_STATE_CHANGE_NO_PREROLL.
88  * </para>
89  * <para>
90  * A typical live source will timestamp the buffers it creates with the 
91  * current stream time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually 
93  * distributed and running.
94  * </para>
95  * <para>
96  * The #GstBaseSrc::get_times method can be used to implement pseudo-live 
97  * sources. The base source will wait for the specified stream time returned in 
98  * #GstBaseSrc::get_times before pushing out the buffer. 
99  * It only makes sense to implement the ::get_times function if the source is 
100  * a live source.
101  * </para>
102  * <para>
103  * There is only support in GstBaseSrc for exactly one source pad, which 
104  * should be named "src". A source implementation (subclass of GstBaseSrc) 
105  * should install a pad template in its base_init function, like so:
106  * </para>
107  * <para>
108  * <programlisting>
109  * static void
110  * my_element_base_init (gpointer g_class)
111  * {
112  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
113  *   // srctemplate should be a #GstStaticPadTemplate with direction
114  *   // #GST_PAD_SRC and name "src"
115  *   gst_element_class_add_pad_template (gstelement_class,
116  *       gst_static_pad_template_get (&amp;srctemplate));
117  *   // see #GstElementDetails
118  *   gst_element_class_set_details (gstelement_class, &amp;details);
119  * }
120  * </programlisting>
121  * </para>
122  * <title>Controlled shutdown of live sources in applications</title>
123  * <para>
124  * Applications that record from a live source may want to stop recording
125  * in a controlled way, so that the recording is stopped, but the data
126  * already in the pipeline is processed to the end (remember that many live
127  * sources would go on recording forever otherwise). For that to happen the
128  * application needs to make the source stop recording and send an EOS
129  * event down the pipeline. The application would then wait for an
130  * EOS message posted on the pipeline's bus to know when all data has
131  * been processed and the pipeline can safely be stopped.
132  * </para>
133  * <para>
134  * Since GStreamer 0.10.3 an application may simply set the source
135  * element to NULL or READY state to make it send an EOS event downstream.
136  * The application should lock the state of the source afterwards, so that
137  * shutting down the pipeline from PLAYING doesn't temporarily start up the
138  * source element for a second time:
139  * <programlisting>
140  * ...
141  * // stop recording
142  * gst_element_set_state (audio_source, GST_STATE_NULL);
143  * gst_element_set_locked_state (audio_source, TRUE);
144  * ...
145  * </programlisting>
146  * Now the application should wait for an EOS message
147  * to be posted on the pipeline's bus. Once it has received
148  * an EOS message, it may safely shut down the entire pipeline:
149  * <programlisting>
150  * ...
151  * // everything done - shut down pipeline
152  * gst_element_set_state (pipeline, GST_STATE_NULL);
153  * gst_element_set_locked_state (audio_source, FALSE);
154  * ...
155  * </programlisting>
156  * </para>
157  * <para>
158  * Note that setting the source element to NULL or READY when the 
159  * pipeline is in the PAUSED state may cause a deadlock since the streaming
160  * thread might be blocked in PREROLL.
161  * </para>
162  * <para>
163  * Last reviewed on 2006-04-14 (0.10.6)
164  * </para>
165  * </refsect2>
166  */
167
168 #ifdef HAVE_CONFIG_H
169 #  include "config.h"
170 #endif
171
172 #include <stdlib.h>
173 #include <string.h>
174
175 #include "gstbasesrc.h"
176 #include "gsttypefindhelper.h"
177 #include <gst/gstmarshal.h>
178 #include <gst/gst-i18n-lib.h>
179
180 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
181 #define GST_CAT_DEFAULT gst_base_src_debug
182
183 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
184 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
185 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
186 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
187 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
188 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
189 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
190                                                                                 timeval)
191 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
192 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
193
194 /* BaseSrc signals and args */
195 enum
196 {
197   /* FILL ME */
198   LAST_SIGNAL
199 };
200
201 #define DEFAULT_BLOCKSIZE       4096
202 #define DEFAULT_NUM_BUFFERS     -1
203 #define DEFAULT_TYPEFIND        FALSE
204
205 enum
206 {
207   PROP_0,
208   PROP_BLOCKSIZE,
209   PROP_NUM_BUFFERS,
210   PROP_TYPEFIND,
211 };
212
213 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
214    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
215
216 struct _GstBaseSrcPrivate
217 {
218   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
219                                  * to avoid the sending of two EOS in some cases) */
220   gboolean discont;
221
222   /* two segments to be sent in the streaming thread with STREAM_LOCK */
223   GstEvent *close_segment;
224   GstEvent *start_segment;
225 };
226
227 static GstElementClass *parent_class = NULL;
228
229 static void gst_base_src_base_init (gpointer g_class);
230 static void gst_base_src_class_init (GstBaseSrcClass * klass);
231 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
232 static void gst_base_src_finalize (GObject * object);
233
234
235 GType
236 gst_base_src_get_type (void)
237 {
238   static GType base_src_type = 0;
239
240   if (G_UNLIKELY (base_src_type == 0)) {
241     static const GTypeInfo base_src_info = {
242       sizeof (GstBaseSrcClass),
243       (GBaseInitFunc) gst_base_src_base_init,
244       NULL,
245       (GClassInitFunc) gst_base_src_class_init,
246       NULL,
247       NULL,
248       sizeof (GstBaseSrc),
249       0,
250       (GInstanceInitFunc) gst_base_src_init,
251     };
252
253     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
254         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
255   }
256   return base_src_type;
257 }
258 static GstCaps *gst_base_src_getcaps (GstPad * pad);
259 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
260
261 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
262 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
263 static void gst_base_src_set_property (GObject * object, guint prop_id,
264     const GValue * value, GParamSpec * pspec);
265 static void gst_base_src_get_property (GObject * object, guint prop_id,
266     GValue * value, GParamSpec * pspec);
267 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
268 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
269 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
270 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
271
272 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
273
274 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
275 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
276     GstSegment * segment);
277 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
278
279 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
280 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
281 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
282
283 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
284     GstStateChange transition);
285
286 static void gst_base_src_loop (GstPad * pad);
287 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
288 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
289 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
290     guint length, GstBuffer ** buf);
291 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
292     guint length, GstBuffer ** buf);
293
294 static void
295 gst_base_src_base_init (gpointer g_class)
296 {
297   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
298 }
299
300 static void
301 gst_base_src_class_init (GstBaseSrcClass * klass)
302 {
303   GObjectClass *gobject_class;
304   GstElementClass *gstelement_class;
305
306   gobject_class = G_OBJECT_CLASS (klass);
307   gstelement_class = GST_ELEMENT_CLASS (klass);
308
309   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
310
311   parent_class = g_type_class_peek_parent (klass);
312
313   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
314   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
315   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
316
317   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
318       g_param_spec_ulong ("blocksize", "Block size",
319           "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE,
320           G_PARAM_READWRITE));
321   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
322       g_param_spec_int ("num-buffers", "num-buffers",
323           "Number of buffers to output before sending EOS", -1, G_MAXINT,
324           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
325   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
326       g_param_spec_boolean ("typefind", "Typefind",
327           "Run typefind before negotiating", DEFAULT_TYPEFIND,
328           G_PARAM_READWRITE));
329
330   gstelement_class->change_state =
331       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
332   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
333   gstelement_class->get_query_types =
334       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
335
336   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
337   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
338   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
339   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
340   klass->check_get_range =
341       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
342 }
343
344 static void
345 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
346 {
347   GstPad *pad;
348   GstPadTemplate *pad_template;
349
350   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
351
352   basesrc->is_live = FALSE;
353   basesrc->live_lock = g_mutex_new ();
354   basesrc->live_cond = g_cond_new ();
355   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
356   basesrc->num_buffers_left = -1;
357
358   basesrc->can_activate_push = TRUE;
359   basesrc->pad_mode = GST_ACTIVATE_NONE;
360
361   pad_template =
362       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
363   g_return_if_fail (pad_template != NULL);
364
365   GST_DEBUG_OBJECT (basesrc, "creating src pad");
366   pad = gst_pad_new_from_template (pad_template, "src");
367
368   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
369   gst_pad_set_activatepush_function (pad,
370       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
371   gst_pad_set_activatepull_function (pad,
372       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
373   gst_pad_set_event_function (pad,
374       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
375   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
376   gst_pad_set_checkgetrange_function (pad,
377       GST_DEBUG_FUNCPTR (gst_base_src_pad_check_get_range));
378   gst_pad_set_getrange_function (pad,
379       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
380   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
381   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
382
383   /* hold pointer to pad */
384   basesrc->srcpad = pad;
385   GST_DEBUG_OBJECT (basesrc, "adding src pad");
386   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
387
388   basesrc->blocksize = DEFAULT_BLOCKSIZE;
389   basesrc->clock_id = NULL;
390   /* we operate in BYTES by default */
391   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
392   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
393
394   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
395
396   GST_DEBUG_OBJECT (basesrc, "init done");
397 }
398
399 static void
400 gst_base_src_finalize (GObject * object)
401 {
402   GstBaseSrc *basesrc;
403   GstEvent **event_p;
404
405   basesrc = GST_BASE_SRC (object);
406
407   g_mutex_free (basesrc->live_lock);
408   g_cond_free (basesrc->live_cond);
409
410   event_p = &basesrc->data.ABI.pending_seek;
411   gst_event_replace ((GstEvent **) event_p, NULL);
412
413   G_OBJECT_CLASS (parent_class)->finalize (object);
414 }
415
416 /**
417  * gst_base_src_set_live:
418  * @src: base source instance
419  * @live: new live-mode
420  *
421  * If the element listens to a live source, @live should
422  * be set to %TRUE. 
423  *
424  * A live source will not produce data in the PAUSED state and
425  * will therefore not be able to participate in the PREROLL phase
426  * of a pipeline. To signal this fact to the application and the 
427  * pipeline, the state change return value of the live source will
428  * be GST_STATE_CHANGE_NO_PREROLL.
429  */
430 void
431 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
432 {
433   GST_LIVE_LOCK (src);
434   src->is_live = live;
435   GST_LIVE_UNLOCK (src);
436 }
437
438 /**
439  * gst_base_src_is_live:
440  * @src: base source instance
441  *
442  * Check if an element is in live mode.
443  *
444  * Returns: %TRUE if element is in live mode.
445  */
446 gboolean
447 gst_base_src_is_live (GstBaseSrc * src)
448 {
449   gboolean result;
450
451   GST_LIVE_LOCK (src);
452   result = src->is_live;
453   GST_LIVE_UNLOCK (src);
454
455   return result;
456 }
457
458 /**
459  * gst_base_src_set_format:
460  * @src: base source instance
461  * @format: the format to use
462  *
463  * Sets the default format of the source. This will be the format used
464  * for sending NEW_SEGMENT events and for performing seeks.
465  *
466  * If a format of GST_FORMAT_BYTES is set, the element will be able to
467  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
468  *
469  * @Since: 0.10.1
470  */
471 void
472 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
473 {
474   gst_segment_init (&src->segment, format);
475 }
476
477 static gboolean
478 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
479 {
480   GstBaseSrcClass *bclass;
481   GstBaseSrc *bsrc;
482   gboolean res = TRUE;
483
484   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
485   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
486
487   if (bclass->set_caps)
488     res = bclass->set_caps (bsrc, caps);
489
490   return res;
491 }
492
493 static GstCaps *
494 gst_base_src_getcaps (GstPad * pad)
495 {
496   GstBaseSrcClass *bclass;
497   GstBaseSrc *bsrc;
498   GstCaps *caps = NULL;
499
500   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
501   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
502   if (bclass->get_caps)
503     caps = bclass->get_caps (bsrc);
504
505   if (caps == NULL) {
506     GstPadTemplate *pad_template;
507
508     pad_template =
509         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
510     if (pad_template != NULL) {
511       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
512     }
513   }
514   return caps;
515 }
516
517 static gboolean
518 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
519 {
520   gboolean res;
521
522   switch (GST_QUERY_TYPE (query)) {
523     case GST_QUERY_POSITION:
524     {
525       GstFormat format;
526
527       gst_query_parse_position (query, &format, NULL);
528       switch (format) {
529         case GST_FORMAT_PERCENT:
530         {
531           gint64 percent;
532           gint64 position;
533           gint64 duration;
534
535           position = src->segment.last_stop;
536           duration = src->segment.duration;
537
538           if (position != -1 && duration != -1) {
539             if (position < duration)
540               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
541                   duration);
542             else
543               percent = GST_FORMAT_PERCENT_MAX;
544           } else
545             percent = -1;
546
547           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
548           res = TRUE;
549           break;
550         }
551         default:
552         {
553           gint64 position;
554
555           position = src->segment.last_stop;
556
557           if (position != -1) {
558             /* convert to requested format */
559             res =
560                 gst_pad_query_convert (src->srcpad, src->segment.format,
561                 position, &format, &position);
562           } else
563             res = TRUE;
564
565           gst_query_set_position (query, format, position);
566           break;
567         }
568       }
569       break;
570     }
571     case GST_QUERY_DURATION:
572     {
573       GstFormat format;
574
575       gst_query_parse_duration (query, &format, NULL);
576       switch (format) {
577         case GST_FORMAT_PERCENT:
578           gst_query_set_duration (query, GST_FORMAT_PERCENT,
579               GST_FORMAT_PERCENT_MAX);
580           res = TRUE;
581           break;
582         default:
583         {
584           gint64 duration;
585
586           duration = src->segment.duration;
587
588           if (duration != -1) {
589             /* convert to requested format */
590             res =
591                 gst_pad_query_convert (src->srcpad, src->segment.format,
592                 duration, &format, &duration);
593           } else {
594             res = TRUE;
595           }
596           gst_query_set_duration (query, format, duration);
597           break;
598         }
599       }
600       break;
601     }
602
603     case GST_QUERY_SEEKING:
604     {
605       gst_query_set_seeking (query, src->segment.format,
606           src->seekable, 0, src->segment.duration);
607       res = TRUE;
608       break;
609     }
610     case GST_QUERY_SEGMENT:
611     {
612       gint64 start, stop;
613
614       /* no end segment configured, current duration then */
615       if ((stop = src->segment.stop) == -1)
616         stop = src->segment.duration;
617       start = src->segment.start;
618
619       /* adjust to stream time */
620       if (src->segment.time != -1) {
621         start -= src->segment.time;
622         if (stop != -1)
623           stop -= src->segment.time;
624       }
625       gst_query_set_segment (query, src->segment.rate, src->segment.format,
626           start, stop);
627       res = TRUE;
628       break;
629     }
630
631     case GST_QUERY_FORMATS:
632     {
633       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
634           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
635       res = TRUE;
636       break;
637     }
638     case GST_QUERY_LATENCY:
639     case GST_QUERY_JITTER:
640     case GST_QUERY_RATE:
641     case GST_QUERY_CONVERT:
642     {
643       GstFormat src_fmt, dest_fmt;
644       gint64 src_val, dest_val;
645
646       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
647
648       /* we can only convert between equal formats... */
649       if (src_fmt == dest_fmt) {
650         dest_val = src_val;
651         res = TRUE;
652       } else
653         res = FALSE;
654
655       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
656       break;
657     }
658     default:
659       res = FALSE;
660       break;
661   }
662   return res;
663 }
664
665 static gboolean
666 gst_base_src_query (GstPad * pad, GstQuery * query)
667 {
668   GstBaseSrc *src;
669   GstBaseSrcClass *bclass;
670   gboolean result = FALSE;
671
672   src = GST_BASE_SRC (gst_pad_get_parent (pad));
673
674   bclass = GST_BASE_SRC_GET_CLASS (src);
675
676   if (bclass->query)
677     result = bclass->query (src, query);
678   else
679     result = gst_pad_query_default (pad, query);
680
681   gst_object_unref (src);
682
683   return result;
684 }
685
686 static gboolean
687 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
688 {
689   gboolean res = TRUE;
690
691   /* update our offset if the start/stop position was updated */
692   if (segment->format == GST_FORMAT_BYTES) {
693     segment->last_stop = segment->start;
694     segment->time = segment->start;
695   } else if (segment->start == 0) {
696     /* seek to start, we can implement a default for this. */
697     segment->last_stop = 0;
698     segment->time = 0;
699     res = TRUE;
700   } else
701     res = FALSE;
702
703   return res;
704 }
705
706 static gboolean
707 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
708 {
709   GstBaseSrcClass *bclass;
710   gboolean result = FALSE;
711
712   bclass = GST_BASE_SRC_GET_CLASS (src);
713
714   if (bclass->do_seek)
715     result = bclass->do_seek (src, segment);
716
717   return result;
718 }
719
720 /* this code implements the seeking. It is a good example
721  * handling all cases.
722  *
723  * A seek updates the currently configured segment.start
724  * and segment.stop values based on the SEEK_TYPE. If the
725  * segment.start value is updated, a seek to this new position
726  * should be performed.
727  *
728  * The seek can only be executed when we are not currently
729  * streaming any data, to make sure that this is the case, we
730  * acquire the STREAM_LOCK which is taken when we are in the
731  * _loop() function or when a getrange() is called. Normally
732  * we will not receive a seek if we are operating in pull mode
733  * though.
734  *
735  * When we are in the loop() function, we might be in the middle
736  * of pushing a buffer, which might block in a sink. To make sure
737  * that the push gets unblocked we push out a FLUSH_START event.
738  * Our loop function will get a WRONG_STATE return value from
739  * the push and will pause, effectively releasing the STREAM_LOCK.
740  *
741  * For a non-flushing seek, we pause the task, which might eventually
742  * release the STREAM_LOCK. We say eventually because when the sink
743  * blocks on the sample we might wait a very long time until the sink
744  * unblocks the sample. In any case we acquire the STREAM_LOCK and
745  * can continue the seek. A non-flushing seek is normally done in a 
746  * running pipeline to perform seamless playback.
747  * In the case of a non-flushing seek we need to make sure that the
748  * data we output after the seek is continuous with the previous data,
749  * this is because a non-flushing seek does not reset the stream-time
750  * to 0. We do this by closing the currently running segment, ie. sending
751  * a new_segment event with the stop position set to the last processed 
752  * position.
753  *
754  * After updating the segment.start/stop values, we prepare for
755  * streaming again. We push out a FLUSH_STOP to make the peer pad
756  * accept data again and we start our task again.
757  *
758  * A segment seek posts a message on the bus saying that the playback
759  * of the segment started. We store the segment flag internally because
760  * when we reach the segment.stop we have to post a segment.done
761  * instead of EOS when doing a segment seek.
762  */
763 /* FIXME, we have the unlock gboolean here because most current implementations
764  * (fdsrc, -base/gst/tcp/, ...) not only unlock when there is something to
765  * unlock
766  */
767 static gboolean
768 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
769 {
770   gboolean res;
771   gdouble rate;
772   GstFormat format;
773   GstSeekFlags flags;
774   GstSeekType cur_type, stop_type;
775   gint64 cur, stop;
776   gboolean flush;
777   gboolean update;
778   GstSegment seeksegment;
779
780   GST_DEBUG_OBJECT (src, "doing seek");
781
782   if (event) {
783     gst_event_parse_seek (event, &rate, &format, &flags,
784         &cur_type, &cur, &stop_type, &stop);
785
786     /* we have to have a format as the segment format. Try to convert
787      * if not. */
788     if (src->segment.format != format) {
789       GstFormat fmt;
790
791       fmt = src->segment.format;
792       res = TRUE;
793       if (cur_type != GST_SEEK_TYPE_NONE)
794         res = gst_pad_query_convert (src->srcpad, format, cur, &fmt, &cur);
795       if (res && stop_type != GST_SEEK_TYPE_NONE)
796         res = gst_pad_query_convert (src->srcpad, format, stop, &fmt, &stop);
797       if (!res)
798         goto no_format;
799
800       format = fmt;
801     }
802   } else {
803     flags = 0;
804   }
805
806   flush = flags & GST_SEEK_FLAG_FLUSH;
807
808   /* send flush start */
809   if (flush)
810     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
811   else
812     gst_pad_pause_task (src->srcpad);
813
814   /* unblock streaming thread */
815   if (unlock)
816     gst_base_src_unlock (src);
817
818   /* grab streaming lock, this should eventually be possible, either
819    * because the task is paused or our streaming thread stopped 
820    * because our peer is flushing. */
821   GST_PAD_STREAM_LOCK (src->srcpad);
822
823   /* make copy into temp structure, we can only update the main one
824    * when the subclass actually could do the seek. */
825   memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
826
827   /* now configure the seek segment */
828   if (event) {
829     gst_segment_set_seek (&seeksegment, rate, format, flags,
830         cur_type, cur, stop_type, stop, &update);
831   }
832
833   GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
834       " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
835       seeksegment.start, seeksegment.stop, seeksegment.last_stop);
836
837   /* do the seek, segment.last_stop contains new position. */
838   res = gst_base_src_do_seek (src, &seeksegment);
839
840   /* and prepare to continue streaming */
841   if (flush) {
842     /* send flush stop, peer will accept data and events again. We
843      * are not yet providing data as we still have the STREAM_LOCK. */
844     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
845   } else if (res && src->data.ABI.running) {
846     /* we are running the current segment and doing a non-flushing seek, 
847      * close the segment first based on the last_stop. */
848     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
849         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
850
851     if (src->priv->close_segment)
852       gst_event_unref (src->priv->close_segment);
853     src->priv->close_segment =
854         gst_event_new_new_segment_full (TRUE,
855         src->segment.rate, src->segment.applied_rate, src->segment.format,
856         src->segment.start, src->segment.last_stop, src->segment.time);
857   }
858
859   /* if successfull seek, we update our real segment and push
860    * out the new segment. */
861   if (res) {
862     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
863
864     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
865       gst_element_post_message (GST_ELEMENT (src),
866           gst_message_new_segment_start (GST_OBJECT (src),
867               src->segment.format, src->segment.last_stop));
868     }
869
870     if ((stop = src->segment.stop) == -1)
871       stop = src->segment.duration;
872
873     /* now send the newsegment */
874     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
875         " to %" G_GINT64_FORMAT, src->segment.start, stop);
876
877     if (src->priv->start_segment)
878       gst_event_unref (src->priv->start_segment);
879     src->priv->start_segment =
880         gst_event_new_new_segment_full (FALSE,
881         src->segment.rate, src->segment.applied_rate, src->segment.format,
882         src->segment.last_stop, stop, src->segment.time);
883   }
884
885   src->priv->discont = TRUE;
886   src->data.ABI.running = TRUE;
887   /* and restart the task in case it got paused explicitely or by
888    * the FLUSH_START event we pushed out. */
889   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
890       src->srcpad);
891
892   /* and release the lock again so we can continue streaming */
893   GST_PAD_STREAM_UNLOCK (src->srcpad);
894
895   return res;
896
897   /* ERROR */
898 no_format:
899   {
900     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
901     return FALSE;
902   }
903 }
904
905 static const GstQueryType *
906 gst_base_src_get_query_types (GstElement * element)
907 {
908   static const GstQueryType query_types[] = {
909     GST_QUERY_DURATION,
910     GST_QUERY_POSITION,
911     GST_QUERY_SEEKING,
912     GST_QUERY_SEGMENT,
913     GST_QUERY_FORMATS,
914     GST_QUERY_LATENCY,
915     GST_QUERY_JITTER,
916     GST_QUERY_RATE,
917     GST_QUERY_CONVERT,
918     0
919   };
920
921   return query_types;
922 }
923
924 /* all events send to this element directly
925  */
926 static gboolean
927 gst_base_src_send_event (GstElement * element, GstEvent * event)
928 {
929   GstBaseSrc *src;
930   gboolean result = FALSE;
931
932   src = GST_BASE_SRC (element);
933
934   switch (GST_EVENT_TYPE (event)) {
935     case GST_EVENT_FLUSH_START:
936     case GST_EVENT_FLUSH_STOP:
937       /* sending random flushes downstream can break stuff,
938        * especially sync since all segment info will get flushed */
939       break;
940     case GST_EVENT_EOS:
941       /* FIXME, queue EOS and make sure the task or pull function 
942        * perform the EOS actions. */
943       break;
944     case GST_EVENT_NEWSEGMENT:
945       /* sending random NEWSEGMENT downstream can break sync. */
946       break;
947     case GST_EVENT_TAG:
948     case GST_EVENT_BUFFERSIZE:
949       break;
950     case GST_EVENT_QOS:
951       break;
952     case GST_EVENT_SEEK:
953     {
954       gboolean started;
955
956       GST_OBJECT_LOCK (src->srcpad);
957       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
958         goto wrong_mode;
959       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
960       GST_OBJECT_UNLOCK (src->srcpad);
961
962       if (started) {
963         /* when we are running in push mode, we can execute the
964          * seek right now, we need to unlock. */
965         result = gst_base_src_perform_seek (src, event, TRUE);
966       } else {
967         GstEvent **event_p;
968
969         /* else we store the event and execute the seek when we
970          * get activated */
971         GST_OBJECT_LOCK (src);
972         event_p = &src->data.ABI.pending_seek;
973         gst_event_replace ((GstEvent **) event_p, event);
974         GST_OBJECT_UNLOCK (src);
975         /* assume the seek will work */
976         result = TRUE;
977       }
978       break;
979     }
980     case GST_EVENT_NAVIGATION:
981       break;
982     default:
983       break;
984   }
985 done:
986   gst_event_unref (event);
987
988   return result;
989
990   /* ERRORS */
991 wrong_mode:
992   {
993     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
994     GST_OBJECT_UNLOCK (src->srcpad);
995     result = FALSE;
996     goto done;
997   }
998 }
999
1000 static gboolean
1001 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1002 {
1003   gboolean result;
1004
1005   switch (GST_EVENT_TYPE (event)) {
1006     case GST_EVENT_SEEK:
1007       /* is normally called when in push mode */
1008       if (!src->seekable)
1009         goto not_seekable;
1010
1011       result = gst_base_src_perform_seek (src, event, TRUE);
1012       break;
1013     case GST_EVENT_FLUSH_START:
1014       /* cancel any blocking getrange, is normally called
1015        * when in pull mode. */
1016       result = gst_base_src_unlock (src);
1017       break;
1018     case GST_EVENT_FLUSH_STOP:
1019     default:
1020       result = TRUE;
1021       break;
1022   }
1023   return result;
1024
1025   /* ERRORS */
1026 not_seekable:
1027   {
1028     GST_DEBUG_OBJECT (src, "is not seekable");
1029     return FALSE;
1030   }
1031 }
1032
1033 static gboolean
1034 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1035 {
1036   GstBaseSrc *src;
1037   GstBaseSrcClass *bclass;
1038   gboolean result = FALSE;
1039
1040   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1041   bclass = GST_BASE_SRC_GET_CLASS (src);
1042
1043   if (bclass->event) {
1044     if (!(result = bclass->event (src, event)))
1045       goto subclass_failed;
1046   }
1047
1048 done:
1049   gst_event_unref (event);
1050   gst_object_unref (src);
1051
1052   return result;
1053
1054   /* ERRORS */
1055 subclass_failed:
1056   {
1057     GST_DEBUG_OBJECT (src, "subclass refused event");
1058     goto done;
1059   }
1060 }
1061
1062 static void
1063 gst_base_src_set_property (GObject * object, guint prop_id,
1064     const GValue * value, GParamSpec * pspec)
1065 {
1066   GstBaseSrc *src;
1067
1068   src = GST_BASE_SRC (object);
1069
1070   switch (prop_id) {
1071     case PROP_BLOCKSIZE:
1072       src->blocksize = g_value_get_ulong (value);
1073       break;
1074     case PROP_NUM_BUFFERS:
1075       src->num_buffers = g_value_get_int (value);
1076       break;
1077     case PROP_TYPEFIND:
1078       src->data.ABI.typefind = g_value_get_boolean (value);
1079       break;
1080     default:
1081       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1082       break;
1083   }
1084 }
1085
1086 static void
1087 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1088     GParamSpec * pspec)
1089 {
1090   GstBaseSrc *src;
1091
1092   src = GST_BASE_SRC (object);
1093
1094   switch (prop_id) {
1095     case PROP_BLOCKSIZE:
1096       g_value_set_ulong (value, src->blocksize);
1097       break;
1098     case PROP_NUM_BUFFERS:
1099       g_value_set_int (value, src->num_buffers);
1100       break;
1101     case PROP_TYPEFIND:
1102       g_value_set_boolean (value, src->data.ABI.typefind);
1103       break;
1104     default:
1105       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1106       break;
1107   }
1108 }
1109
1110 /* with STREAM_LOCK and LOCK*/
1111 static GstClockReturn
1112 gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time)
1113 {
1114   GstClockReturn ret;
1115   GstClockID id;
1116   GstClock *clock;
1117
1118   /* get clock, if no clock, we don't sync */
1119   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1120     return GST_CLOCK_OK;
1121
1122   /* clock_id should be NULL outside of this function */
1123   g_assert (basesrc->clock_id == NULL);
1124   g_assert (GST_CLOCK_TIME_IS_VALID (time));
1125
1126   id = gst_clock_new_single_shot_id (clock, time);
1127
1128   basesrc->clock_id = id;
1129   /* release the object lock while waiting */
1130   GST_OBJECT_UNLOCK (basesrc);
1131
1132   ret = gst_clock_id_wait (id, NULL);
1133
1134   GST_OBJECT_LOCK (basesrc);
1135   gst_clock_id_unref (id);
1136   basesrc->clock_id = NULL;
1137
1138   return ret;
1139 }
1140
1141 /* perform synchronisation on a buffer. 
1142  * with STREAM_LOCK.
1143  */
1144 static GstClockReturn
1145 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1146 {
1147   GstClockReturn result;
1148   GstClockTime start, end;
1149   GstBaseSrcClass *bclass;
1150   GstClockTime base_time;
1151
1152   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1153
1154   start = end = -1;
1155   if (bclass->get_times)
1156     bclass->get_times (basesrc, buffer, &start, &end);
1157
1158   /* if we don't have a timestamp, we don't sync */
1159   if (!GST_CLOCK_TIME_IS_VALID (start))
1160     goto invalid_start;
1161
1162   /* now do clocking */
1163   GST_OBJECT_LOCK (basesrc);
1164   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1165
1166   GST_LOG_OBJECT (basesrc,
1167       "waiting for clock, base time %" GST_TIME_FORMAT
1168       ", stream_start %" GST_TIME_FORMAT,
1169       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1170
1171   result = gst_base_src_wait (basesrc, start + base_time);
1172   GST_OBJECT_UNLOCK (basesrc);
1173
1174   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1175
1176   return result;
1177
1178   /* special cases */
1179 invalid_start:
1180   {
1181     GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1182     return GST_CLOCK_OK;
1183   }
1184 }
1185
1186 static gboolean
1187 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1188 {
1189   guint64 size, maxsize;
1190   GstBaseSrcClass *bclass;
1191
1192   bclass = GST_BASE_SRC_GET_CLASS (src);
1193
1194   /* only operate if we are working with bytes */
1195   if (src->segment.format != GST_FORMAT_BYTES)
1196     return TRUE;
1197
1198   size = (guint64) src->segment.duration;
1199
1200   /* the max amount of bytes to read is the total size or
1201    * up to the segment.stop if present. */
1202   if (src->segment.stop != -1)
1203     maxsize = MIN (size, src->segment.stop);
1204   else
1205     maxsize = size;
1206
1207   GST_DEBUG_OBJECT (src,
1208       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1209       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1210       *length, size, src->segment.stop, maxsize);
1211
1212   /* check size */
1213   if (maxsize != -1) {
1214     if (offset > maxsize)
1215       goto unexpected_length;
1216
1217     if (offset + *length > maxsize) {
1218       /* see if length of the file changed */
1219       if (bclass->get_size)
1220         bclass->get_size (src, &size);
1221
1222       if (src->segment.stop != -1)
1223         maxsize = MIN (size, src->segment.stop);
1224       else
1225         maxsize = size;
1226
1227       if (offset + *length > maxsize) {
1228         *length = maxsize - offset;
1229       }
1230     }
1231   }
1232   if (*length == 0)
1233     goto unexpected_length;
1234
1235   /* keep track f current position. segment is in bytes, we checked 
1236    * that above. */
1237   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
1238
1239   return TRUE;
1240
1241   /* ERRORS */
1242 unexpected_length:
1243   {
1244     return FALSE;
1245   }
1246 }
1247
1248 static GstFlowReturn
1249 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1250     GstBuffer ** buf)
1251 {
1252   GstFlowReturn ret;
1253   GstBaseSrcClass *bclass;
1254   GstClockReturn status;
1255
1256   bclass = GST_BASE_SRC_GET_CLASS (src);
1257
1258   GST_LIVE_LOCK (src);
1259   if (src->is_live) {
1260     while (!src->live_running) {
1261       GST_DEBUG ("live source signal waiting");
1262       GST_LIVE_SIGNAL (src);
1263       GST_DEBUG ("live source waiting for running state");
1264       GST_LIVE_WAIT (src);
1265       GST_DEBUG ("live source unlocked");
1266     }
1267     /* FIXME, use another variable to signal stopping */
1268     GST_OBJECT_LOCK (src->srcpad);
1269     if (GST_PAD_IS_FLUSHING (src->srcpad))
1270       goto flushing;
1271     GST_OBJECT_UNLOCK (src->srcpad);
1272   }
1273   GST_LIVE_UNLOCK (src);
1274
1275   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))
1276     goto not_started;
1277
1278   if (G_UNLIKELY (!bclass->create))
1279     goto no_function;
1280
1281   if (!gst_base_src_update_length (src, offset, &length))
1282     goto unexpected_length;
1283
1284   if (src->num_buffers_left == 0) {
1285     goto reached_num_buffers;
1286   } else {
1287     if (src->num_buffers_left > 0)
1288       src->num_buffers_left--;
1289   }
1290
1291   GST_DEBUG_OBJECT (src,
1292       "calling create offset %" G_GUINT64_FORMAT " %" G_GINT64_FORMAT, offset,
1293       src->segment.time);
1294
1295   ret = bclass->create (src, offset, length, buf);
1296   if (ret != GST_FLOW_OK)
1297     goto done;
1298
1299   /* no timestamp set and we are at offset 0 */
1300   if (offset == 0 && src->segment.time == 0
1301       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1302     GST_BUFFER_TIMESTAMP (*buf) = 0;
1303
1304   /* now sync before pushing the buffer */
1305   status = gst_base_src_do_sync (src, *buf);
1306   switch (status) {
1307     case GST_CLOCK_EARLY:
1308       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1309       break;
1310     case GST_CLOCK_OK:
1311       GST_DEBUG_OBJECT (src, "buffer ok");
1312       break;
1313     default:
1314       GST_DEBUG_OBJECT (src, "clock returned %d, not returning", status);
1315       gst_buffer_unref (*buf);
1316       *buf = NULL;
1317       ret = GST_FLOW_WRONG_STATE;
1318       break;
1319   }
1320 done:
1321   return ret;
1322
1323   /* ERROR */
1324 flushing:
1325   {
1326     GST_DEBUG_OBJECT (src, "pad is flushing");
1327     GST_OBJECT_UNLOCK (src->srcpad);
1328     GST_LIVE_UNLOCK (src);
1329     return GST_FLOW_WRONG_STATE;
1330   }
1331 not_started:
1332   {
1333     GST_DEBUG_OBJECT (src, "getrange but not started");
1334     return GST_FLOW_WRONG_STATE;
1335   }
1336 no_function:
1337   {
1338     GST_DEBUG_OBJECT (src, "no create function");
1339     return GST_FLOW_ERROR;
1340   }
1341 unexpected_length:
1342   {
1343     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1344         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1345     return GST_FLOW_UNEXPECTED;
1346   }
1347 reached_num_buffers:
1348   {
1349     GST_DEBUG_OBJECT (src, "sent all buffers");
1350     return GST_FLOW_UNEXPECTED;
1351   }
1352 }
1353
1354 static GstFlowReturn
1355 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1356     GstBuffer ** buf)
1357 {
1358   GstBaseSrc *src;
1359   GstFlowReturn res;
1360
1361   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1362
1363   res = gst_base_src_get_range (src, offset, length, buf);
1364
1365   gst_object_unref (src);
1366
1367   return res;
1368 }
1369
1370 static gboolean
1371 gst_base_src_pad_check_get_range (GstPad * pad)
1372 {
1373   GstBaseSrcClass *bclass;
1374   GstBaseSrc *src;
1375   gboolean res;
1376
1377   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1378
1379   bclass = GST_BASE_SRC_GET_CLASS (src);
1380
1381   if (bclass->check_get_range == NULL) {
1382     GST_WARNING_OBJECT (src, "no check_get_range function set");
1383     return FALSE;
1384   }
1385
1386   res = bclass->check_get_range (src);
1387   GST_LOG_OBJECT (src, "%s() returned %d",
1388       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
1389
1390   gst_object_unref (src);
1391
1392   return res;
1393 }
1394
1395 static gboolean
1396 gst_base_src_default_check_get_range (GstBaseSrc * src)
1397 {
1398   gboolean res;
1399
1400   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1401     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
1402     gst_base_src_start (src);
1403     gst_base_src_stop (src);
1404   }
1405
1406   /* we can operate in getrange mode if the native format is bytes
1407    * and we are seekable, this condition is set in the random_access
1408    * flag and is set in the _start() method. */
1409   res = src->random_access;
1410
1411   return res;
1412 }
1413
1414 static void
1415 gst_base_src_loop (GstPad * pad)
1416 {
1417   GstBaseSrc *src;
1418   GstBuffer *buf = NULL;
1419   GstFlowReturn ret;
1420   gint64 position;
1421   gboolean eos;
1422
1423   eos = FALSE;
1424
1425   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1426
1427   src->priv->last_sent_eos = FALSE;
1428
1429   /* push events to close/start our segment before we do the real work. */
1430   if (src->priv->close_segment) {
1431     gst_pad_push_event (pad, src->priv->close_segment);
1432     src->priv->close_segment = NULL;
1433   }
1434   if (src->priv->start_segment) {
1435     gst_pad_push_event (pad, src->priv->start_segment);
1436     src->priv->start_segment = NULL;
1437   }
1438
1439   /* if we operate in bytes, we can calculate an offset */
1440   if (src->segment.format == GST_FORMAT_BYTES)
1441     position = src->segment.last_stop;
1442   else
1443     position = -1;
1444
1445   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1446   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1447     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() =  %d", ret);
1448     goto pause;
1449   }
1450   if (G_UNLIKELY (buf == NULL))
1451     goto null_buffer;
1452
1453   /* figure out the new position */
1454   switch (src->segment.format) {
1455     case GST_FORMAT_BYTES:
1456       position += GST_BUFFER_SIZE (buf);
1457       break;
1458     case GST_FORMAT_TIME:
1459     {
1460       GstClockTime start, duration;
1461
1462       start = GST_BUFFER_TIMESTAMP (buf);
1463       duration = GST_BUFFER_DURATION (buf);
1464
1465       if (GST_CLOCK_TIME_IS_VALID (start))
1466         position = start;
1467       else
1468         position = src->segment.last_stop;
1469
1470       if (GST_CLOCK_TIME_IS_VALID (duration))
1471         position += duration;
1472       break;
1473     }
1474     case GST_FORMAT_DEFAULT:
1475       position = GST_BUFFER_OFFSET_END (buf);
1476       break;
1477     default:
1478       position = -1;
1479       break;
1480   }
1481   if (position != -1) {
1482     if (src->segment.stop != -1) {
1483       if (position >= src->segment.stop) {
1484         eos = TRUE;
1485         position = src->segment.stop;
1486       }
1487     }
1488     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1489   }
1490
1491   if (G_UNLIKELY (src->priv->discont)) {
1492     buf = gst_buffer_make_metadata_writable (buf);
1493     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
1494     src->priv->discont = FALSE;
1495   }
1496
1497   ret = gst_pad_push (pad, buf);
1498   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1499     GST_INFO_OBJECT (src, "pausing after gst_pad_push() =  %d", ret);
1500     goto pause;
1501   }
1502
1503   if (eos) {
1504     GST_INFO_OBJECT (src, "pausing after EOS");
1505     ret = GST_FLOW_UNEXPECTED;
1506     goto pause;
1507   }
1508
1509 done:
1510   gst_object_unref (src);
1511   return;
1512
1513   /* special cases */
1514 pause:
1515   {
1516     const gchar *reason = gst_flow_get_name (ret);
1517
1518     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1519     src->data.ABI.running = FALSE;
1520     gst_pad_pause_task (pad);
1521     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1522       if (ret == GST_FLOW_UNEXPECTED) {
1523         /* perform EOS logic */
1524         if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1525           gst_element_post_message (GST_ELEMENT (src),
1526               gst_message_new_segment_done (GST_OBJECT (src),
1527                   src->segment.format, src->segment.last_stop));
1528         } else {
1529           gst_pad_push_event (pad, gst_event_new_eos ());
1530           src->priv->last_sent_eos = TRUE;
1531         }
1532       } else {
1533         /* for fatal errors we post an error message */
1534         GST_ELEMENT_ERROR (src, STREAM, FAILED,
1535             (_("Internal data flow error.")),
1536             ("streaming task paused, reason %s", reason));
1537         gst_pad_push_event (pad, gst_event_new_eos ());
1538         src->priv->last_sent_eos = TRUE;
1539       }
1540     }
1541     goto done;
1542   }
1543 null_buffer:
1544   {
1545     GST_ELEMENT_ERROR (src, STREAM, FAILED,
1546         (_("Internal data flow error.")), ("element returned NULL buffer"));
1547     /* we finished the segment on error */
1548     src->data.ABI.running = FALSE;
1549     gst_pad_pause_task (pad);
1550     gst_pad_push_event (pad, gst_event_new_eos ());
1551     src->priv->last_sent_eos = TRUE;
1552     goto done;
1553   }
1554 }
1555
1556 /* this will always be called between start() and stop(). So you can rely on
1557  * resources allocated by start() and freed from stop(). This needs to be added
1558  * to the docs at some point. */
1559 static gboolean
1560 gst_base_src_unlock (GstBaseSrc * basesrc)
1561 {
1562   GstBaseSrcClass *bclass;
1563   gboolean result = TRUE;
1564
1565   GST_DEBUG ("unlock");
1566   /* unblock whatever the subclass is doing */
1567   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1568   if (bclass->unlock)
1569     result = bclass->unlock (basesrc);
1570
1571   GST_DEBUG ("unschedule clock");
1572   /* and unblock the clock as well, if any */
1573   GST_OBJECT_LOCK (basesrc);
1574   if (basesrc->clock_id) {
1575     gst_clock_id_unschedule (basesrc->clock_id);
1576   }
1577   GST_OBJECT_UNLOCK (basesrc);
1578
1579   GST_DEBUG ("unlock done");
1580
1581   return result;
1582 }
1583
1584 /* default negotiation code. 
1585  *
1586  * Take intersection between src and sink pads, take first
1587  * caps and fixate. 
1588  */
1589 static gboolean
1590 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
1591 {
1592   GstCaps *thiscaps;
1593   GstCaps *caps = NULL;
1594   GstCaps *peercaps = NULL;
1595   gboolean result = FALSE;
1596
1597   /* first see what is possible on our source pad */
1598   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
1599   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
1600   /* nothing or anything is allowed, we're done */
1601   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
1602     goto no_nego_needed;
1603
1604   /* get the peer caps */
1605   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
1606   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
1607   if (peercaps) {
1608     GstCaps *icaps;
1609
1610     /* get intersection */
1611     icaps = gst_caps_intersect (thiscaps, peercaps);
1612     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
1613     gst_caps_unref (thiscaps);
1614     gst_caps_unref (peercaps);
1615     if (icaps) {
1616       /* take first (and best, since they are sorted) possibility */
1617       caps = gst_caps_copy_nth (icaps, 0);
1618       gst_caps_unref (icaps);
1619     }
1620   } else {
1621     /* no peer, work with our own caps then */
1622     caps = thiscaps;
1623   }
1624   if (caps) {
1625     caps = gst_caps_make_writable (caps);
1626     gst_caps_truncate (caps);
1627
1628     /* now fixate */
1629     if (!gst_caps_is_empty (caps)) {
1630       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
1631       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
1632
1633       if (gst_caps_is_any (caps)) {
1634         /* hmm, still anything, so element can do anything and
1635          * nego is not needed */
1636         result = TRUE;
1637       } else if (gst_caps_is_fixed (caps)) {
1638         /* yay, fixed caps, use those then */
1639         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
1640         result = TRUE;
1641       }
1642     }
1643     gst_caps_unref (caps);
1644   }
1645   return result;
1646
1647 no_nego_needed:
1648   {
1649     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
1650     if (thiscaps)
1651       gst_caps_unref (thiscaps);
1652     return TRUE;
1653   }
1654 }
1655
1656 static gboolean
1657 gst_base_src_negotiate (GstBaseSrc * basesrc)
1658 {
1659   GstBaseSrcClass *bclass;
1660   gboolean result = TRUE;
1661
1662   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1663
1664   if (bclass->negotiate)
1665     result = bclass->negotiate (basesrc);
1666
1667   return result;
1668 }
1669
1670 static gboolean
1671 gst_base_src_start (GstBaseSrc * basesrc)
1672 {
1673   GstBaseSrcClass *bclass;
1674   gboolean result;
1675   guint64 size;
1676
1677   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1678     return TRUE;
1679
1680   GST_DEBUG_OBJECT (basesrc, "starting source");
1681
1682   basesrc->num_buffers_left = basesrc->num_buffers;
1683
1684   gst_segment_init (&basesrc->segment, basesrc->segment.format);
1685   basesrc->data.ABI.running = FALSE;
1686
1687   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1688   if (bclass->start)
1689     result = bclass->start (basesrc);
1690   else
1691     result = TRUE;
1692
1693   if (!result)
1694     goto could_not_start;
1695
1696   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
1697
1698   /* figure out the size */
1699   if (basesrc->segment.format == GST_FORMAT_BYTES) {
1700     if (bclass->get_size) {
1701       if (!(result = bclass->get_size (basesrc, &size)))
1702         size = -1;
1703     } else {
1704       result = FALSE;
1705       size = -1;
1706     }
1707     /* only update the size when operating in bytes, subclass is supposed
1708      * to set duration in the start method for other formats */
1709     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
1710   }
1711
1712   GST_DEBUG_OBJECT (basesrc,
1713       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
1714       G_GINT64_FORMAT, basesrc->segment.format, result, size,
1715       basesrc->segment.duration);
1716
1717   /* check if we can seek */
1718   if (bclass->is_seekable)
1719     basesrc->seekable = bclass->is_seekable (basesrc);
1720   else
1721     basesrc->seekable = FALSE;
1722
1723   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
1724
1725   /* update for random access flag */
1726   basesrc->random_access = basesrc->seekable &&
1727       basesrc->segment.format == GST_FORMAT_BYTES;
1728
1729   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
1730
1731   /* run typefind if we are random_access and the typefinding is enabled. */
1732   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
1733     GstCaps *caps;
1734
1735     caps = gst_type_find_helper (basesrc->srcpad, size);
1736     gst_pad_set_caps (basesrc->srcpad, caps);
1737     gst_caps_unref (caps);
1738   } else {
1739     /* use class or default negotiate function */
1740     if (!gst_base_src_negotiate (basesrc))
1741       goto could_not_negotiate;
1742   }
1743
1744   return TRUE;
1745
1746   /* ERROR */
1747 could_not_start:
1748   {
1749     GST_DEBUG_OBJECT (basesrc, "could not start");
1750     return FALSE;
1751   }
1752 could_not_negotiate:
1753   {
1754     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
1755     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
1756         ("Could not negotiate format"), ("Check your filtered caps, if any"));
1757     gst_base_src_stop (basesrc);
1758     return FALSE;
1759   }
1760 }
1761
1762 static gboolean
1763 gst_base_src_stop (GstBaseSrc * basesrc)
1764 {
1765   GstBaseSrcClass *bclass;
1766   gboolean result = TRUE;
1767
1768   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1769     return TRUE;
1770
1771   GST_DEBUG_OBJECT (basesrc, "stopping source");
1772
1773   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1774   if (bclass->stop)
1775     result = bclass->stop (basesrc);
1776
1777   if (result)
1778     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
1779
1780   return result;
1781 }
1782
1783 static gboolean
1784 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
1785 {
1786   gboolean result;
1787
1788   GST_LIVE_LOCK (basesrc);
1789   basesrc->live_running = TRUE;
1790   GST_LIVE_SIGNAL (basesrc);
1791   GST_LIVE_UNLOCK (basesrc);
1792
1793   /* step 1, unblock clock sync (if any) */
1794   result = gst_base_src_unlock (basesrc);
1795
1796   /* step 2, make sure streaming finishes */
1797   result &= gst_pad_stop_task (pad);
1798
1799   return result;
1800 }
1801
1802 static gboolean
1803 gst_base_src_activate_push (GstPad * pad, gboolean active)
1804 {
1805   GstBaseSrc *basesrc;
1806   gboolean res;
1807
1808   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1809
1810   /* prepare subclass first */
1811   if (active) {
1812     GstEvent *event;
1813
1814     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
1815
1816     if (!basesrc->can_activate_push)
1817       goto no_push_activation;
1818
1819     if (!gst_base_src_start (basesrc))
1820       goto error_start;
1821
1822     basesrc->priv->last_sent_eos = FALSE;
1823
1824     /* do initial seek, which will start the task */
1825     GST_OBJECT_LOCK (basesrc);
1826     event = basesrc->data.ABI.pending_seek;
1827     basesrc->data.ABI.pending_seek = NULL;
1828     GST_OBJECT_UNLOCK (basesrc);
1829
1830     /* no need to unlock anything, the task is certainly
1831      * not running here. */
1832     res = gst_base_src_perform_seek (basesrc, event, FALSE);
1833
1834     if (event)
1835       gst_event_unref (event);
1836   } else {
1837     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
1838     res = gst_base_src_deactivate (basesrc, pad);
1839   }
1840   return res;
1841
1842   /* ERRORS */
1843 no_push_activation:
1844   {
1845     GST_DEBUG_OBJECT (basesrc, "Subclass disabled push-mode activation");
1846     return FALSE;
1847   }
1848 error_start:
1849   {
1850     gst_base_src_stop (basesrc);
1851     GST_DEBUG_OBJECT (basesrc, "Failed to start in push mode");
1852     return FALSE;
1853   }
1854 }
1855
1856 static gboolean
1857 gst_base_src_activate_pull (GstPad * pad, gboolean active)
1858 {
1859   GstBaseSrc *basesrc;
1860
1861   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1862
1863   /* prepare subclass first */
1864   if (active) {
1865     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
1866     if (!gst_base_src_start (basesrc))
1867       goto error_start;
1868
1869     /* if not random_access, we cannot operate in pull mode for now */
1870     if (!basesrc->random_access) {
1871       gst_base_src_stop (basesrc);
1872       return FALSE;
1873     }
1874     return TRUE;
1875   } else {
1876     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
1877
1878     /* don't send EOS when going from PAUSED => READY when in pull mode */
1879     basesrc->priv->last_sent_eos = TRUE;
1880
1881     if (!gst_base_src_stop (basesrc))
1882       goto error_stop;
1883
1884     return gst_base_src_deactivate (basesrc, pad);
1885   }
1886
1887 error_start:
1888   {
1889     gst_base_src_stop (basesrc);
1890     GST_DEBUG_OBJECT (basesrc, "Failed to start in pull mode");
1891     return FALSE;
1892   }
1893 error_stop:
1894   {
1895     GST_DEBUG_OBJECT (basesrc, "Failed to stop in pull mode");
1896     return FALSE;
1897   }
1898 }
1899
1900 static GstStateChangeReturn
1901 gst_base_src_change_state (GstElement * element, GstStateChange transition)
1902 {
1903   GstBaseSrc *basesrc;
1904   GstStateChangeReturn result;
1905   gboolean no_preroll = FALSE;
1906
1907   basesrc = GST_BASE_SRC (element);
1908
1909   switch (transition) {
1910     case GST_STATE_CHANGE_NULL_TO_READY:
1911       break;
1912     case GST_STATE_CHANGE_READY_TO_PAUSED:
1913       GST_LIVE_LOCK (element);
1914       if (basesrc->is_live) {
1915         no_preroll = TRUE;
1916         basesrc->live_running = FALSE;
1917       }
1918       basesrc->priv->last_sent_eos = FALSE;
1919       basesrc->priv->discont = TRUE;
1920       GST_LIVE_UNLOCK (element);
1921       break;
1922     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1923       GST_LIVE_LOCK (element);
1924       if (basesrc->is_live) {
1925         basesrc->live_running = TRUE;
1926         GST_LIVE_SIGNAL (element);
1927       }
1928       GST_LIVE_UNLOCK (element);
1929       break;
1930     default:
1931       break;
1932   }
1933
1934   if ((result =
1935           GST_ELEMENT_CLASS (parent_class)->change_state (element,
1936               transition)) == GST_STATE_CHANGE_FAILURE)
1937     goto failure;
1938
1939   switch (transition) {
1940     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1941       GST_LIVE_LOCK (element);
1942       if (basesrc->is_live) {
1943         no_preroll = TRUE;
1944         basesrc->live_running = FALSE;
1945       }
1946       GST_LIVE_UNLOCK (element);
1947       break;
1948     case GST_STATE_CHANGE_PAUSED_TO_READY:
1949     {
1950       GstEvent **event_p;
1951
1952       if (!gst_base_src_stop (basesrc))
1953         goto error_stop;
1954
1955       /* FIXME, deprecate this behaviour, it is very dangerous.
1956        * the prefered way of sending EOS downstream is by sending
1957        * the EOS event to the element */
1958       if (!basesrc->priv->last_sent_eos) {
1959         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
1960         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
1961         basesrc->priv->last_sent_eos = TRUE;
1962       }
1963       event_p = &basesrc->data.ABI.pending_seek;
1964       gst_event_replace (event_p, NULL);
1965       event_p = &basesrc->priv->close_segment;
1966       gst_event_replace (event_p, NULL);
1967       event_p = &basesrc->priv->start_segment;
1968       gst_event_replace (event_p, NULL);
1969       break;
1970     }
1971     case GST_STATE_CHANGE_READY_TO_NULL:
1972       break;
1973     default:
1974       break;
1975   }
1976
1977   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
1978     result = GST_STATE_CHANGE_NO_PREROLL;
1979
1980   return result;
1981
1982   /* ERRORS */
1983 failure:
1984   {
1985     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
1986     gst_base_src_stop (basesrc);
1987     return result;
1988   }
1989 error_stop:
1990   {
1991     GST_DEBUG_OBJECT (basesrc, "Failed to stop");
1992     return GST_STATE_CHANGE_FAILURE;
1993   }
1994 }