libs/gst/base/gstbasesrc.*: Don't unconditionally send EOS when going from PAUSED to
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in a any #GstFormat with the
37  * gst_base_src_set_format(). This format determines the format of the
38  * internal #GstSegment and the #GST_EVENT_NEW_SEGMENT. The default format for
39  * #GstBaseSrc is GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports the push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>the format is set to GST_FORMAT_BYTES (default).</para></listitem>
45  *   <listitem><para>#GstBaseSrc::is_seekable returns TRUE.</para></listitem>
46  * </itemizedlist>
47  *
48  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
49  * automatically seekable in push mode as well. The following conditions must be
50  * met to make the element seekable in push mode when the format is not
51  * GST_FORMAT_BYTES:
52  * <itemizedlist>
53  *   <listitem><para>
54  *     #GstBaseSrc::is_seekable returns TRUE.
55  *   </para></listitem>
56  *   <listitem><para>
57  *     #GstBaseSrc::query can convert all supported seek formats to the
58  *     internal format as set with gst_base_src_set_format().
59  *   </para></listitem>
60  *   <listitem><para>
61  *     #GstBaseSrc::do_seek is implemented, performs the seek and returns TRUE.
62  *   </para></listitem>
63  * </itemizedlist>
64  *
65  * When the default format is not GST_FORMAT_BYTES, the element should ignore the
66  * offset and length in the #GstBaseSrc::create method. It is recommended to subclass
67  * #GstPushSrc instead in this situation.
68  *
69  * #GstBaseSrc has support for live sources. Live sources are sources that produce
70  * data at a fixed rate, such as audio or video capture. A typical live source also
71  * provides a clock to export the rate at which they produce data.
72  * Use gst_base_src_set_live() to activate the live source mode.
73  *
74  * A live source does not produce data in the PAUSED state, this means that the 
75  * #GstBaseSrc::create method will not be called in PAUSED but only in PLAYING.
76  * To signal the pipeline that the element will not produce data, its return
77  * value from the READY to PAUSED state will be GST_STATE_CHANGE_NO_PREROLL.
78  * 
79  * A typical live source will timestamp the buffers they create with the current
80  * stream time of the pipeline. This is why they can only produce data in PLAYING,
81  * when the clock is actually distributed and running.
82  *
83  * The #GstBaseSrc::get_times method can be used to implement pseudo-live sources.
84  * The base source will wait for the specified stream time returned in 
85  * #GstBaseSrc::get_times before pushing out the buffer. It only makes sense to implement
86  * the ::get_times function if the source is a live source.
87  *
88  * There is only support in GstBaseSrc for one source pad, which should be named
89  * "src". A source implementation (subclass of GstBaseSrc) should install a pad
90  * template in its base_init function, like so:
91  *
92  * <programlisting>
93  * static void
94  * my_element_base_init (gpointer g_class)
95  * {
96  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
97  *   // srctemplate should be a #GstStaticPadTemplate with direction
98  *   // #GST_PAD_SRC and name "src"
99  *   gst_element_class_add_pad_template (gstelement_class,
100  *       gst_static_pad_template_get (&amp;srctemplate));
101  *   // see #GstElementDetails
102  *   gst_element_class_set_details (gstelement_class, &amp;details);
103  * }
104  * </programlisting>
105  *
106  * Last reviewed on 2005-12-18 (0.10.0)
107  */
108
109 #include <stdlib.h>
110 #include <string.h>
111
112 #ifdef HAVE_CONFIG_H
113 #  include "config.h"
114 #endif
115
116 #include "gstbasesrc.h"
117 #include "gsttypefindhelper.h"
118 #include <gst/gstmarshal.h>
119 #include <gst/gst-i18n-lib.h>
120
121 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
122 #define GST_CAT_DEFAULT gst_base_src_debug
123
124 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
125 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
126 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
127 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
128 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
129 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
130 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
131                                                                                 timeval)
132 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
133 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
134
135 /* BaseSrc signals and args */
136 enum
137 {
138   /* FILL ME */
139   LAST_SIGNAL
140 };
141
142 #define DEFAULT_BLOCKSIZE       4096
143 #define DEFAULT_NUM_BUFFERS     -1
144 #define DEFAULT_TYPEFIND        FALSE
145
146 enum
147 {
148   PROP_0,
149   PROP_BLOCKSIZE,
150   PROP_NUM_BUFFERS,
151   PROP_TYPEFIND,
152 };
153
154 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
155    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
156
157 struct _GstBaseSrcPrivate
158 {
159   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
160                                  * to avoid the sending of two EOS in some cases) */
161 };
162
163 static GstElementClass *parent_class = NULL;
164
165 static void gst_base_src_base_init (gpointer g_class);
166 static void gst_base_src_class_init (GstBaseSrcClass * klass);
167 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
168 static void gst_base_src_finalize (GObject * object);
169
170
171 GType
172 gst_base_src_get_type (void)
173 {
174   static GType base_src_type = 0;
175
176   if (!base_src_type) {
177     static const GTypeInfo base_src_info = {
178       sizeof (GstBaseSrcClass),
179       (GBaseInitFunc) gst_base_src_base_init,
180       NULL,
181       (GClassInitFunc) gst_base_src_class_init,
182       NULL,
183       NULL,
184       sizeof (GstBaseSrc),
185       0,
186       (GInstanceInitFunc) gst_base_src_init,
187     };
188
189     base_src_type = g_type_register_static (GST_TYPE_ELEMENT,
190         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
191   }
192   return base_src_type;
193 }
194 static GstCaps *gst_base_src_getcaps (GstPad * pad);
195 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
196
197 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
198 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
199 static void gst_base_src_set_property (GObject * object, guint prop_id,
200     const GValue * value, GParamSpec * pspec);
201 static void gst_base_src_get_property (GObject * object, guint prop_id,
202     GValue * value, GParamSpec * pspec);
203 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
204 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
205 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
206
207 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
208
209 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
210 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
211     GstSegment * segment);
212 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
213
214 static gboolean gst_base_src_unlock (GstBaseSrc * basesrc);
215 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
216 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
217
218 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
219     GstStateChange transition);
220
221 static void gst_base_src_loop (GstPad * pad);
222 static gboolean gst_base_src_check_get_range (GstPad * pad);
223 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
224     guint length, GstBuffer ** buf);
225 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
226     guint length, GstBuffer ** buf);
227
228 static void
229 gst_base_src_base_init (gpointer g_class)
230 {
231   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
232 }
233
234 static void
235 gst_base_src_class_init (GstBaseSrcClass * klass)
236 {
237   GObjectClass *gobject_class;
238   GstElementClass *gstelement_class;
239
240   gobject_class = (GObjectClass *) klass;
241   gstelement_class = (GstElementClass *) klass;
242
243   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
244
245   parent_class = g_type_class_ref (GST_TYPE_ELEMENT);
246
247   gobject_class->finalize = GST_DEBUG_FUNCPTR (gst_base_src_finalize);
248   gobject_class->set_property = GST_DEBUG_FUNCPTR (gst_base_src_set_property);
249   gobject_class->get_property = GST_DEBUG_FUNCPTR (gst_base_src_get_property);
250
251   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_BLOCKSIZE,
252       g_param_spec_ulong ("blocksize", "Block size",
253           "Size in bytes to read per buffer", 1, G_MAXULONG, DEFAULT_BLOCKSIZE,
254           G_PARAM_READWRITE));
255   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_NUM_BUFFERS,
256       g_param_spec_int ("num-buffers", "num-buffers",
257           "Number of buffers to output before sending EOS", -1, G_MAXINT,
258           DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE));
259   g_object_class_install_property (G_OBJECT_CLASS (klass), PROP_TYPEFIND,
260       g_param_spec_boolean ("typefind", "Typefind",
261           "Run typefind before negotiating", DEFAULT_TYPEFIND,
262           G_PARAM_READWRITE));
263
264   gstelement_class->change_state =
265       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
266   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
267
268   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
269   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
270   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
271   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
272 }
273
274 static void
275 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
276 {
277   GstPad *pad;
278   GstPadTemplate *pad_template;
279
280   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
281
282   basesrc->is_live = FALSE;
283   basesrc->live_lock = g_mutex_new ();
284   basesrc->live_cond = g_cond_new ();
285   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
286   basesrc->num_buffers_left = -1;
287
288   basesrc->can_activate_push = TRUE;
289   basesrc->pad_mode = GST_ACTIVATE_NONE;
290
291   pad_template =
292       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
293   g_return_if_fail (pad_template != NULL);
294
295   GST_DEBUG_OBJECT (basesrc, "creating src pad");
296   pad = gst_pad_new_from_template (pad_template, "src");
297
298   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
299   gst_pad_set_activatepush_function (pad,
300       GST_DEBUG_FUNCPTR (gst_base_src_activate_push));
301   gst_pad_set_activatepull_function (pad,
302       GST_DEBUG_FUNCPTR (gst_base_src_activate_pull));
303   gst_pad_set_event_function (pad,
304       GST_DEBUG_FUNCPTR (gst_base_src_event_handler));
305   gst_pad_set_query_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_query));
306   gst_pad_set_checkgetrange_function (pad,
307       GST_DEBUG_FUNCPTR (gst_base_src_check_get_range));
308   gst_pad_set_getrange_function (pad,
309       GST_DEBUG_FUNCPTR (gst_base_src_pad_get_range));
310   gst_pad_set_getcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_getcaps));
311   gst_pad_set_setcaps_function (pad, GST_DEBUG_FUNCPTR (gst_base_src_setcaps));
312
313   /* hold pointer to pad */
314   basesrc->srcpad = pad;
315   GST_DEBUG_OBJECT (basesrc, "adding src pad");
316   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
317
318   basesrc->blocksize = DEFAULT_BLOCKSIZE;
319   basesrc->clock_id = NULL;
320   /* we operate in BYTES by default */
321   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
322   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
323
324   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
325
326   GST_DEBUG_OBJECT (basesrc, "init done");
327 }
328
329 static void
330 gst_base_src_finalize (GObject * object)
331 {
332   GstBaseSrc *basesrc;
333
334   basesrc = GST_BASE_SRC (object);
335
336   g_mutex_free (basesrc->live_lock);
337   g_cond_free (basesrc->live_cond);
338
339   G_OBJECT_CLASS (parent_class)->finalize (object);
340 }
341
342 /**
343  * gst_base_src_set_live:
344  * @src: base source instance
345  * @live: new live-mode
346  *
347  * If the element listens to a live source, the @livemode should
348  * be set to %TRUE. This declares that this source can't seek.
349  */
350 void
351 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
352 {
353   GST_LIVE_LOCK (src);
354   src->is_live = live;
355   GST_LIVE_UNLOCK (src);
356 }
357
358 /**
359  * gst_base_src_is_live:
360  * @src: base source instance
361  *
362  * Check if an element is in live mode.
363  *
364  * Returns: %TRUE if element is in live mode.
365  */
366 gboolean
367 gst_base_src_is_live (GstBaseSrc * src)
368 {
369   gboolean result;
370
371   GST_LIVE_LOCK (src);
372   result = src->is_live;
373   GST_LIVE_UNLOCK (src);
374
375   return result;
376 }
377
378 /**
379  * gst_base_src_set_format:
380  * @src: base source instance
381  * @format: the format to use
382  *
383  * Sets the default format of the source. This will be the format used
384  * for sending NEW_SEGMENT events and for performing seeks.
385  *
386  * If a format of GST_FORMAT_BYTES is set, the element will be able to
387  * operate in pull mode if the #GstBaseSrc::is_seekable returns TRUE.
388  *
389  * @Since: 0.10.1
390  */
391 void
392 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
393 {
394   gst_segment_init (&src->segment, format);
395 }
396
397 static gboolean
398 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
399 {
400   GstBaseSrcClass *bclass;
401   GstBaseSrc *bsrc;
402   gboolean res = TRUE;
403
404   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
405   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
406
407   if (bclass->set_caps)
408     res = bclass->set_caps (bsrc, caps);
409
410   return res;
411 }
412
413 static GstCaps *
414 gst_base_src_getcaps (GstPad * pad)
415 {
416   GstBaseSrcClass *bclass;
417   GstBaseSrc *bsrc;
418   GstCaps *caps = NULL;
419
420   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
421   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
422   if (bclass->get_caps)
423     caps = bclass->get_caps (bsrc);
424
425   if (caps == NULL) {
426     GstPadTemplate *pad_template;
427
428     pad_template =
429         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
430     if (pad_template != NULL) {
431       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
432     }
433   }
434   return caps;
435 }
436
437 static gboolean
438 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
439 {
440   gboolean res;
441
442   switch (GST_QUERY_TYPE (query)) {
443     case GST_QUERY_POSITION:
444     {
445       GstFormat format;
446
447       gst_query_parse_position (query, &format, NULL);
448       switch (format) {
449         case GST_FORMAT_PERCENT:
450         {
451           gint64 percent;
452           gint64 position;
453           gint64 duration;
454
455           position = src->segment.last_stop;
456           duration = src->segment.duration;
457
458           if (position != -1 && duration != -1) {
459             if (position < duration)
460               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
461                   duration);
462             else
463               percent = GST_FORMAT_PERCENT_MAX;
464           } else
465             percent = -1;
466
467           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
468           res = TRUE;
469           break;
470         }
471         default:
472         {
473           gint64 position;
474
475           position = src->segment.last_stop;
476
477           if (position != -1) {
478             /* convert to requested format */
479             res =
480                 gst_pad_query_convert (src->srcpad, src->segment.format,
481                 position, &format, &position);
482           } else
483             res = TRUE;
484
485           gst_query_set_position (query, format, position);
486           break;
487         }
488       }
489       break;
490     }
491     case GST_QUERY_DURATION:
492     {
493       GstFormat format;
494
495       gst_query_parse_duration (query, &format, NULL);
496       switch (format) {
497         case GST_FORMAT_PERCENT:
498           gst_query_set_duration (query, GST_FORMAT_PERCENT,
499               GST_FORMAT_PERCENT_MAX);
500           res = TRUE;
501           break;
502         default:
503         {
504           gint64 duration;
505
506           duration = src->segment.duration;
507
508           if (duration != -1) {
509             /* convert to requested format */
510             res =
511                 gst_pad_query_convert (src->srcpad, src->segment.format,
512                 duration, &format, &duration);
513           } else {
514             res = TRUE;
515           }
516           gst_query_set_duration (query, format, duration);
517           break;
518         }
519       }
520       break;
521     }
522
523     case GST_QUERY_SEEKING:
524     {
525       gst_query_set_seeking (query, src->segment.format,
526           src->seekable, 0, src->segment.duration);
527       res = TRUE;
528       break;
529     }
530     case GST_QUERY_SEGMENT:
531     {
532       gint64 start, stop;
533
534       /* no end segment configured, current duration then */
535       if ((stop = src->segment.stop) == -1)
536         stop = src->segment.duration;
537       start = src->segment.start;
538
539       /* adjust to stream time */
540       if (src->segment.time != -1) {
541         start -= src->segment.time;
542         if (stop != -1)
543           stop -= src->segment.time;
544       }
545       gst_query_set_segment (query, src->segment.rate, src->segment.format,
546           start, stop);
547       res = TRUE;
548       break;
549     }
550
551     case GST_QUERY_FORMATS:
552     {
553       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
554           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
555       res = TRUE;
556       break;
557     }
558     case GST_QUERY_LATENCY:
559     case GST_QUERY_JITTER:
560     case GST_QUERY_RATE:
561     case GST_QUERY_CONVERT:
562     {
563       GstFormat src_fmt, dest_fmt;
564       gint64 src_val, dest_val;
565
566       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
567
568       /* we can only convert between equal formats... */
569       if (src_fmt == dest_fmt) {
570         dest_val = src_val;
571         res = TRUE;
572       } else
573         res = FALSE;
574
575       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
576       break;
577     }
578     default:
579       res = FALSE;
580       break;
581   }
582   return res;
583 }
584
585 static gboolean
586 gst_base_src_query (GstPad * pad, GstQuery * query)
587 {
588   GstBaseSrc *src;
589   GstBaseSrcClass *bclass;
590   gboolean result = FALSE;
591
592   src = GST_BASE_SRC (gst_pad_get_parent (pad));
593
594   bclass = GST_BASE_SRC_GET_CLASS (src);
595
596   if (bclass->query)
597     result = bclass->query (src, query);
598   else
599     result = gst_pad_query_default (pad, query);
600
601   gst_object_unref (src);
602
603   return result;
604 }
605
606 static gboolean
607 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
608 {
609   gboolean res = TRUE;
610
611   /* update our offset if the start/stop position was updated */
612   if (segment->format == GST_FORMAT_BYTES) {
613     segment->last_stop = segment->start;
614     segment->time = segment->start;
615   } else if (segment->start == 0) {
616     /* seek to start, we can implement a default for this. */
617     segment->last_stop = 0;
618     segment->time = 0;
619     res = TRUE;
620   } else
621     res = FALSE;
622
623   return res;
624 }
625
626 static gboolean
627 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
628 {
629   GstBaseSrcClass *bclass;
630   gboolean result = FALSE;
631
632   bclass = GST_BASE_SRC_GET_CLASS (src);
633
634   if (bclass->do_seek)
635     result = bclass->do_seek (src, segment);
636
637   return result;
638 }
639
640 /* this code implements the seeking. It is a good example
641  * handling all cases.
642  *
643  * A seek updates the currently configured segment.start
644  * and segment.stop values based on the SEEK_TYPE. If the
645  * segment.start value is updated, a seek to this new position
646  * should be performed.
647  *
648  * The seek can only be executed when we are not currently
649  * streaming any data, to make sure that this is the case, we
650  * acquire the STREAM_LOCK which is taken when we are in the
651  * _loop() function or when a getrange() is called. Normally
652  * we will not receive a seek if we are operating in pull mode
653  * though.
654  *
655  * When we are in the loop() function, we might be in the middle
656  * of pushing a buffer, which might block in a sink. To make sure
657  * that the push gets unblocked we push out a FLUSH_START event.
658  * Our loop function will get a WRONG_STATE return value from
659  * the push and will pause, effectively releasing the STREAM_LOCK.
660  *
661  * For a non-flushing seek, we pause the task, which might eventually
662  * release the STREAM_LOCK. We say eventually because when the sink
663  * blocks on the sample we might wait a very long time until the sink
664  * unblocks the sample. In any case we acquire the STREAM_LOCK and
665  * can continue the seek. A non-flushing seek is normally done in a 
666  * running pipeline to perform seamless playback.
667  * In the case of a non-flushing seek we need to make sure that the
668  * data we output after the seek is continuous with the previous data,
669  * this is because a non-flushing seek does not reset the stream-time
670  * to 0. We do this by closing the currently running segment, ie. sending
671  * a new_segment event with the stop position set to the last processed 
672  * position.
673  *
674  * After updating the segment.start/stop values, we prepare for
675  * streaming again. We push out a FLUSH_STOP to make the peer pad
676  * accept data again and we start our task again.
677  *
678  * A segment seek posts a message on the bus saying that the playback
679  * of the segment started. We store the segment flag internally because
680  * when we reach the segment.stop we have to post a segment.done
681  * instead of EOS when doing a segment seek.
682  */
683 /* FIXME, we have the unlock gboolean here because most current implementations
684  * (fdsrc, -base/gst/tcp/, ...) not only unlock when there is something to
685  * unlock
686  */
687 static gboolean
688 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
689 {
690   gboolean res;
691   gdouble rate;
692   GstFormat format;
693   GstSeekFlags flags;
694   GstSeekType cur_type, stop_type;
695   gint64 cur, stop;
696   gboolean flush;
697   gboolean update;
698   GstSegment seeksegment;
699
700   GST_DEBUG_OBJECT (src, "doing seek");
701
702   if (event) {
703     gst_event_parse_seek (event, &rate, &format, &flags,
704         &cur_type, &cur, &stop_type, &stop);
705
706     /* we have to have a format as the segment format. Try to convert
707      * if not. */
708     if (src->segment.format != format) {
709       GstFormat fmt;
710
711       fmt = src->segment.format;
712       res = TRUE;
713       if (cur_type != GST_SEEK_TYPE_NONE)
714         res = gst_pad_query_convert (src->srcpad, format, cur, &fmt, &cur);
715       if (res && stop_type != GST_SEEK_TYPE_NONE)
716         res = gst_pad_query_convert (src->srcpad, format, stop, &fmt, &stop);
717       if (!res)
718         goto no_format;
719
720       format = fmt;
721     }
722   } else {
723     flags = 0;
724   }
725
726   flush = flags & GST_SEEK_FLAG_FLUSH;
727
728   /* send flush start */
729   if (flush)
730     gst_pad_push_event (src->srcpad, gst_event_new_flush_start ());
731   else
732     gst_pad_pause_task (src->srcpad);
733
734   /* unblock streaming thread */
735   if (unlock)
736     gst_base_src_unlock (src);
737
738   /* grab streaming lock, this should eventually be possible, either
739    * because the task is paused or out streaming thread stopped 
740    * because our peer is flushing. */
741   GST_PAD_STREAM_LOCK (src->srcpad);
742
743   /* make copy into temp structure, we can only update the main one
744    * when the subclass actually could to the seek. */
745   memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
746
747   /* now configure the seek segment */
748   if (event) {
749     gst_segment_set_seek (&seeksegment, rate, format, flags,
750         cur_type, cur, stop_type, stop, &update);
751   }
752
753   GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
754       " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
755       seeksegment.start, seeksegment.stop, seeksegment.last_stop);
756
757   /* do the seek, segment.last_stop contains new position. */
758   res = gst_base_src_do_seek (src, &seeksegment);
759
760   /* and prepare to continue streaming */
761   if (flush) {
762     /* send flush stop, peer will accept data and events again. We
763      * are not yet providing data as we still have the STREAM_LOCK. */
764     gst_pad_push_event (src->srcpad, gst_event_new_flush_stop ());
765   } else if (res && src->data.ABI.running) {
766     /* we are running the current segment and doing a non-flushing seek, 
767      * close the segment first based on the last_stop. */
768     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
769         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
770
771     gst_pad_push_event (src->srcpad,
772         gst_event_new_new_segment (TRUE,
773             src->segment.rate, src->segment.format,
774             src->segment.start, src->segment.last_stop, src->segment.time));
775   }
776
777   /* if successfull seek, we update our real segment and push
778    * out the new segment. */
779   if (res) {
780     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
781
782     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
783       gst_element_post_message (GST_ELEMENT (src),
784           gst_message_new_segment_start (GST_OBJECT (src),
785               src->segment.format, src->segment.last_stop));
786     }
787
788     if ((stop = src->segment.stop) == -1)
789       stop = src->segment.duration;
790
791     /* now send the newsegment */
792     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
793         " to %" G_GINT64_FORMAT, src->segment.start, stop);
794
795     gst_pad_push_event (src->srcpad,
796         gst_event_new_new_segment (FALSE,
797             src->segment.rate, src->segment.format,
798             src->segment.last_stop, stop, src->segment.time));
799   }
800
801   src->data.ABI.running = TRUE;
802   /* and restart the task in case it got paused explicitely or by
803    * the FLUSH_START event we pushed out. */
804   gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
805       src->srcpad);
806
807   /* and release the lock again so we can continue streaming */
808   GST_PAD_STREAM_UNLOCK (src->srcpad);
809
810   return res;
811
812   /* ERROR */
813 no_format:
814   {
815     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
816     return FALSE;
817   }
818 }
819
820 /* all events send to this element directly
821  */
822 static gboolean
823 gst_base_src_send_event (GstElement * element, GstEvent * event)
824 {
825   GstBaseSrc *src;
826   gboolean result;
827
828   src = GST_BASE_SRC (element);
829
830   switch (GST_EVENT_TYPE (event)) {
831     case GST_EVENT_SEEK:
832     {
833       GST_OBJECT_LOCK (src);
834       /* gst_event_replace? */
835       if (src->data.ABI.pending_seek)
836         gst_event_unref (src->data.ABI.pending_seek);
837       gst_event_ref (event);
838       src->data.ABI.pending_seek = event;
839       GST_OBJECT_UNLOCK (src);
840       result = TRUE;
841       break;
842     }
843     default:
844       result = FALSE;
845       break;
846   }
847   gst_event_unref (event);
848
849   return result;
850 }
851
852 static gboolean
853 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
854 {
855   gboolean result;
856
857   switch (GST_EVENT_TYPE (event)) {
858     case GST_EVENT_SEEK:
859       /* is normally called when in push mode */
860       if (!src->seekable)
861         goto not_seekable;
862
863       result = gst_base_src_perform_seek (src, event, TRUE);
864       break;
865     case GST_EVENT_FLUSH_START:
866       /* cancel any blocking getrange, is normally called
867        * when in pull mode. */
868       result = gst_base_src_unlock (src);
869       break;
870     case GST_EVENT_FLUSH_STOP:
871     default:
872       result = TRUE;
873       break;
874   }
875   return result;
876
877   /* ERRORS */
878 not_seekable:
879   {
880     GST_DEBUG_OBJECT (src, "is not seekable");
881     return FALSE;
882   }
883 }
884
885 static gboolean
886 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
887 {
888   GstBaseSrc *src;
889   GstBaseSrcClass *bclass;
890   gboolean result = FALSE;
891
892   src = GST_BASE_SRC (gst_pad_get_parent (pad));
893   bclass = GST_BASE_SRC_GET_CLASS (src);
894
895   if (bclass->event) {
896     if (!(result = bclass->event (src, event)))
897       goto subclass_failed;
898   }
899
900 done:
901   gst_event_unref (event);
902   gst_object_unref (src);
903
904   return result;
905
906   /* ERRORS */
907 subclass_failed:
908   {
909     GST_DEBUG_OBJECT (src, "subclass refused event");
910     goto done;
911   }
912 }
913
914 static void
915 gst_base_src_set_property (GObject * object, guint prop_id,
916     const GValue * value, GParamSpec * pspec)
917 {
918   GstBaseSrc *src;
919
920   src = GST_BASE_SRC (object);
921
922   switch (prop_id) {
923     case PROP_BLOCKSIZE:
924       src->blocksize = g_value_get_ulong (value);
925       break;
926     case PROP_NUM_BUFFERS:
927       src->num_buffers = g_value_get_int (value);
928       break;
929     case PROP_TYPEFIND:
930       src->data.ABI.typefind = g_value_get_boolean (value);
931       break;
932     default:
933       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
934       break;
935   }
936 }
937
938 static void
939 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
940     GParamSpec * pspec)
941 {
942   GstBaseSrc *src;
943
944   src = GST_BASE_SRC (object);
945
946   switch (prop_id) {
947     case PROP_BLOCKSIZE:
948       g_value_set_ulong (value, src->blocksize);
949       break;
950     case PROP_NUM_BUFFERS:
951       g_value_set_int (value, src->num_buffers);
952       break;
953     case PROP_TYPEFIND:
954       g_value_set_boolean (value, src->data.ABI.typefind);
955       break;
956     default:
957       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
958       break;
959   }
960 }
961
962 /* with STREAM_LOCK and LOCK*/
963 static GstClockReturn
964 gst_base_src_wait (GstBaseSrc * basesrc, GstClockTime time)
965 {
966   GstClockReturn ret;
967   GstClockID id;
968   GstClock *clock;
969
970   /* get clock, if no clock, we don't sync */
971   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
972     return GST_CLOCK_OK;
973
974   /* clock_id should be NULL outside of this function */
975   g_assert (basesrc->clock_id == NULL);
976   g_assert (GST_CLOCK_TIME_IS_VALID (time));
977
978   id = gst_clock_new_single_shot_id (clock, time);
979
980   basesrc->clock_id = id;
981   /* release the object lock while waiting */
982   GST_OBJECT_UNLOCK (basesrc);
983
984   ret = gst_clock_id_wait (id, NULL);
985
986   GST_OBJECT_LOCK (basesrc);
987   gst_clock_id_unref (id);
988   basesrc->clock_id = NULL;
989
990   return ret;
991 }
992
993 /* perform synchronisation on a buffer. 
994  * with STREAM_LOCK.
995  */
996 static GstClockReturn
997 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
998 {
999   GstClockReturn result;
1000   GstClockTime start, end;
1001   GstBaseSrcClass *bclass;
1002   GstClockTime base_time;
1003
1004   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1005
1006   start = end = -1;
1007   if (bclass->get_times)
1008     bclass->get_times (basesrc, buffer, &start, &end);
1009
1010   /* if we don't have a timestamp, we don't sync */
1011   if (!GST_CLOCK_TIME_IS_VALID (start))
1012     goto invalid_start;
1013
1014   /* now do clocking */
1015   GST_OBJECT_LOCK (basesrc);
1016   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1017
1018   GST_LOG_OBJECT (basesrc,
1019       "waiting for clock, base time %" GST_TIME_FORMAT
1020       ", stream_start %" GST_TIME_FORMAT,
1021       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1022
1023   result = gst_base_src_wait (basesrc, start + base_time);
1024   GST_OBJECT_UNLOCK (basesrc);
1025
1026   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1027
1028   return result;
1029
1030   /* special cases */
1031 invalid_start:
1032   {
1033     GST_DEBUG_OBJECT (basesrc, "get_times returned invalid start");
1034     return GST_CLOCK_OK;
1035   }
1036 }
1037
1038 static gboolean
1039 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1040 {
1041   guint64 size, maxsize;
1042   GstBaseSrcClass *bclass;
1043
1044   bclass = GST_BASE_SRC_GET_CLASS (src);
1045
1046   /* only operate if we are working with bytes */
1047   if (src->segment.format != GST_FORMAT_BYTES)
1048     return TRUE;
1049
1050   size = (guint64) src->segment.duration;
1051
1052   /* the max amount of bytes to read is the total size or
1053    * up to the segment.stop if present. */
1054   if (src->segment.stop != -1)
1055     maxsize = MIN (size, src->segment.stop);
1056   else
1057     maxsize = size;
1058
1059   GST_DEBUG_OBJECT (src,
1060       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1061       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1062       *length, size, src->segment.stop, maxsize);
1063
1064   /* check size */
1065   if (maxsize != -1) {
1066     if (offset > maxsize)
1067       goto unexpected_length;
1068
1069     if (offset + *length > maxsize) {
1070       /* see if length of the file changed */
1071       if (bclass->get_size)
1072         bclass->get_size (src, &size);
1073
1074       if (src->segment.stop != -1)
1075         maxsize = MIN (size, src->segment.stop);
1076       else
1077         maxsize = size;
1078
1079       if (offset + *length > maxsize) {
1080         *length = maxsize - offset;
1081       }
1082     }
1083   }
1084   if (*length == 0)
1085     goto unexpected_length;
1086
1087   return TRUE;
1088
1089   /* ERRORS */
1090 unexpected_length:
1091   {
1092     return FALSE;
1093   }
1094 }
1095
1096 static GstFlowReturn
1097 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
1098     GstBuffer ** buf)
1099 {
1100   GstFlowReturn ret;
1101   GstBaseSrcClass *bclass;
1102   GstClockReturn status;
1103
1104   bclass = GST_BASE_SRC_GET_CLASS (src);
1105
1106   GST_LIVE_LOCK (src);
1107   if (src->is_live) {
1108     while (!src->live_running) {
1109       GST_DEBUG ("live source signal waiting");
1110       GST_LIVE_SIGNAL (src);
1111       GST_DEBUG ("live source waiting for running state");
1112       GST_LIVE_WAIT (src);
1113       GST_DEBUG ("live source unlocked");
1114     }
1115     /* FIXME, use another variable to signal stopping */
1116     GST_OBJECT_LOCK (src->srcpad);
1117     if (GST_PAD_IS_FLUSHING (src->srcpad))
1118       goto flushing;
1119     GST_OBJECT_UNLOCK (src->srcpad);
1120   }
1121   GST_LIVE_UNLOCK (src);
1122
1123   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED))
1124     goto not_started;
1125
1126   if (G_UNLIKELY (!bclass->create))
1127     goto no_function;
1128
1129   if (!gst_base_src_update_length (src, offset, &length))
1130     goto unexpected_length;
1131
1132   if (src->num_buffers_left == 0) {
1133     goto reached_num_buffers;
1134   } else {
1135     if (src->num_buffers_left > 0)
1136       src->num_buffers_left--;
1137   }
1138
1139   GST_DEBUG_OBJECT (src,
1140       "calling create offset %" G_GUINT64_FORMAT " %" G_GINT64_FORMAT, offset,
1141       src->segment.time);
1142
1143   ret = bclass->create (src, offset, length, buf);
1144   if (ret != GST_FLOW_OK)
1145     goto done;
1146
1147   /* no timestamp set and we are at offset 0 */
1148   if (offset == 0 && src->segment.time == 0
1149       && GST_BUFFER_TIMESTAMP (*buf) == -1)
1150     GST_BUFFER_TIMESTAMP (*buf) = 0;
1151
1152   /* now sync before pushing the buffer */
1153   status = gst_base_src_do_sync (src, *buf);
1154   switch (status) {
1155     case GST_CLOCK_EARLY:
1156       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
1157       break;
1158     case GST_CLOCK_OK:
1159       GST_DEBUG_OBJECT (src, "buffer ok");
1160       break;
1161     default:
1162       GST_DEBUG_OBJECT (src, "clock returned %d, not returning", status);
1163       gst_buffer_unref (*buf);
1164       *buf = NULL;
1165       ret = GST_FLOW_WRONG_STATE;
1166       break;
1167   }
1168 done:
1169   return ret;
1170
1171   /* ERROR */
1172 flushing:
1173   {
1174     GST_DEBUG_OBJECT (src, "pad is flushing");
1175     GST_OBJECT_UNLOCK (src->srcpad);
1176     GST_LIVE_UNLOCK (src);
1177     return GST_FLOW_WRONG_STATE;
1178   }
1179 not_started:
1180   {
1181     GST_DEBUG_OBJECT (src, "getrange but not started");
1182     return GST_FLOW_WRONG_STATE;
1183   }
1184 no_function:
1185   {
1186     GST_DEBUG_OBJECT (src, "no create function");
1187     return GST_FLOW_ERROR;
1188   }
1189 unexpected_length:
1190   {
1191     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
1192         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
1193     return GST_FLOW_UNEXPECTED;
1194   }
1195 reached_num_buffers:
1196   {
1197     GST_DEBUG_OBJECT (src, "sent all buffers");
1198     return GST_FLOW_UNEXPECTED;
1199   }
1200 }
1201
1202 static GstFlowReturn
1203 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
1204     GstBuffer ** buf)
1205 {
1206   GstBaseSrc *src;
1207   GstFlowReturn res;
1208
1209   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1210
1211   res = gst_base_src_get_range (src, offset, length, buf);
1212
1213   gst_object_unref (src);
1214
1215   return res;
1216 }
1217
1218 static gboolean
1219 gst_base_src_check_get_range (GstPad * pad)
1220 {
1221   GstBaseSrc *src;
1222   gboolean res;
1223
1224   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1225
1226   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
1227     gst_base_src_start (src);
1228     gst_base_src_stop (src);
1229   }
1230
1231   /* we can operate in getrange mode if the native format is bytes
1232    * and we are seekable, this condition is set in the random_access
1233    * flag and is set in the _start() method. */
1234   res = src->random_access;
1235
1236   gst_object_unref (src);
1237
1238   return res;
1239 }
1240
1241 static void
1242 gst_base_src_loop (GstPad * pad)
1243 {
1244   GstBaseSrc *src;
1245   GstBuffer *buf = NULL;
1246   GstFlowReturn ret;
1247   gint64 position;
1248
1249   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1250
1251   src->priv->last_sent_eos = FALSE;
1252
1253   /* if we operate in bytes, we can calculate an offset */
1254   if (src->segment.format == GST_FORMAT_BYTES)
1255     position = src->segment.last_stop;
1256   else
1257     position = -1;
1258
1259   ret = gst_base_src_get_range (src, position, src->blocksize, &buf);
1260   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
1261     if (ret == GST_FLOW_UNEXPECTED)
1262       goto eos;
1263     else
1264       goto pause;
1265   }
1266   if (G_UNLIKELY (buf == NULL))
1267     goto error;
1268
1269   /* figure out the new position */
1270   switch (src->segment.format) {
1271     case GST_FORMAT_BYTES:
1272       position += GST_BUFFER_SIZE (buf);
1273       break;
1274     case GST_FORMAT_TIME:
1275     {
1276       GstClockTime start, duration;
1277
1278       start = GST_BUFFER_TIMESTAMP (buf);
1279       duration = GST_BUFFER_DURATION (buf);
1280
1281       if (GST_CLOCK_TIME_IS_VALID (start))
1282         position = start;
1283       else
1284         position = src->segment.last_stop;
1285
1286       if (GST_CLOCK_TIME_IS_VALID (duration))
1287         position += duration;
1288       break;
1289     }
1290     case GST_FORMAT_DEFAULT:
1291       position = GST_BUFFER_OFFSET_END (buf);
1292       break;
1293     default:
1294       position = -1;
1295       break;
1296   }
1297   if (position != -1)
1298     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
1299
1300   ret = gst_pad_push (pad, buf);
1301   if (G_UNLIKELY (ret != GST_FLOW_OK))
1302     goto pause;
1303
1304 done:
1305   gst_object_unref (src);
1306   return;
1307
1308   /* special cases */
1309 eos:
1310   {
1311     GST_DEBUG_OBJECT (src, "going to EOS, getrange returned UNEXPECTED");
1312     /* we finished the segment */
1313     src->data.ABI.running = FALSE;
1314     gst_pad_pause_task (pad);
1315     if (src->segment.flags & GST_SEEK_FLAG_SEGMENT) {
1316       gst_element_post_message (GST_ELEMENT (src),
1317           gst_message_new_segment_done (GST_OBJECT (src),
1318               src->segment.format, src->segment.last_stop));
1319     } else {
1320       gst_pad_push_event (pad, gst_event_new_eos ());
1321       src->priv->last_sent_eos = TRUE;
1322     }
1323     goto done;
1324   }
1325 pause:
1326   {
1327     const gchar *reason = gst_flow_get_name (ret);
1328
1329     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
1330     gst_pad_pause_task (pad);
1331     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
1332       /* for fatal errors we post an error message */
1333       GST_ELEMENT_ERROR (src, STREAM, FAILED,
1334           (_("Internal data flow error.")),
1335           ("streaming task paused, reason %s", reason));
1336       gst_pad_push_event (pad, gst_event_new_eos ());
1337       src->priv->last_sent_eos = TRUE;
1338     }
1339     goto done;
1340   }
1341 error:
1342   {
1343     GST_ELEMENT_ERROR (src, STREAM, FAILED,
1344         (_("Internal data flow error.")), ("element returned NULL buffer"));
1345     /* we finished the segment on error */
1346     src->data.ABI.running = FALSE;
1347     gst_pad_pause_task (pad);
1348     gst_pad_push_event (pad, gst_event_new_eos ());
1349     src->priv->last_sent_eos = TRUE;
1350     goto done;
1351   }
1352 }
1353
1354 /* this will always be called between start() and stop(). So you can rely on
1355  * resources allocated by start() and freed from stop(). This needs to be added
1356  * to the docs at some point. */
1357 static gboolean
1358 gst_base_src_unlock (GstBaseSrc * basesrc)
1359 {
1360   GstBaseSrcClass *bclass;
1361   gboolean result = TRUE;
1362
1363   GST_DEBUG ("unlock");
1364   /* unblock whatever the subclass is doing */
1365   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1366   if (bclass->unlock)
1367     result = bclass->unlock (basesrc);
1368
1369   GST_DEBUG ("unschedule clock");
1370   /* and unblock the clock as well, if any */
1371   GST_OBJECT_LOCK (basesrc);
1372   if (basesrc->clock_id) {
1373     gst_clock_id_unschedule (basesrc->clock_id);
1374   }
1375   GST_OBJECT_UNLOCK (basesrc);
1376
1377   GST_DEBUG ("unlock done");
1378
1379   return result;
1380 }
1381
1382 /* default negotiation code. 
1383  *
1384  * Take intersection between src and sink pads, take first
1385  * caps and fixate. 
1386  */
1387 static gboolean
1388 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
1389 {
1390   GstCaps *thiscaps;
1391   GstCaps *caps = NULL;
1392   GstCaps *peercaps = NULL;
1393   gboolean result = FALSE;
1394
1395   /* first see what is possible on our source pad */
1396   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc));
1397   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
1398   /* nothing or anything is allowed, we're done */
1399   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
1400     goto no_nego_needed;
1401
1402   /* get the peer caps */
1403   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc));
1404   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
1405   if (peercaps) {
1406     GstCaps *icaps;
1407
1408     /* get intersection */
1409     icaps = gst_caps_intersect (thiscaps, peercaps);
1410     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
1411     gst_caps_unref (thiscaps);
1412     gst_caps_unref (peercaps);
1413     if (icaps) {
1414       /* take first (and best, since they are sorted) possibility */
1415       caps = gst_caps_copy_nth (icaps, 0);
1416       gst_caps_unref (icaps);
1417     }
1418   } else {
1419     /* no peer, work with our own caps then */
1420     caps = thiscaps;
1421   }
1422   if (caps) {
1423     caps = gst_caps_make_writable (caps);
1424     gst_caps_truncate (caps);
1425
1426     /* now fixate */
1427     if (!gst_caps_is_empty (caps)) {
1428       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
1429       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
1430
1431       if (gst_caps_is_any (caps)) {
1432         /* hmm, still anything, so element can do anything and
1433          * nego is not needed */
1434         result = TRUE;
1435       } else if (gst_caps_is_fixed (caps)) {
1436         /* yay, fixed caps, use those then */
1437         gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
1438         result = TRUE;
1439       }
1440     }
1441     gst_caps_unref (caps);
1442   }
1443   return result;
1444
1445 no_nego_needed:
1446   {
1447     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
1448     if (thiscaps)
1449       gst_caps_unref (thiscaps);
1450     return TRUE;
1451   }
1452 }
1453
1454 static gboolean
1455 gst_base_src_negotiate (GstBaseSrc * basesrc)
1456 {
1457   GstBaseSrcClass *bclass;
1458   gboolean result = TRUE;
1459
1460   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1461
1462   if (bclass->negotiate)
1463     result = bclass->negotiate (basesrc);
1464
1465   return result;
1466 }
1467
1468 static gboolean
1469 gst_base_src_start (GstBaseSrc * basesrc)
1470 {
1471   GstBaseSrcClass *bclass;
1472   gboolean result;
1473   guint64 size;
1474
1475   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1476     return TRUE;
1477
1478   GST_DEBUG_OBJECT (basesrc, "starting source");
1479
1480   basesrc->num_buffers_left = basesrc->num_buffers;
1481
1482   gst_segment_init (&basesrc->segment, basesrc->segment.format);
1483   basesrc->data.ABI.running = FALSE;
1484
1485   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1486   if (bclass->start)
1487     result = bclass->start (basesrc);
1488   else
1489     result = TRUE;
1490
1491   if (!result)
1492     goto could_not_start;
1493
1494   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
1495
1496   /* figure out the size */
1497   if (basesrc->segment.format == GST_FORMAT_BYTES) {
1498     if (bclass->get_size) {
1499       if (!(result = bclass->get_size (basesrc, &size)))
1500         size = -1;
1501     } else {
1502       result = FALSE;
1503       size = -1;
1504     }
1505     /* only update the size when operating in bytes, subclass is supposed
1506      * to set duration in the start method for other formats */
1507     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
1508   }
1509
1510   GST_DEBUG_OBJECT (basesrc,
1511       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
1512       G_GINT64_FORMAT, basesrc->segment.format, result, size,
1513       basesrc->segment.duration);
1514
1515   /* check if we can seek */
1516   if (bclass->is_seekable)
1517     basesrc->seekable = bclass->is_seekable (basesrc);
1518   else
1519     basesrc->seekable = FALSE;
1520
1521   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", basesrc->seekable);
1522
1523   /* update for random access flag */
1524   basesrc->random_access = basesrc->seekable &&
1525       basesrc->segment.format == GST_FORMAT_BYTES;
1526
1527   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
1528
1529   /* run typefind if we are random_access and the typefinding is enabled. */
1530   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
1531     GstCaps *caps;
1532
1533     caps = gst_type_find_helper (basesrc->srcpad, size);
1534     gst_pad_set_caps (basesrc->srcpad, caps);
1535     gst_caps_unref (caps);
1536   } else {
1537     /* use class or default negotiate function */
1538     if (!gst_base_src_negotiate (basesrc))
1539       goto could_not_negotiate;
1540   }
1541
1542   return TRUE;
1543
1544   /* ERROR */
1545 could_not_start:
1546   {
1547     GST_DEBUG_OBJECT (basesrc, "could not start");
1548     return FALSE;
1549   }
1550 could_not_negotiate:
1551   {
1552     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
1553     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
1554         ("Could not negotiate format"), ("Check your filtered caps, if any"));
1555     gst_base_src_stop (basesrc);
1556     return FALSE;
1557   }
1558 }
1559
1560 static gboolean
1561 gst_base_src_stop (GstBaseSrc * basesrc)
1562 {
1563   GstBaseSrcClass *bclass;
1564   gboolean result = TRUE;
1565
1566   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
1567     return TRUE;
1568
1569   GST_DEBUG_OBJECT (basesrc, "stopping source");
1570
1571   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1572   if (bclass->stop)
1573     result = bclass->stop (basesrc);
1574
1575   if (result)
1576     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
1577
1578   return result;
1579 }
1580
1581 static gboolean
1582 gst_base_src_deactivate (GstBaseSrc * basesrc, GstPad * pad)
1583 {
1584   gboolean result;
1585
1586   GST_LIVE_LOCK (basesrc);
1587   basesrc->live_running = TRUE;
1588   GST_LIVE_SIGNAL (basesrc);
1589   GST_LIVE_UNLOCK (basesrc);
1590
1591   /* step 1, unblock clock sync (if any) */
1592   result = gst_base_src_unlock (basesrc);
1593
1594   /* step 2, make sure streaming finishes */
1595   result &= gst_pad_stop_task (pad);
1596
1597   return result;
1598 }
1599
1600 static gboolean
1601 gst_base_src_activate_push (GstPad * pad, gboolean active)
1602 {
1603   GstBaseSrc *basesrc;
1604   gboolean res;
1605
1606   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1607
1608   /* prepare subclass first */
1609   if (active) {
1610     GstEvent *event;
1611
1612     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
1613
1614     if (!basesrc->can_activate_push)
1615       goto no_push_activation;
1616
1617     if (!gst_base_src_start (basesrc))
1618       goto error_start;
1619
1620     basesrc->priv->last_sent_eos = FALSE;
1621
1622     /* do initial seek, which will start the task */
1623     GST_OBJECT_LOCK (basesrc);
1624     event = basesrc->data.ABI.pending_seek;
1625     basesrc->data.ABI.pending_seek = NULL;
1626     GST_OBJECT_UNLOCK (basesrc);
1627
1628     /* no need to unlock anything, the task is certainly
1629      * not running here. */
1630     res = gst_base_src_perform_seek (basesrc, event, FALSE);
1631
1632     if (event)
1633       gst_event_unref (event);
1634   } else {
1635     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
1636     res = gst_base_src_deactivate (basesrc, pad);
1637   }
1638   return res;
1639
1640   /* ERRORS */
1641 no_push_activation:
1642   {
1643     GST_DEBUG_OBJECT (basesrc, "Subclass disabled push-mode activation");
1644     return FALSE;
1645   }
1646 error_start:
1647   {
1648     gst_base_src_stop (basesrc);
1649     GST_DEBUG_OBJECT (basesrc, "Failed to start in push mode");
1650     return FALSE;
1651   }
1652 }
1653
1654 static gboolean
1655 gst_base_src_activate_pull (GstPad * pad, gboolean active)
1656 {
1657   GstBaseSrc *basesrc;
1658
1659   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
1660
1661   /* prepare subclass first */
1662   if (active) {
1663     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
1664     if (!gst_base_src_start (basesrc))
1665       goto error_start;
1666
1667     /* if not random_access, we cannot operate in pull mode for now */
1668     if (!basesrc->random_access) {
1669       gst_base_src_stop (basesrc);
1670       return FALSE;
1671     }
1672     return TRUE;
1673   } else {
1674     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
1675
1676     /* don't send EOS when going from PAUSED => READY when in pull mode */
1677     basesrc->priv->last_sent_eos = TRUE;
1678
1679     if (!gst_base_src_stop (basesrc))
1680       goto error_stop;
1681
1682     return gst_base_src_deactivate (basesrc, pad);
1683   }
1684
1685 error_start:
1686   {
1687     gst_base_src_stop (basesrc);
1688     GST_DEBUG_OBJECT (basesrc, "Failed to start in pull mode");
1689     return FALSE;
1690   }
1691 error_stop:
1692   {
1693     GST_DEBUG_OBJECT (basesrc, "Failed to stop in pull mode");
1694     return FALSE;
1695   }
1696 }
1697
1698 static GstStateChangeReturn
1699 gst_base_src_change_state (GstElement * element, GstStateChange transition)
1700 {
1701   GstBaseSrc *basesrc;
1702   GstStateChangeReturn result;
1703   gboolean no_preroll = FALSE;
1704
1705   basesrc = GST_BASE_SRC (element);
1706
1707   switch (transition) {
1708     case GST_STATE_CHANGE_NULL_TO_READY:
1709       break;
1710     case GST_STATE_CHANGE_READY_TO_PAUSED:
1711       GST_LIVE_LOCK (element);
1712       if (basesrc->is_live) {
1713         no_preroll = TRUE;
1714         basesrc->live_running = FALSE;
1715       }
1716       basesrc->priv->last_sent_eos = FALSE;
1717       GST_LIVE_UNLOCK (element);
1718       break;
1719     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
1720       GST_LIVE_LOCK (element);
1721       if (basesrc->is_live) {
1722         basesrc->live_running = TRUE;
1723         GST_LIVE_SIGNAL (element);
1724       }
1725       GST_LIVE_UNLOCK (element);
1726       break;
1727     default:
1728       break;
1729   }
1730
1731   if ((result =
1732           GST_ELEMENT_CLASS (parent_class)->change_state (element,
1733               transition)) == GST_STATE_CHANGE_FAILURE)
1734     goto failure;
1735
1736   switch (transition) {
1737     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
1738       GST_LIVE_LOCK (element);
1739       if (basesrc->is_live) {
1740         no_preroll = TRUE;
1741         basesrc->live_running = FALSE;
1742       }
1743       GST_LIVE_UNLOCK (element);
1744       break;
1745     case GST_STATE_CHANGE_PAUSED_TO_READY:
1746       if (!gst_base_src_stop (basesrc))
1747         goto error_stop;
1748
1749       if (!basesrc->priv->last_sent_eos) {
1750         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
1751         gst_pad_push_event (basesrc->srcpad, gst_event_new_eos ());
1752         basesrc->priv->last_sent_eos = TRUE;
1753       }
1754       break;
1755     case GST_STATE_CHANGE_READY_TO_NULL:
1756       break;
1757     default:
1758       break;
1759   }
1760
1761   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
1762     result = GST_STATE_CHANGE_NO_PREROLL;
1763
1764   return result;
1765
1766   /* ERRORS */
1767 failure:
1768   {
1769     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
1770     gst_base_src_stop (basesrc);
1771     return result;
1772   }
1773 error_stop:
1774   {
1775     GST_DEBUG_OBJECT (basesrc, "Failed to stop");
1776     return GST_STATE_CHANGE_FAILURE;
1777   }
1778 }