basesrc: Send the stream-start event as first event ever
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
51  * automatically seekable in push mode as well. The following conditions must
52  * be met to make the element seekable in push mode when the format is not
53  * #GST_FORMAT_BYTES:
54  * <itemizedlist>
55  *   <listitem><para>
56  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
57  *   </para></listitem>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
60  *     internal format as set with gst_base_src_set_format().
61  *   </para></listitem>
62  *   <listitem><para>
63  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
64  *      %TRUE.
65  *   </para></listitem>
66  * </itemizedlist>
67  *
68  * When the element does not meet the requirements to operate in pull mode, the
69  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
70  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
71  * element can operate in pull mode but only with specific offsets and
72  * lengths, it is allowed to generate an error when the wrong values are passed
73  * to the #GstBaseSrcClass.create() function.
74  *
75  * #GstBaseSrc has support for live sources. Live sources are sources that when
76  * paused discard data, such as audio or video capture devices. A typical live
77  * source also produces data at a fixed rate and thus provides a clock to publish
78  * this rate.
79  * Use gst_base_src_set_live() to activate the live source mode.
80  *
81  * A live source does not produce data in the PAUSED state. This means that the
82  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
83  * PLAYING. To signal the pipeline that the element will not produce data, the
84  * return value from the READY to PAUSED state will be
85  * #GST_STATE_CHANGE_NO_PREROLL.
86  *
87  * A typical live source will timestamp the buffers it creates with the
88  * current running time of the pipeline. This is one reason why a live source
89  * can only produce data in the PLAYING state, when the clock is actually
90  * distributed and running.
91  *
92  * Live sources that synchronize and block on the clock (an audio source, for
93  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
94  * #GstBaseSrcClass.create() function was interrupted by a state change to
95  * PAUSED.
96  *
97  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
98  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
99  * function if the source is a live source. The #GstBaseSrcClass.get_times()
100  * function should return timestamps starting from 0, as if it were a non-live
101  * source. The base class will make sure that the timestamps are transformed
102  * into the current running_time. The base source will then wait for the
103  * calculated running_time before pushing out the buffer.
104  *
105  * For live sources, the base class will by default report a latency of 0.
106  * For pseudo live sources, the base class will by default measure the difference
107  * between the first buffer timestamp and the start time of get_times and will
108  * report this value as the latency.
109  * Subclasses should override the query function when this behaviour is not
110  * acceptable.
111  *
112  * There is only support in #GstBaseSrc for exactly one source pad, which
113  * should be named "src". A source implementation (subclass of #GstBaseSrc)
114  * should install a pad template in its class_init function, like so:
115  * |[
116  * static void
117  * my_element_class_init (GstMyElementClass *klass)
118  * {
119  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
120  *   // srctemplate should be a #GstStaticPadTemplate with direction
121  *   // #GST_PAD_SRC and name "src"
122  *   gst_element_class_add_pad_template (gstelement_class,
123  *       gst_static_pad_template_get (&amp;srctemplate));
124  *   // see #GstElementDetails
125  *   gst_element_class_set_details (gstelement_class, &amp;details);
126  * }
127  * ]|
128  *
129  * <refsect2>
130  * <title>Controlled shutdown of live sources in applications</title>
131  * <para>
132  * Applications that record from a live source may want to stop recording
133  * in a controlled way, so that the recording is stopped, but the data
134  * already in the pipeline is processed to the end (remember that many live
135  * sources would go on recording forever otherwise). For that to happen the
136  * application needs to make the source stop recording and send an EOS
137  * event down the pipeline. The application would then wait for an
138  * EOS message posted on the pipeline's bus to know when all data has
139  * been processed and the pipeline can safely be stopped.
140  *
141  * Since GStreamer 0.10.16 an application may send an EOS event to a source
142  * element to make it perform the EOS logic (send EOS event downstream or post a
143  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
144  * with the gst_element_send_event() function on the element or its parent bin.
145  *
146  * After the EOS has been sent to the element, the application should wait for
147  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
148  * received, it may safely shut down the entire pipeline.
149  *
150  * Last reviewed on 2007-12-19 (0.10.16)
151  * </para>
152  * </refsect2>
153  */
154
155 #ifdef HAVE_CONFIG_H
156 #  include "config.h"
157 #endif
158
159 #include <stdlib.h>
160 #include <string.h>
161
162 #include <gst/gst_private.h>
163 #include <gst/glib-compat-private.h>
164
165 #include "gstbasesrc.h"
166 #include "gsttypefindhelper.h"
167 #include <gst/gstmarshal.h>
168 #include <gst/gst-i18n-lib.h>
169
170 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
171 #define GST_CAT_DEFAULT gst_base_src_debug
172
173 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
174 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
175 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
176 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
177 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
178 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
179 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
180 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
181 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
182
183 /* BaseSrc signals and args */
184 enum
185 {
186   /* FILL ME */
187   LAST_SIGNAL
188 };
189
190 #define DEFAULT_BLOCKSIZE       4096
191 #define DEFAULT_NUM_BUFFERS     -1
192 #define DEFAULT_TYPEFIND        FALSE
193 #define DEFAULT_DO_TIMESTAMP    FALSE
194
195 enum
196 {
197   PROP_0,
198   PROP_BLOCKSIZE,
199   PROP_NUM_BUFFERS,
200   PROP_TYPEFIND,
201   PROP_DO_TIMESTAMP
202 };
203
204 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
205    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
206
207 struct _GstBaseSrcPrivate
208 {
209   gboolean discont;
210   gboolean flushing;
211
212   GstFlowReturn start_result;
213   gboolean async;
214
215   /* if a stream-start event should be sent */
216   gboolean stream_start_pending;
217
218   /* if segment should be sent */
219   gboolean segment_pending;
220
221   /* if EOS is pending (atomic) */
222   gint pending_eos;
223
224   /* startup latency is the time it takes between going to PLAYING and producing
225    * the first BUFFER with running_time 0. This value is included in the latency
226    * reporting. */
227   GstClockTime latency;
228   /* timestamp offset, this is the offset add to the values of gst_times for
229    * pseudo live sources */
230   GstClockTimeDiff ts_offset;
231
232   gboolean do_timestamp;
233   volatile gint dynamic_size;
234
235   /* stream sequence number */
236   guint32 seqnum;
237
238   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
239    * pushed in the data stream */
240   GList *pending_events;
241   volatile gint have_events;
242
243   /* QoS *//* with LOCK */
244   gboolean qos_enabled;
245   gdouble proportion;
246   GstClockTime earliest_time;
247
248   GstBufferPool *pool;
249   const GstAllocator *allocator;
250   guint prefix;
251   guint alignment;
252 };
253
254 static GstElementClass *parent_class = NULL;
255
256 static void gst_base_src_class_init (GstBaseSrcClass * klass);
257 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
258 static void gst_base_src_finalize (GObject * object);
259
260
261 GType
262 gst_base_src_get_type (void)
263 {
264   static volatile gsize base_src_type = 0;
265
266   if (g_once_init_enter (&base_src_type)) {
267     GType _type;
268     static const GTypeInfo base_src_info = {
269       sizeof (GstBaseSrcClass),
270       NULL,
271       NULL,
272       (GClassInitFunc) gst_base_src_class_init,
273       NULL,
274       NULL,
275       sizeof (GstBaseSrc),
276       0,
277       (GInstanceInitFunc) gst_base_src_init,
278     };
279
280     _type = g_type_register_static (GST_TYPE_ELEMENT,
281         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
282     g_once_init_leave (&base_src_type, _type);
283   }
284   return base_src_type;
285 }
286
287 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
288     GstCaps * filter);
289 static void gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
290 static void gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
291
292 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
293 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
294     GstPadMode mode, gboolean active);
295 static void gst_base_src_set_property (GObject * object, guint prop_id,
296     const GValue * value, GParamSpec * pspec);
297 static void gst_base_src_get_property (GObject * object, guint prop_id,
298     GValue * value, GParamSpec * pspec);
299 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
300     GstEvent * event);
301 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
302 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
303
304 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
305     GstQuery * query);
306
307 static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
308     gboolean active);
309 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
310 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
311     GstSegment * segment);
312 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
313 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
314     GstEvent * event, GstSegment * segment);
315 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
316     guint64 offset, guint size, GstBuffer ** buf);
317 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
318     guint64 offset, guint size, GstBuffer ** buf);
319
320 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
321     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
322
323 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
324 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
325
326 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
327     GstStateChange transition);
328
329 static void gst_base_src_loop (GstPad * pad);
330 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
331     guint64 offset, guint length, GstBuffer ** buf);
332 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
333     guint length, GstBuffer ** buf);
334 static gboolean gst_base_src_seekable (GstBaseSrc * src);
335 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
336 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
337     guint * length);
338
339 static void
340 gst_base_src_class_init (GstBaseSrcClass * klass)
341 {
342   GObjectClass *gobject_class;
343   GstElementClass *gstelement_class;
344
345   gobject_class = G_OBJECT_CLASS (klass);
346   gstelement_class = GST_ELEMENT_CLASS (klass);
347
348   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
349
350   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
351
352   parent_class = g_type_class_peek_parent (klass);
353
354   gobject_class->finalize = gst_base_src_finalize;
355   gobject_class->set_property = gst_base_src_set_property;
356   gobject_class->get_property = gst_base_src_get_property;
357
358   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
359       g_param_spec_uint ("blocksize", "Block size",
360           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
361           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
362   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
363       g_param_spec_int ("num-buffers", "num-buffers",
364           "Number of buffers to output before sending EOS (-1 = unlimited)",
365           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
366           G_PARAM_STATIC_STRINGS));
367   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
368       g_param_spec_boolean ("typefind", "Typefind",
369           "Run typefind before negotiating", DEFAULT_TYPEFIND,
370           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
371   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
372       g_param_spec_boolean ("do-timestamp", "Do timestamp",
373           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
374           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
375
376   gstelement_class->change_state =
377       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
378   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
379
380   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
381   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
382   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
383   klass->prepare_seek_segment =
384       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
385   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
386   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
387   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
388   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
389   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
390
391   /* Registering debug symbols for function pointers */
392   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
393   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
394   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
395   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
396   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
397 }
398
399 static void
400 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
401 {
402   GstPad *pad;
403   GstPadTemplate *pad_template;
404
405   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
406
407   basesrc->is_live = FALSE;
408   g_mutex_init (&basesrc->live_lock);
409   g_cond_init (&basesrc->live_cond);
410   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
411   basesrc->num_buffers_left = -1;
412
413   basesrc->can_activate_push = TRUE;
414
415   pad_template =
416       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
417   g_return_if_fail (pad_template != NULL);
418
419   GST_DEBUG_OBJECT (basesrc, "creating src pad");
420   pad = gst_pad_new_from_template (pad_template, "src");
421
422   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
423   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
424   gst_pad_set_event_function (pad, gst_base_src_event);
425   gst_pad_set_query_function (pad, gst_base_src_query);
426   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
427
428   /* hold pointer to pad */
429   basesrc->srcpad = pad;
430   GST_DEBUG_OBJECT (basesrc, "adding src pad");
431   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
432
433   basesrc->blocksize = DEFAULT_BLOCKSIZE;
434   basesrc->clock_id = NULL;
435   /* we operate in BYTES by default */
436   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
437   basesrc->typefind = DEFAULT_TYPEFIND;
438   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
439   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
440
441   basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
442   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
443   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
444   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
445
446   GST_DEBUG_OBJECT (basesrc, "init done");
447 }
448
449 static void
450 gst_base_src_finalize (GObject * object)
451 {
452   GstBaseSrc *basesrc;
453   GstEvent **event_p;
454
455   basesrc = GST_BASE_SRC (object);
456
457   g_mutex_clear (&basesrc->live_lock);
458   g_cond_clear (&basesrc->live_cond);
459
460   event_p = &basesrc->pending_seek;
461   gst_event_replace (event_p, NULL);
462
463   if (basesrc->priv->pending_events) {
464     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
465         NULL);
466     g_list_free (basesrc->priv->pending_events);
467   }
468
469   G_OBJECT_CLASS (parent_class)->finalize (object);
470 }
471
472 /**
473  * gst_base_src_wait_playing:
474  * @src: the src
475  *
476  * If the #GstBaseSrcClass.create() method performs its own synchronisation
477  * against the clock it must unblock when going from PLAYING to the PAUSED state
478  * and call this method before continuing to produce the remaining data.
479  *
480  * This function will block until a state change to PLAYING happens (in which
481  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
482  * to a state change to READY or a FLUSH event (in which case this function
483  * returns #GST_FLOW_WRONG_STATE).
484  *
485  * Since: 0.10.12
486  *
487  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
488  * continue. Any other return value should be returned from the create vmethod.
489  */
490 GstFlowReturn
491 gst_base_src_wait_playing (GstBaseSrc * src)
492 {
493   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
494
495   do {
496     /* block until the state changes, or we get a flush, or something */
497     GST_DEBUG_OBJECT (src, "live source waiting for running state");
498     GST_LIVE_WAIT (src);
499     GST_DEBUG_OBJECT (src, "live source unlocked");
500     if (src->priv->flushing)
501       goto flushing;
502   } while (G_UNLIKELY (!src->live_running));
503
504   return GST_FLOW_OK;
505
506   /* ERRORS */
507 flushing:
508   {
509     GST_DEBUG_OBJECT (src, "we are flushing");
510     return GST_FLOW_WRONG_STATE;
511   }
512 }
513
514 /**
515  * gst_base_src_set_live:
516  * @src: base source instance
517  * @live: new live-mode
518  *
519  * If the element listens to a live source, @live should
520  * be set to %TRUE.
521  *
522  * A live source will not produce data in the PAUSED state and
523  * will therefore not be able to participate in the PREROLL phase
524  * of a pipeline. To signal this fact to the application and the
525  * pipeline, the state change return value of the live source will
526  * be GST_STATE_CHANGE_NO_PREROLL.
527  */
528 void
529 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
530 {
531   g_return_if_fail (GST_IS_BASE_SRC (src));
532
533   GST_OBJECT_LOCK (src);
534   src->is_live = live;
535   GST_OBJECT_UNLOCK (src);
536 }
537
538 /**
539  * gst_base_src_is_live:
540  * @src: base source instance
541  *
542  * Check if an element is in live mode.
543  *
544  * Returns: %TRUE if element is in live mode.
545  */
546 gboolean
547 gst_base_src_is_live (GstBaseSrc * src)
548 {
549   gboolean result;
550
551   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
552
553   GST_OBJECT_LOCK (src);
554   result = src->is_live;
555   GST_OBJECT_UNLOCK (src);
556
557   return result;
558 }
559
560 /**
561  * gst_base_src_set_format:
562  * @src: base source instance
563  * @format: the format to use
564  *
565  * Sets the default format of the source. This will be the format used
566  * for sending NEW_SEGMENT events and for performing seeks.
567  *
568  * If a format of GST_FORMAT_BYTES is set, the element will be able to
569  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
570  *
571  * This function must only be called in states < %GST_STATE_PAUSED.
572  *
573  * Since: 0.10.1
574  */
575 void
576 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
577 {
578   g_return_if_fail (GST_IS_BASE_SRC (src));
579   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
580
581   GST_OBJECT_LOCK (src);
582   gst_segment_init (&src->segment, format);
583   GST_OBJECT_UNLOCK (src);
584 }
585
586 /**
587  * gst_base_src_set_dynamic_size:
588  * @src: base source instance
589  * @dynamic: new dynamic size mode
590  *
591  * If not @dynamic, size is only updated when needed, such as when trying to
592  * read past current tracked size.  Otherwise, size is checked for upon each
593  * read.
594  *
595  * Since: 0.10.36
596  */
597 void
598 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
599 {
600   g_return_if_fail (GST_IS_BASE_SRC (src));
601
602   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
603 }
604
605 /**
606  * gst_base_src_set_async:
607  * @src: base source instance
608  * @async: new async mode
609  *
610  * Configure async behaviour in @src, no state change will block. The open,
611  * close, start, stop, play and pause virtual methods will be executed in a
612  * different thread and are thus allowed to perform blocking operations. Any
613  * blocking operation should be unblocked with the unlock vmethod.
614  */
615 void
616 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
617 {
618   g_return_if_fail (GST_IS_BASE_SRC (src));
619
620   GST_OBJECT_LOCK (src);
621   src->priv->async = async;
622   GST_OBJECT_UNLOCK (src);
623 }
624
625 /**
626  * gst_base_src_is_async:
627  * @src: base source instance
628  *
629  * Get the current async behaviour of @src. See also gst_base_src_set_async().
630  *
631  * Returns: %TRUE if @src is operating in async mode.
632  */
633 gboolean
634 gst_base_src_is_async (GstBaseSrc * src)
635 {
636   gboolean res;
637
638   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
639
640   GST_OBJECT_LOCK (src);
641   res = src->priv->async;
642   GST_OBJECT_UNLOCK (src);
643
644   return res;
645 }
646
647
648 /**
649  * gst_base_src_query_latency:
650  * @src: the source
651  * @live: (out) (allow-none): if the source is live
652  * @min_latency: (out) (allow-none): the min latency of the source
653  * @max_latency: (out) (allow-none): the max latency of the source
654  *
655  * Query the source for the latency parameters. @live will be TRUE when @src is
656  * configured as a live source. @min_latency will be set to the difference
657  * between the running time and the timestamp of the first buffer.
658  * @max_latency is always the undefined value of -1.
659  *
660  * This function is mostly used by subclasses.
661  *
662  * Returns: TRUE if the query succeeded.
663  *
664  * Since: 0.10.13
665  */
666 gboolean
667 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
668     GstClockTime * min_latency, GstClockTime * max_latency)
669 {
670   GstClockTime min;
671
672   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
673
674   GST_OBJECT_LOCK (src);
675   if (live)
676     *live = src->is_live;
677
678   /* if we have a startup latency, report this one, else report 0. Subclasses
679    * are supposed to override the query function if they want something
680    * else. */
681   if (src->priv->latency != -1)
682     min = src->priv->latency;
683   else
684     min = 0;
685
686   if (min_latency)
687     *min_latency = min;
688   if (max_latency)
689     *max_latency = -1;
690
691   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
692       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
693       GST_TIME_ARGS (-1));
694   GST_OBJECT_UNLOCK (src);
695
696   return TRUE;
697 }
698
699 /**
700  * gst_base_src_set_blocksize:
701  * @src: the source
702  * @blocksize: the new blocksize in bytes
703  *
704  * Set the number of bytes that @src will push out with each buffer. When
705  * @blocksize is set to -1, a default length will be used.
706  *
707  * Since: 0.10.22
708  */
709 void
710 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
711 {
712   g_return_if_fail (GST_IS_BASE_SRC (src));
713
714   GST_OBJECT_LOCK (src);
715   src->blocksize = blocksize;
716   GST_OBJECT_UNLOCK (src);
717 }
718
719 /**
720  * gst_base_src_get_blocksize:
721  * @src: the source
722  *
723  * Get the number of bytes that @src will push out with each buffer.
724  *
725  * Returns: the number of bytes pushed with each buffer.
726  *
727  * Since: 0.10.22
728  */
729 guint
730 gst_base_src_get_blocksize (GstBaseSrc * src)
731 {
732   gint res;
733
734   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
735
736   GST_OBJECT_LOCK (src);
737   res = src->blocksize;
738   GST_OBJECT_UNLOCK (src);
739
740   return res;
741 }
742
743
744 /**
745  * gst_base_src_set_do_timestamp:
746  * @src: the source
747  * @timestamp: enable or disable timestamping
748  *
749  * Configure @src to automatically timestamp outgoing buffers based on the
750  * current running_time of the pipeline. This property is mostly useful for live
751  * sources.
752  *
753  * Since: 0.10.15
754  */
755 void
756 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
757 {
758   g_return_if_fail (GST_IS_BASE_SRC (src));
759
760   GST_OBJECT_LOCK (src);
761   src->priv->do_timestamp = timestamp;
762   GST_OBJECT_UNLOCK (src);
763 }
764
765 /**
766  * gst_base_src_get_do_timestamp:
767  * @src: the source
768  *
769  * Query if @src timestamps outgoing buffers based on the current running_time.
770  *
771  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
772  *
773  * Since: 0.10.15
774  */
775 gboolean
776 gst_base_src_get_do_timestamp (GstBaseSrc * src)
777 {
778   gboolean res;
779
780   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
781
782   GST_OBJECT_LOCK (src);
783   res = src->priv->do_timestamp;
784   GST_OBJECT_UNLOCK (src);
785
786   return res;
787 }
788
789 /**
790  * gst_base_src_new_seamless_segment:
791  * @src: The source
792  * @start: The new start value for the segment
793  * @stop: Stop value for the new segment
794  * @position: The position value for the new segent
795  *
796  * Prepare a new seamless segment for emission downstream. This function must
797  * only be called by derived sub-classes, and only from the create() function,
798  * as the stream-lock needs to be held.
799  *
800  * The format for the new segment will be the current format of the source, as
801  * configured with gst_base_src_set_format()
802  *
803  * Returns: %TRUE if preparation of the seamless segment succeeded.
804  *
805  * Since: 0.10.26
806  */
807 gboolean
808 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
809     gint64 position)
810 {
811   gboolean res = TRUE;
812
813   GST_DEBUG_OBJECT (src,
814       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
815       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
816       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
817
818   GST_OBJECT_LOCK (src);
819
820   src->segment.base = gst_segment_to_running_time (&src->segment,
821       src->segment.format, src->segment.position);
822   src->segment.start = start;
823   src->segment.stop = stop;
824   src->segment.position = position;
825
826   /* forward, we send data from position to stop */
827   src->priv->segment_pending = TRUE;
828   GST_OBJECT_UNLOCK (src);
829
830   src->priv->discont = TRUE;
831   src->running = TRUE;
832
833   return res;
834 }
835
836 static gboolean
837 gst_base_src_send_stream_start (GstBaseSrc * src)
838 {
839   gboolean ret = TRUE;
840
841   if (src->priv->stream_start_pending) {
842     ret = gst_pad_push_event (src->srcpad, gst_event_new_stream_start ());
843     src->priv->stream_start_pending = FALSE;
844   }
845
846   return ret;
847 }
848
849 /**
850  * gst_base_src_set_caps:
851  * @src: a #GstBaseSrc
852  * @caps: a #GstCaps
853  *
854  * Set new caps on the basesrc source pad.
855  *
856  * Returns: %TRUE if the caps could be set
857  */
858 gboolean
859 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
860 {
861   GstBaseSrcClass *bclass;
862   gboolean res = TRUE;
863
864   bclass = GST_BASE_SRC_GET_CLASS (src);
865
866   gst_base_src_send_stream_start (src);
867   gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
868
869   if (bclass->set_caps)
870     res = bclass->set_caps (src, caps);
871
872   return res;
873 }
874
875 static GstCaps *
876 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
877 {
878   GstCaps *caps = NULL;
879   GstPadTemplate *pad_template;
880   GstBaseSrcClass *bclass;
881
882   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
883
884   pad_template =
885       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
886
887   if (pad_template != NULL) {
888     caps = gst_pad_template_get_caps (pad_template);
889
890     if (filter) {
891       GstCaps *intersection;
892
893       intersection =
894           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
895       gst_caps_unref (caps);
896       caps = intersection;
897     }
898   }
899   return caps;
900 }
901
902 static void
903 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
904 {
905   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
906   gst_caps_fixate (caps);
907 }
908
909 static void
910 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
911 {
912   GstBaseSrcClass *bclass;
913
914   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
915
916   if (bclass->fixate)
917     bclass->fixate (bsrc, caps);
918 }
919
920 static gboolean
921 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
922 {
923   gboolean res;
924
925   switch (GST_QUERY_TYPE (query)) {
926     case GST_QUERY_POSITION:
927     {
928       GstFormat format;
929
930       gst_query_parse_position (query, &format, NULL);
931
932       GST_DEBUG_OBJECT (src, "position query in format %s",
933           gst_format_get_name (format));
934
935       switch (format) {
936         case GST_FORMAT_PERCENT:
937         {
938           gint64 percent;
939           gint64 position;
940           gint64 duration;
941
942           GST_OBJECT_LOCK (src);
943           position = src->segment.position;
944           duration = src->segment.duration;
945           GST_OBJECT_UNLOCK (src);
946
947           if (position != -1 && duration != -1) {
948             if (position < duration)
949               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
950                   duration);
951             else
952               percent = GST_FORMAT_PERCENT_MAX;
953           } else
954             percent = -1;
955
956           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
957           res = TRUE;
958           break;
959         }
960         default:
961         {
962           gint64 position;
963           GstFormat seg_format;
964
965           GST_OBJECT_LOCK (src);
966           position =
967               gst_segment_to_stream_time (&src->segment, src->segment.format,
968               src->segment.position);
969           seg_format = src->segment.format;
970           GST_OBJECT_UNLOCK (src);
971
972           if (position != -1) {
973             /* convert to requested format */
974             res =
975                 gst_pad_query_convert (src->srcpad, seg_format,
976                 position, format, &position);
977           } else
978             res = TRUE;
979
980           gst_query_set_position (query, format, position);
981           break;
982         }
983       }
984       break;
985     }
986     case GST_QUERY_DURATION:
987     {
988       GstFormat format;
989
990       gst_query_parse_duration (query, &format, NULL);
991
992       GST_DEBUG_OBJECT (src, "duration query in format %s",
993           gst_format_get_name (format));
994
995       switch (format) {
996         case GST_FORMAT_PERCENT:
997           gst_query_set_duration (query, GST_FORMAT_PERCENT,
998               GST_FORMAT_PERCENT_MAX);
999           res = TRUE;
1000           break;
1001         default:
1002         {
1003           gint64 duration;
1004           GstFormat seg_format;
1005           guint length = 0;
1006
1007           /* may have to refresh duration */
1008           if (g_atomic_int_get (&src->priv->dynamic_size))
1009             gst_base_src_update_length (src, 0, &length);
1010
1011           /* this is the duration as configured by the subclass. */
1012           GST_OBJECT_LOCK (src);
1013           duration = src->segment.duration;
1014           seg_format = src->segment.format;
1015           GST_OBJECT_UNLOCK (src);
1016
1017           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1018               duration, gst_format_get_name (seg_format));
1019
1020           if (duration != -1) {
1021             /* convert to requested format, if this fails, we have a duration
1022              * but we cannot answer the query, we must return FALSE. */
1023             res =
1024                 gst_pad_query_convert (src->srcpad, seg_format,
1025                 duration, format, &duration);
1026           } else {
1027             /* The subclass did not configure a duration, we assume that the
1028              * media has an unknown duration then and we return TRUE to report
1029              * this. Note that this is not the same as returning FALSE, which
1030              * means that we cannot report the duration at all. */
1031             res = TRUE;
1032           }
1033           gst_query_set_duration (query, format, duration);
1034           break;
1035         }
1036       }
1037       break;
1038     }
1039
1040     case GST_QUERY_SEEKING:
1041     {
1042       GstFormat format, seg_format;
1043       gint64 duration;
1044
1045       GST_OBJECT_LOCK (src);
1046       duration = src->segment.duration;
1047       seg_format = src->segment.format;
1048       GST_OBJECT_UNLOCK (src);
1049
1050       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1051       if (format == seg_format) {
1052         gst_query_set_seeking (query, seg_format,
1053             gst_base_src_seekable (src), 0, duration);
1054         res = TRUE;
1055       } else {
1056         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
1057         /* Don't reply to the query to make up for demuxers which don't
1058          * handle the SEEKING query yet. Players like Totem will fall back
1059          * to the duration when the SEEKING query isn't answered. */
1060         res = FALSE;
1061       }
1062       break;
1063     }
1064     case GST_QUERY_SEGMENT:
1065     {
1066       gint64 start, stop;
1067
1068       GST_OBJECT_LOCK (src);
1069       /* no end segment configured, current duration then */
1070       if ((stop = src->segment.stop) == -1)
1071         stop = src->segment.duration;
1072       start = src->segment.start;
1073
1074       /* adjust to stream time */
1075       if (src->segment.time != -1) {
1076         start -= src->segment.time;
1077         if (stop != -1)
1078           stop -= src->segment.time;
1079       }
1080
1081       gst_query_set_segment (query, src->segment.rate, src->segment.format,
1082           start, stop);
1083       GST_OBJECT_UNLOCK (src);
1084       res = TRUE;
1085       break;
1086     }
1087
1088     case GST_QUERY_FORMATS:
1089     {
1090       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1091           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1092       res = TRUE;
1093       break;
1094     }
1095     case GST_QUERY_CONVERT:
1096     {
1097       GstFormat src_fmt, dest_fmt;
1098       gint64 src_val, dest_val;
1099
1100       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1101
1102       /* we can only convert between equal formats... */
1103       if (src_fmt == dest_fmt) {
1104         dest_val = src_val;
1105         res = TRUE;
1106       } else
1107         res = FALSE;
1108
1109       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1110       break;
1111     }
1112     case GST_QUERY_LATENCY:
1113     {
1114       GstClockTime min, max;
1115       gboolean live;
1116
1117       /* Subclasses should override and implement something useful */
1118       res = gst_base_src_query_latency (src, &live, &min, &max);
1119
1120       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1121           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1122           GST_TIME_ARGS (max));
1123
1124       gst_query_set_latency (query, live, min, max);
1125       break;
1126     }
1127     case GST_QUERY_JITTER:
1128     case GST_QUERY_RATE:
1129       res = FALSE;
1130       break;
1131     case GST_QUERY_BUFFERING:
1132     {
1133       GstFormat format, seg_format;
1134       gint64 start, stop, estimated;
1135
1136       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1137
1138       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1139           gst_format_get_name (format));
1140
1141       GST_OBJECT_LOCK (src);
1142       if (src->random_access) {
1143         estimated = 0;
1144         start = 0;
1145         if (format == GST_FORMAT_PERCENT)
1146           stop = GST_FORMAT_PERCENT_MAX;
1147         else
1148           stop = src->segment.duration;
1149       } else {
1150         estimated = -1;
1151         start = -1;
1152         stop = -1;
1153       }
1154       seg_format = src->segment.format;
1155       GST_OBJECT_UNLOCK (src);
1156
1157       /* convert to required format. When the conversion fails, we can't answer
1158        * the query. When the value is unknown, we can don't perform conversion
1159        * but report TRUE. */
1160       if (format != GST_FORMAT_PERCENT && stop != -1) {
1161         res = gst_pad_query_convert (src->srcpad, seg_format,
1162             stop, format, &stop);
1163       } else {
1164         res = TRUE;
1165       }
1166       if (res && format != GST_FORMAT_PERCENT && start != -1)
1167         res = gst_pad_query_convert (src->srcpad, seg_format,
1168             start, format, &start);
1169
1170       gst_query_set_buffering_range (query, format, start, stop, estimated);
1171       break;
1172     }
1173     case GST_QUERY_SCHEDULING:
1174     {
1175       gboolean random_access;
1176
1177       random_access = gst_base_src_is_random_access (src);
1178
1179       /* we can operate in getrange mode if the native format is bytes
1180        * and we are seekable, this condition is set in the random_access
1181        * flag and is set in the _start() method. */
1182       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1183       if (random_access)
1184         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1185       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1186
1187       res = TRUE;
1188       break;
1189     }
1190     case GST_QUERY_CAPS:
1191     {
1192       GstBaseSrcClass *bclass;
1193       GstCaps *caps, *filter;
1194
1195       bclass = GST_BASE_SRC_GET_CLASS (src);
1196       if (bclass->get_caps) {
1197         gst_query_parse_caps (query, &filter);
1198         if ((caps = bclass->get_caps (src, filter))) {
1199           gst_query_set_caps_result (query, caps);
1200           gst_caps_unref (caps);
1201           res = TRUE;
1202         } else {
1203           res = FALSE;
1204         }
1205       } else
1206         res = FALSE;
1207       break;
1208     }
1209     default:
1210       res = FALSE;
1211       break;
1212   }
1213   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1214       res);
1215
1216   return res;
1217 }
1218
1219 static gboolean
1220 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1221 {
1222   GstBaseSrc *src;
1223   GstBaseSrcClass *bclass;
1224   gboolean result = FALSE;
1225
1226   src = GST_BASE_SRC (parent);
1227   bclass = GST_BASE_SRC_GET_CLASS (src);
1228
1229   if (bclass->query)
1230     result = bclass->query (src, query);
1231
1232   return result;
1233 }
1234
1235 static gboolean
1236 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1237 {
1238   gboolean res = TRUE;
1239
1240   /* update our offset if the start/stop position was updated */
1241   if (segment->format == GST_FORMAT_BYTES) {
1242     segment->time = segment->start;
1243   } else if (segment->start == 0) {
1244     /* seek to start, we can implement a default for this. */
1245     segment->time = 0;
1246   } else {
1247     res = FALSE;
1248     GST_INFO_OBJECT (src, "Can't do a default seek");
1249   }
1250
1251   return res;
1252 }
1253
1254 static gboolean
1255 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1256 {
1257   GstBaseSrcClass *bclass;
1258   gboolean result = FALSE;
1259
1260   bclass = GST_BASE_SRC_GET_CLASS (src);
1261
1262   if (bclass->do_seek)
1263     result = bclass->do_seek (src, segment);
1264
1265   return result;
1266 }
1267
1268 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1269
1270 static gboolean
1271 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1272     GstSegment * segment)
1273 {
1274   /* By default, we try one of 2 things:
1275    *   - For absolute seek positions, convert the requested position to our
1276    *     configured processing format and place it in the output segment \
1277    *   - For relative seek positions, convert our current (input) values to the
1278    *     seek format, adjust by the relative seek offset and then convert back to
1279    *     the processing format
1280    */
1281   GstSeekType cur_type, stop_type;
1282   gint64 cur, stop;
1283   GstSeekFlags flags;
1284   GstFormat seek_format, dest_format;
1285   gdouble rate;
1286   gboolean update;
1287   gboolean res = TRUE;
1288
1289   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1290       &cur_type, &cur, &stop_type, &stop);
1291   dest_format = segment->format;
1292
1293   if (seek_format == dest_format) {
1294     gst_segment_do_seek (segment, rate, seek_format, flags,
1295         cur_type, cur, stop_type, stop, &update);
1296     return TRUE;
1297   }
1298
1299   if (cur_type != GST_SEEK_TYPE_NONE) {
1300     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1301     res =
1302         gst_pad_query_convert (src->srcpad, seek_format, cur, dest_format,
1303         &cur);
1304     cur_type = GST_SEEK_TYPE_SET;
1305   }
1306
1307   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1308     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1309     res =
1310         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1311         &stop);
1312     stop_type = GST_SEEK_TYPE_SET;
1313   }
1314
1315   /* And finally, configure our output segment in the desired format */
1316   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1317       stop_type, stop, &update);
1318
1319   if (!res)
1320     goto no_format;
1321
1322   return res;
1323
1324 no_format:
1325   {
1326     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1327     return FALSE;
1328   }
1329 }
1330
1331 static gboolean
1332 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1333     GstSegment * seeksegment)
1334 {
1335   GstBaseSrcClass *bclass;
1336   gboolean result = FALSE;
1337
1338   bclass = GST_BASE_SRC_GET_CLASS (src);
1339
1340   if (bclass->prepare_seek_segment)
1341     result = bclass->prepare_seek_segment (src, event, seeksegment);
1342
1343   return result;
1344 }
1345
1346 static GstFlowReturn
1347 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1348     guint size, GstBuffer ** buffer)
1349 {
1350   GstFlowReturn ret;
1351   GstBaseSrcPrivate *priv = src->priv;
1352
1353   if (priv->pool) {
1354     ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
1355   } else if (size != -1) {
1356     *buffer = gst_buffer_new_allocate (priv->allocator, size, priv->alignment);
1357     if (G_UNLIKELY (*buffer == NULL))
1358       goto alloc_failed;
1359
1360     ret = GST_FLOW_OK;
1361   } else {
1362     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1363         size);
1364     goto alloc_failed;
1365   }
1366   return ret;
1367
1368   /* ERRORS */
1369 alloc_failed:
1370   {
1371     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1372     return GST_FLOW_ERROR;
1373   }
1374 }
1375
1376 static GstFlowReturn
1377 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1378     guint size, GstBuffer ** buffer)
1379 {
1380   GstBaseSrcClass *bclass;
1381   GstFlowReturn ret;
1382
1383   bclass = GST_BASE_SRC_GET_CLASS (src);
1384
1385   if (G_UNLIKELY (!bclass->alloc))
1386     goto no_function;
1387   if (G_UNLIKELY (!bclass->fill))
1388     goto no_function;
1389
1390   ret = bclass->alloc (src, offset, size, buffer);
1391   if (G_UNLIKELY (ret != GST_FLOW_OK))
1392     goto alloc_failed;
1393
1394   if (G_LIKELY (size > 0)) {
1395     /* only call fill when there is a size */
1396     ret = bclass->fill (src, offset, size, *buffer);
1397     if (G_UNLIKELY (ret != GST_FLOW_OK))
1398       goto not_ok;
1399   }
1400
1401   return GST_FLOW_OK;
1402
1403   /* ERRORS */
1404 no_function:
1405   {
1406     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1407     return GST_FLOW_NOT_SUPPORTED;
1408   }
1409 alloc_failed:
1410   {
1411     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1412     return ret;
1413   }
1414 not_ok:
1415   {
1416     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1417         gst_flow_get_name (ret));
1418     gst_buffer_unref (*buffer);
1419     return ret;
1420   }
1421 }
1422
1423 /* this code implements the seeking. It is a good example
1424  * handling all cases.
1425  *
1426  * A seek updates the currently configured segment.start
1427  * and segment.stop values based on the SEEK_TYPE. If the
1428  * segment.start value is updated, a seek to this new position
1429  * should be performed.
1430  *
1431  * The seek can only be executed when we are not currently
1432  * streaming any data, to make sure that this is the case, we
1433  * acquire the STREAM_LOCK which is taken when we are in the
1434  * _loop() function or when a getrange() is called. Normally
1435  * we will not receive a seek if we are operating in pull mode
1436  * though. When we operate as a live source we might block on the live
1437  * cond, which does not release the STREAM_LOCK. Therefore we will try
1438  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1439  * safe to perform the seek.
1440  *
1441  * When we are in the loop() function, we might be in the middle
1442  * of pushing a buffer, which might block in a sink. To make sure
1443  * that the push gets unblocked we push out a FLUSH_START event.
1444  * Our loop function will get a WRONG_STATE return value from
1445  * the push and will pause, effectively releasing the STREAM_LOCK.
1446  *
1447  * For a non-flushing seek, we pause the task, which might eventually
1448  * release the STREAM_LOCK. We say eventually because when the sink
1449  * blocks on the sample we might wait a very long time until the sink
1450  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1451  * can continue the seek. A non-flushing seek is normally done in a
1452  * running pipeline to perform seamless playback, this means that the sink is
1453  * PLAYING and will return from its chain function.
1454  * In the case of a non-flushing seek we need to make sure that the
1455  * data we output after the seek is continuous with the previous data,
1456  * this is because a non-flushing seek does not reset the running-time
1457  * to 0. We do this by closing the currently running segment, ie. sending
1458  * a new_segment event with the stop position set to the last processed
1459  * position.
1460  *
1461  * After updating the segment.start/stop values, we prepare for
1462  * streaming again. We push out a FLUSH_STOP to make the peer pad
1463  * accept data again and we start our task again.
1464  *
1465  * A segment seek posts a message on the bus saying that the playback
1466  * of the segment started. We store the segment flag internally because
1467  * when we reach the segment.stop we have to post a segment.done
1468  * instead of EOS when doing a segment seek.
1469  */
1470 /* FIXME (0.11), we have the unlock gboolean here because most current
1471  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1472  * the streaming thread isn't running, resulting in bogus unlocks later when it
1473  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1474  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1475  * until 0.11
1476  */
1477 static gboolean
1478 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1479 {
1480   gboolean res = TRUE, tres;
1481   gdouble rate;
1482   GstFormat seek_format, dest_format;
1483   GstSeekFlags flags;
1484   GstSeekType cur_type, stop_type;
1485   gint64 cur, stop;
1486   gboolean flush, playing;
1487   gboolean update;
1488   gboolean relative_seek = FALSE;
1489   gboolean seekseg_configured = FALSE;
1490   GstSegment seeksegment;
1491   guint32 seqnum;
1492   GstEvent *tevent;
1493
1494   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1495
1496   GST_OBJECT_LOCK (src);
1497   dest_format = src->segment.format;
1498   GST_OBJECT_UNLOCK (src);
1499
1500   if (event) {
1501     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1502         &cur_type, &cur, &stop_type, &stop);
1503
1504     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1505         SEEK_TYPE_IS_RELATIVE (stop_type);
1506
1507     if (dest_format != seek_format && !relative_seek) {
1508       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1509        * here before taking the stream lock, otherwise we must convert it later,
1510        * once we have the stream lock and can read the last configures segment
1511        * start and stop positions */
1512       gst_segment_init (&seeksegment, dest_format);
1513
1514       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1515         goto prepare_failed;
1516
1517       seekseg_configured = TRUE;
1518     }
1519
1520     flush = flags & GST_SEEK_FLAG_FLUSH;
1521     seqnum = gst_event_get_seqnum (event);
1522   } else {
1523     flush = FALSE;
1524     /* get next seqnum */
1525     seqnum = gst_util_seqnum_next ();
1526   }
1527
1528   /* send flush start */
1529   if (flush) {
1530     tevent = gst_event_new_flush_start ();
1531     gst_event_set_seqnum (tevent, seqnum);
1532     gst_pad_push_event (src->srcpad, tevent);
1533   } else
1534     gst_pad_pause_task (src->srcpad);
1535
1536   /* unblock streaming thread. */
1537   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1538
1539   /* grab streaming lock, this should eventually be possible, either
1540    * because the task is paused, our streaming thread stopped
1541    * or because our peer is flushing. */
1542   GST_PAD_STREAM_LOCK (src->srcpad);
1543   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1544     /* we have seen this event before, issue a warning for now */
1545     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1546         seqnum);
1547   } else {
1548     src->priv->seqnum = seqnum;
1549     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1550   }
1551
1552   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1553
1554   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1555    * copy the current segment info into the temp segment that we can actually
1556    * attempt the seek with. We only update the real segment if the seek succeeds. */
1557   if (!seekseg_configured) {
1558     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1559
1560     /* now configure the final seek segment */
1561     if (event) {
1562       if (seeksegment.format != seek_format) {
1563         /* OK, here's where we give the subclass a chance to convert the relative
1564          * seek into an absolute one in the processing format. We set up any
1565          * absolute seek above, before taking the stream lock. */
1566         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1567           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1568               "Aborting seek");
1569           res = FALSE;
1570         }
1571       } else {
1572         /* The seek format matches our processing format, no need to ask the
1573          * the subclass to configure the segment. */
1574         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1575             cur_type, cur, stop_type, stop, &update);
1576       }
1577     }
1578     /* Else, no seek event passed, so we're just (re)starting the
1579        current segment. */
1580   }
1581
1582   if (res) {
1583     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1584         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1585         seeksegment.start, seeksegment.stop, seeksegment.position);
1586
1587     /* do the seek, segment.position contains the new position. */
1588     res = gst_base_src_do_seek (src, &seeksegment);
1589   }
1590
1591   /* and prepare to continue streaming */
1592   if (flush) {
1593     tevent = gst_event_new_flush_stop (TRUE);
1594     gst_event_set_seqnum (tevent, seqnum);
1595     /* send flush stop, peer will accept data and events again. We
1596      * are not yet providing data as we still have the STREAM_LOCK. */
1597     gst_pad_push_event (src->srcpad, tevent);
1598   }
1599
1600   /* The subclass must have converted the segment to the processing format
1601    * by now */
1602   if (res && seeksegment.format != dest_format) {
1603     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1604         "in the correct format. Aborting seek.");
1605     res = FALSE;
1606   }
1607
1608   /* if the seek was successful, we update our real segment and push
1609    * out the new segment. */
1610   if (res) {
1611     GST_OBJECT_LOCK (src);
1612     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1613     GST_OBJECT_UNLOCK (src);
1614
1615     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1616       GstMessage *message;
1617
1618       message = gst_message_new_segment_start (GST_OBJECT (src),
1619           seeksegment.format, seeksegment.position);
1620       gst_message_set_seqnum (message, seqnum);
1621
1622       gst_element_post_message (GST_ELEMENT (src), message);
1623     }
1624
1625     /* for deriving a stop position for the playback segment from the seek
1626      * segment, we must take the duration when the stop is not set */
1627     if ((stop = seeksegment.stop) == -1)
1628       stop = seeksegment.duration;
1629
1630     src->priv->segment_pending = TRUE;
1631   }
1632
1633   src->priv->discont = TRUE;
1634   src->running = TRUE;
1635   /* and restart the task in case it got paused explicitly or by
1636    * the FLUSH_START event we pushed out. */
1637   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1638       src->srcpad);
1639   if (res && !tres)
1640     res = FALSE;
1641
1642   /* and release the lock again so we can continue streaming */
1643   GST_PAD_STREAM_UNLOCK (src->srcpad);
1644
1645   return res;
1646
1647   /* ERROR */
1648 prepare_failed:
1649   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1650       "Aborting seek");
1651   return FALSE;
1652 }
1653
1654 /* all events send to this element directly. This is mainly done from the
1655  * application.
1656  */
1657 static gboolean
1658 gst_base_src_send_event (GstElement * element, GstEvent * event)
1659 {
1660   GstBaseSrc *src;
1661   gboolean result = FALSE;
1662
1663   src = GST_BASE_SRC (element);
1664
1665   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1666
1667   switch (GST_EVENT_TYPE (event)) {
1668       /* bidirectional events */
1669     case GST_EVENT_FLUSH_START:
1670     case GST_EVENT_FLUSH_STOP:
1671       /* sending random flushes downstream can break stuff,
1672        * especially sync since all segment info will get flushed */
1673       break;
1674
1675       /* downstream serialized events */
1676     case GST_EVENT_EOS:
1677     {
1678       GstBaseSrcClass *bclass;
1679
1680       bclass = GST_BASE_SRC_GET_CLASS (src);
1681
1682       /* queue EOS and make sure the task or pull function performs the EOS
1683        * actions.
1684        *
1685        * We have two possibilities:
1686        *
1687        *  - Before we are to enter the _create function, we check the pending_eos
1688        *    first and do EOS instead of entering it.
1689        *  - If we are in the _create function or we did not manage to set the
1690        *    flag fast enough and we are about to enter the _create function,
1691        *    we unlock it so that we exit with WRONG_STATE immediately. We then
1692        *    check the EOS flag and do the EOS logic.
1693        */
1694       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1695       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1696
1697
1698       /* unlock the _create function so that we can check the pending_eos flag
1699        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1700        * that we can grab it and stop the unlock again. We don't take the stream
1701        * lock so that this operation is guaranteed to never block. */
1702       gst_base_src_activate_pool (src, FALSE);
1703       if (bclass->unlock)
1704         bclass->unlock (src);
1705
1706       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1707
1708       GST_LIVE_LOCK (src);
1709       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1710       /* now stop the unlock of the streaming thread again. Grabbing the live
1711        * lock is enough because that protects the create function. */
1712       if (bclass->unlock_stop)
1713         bclass->unlock_stop (src);
1714       gst_base_src_activate_pool (src, TRUE);
1715       GST_LIVE_UNLOCK (src);
1716
1717       result = TRUE;
1718       break;
1719     }
1720     case GST_EVENT_SEGMENT:
1721       /* sending random SEGMENT downstream can break sync. */
1722       break;
1723     case GST_EVENT_TAG:
1724     case GST_EVENT_CUSTOM_DOWNSTREAM:
1725     case GST_EVENT_CUSTOM_BOTH:
1726       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1727       GST_OBJECT_LOCK (src);
1728       src->priv->pending_events =
1729           g_list_append (src->priv->pending_events, event);
1730       g_atomic_int_set (&src->priv->have_events, TRUE);
1731       GST_OBJECT_UNLOCK (src);
1732       event = NULL;
1733       result = TRUE;
1734       break;
1735     case GST_EVENT_BUFFERSIZE:
1736       /* does not seem to make much sense currently */
1737       break;
1738
1739       /* upstream events */
1740     case GST_EVENT_QOS:
1741       /* elements should override send_event and do something */
1742       break;
1743     case GST_EVENT_SEEK:
1744     {
1745       gboolean started;
1746
1747       GST_OBJECT_LOCK (src->srcpad);
1748       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
1749         goto wrong_mode;
1750       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1751       GST_OBJECT_UNLOCK (src->srcpad);
1752
1753       if (started) {
1754         GST_DEBUG_OBJECT (src, "performing seek");
1755         /* when we are running in push mode, we can execute the
1756          * seek right now, we need to unlock. */
1757         result = gst_base_src_perform_seek (src, event, TRUE);
1758       } else {
1759         GstEvent **event_p;
1760
1761         /* else we store the event and execute the seek when we
1762          * get activated */
1763         GST_OBJECT_LOCK (src);
1764         GST_DEBUG_OBJECT (src, "queueing seek");
1765         event_p = &src->pending_seek;
1766         gst_event_replace ((GstEvent **) event_p, event);
1767         GST_OBJECT_UNLOCK (src);
1768         /* assume the seek will work */
1769         result = TRUE;
1770       }
1771       break;
1772     }
1773     case GST_EVENT_NAVIGATION:
1774       /* could make sense for elements that do something with navigation events
1775        * but then they would need to override the send_event function */
1776       break;
1777     case GST_EVENT_LATENCY:
1778       /* does not seem to make sense currently */
1779       break;
1780
1781       /* custom events */
1782     case GST_EVENT_CUSTOM_UPSTREAM:
1783       /* override send_event if you want this */
1784       break;
1785     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1786     case GST_EVENT_CUSTOM_BOTH_OOB:
1787       /* insert a random custom event into the pipeline */
1788       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1789       result = gst_pad_push_event (src->srcpad, event);
1790       /* we gave away the ref to the event in the push */
1791       event = NULL;
1792       break;
1793     default:
1794       break;
1795   }
1796 done:
1797   /* if we still have a ref to the event, unref it now */
1798   if (event)
1799     gst_event_unref (event);
1800
1801   return result;
1802
1803   /* ERRORS */
1804 wrong_mode:
1805   {
1806     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1807     GST_OBJECT_UNLOCK (src->srcpad);
1808     result = FALSE;
1809     goto done;
1810   }
1811 }
1812
1813 static gboolean
1814 gst_base_src_seekable (GstBaseSrc * src)
1815 {
1816   GstBaseSrcClass *bclass;
1817   bclass = GST_BASE_SRC_GET_CLASS (src);
1818   if (bclass->is_seekable)
1819     return bclass->is_seekable (src);
1820   else
1821     return FALSE;
1822 }
1823
1824 static void
1825 gst_base_src_update_qos (GstBaseSrc * src,
1826     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1827 {
1828   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1829       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1830       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1831
1832   GST_OBJECT_LOCK (src);
1833   src->priv->proportion = proportion;
1834   src->priv->earliest_time = timestamp + diff;
1835   GST_OBJECT_UNLOCK (src);
1836 }
1837
1838
1839 static gboolean
1840 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1841 {
1842   gboolean result;
1843
1844   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1845
1846   switch (GST_EVENT_TYPE (event)) {
1847     case GST_EVENT_SEEK:
1848       /* is normally called when in push mode */
1849       if (!gst_base_src_seekable (src))
1850         goto not_seekable;
1851
1852       result = gst_base_src_perform_seek (src, event, TRUE);
1853       break;
1854     case GST_EVENT_FLUSH_START:
1855       /* cancel any blocking getrange, is normally called
1856        * when in pull mode. */
1857       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1858       break;
1859     case GST_EVENT_FLUSH_STOP:
1860       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1861       break;
1862     case GST_EVENT_QOS:
1863     {
1864       gdouble proportion;
1865       GstClockTimeDiff diff;
1866       GstClockTime timestamp;
1867
1868       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1869       gst_base_src_update_qos (src, proportion, diff, timestamp);
1870       result = TRUE;
1871       break;
1872     }
1873     case GST_EVENT_RECONFIGURE:
1874       result = TRUE;
1875       break;
1876     case GST_EVENT_LATENCY:
1877       result = TRUE;
1878       break;
1879     default:
1880       result = FALSE;
1881       break;
1882   }
1883   return result;
1884
1885   /* ERRORS */
1886 not_seekable:
1887   {
1888     GST_DEBUG_OBJECT (src, "is not seekable");
1889     return FALSE;
1890   }
1891 }
1892
1893 static gboolean
1894 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
1895 {
1896   GstBaseSrc *src;
1897   GstBaseSrcClass *bclass;
1898   gboolean result = FALSE;
1899
1900   src = GST_BASE_SRC (parent);
1901   bclass = GST_BASE_SRC_GET_CLASS (src);
1902
1903   if (bclass->event) {
1904     if (!(result = bclass->event (src, event)))
1905       goto subclass_failed;
1906   }
1907
1908 done:
1909   gst_event_unref (event);
1910
1911   return result;
1912
1913   /* ERRORS */
1914 subclass_failed:
1915   {
1916     GST_DEBUG_OBJECT (src, "subclass refused event");
1917     goto done;
1918   }
1919 }
1920
1921 static void
1922 gst_base_src_set_property (GObject * object, guint prop_id,
1923     const GValue * value, GParamSpec * pspec)
1924 {
1925   GstBaseSrc *src;
1926
1927   src = GST_BASE_SRC (object);
1928
1929   switch (prop_id) {
1930     case PROP_BLOCKSIZE:
1931       gst_base_src_set_blocksize (src, g_value_get_uint (value));
1932       break;
1933     case PROP_NUM_BUFFERS:
1934       src->num_buffers = g_value_get_int (value);
1935       break;
1936     case PROP_TYPEFIND:
1937       src->typefind = g_value_get_boolean (value);
1938       break;
1939     case PROP_DO_TIMESTAMP:
1940       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1941       break;
1942     default:
1943       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1944       break;
1945   }
1946 }
1947
1948 static void
1949 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1950     GParamSpec * pspec)
1951 {
1952   GstBaseSrc *src;
1953
1954   src = GST_BASE_SRC (object);
1955
1956   switch (prop_id) {
1957     case PROP_BLOCKSIZE:
1958       g_value_set_uint (value, gst_base_src_get_blocksize (src));
1959       break;
1960     case PROP_NUM_BUFFERS:
1961       g_value_set_int (value, src->num_buffers);
1962       break;
1963     case PROP_TYPEFIND:
1964       g_value_set_boolean (value, src->typefind);
1965       break;
1966     case PROP_DO_TIMESTAMP:
1967       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1968       break;
1969     default:
1970       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1971       break;
1972   }
1973 }
1974
1975 /* with STREAM_LOCK and LOCK */
1976 static GstClockReturn
1977 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1978 {
1979   GstClockReturn ret;
1980   GstClockID id;
1981
1982   id = gst_clock_new_single_shot_id (clock, time);
1983
1984   basesrc->clock_id = id;
1985   /* release the live lock while waiting */
1986   GST_LIVE_UNLOCK (basesrc);
1987
1988   ret = gst_clock_id_wait (id, NULL);
1989
1990   GST_LIVE_LOCK (basesrc);
1991   gst_clock_id_unref (id);
1992   basesrc->clock_id = NULL;
1993
1994   return ret;
1995 }
1996
1997 /* perform synchronisation on a buffer.
1998  * with STREAM_LOCK.
1999  */
2000 static GstClockReturn
2001 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2002 {
2003   GstClockReturn result;
2004   GstClockTime start, end;
2005   GstBaseSrcClass *bclass;
2006   GstClockTime base_time;
2007   GstClock *clock;
2008   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
2009   gboolean do_timestamp, first, pseudo_live;
2010
2011   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2012
2013   start = end = -1;
2014   if (bclass->get_times)
2015     bclass->get_times (basesrc, buffer, &start, &end);
2016
2017   /* get buffer timestamp */
2018   timestamp = GST_BUFFER_TIMESTAMP (buffer);
2019
2020   /* grab the lock to prepare for clocking and calculate the startup
2021    * latency. */
2022   GST_OBJECT_LOCK (basesrc);
2023
2024   /* if we are asked to sync against the clock we are a pseudo live element */
2025   pseudo_live = (start != -1 && basesrc->is_live);
2026   /* check for the first buffer */
2027   first = (basesrc->priv->latency == -1);
2028
2029   if (timestamp != -1 && pseudo_live) {
2030     GstClockTime latency;
2031
2032     /* we have a timestamp and a sync time, latency is the diff */
2033     if (timestamp <= start)
2034       latency = start - timestamp;
2035     else
2036       latency = 0;
2037
2038     if (first) {
2039       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2040           GST_TIME_ARGS (latency));
2041       /* first time we calculate latency, just configure */
2042       basesrc->priv->latency = latency;
2043     } else {
2044       if (basesrc->priv->latency != latency) {
2045         /* we have a new latency, FIXME post latency message */
2046         basesrc->priv->latency = latency;
2047         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2048             GST_TIME_ARGS (latency));
2049       }
2050     }
2051   } else if (first) {
2052     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2053         basesrc->is_live, start != -1);
2054     basesrc->priv->latency = 0;
2055   }
2056
2057   /* get clock, if no clock, we can't sync or do timestamps */
2058   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2059     goto no_clock;
2060
2061   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2062
2063   do_timestamp = basesrc->priv->do_timestamp;
2064
2065   /* first buffer, calculate the timestamp offset */
2066   if (first) {
2067     GstClockTime running_time;
2068
2069     now = gst_clock_get_time (clock);
2070     running_time = now - base_time;
2071
2072     GST_LOG_OBJECT (basesrc,
2073         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
2074         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
2075         GST_TIME_ARGS (running_time));
2076
2077     if (pseudo_live && timestamp != -1) {
2078       /* live source and we need to sync, add startup latency to all timestamps
2079        * to get the real running_time. Live sources should always timestamp
2080        * according to the current running time. */
2081       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2082
2083       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2084           GST_TIME_ARGS (basesrc->priv->ts_offset));
2085     } else {
2086       basesrc->priv->ts_offset = 0;
2087       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2088     }
2089
2090     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
2091       if (do_timestamp)
2092         timestamp = running_time;
2093       else
2094         timestamp = 0;
2095
2096       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
2097
2098       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2099           GST_TIME_ARGS (timestamp));
2100     }
2101
2102     /* add the timestamp offset we need for sync */
2103     timestamp += basesrc->priv->ts_offset;
2104   } else {
2105     /* not the first buffer, the timestamp is the diff between the clock and
2106      * base_time */
2107     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
2108       now = gst_clock_get_time (clock);
2109
2110       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
2111
2112       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
2113           GST_TIME_ARGS (now - base_time));
2114     }
2115   }
2116
2117   /* if we don't have a buffer timestamp, we don't sync */
2118   if (!GST_CLOCK_TIME_IS_VALID (start))
2119     goto no_sync;
2120
2121   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
2122     /* for pseudo live sources, add our ts_offset to the timestamp */
2123     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
2124     start += basesrc->priv->ts_offset;
2125   }
2126
2127   GST_LOG_OBJECT (basesrc,
2128       "waiting for clock, base time %" GST_TIME_FORMAT
2129       ", stream_start %" GST_TIME_FORMAT,
2130       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2131   GST_OBJECT_UNLOCK (basesrc);
2132
2133   result = gst_base_src_wait (basesrc, clock, start + base_time);
2134
2135   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2136
2137   return result;
2138
2139   /* special cases */
2140 no_clock:
2141   {
2142     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2143     GST_OBJECT_UNLOCK (basesrc);
2144     return GST_CLOCK_OK;
2145   }
2146 no_sync:
2147   {
2148     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2149     GST_OBJECT_UNLOCK (basesrc);
2150     return GST_CLOCK_OK;
2151   }
2152 }
2153
2154 /* Called with STREAM_LOCK and LIVE_LOCK */
2155 static gboolean
2156 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2157 {
2158   guint64 size, maxsize;
2159   GstBaseSrcClass *bclass;
2160   GstFormat format;
2161   gint64 stop;
2162   gboolean dynamic;
2163
2164   bclass = GST_BASE_SRC_GET_CLASS (src);
2165
2166   format = src->segment.format;
2167   stop = src->segment.stop;
2168   /* get total file size */
2169   size = src->segment.duration;
2170
2171   /* only operate if we are working with bytes */
2172   if (format != GST_FORMAT_BYTES)
2173     return TRUE;
2174
2175   /* the max amount of bytes to read is the total size or
2176    * up to the segment.stop if present. */
2177   if (stop != -1)
2178     maxsize = MIN (size, stop);
2179   else
2180     maxsize = size;
2181
2182   GST_DEBUG_OBJECT (src,
2183       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2184       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2185       *length, size, stop, maxsize);
2186
2187   dynamic = g_atomic_int_get (&src->priv->dynamic_size);
2188   GST_DEBUG_OBJECT (src, "dynamic size: %d", dynamic);
2189
2190   /* check size if we have one */
2191   if (maxsize != -1) {
2192     /* if we run past the end, check if the file became bigger and
2193      * retry. */
2194     if (G_UNLIKELY (offset + *length >= maxsize || dynamic)) {
2195       /* see if length of the file changed */
2196       if (bclass->get_size)
2197         if (!bclass->get_size (src, &size))
2198           size = -1;
2199
2200       /* make sure we don't exceed the configured segment stop
2201        * if it was set */
2202       if (stop != -1)
2203         maxsize = MIN (size, stop);
2204       else
2205         maxsize = size;
2206
2207       /* if we are at or past the end, EOS */
2208       if (G_UNLIKELY (offset >= maxsize))
2209         goto unexpected_length;
2210
2211       /* else we can clip to the end */
2212       if (G_UNLIKELY (offset + *length >= maxsize))
2213         *length = maxsize - offset;
2214
2215     }
2216   }
2217
2218   /* keep track of current duration.
2219    * segment is in bytes, we checked that above. */
2220   GST_OBJECT_LOCK (src);
2221   src->segment.duration = size;
2222   GST_OBJECT_UNLOCK (src);
2223
2224   return TRUE;
2225
2226   /* ERRORS */
2227 unexpected_length:
2228   {
2229     return FALSE;
2230   }
2231 }
2232
2233 /* must be called with LIVE_LOCK */
2234 static GstFlowReturn
2235 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2236     GstBuffer ** buf)
2237 {
2238   GstFlowReturn ret;
2239   GstBaseSrcClass *bclass;
2240   GstClockReturn status;
2241
2242   bclass = GST_BASE_SRC_GET_CLASS (src);
2243
2244 again:
2245   if (src->is_live) {
2246     if (G_UNLIKELY (!src->live_running)) {
2247       ret = gst_base_src_wait_playing (src);
2248       if (ret != GST_FLOW_OK)
2249         goto stopped;
2250     }
2251   }
2252
2253   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)))
2254     goto not_started;
2255
2256   if (G_UNLIKELY (!bclass->create))
2257     goto no_function;
2258
2259   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2260     goto unexpected_length;
2261
2262   /* track position */
2263   GST_OBJECT_LOCK (src);
2264   if (src->segment.format == GST_FORMAT_BYTES)
2265     src->segment.position = offset;
2266   GST_OBJECT_UNLOCK (src);
2267
2268   /* normally we don't count buffers */
2269   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2270     if (src->num_buffers_left == 0)
2271       goto reached_num_buffers;
2272     else
2273       src->num_buffers_left--;
2274   }
2275
2276   /* don't enter the create function if a pending EOS event was set. For the
2277    * logic of the pending_eos, check the event function of this class. */
2278   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2279     goto eos;
2280
2281   GST_DEBUG_OBJECT (src,
2282       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2283       G_GINT64_FORMAT, offset, length, src->segment.time);
2284
2285   ret = bclass->create (src, offset, length, buf);
2286
2287   /* The create function could be unlocked because we have a pending EOS. It's
2288    * possible that we have a valid buffer from create that we need to
2289    * discard when the create function returned _OK. */
2290   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2291     if (ret == GST_FLOW_OK) {
2292       gst_buffer_unref (*buf);
2293       *buf = NULL;
2294     }
2295     goto eos;
2296   }
2297
2298   if (G_UNLIKELY (ret != GST_FLOW_OK))
2299     goto not_ok;
2300
2301   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2302   if (offset == 0 && src->segment.time == 0
2303       && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) {
2304     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2305     *buf = gst_buffer_make_writable (*buf);
2306     GST_BUFFER_TIMESTAMP (*buf) = 0;
2307   }
2308
2309   /* now sync before pushing the buffer */
2310   status = gst_base_src_do_sync (src, *buf);
2311
2312   /* waiting for the clock could have made us flushing */
2313   if (G_UNLIKELY (src->priv->flushing))
2314     goto flushing;
2315
2316   switch (status) {
2317     case GST_CLOCK_EARLY:
2318       /* the buffer is too late. We currently don't drop the buffer. */
2319       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2320       break;
2321     case GST_CLOCK_OK:
2322       /* buffer synchronised properly */
2323       GST_DEBUG_OBJECT (src, "buffer ok");
2324       break;
2325     case GST_CLOCK_UNSCHEDULED:
2326       /* this case is triggered when we were waiting for the clock and
2327        * it got unlocked because we did a state change. In any case, get rid of
2328        * the buffer. */
2329       gst_buffer_unref (*buf);
2330       *buf = NULL;
2331       if (!src->live_running) {
2332         /* We return WRONG_STATE when we are not running to stop the dataflow also
2333          * get rid of the produced buffer. */
2334         GST_DEBUG_OBJECT (src,
2335             "clock was unscheduled (%d), returning WRONG_STATE", status);
2336         ret = GST_FLOW_WRONG_STATE;
2337       } else {
2338         /* If we are running when this happens, we quickly switched between
2339          * pause and playing. We try to produce a new buffer */
2340         GST_DEBUG_OBJECT (src,
2341             "clock was unscheduled (%d), but we are running", status);
2342         goto again;
2343       }
2344       break;
2345     default:
2346       /* all other result values are unexpected and errors */
2347       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2348           (_("Internal clock error.")),
2349           ("clock returned unexpected return value %d", status));
2350       gst_buffer_unref (*buf);
2351       *buf = NULL;
2352       ret = GST_FLOW_ERROR;
2353       break;
2354   }
2355   return ret;
2356
2357   /* ERROR */
2358 stopped:
2359   {
2360     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2361         gst_flow_get_name (ret));
2362     return ret;
2363   }
2364 not_ok:
2365   {
2366     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2367         gst_flow_get_name (ret));
2368     return ret;
2369   }
2370 not_started:
2371   {
2372     GST_DEBUG_OBJECT (src, "getrange but not started");
2373     return GST_FLOW_WRONG_STATE;
2374   }
2375 no_function:
2376   {
2377     GST_DEBUG_OBJECT (src, "no create function");
2378     return GST_FLOW_NOT_SUPPORTED;
2379   }
2380 unexpected_length:
2381   {
2382     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2383         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2384     return GST_FLOW_EOS;
2385   }
2386 reached_num_buffers:
2387   {
2388     GST_DEBUG_OBJECT (src, "sent all buffers");
2389     return GST_FLOW_EOS;
2390   }
2391 flushing:
2392   {
2393     GST_DEBUG_OBJECT (src, "we are flushing");
2394     gst_buffer_unref (*buf);
2395     *buf = NULL;
2396     return GST_FLOW_WRONG_STATE;
2397   }
2398 eos:
2399   {
2400     GST_DEBUG_OBJECT (src, "we are EOS");
2401     return GST_FLOW_EOS;
2402   }
2403 }
2404
2405 static GstFlowReturn
2406 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2407     guint length, GstBuffer ** buf)
2408 {
2409   GstBaseSrc *src;
2410   GstFlowReturn res;
2411
2412   src = GST_BASE_SRC_CAST (parent);
2413
2414   GST_LIVE_LOCK (src);
2415   if (G_UNLIKELY (src->priv->flushing))
2416     goto flushing;
2417
2418   res = gst_base_src_get_range (src, offset, length, buf);
2419
2420 done:
2421   GST_LIVE_UNLOCK (src);
2422
2423   return res;
2424
2425   /* ERRORS */
2426 flushing:
2427   {
2428     GST_DEBUG_OBJECT (src, "we are flushing");
2429     res = GST_FLOW_WRONG_STATE;
2430     goto done;
2431   }
2432 }
2433
2434 static gboolean
2435 gst_base_src_is_random_access (GstBaseSrc * src)
2436 {
2437   /* we need to start the basesrc to check random access */
2438   if (!GST_BASE_SRC_IS_STARTED (src)) {
2439     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2440     if (G_LIKELY (gst_base_src_start (src))) {
2441       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2442         goto start_failed;
2443       gst_base_src_stop (src);
2444     }
2445   }
2446
2447   return src->random_access;
2448
2449   /* ERRORS */
2450 start_failed:
2451   {
2452     GST_DEBUG_OBJECT (src, "failed to start");
2453     return FALSE;
2454   }
2455 }
2456
2457 static void
2458 gst_base_src_loop (GstPad * pad)
2459 {
2460   GstBaseSrc *src;
2461   GstBuffer *buf = NULL;
2462   GstFlowReturn ret;
2463   gint64 position;
2464   gboolean eos;
2465   guint blocksize;
2466   GList *pending_events = NULL, *tmp;
2467
2468   eos = FALSE;
2469
2470   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2471
2472   gst_base_src_send_stream_start (src);
2473
2474   /* check if we need to renegotiate */
2475   if (gst_pad_check_reconfigure (pad)) {
2476     if (!gst_base_src_negotiate (src))
2477       goto not_negotiated;
2478   }
2479
2480   GST_LIVE_LOCK (src);
2481
2482   if (G_UNLIKELY (src->priv->flushing))
2483     goto flushing;
2484
2485   blocksize = src->blocksize;
2486
2487   /* if we operate in bytes, we can calculate an offset */
2488   if (src->segment.format == GST_FORMAT_BYTES) {
2489     position = src->segment.position;
2490     /* for negative rates, start with subtracting the blocksize */
2491     if (src->segment.rate < 0.0) {
2492       /* we cannot go below segment.start */
2493       if (position > src->segment.start + blocksize)
2494         position -= blocksize;
2495       else {
2496         /* last block, remainder up to segment.start */
2497         blocksize = position - src->segment.start;
2498         position = src->segment.start;
2499       }
2500     }
2501   } else
2502     position = -1;
2503
2504   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
2505       GST_TIME_ARGS (position), blocksize);
2506
2507   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2508   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2509     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2510         gst_flow_get_name (ret));
2511     GST_LIVE_UNLOCK (src);
2512     goto pause;
2513   }
2514   /* this should not happen */
2515   if (G_UNLIKELY (buf == NULL))
2516     goto null_buffer;
2517
2518   /* push events to close/start our segment before we push the buffer. */
2519   if (G_UNLIKELY (src->priv->segment_pending)) {
2520     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2521     src->priv->segment_pending = FALSE;
2522   }
2523
2524   if (g_atomic_int_get (&src->priv->have_events)) {
2525     GST_OBJECT_LOCK (src);
2526     /* take the events */
2527     pending_events = src->priv->pending_events;
2528     src->priv->pending_events = NULL;
2529     g_atomic_int_set (&src->priv->have_events, FALSE);
2530     GST_OBJECT_UNLOCK (src);
2531   }
2532
2533   /* Push out pending events if any */
2534   if (G_UNLIKELY (pending_events != NULL)) {
2535     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2536       GstEvent *ev = (GstEvent *) tmp->data;
2537       gst_pad_push_event (pad, ev);
2538     }
2539     g_list_free (pending_events);
2540   }
2541
2542   /* figure out the new position */
2543   switch (src->segment.format) {
2544     case GST_FORMAT_BYTES:
2545     {
2546       guint bufsize = gst_buffer_get_size (buf);
2547
2548       /* we subtracted above for negative rates */
2549       if (src->segment.rate >= 0.0)
2550         position += bufsize;
2551       break;
2552     }
2553     case GST_FORMAT_TIME:
2554     {
2555       GstClockTime start, duration;
2556
2557       start = GST_BUFFER_TIMESTAMP (buf);
2558       duration = GST_BUFFER_DURATION (buf);
2559
2560       if (GST_CLOCK_TIME_IS_VALID (start))
2561         position = start;
2562       else
2563         position = src->segment.position;
2564
2565       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2566         if (src->segment.rate >= 0.0)
2567           position += duration;
2568         else if (position > duration)
2569           position -= duration;
2570         else
2571           position = 0;
2572       }
2573       break;
2574     }
2575     case GST_FORMAT_DEFAULT:
2576       if (src->segment.rate >= 0.0)
2577         position = GST_BUFFER_OFFSET_END (buf);
2578       else
2579         position = GST_BUFFER_OFFSET (buf);
2580       break;
2581     default:
2582       position = -1;
2583       break;
2584   }
2585   if (position != -1) {
2586     if (src->segment.rate >= 0.0) {
2587       /* positive rate, check if we reached the stop */
2588       if (src->segment.stop != -1) {
2589         if (position >= src->segment.stop) {
2590           eos = TRUE;
2591           position = src->segment.stop;
2592         }
2593       }
2594     } else {
2595       /* negative rate, check if we reached the start. start is always set to
2596        * something different from -1 */
2597       if (position <= src->segment.start) {
2598         eos = TRUE;
2599         position = src->segment.start;
2600       }
2601       /* when going reverse, all buffers are DISCONT */
2602       src->priv->discont = TRUE;
2603     }
2604     GST_OBJECT_LOCK (src);
2605     src->segment.position = position;
2606     GST_OBJECT_UNLOCK (src);
2607   }
2608
2609   if (G_UNLIKELY (src->priv->discont)) {
2610     GST_INFO_OBJECT (src, "marking pending DISCONT");
2611     buf = gst_buffer_make_writable (buf);
2612     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2613     src->priv->discont = FALSE;
2614   }
2615   GST_LIVE_UNLOCK (src);
2616
2617   ret = gst_pad_push (pad, buf);
2618   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2619     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2620         gst_flow_get_name (ret));
2621     goto pause;
2622   }
2623
2624   if (G_UNLIKELY (eos)) {
2625     GST_INFO_OBJECT (src, "pausing after end of segment");
2626     ret = GST_FLOW_EOS;
2627     goto pause;
2628   }
2629
2630 done:
2631   return;
2632
2633   /* special cases */
2634 not_negotiated:
2635   {
2636     GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2637     ret = GST_FLOW_NOT_NEGOTIATED;
2638     goto pause;
2639   }
2640 flushing:
2641   {
2642     GST_DEBUG_OBJECT (src, "we are flushing");
2643     GST_LIVE_UNLOCK (src);
2644     ret = GST_FLOW_WRONG_STATE;
2645     goto pause;
2646   }
2647 pause:
2648   {
2649     const gchar *reason = gst_flow_get_name (ret);
2650     GstEvent *event;
2651
2652     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2653     src->running = FALSE;
2654     gst_pad_pause_task (pad);
2655     if (ret == GST_FLOW_EOS) {
2656       gboolean flag_segment;
2657       GstFormat format;
2658       gint64 position;
2659
2660       /* perform EOS logic */
2661       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2662       format = src->segment.format;
2663       position = src->segment.position;
2664
2665       if (flag_segment) {
2666         GstMessage *message;
2667
2668         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2669             format, position);
2670         gst_message_set_seqnum (message, src->priv->seqnum);
2671         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2672       } else {
2673         event = gst_event_new_eos ();
2674         gst_event_set_seqnum (event, src->priv->seqnum);
2675         gst_pad_push_event (pad, event);
2676       }
2677     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
2678       event = gst_event_new_eos ();
2679       gst_event_set_seqnum (event, src->priv->seqnum);
2680       /* for fatal errors we post an error message, post the error
2681        * first so the app knows about the error first.
2682        * Also don't do this for WRONG_STATE because it happens
2683        * due to flushing and posting an error message because of
2684        * that is the wrong thing to do, e.g. when we're doing
2685        * a flushing seek. */
2686       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2687           (_("Internal data flow error.")),
2688           ("streaming task paused, reason %s (%d)", reason, ret));
2689       gst_pad_push_event (pad, event);
2690     }
2691     goto done;
2692   }
2693 null_buffer:
2694   {
2695     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2696         (_("Internal data flow error.")), ("element returned NULL buffer"));
2697     GST_LIVE_UNLOCK (src);
2698     goto done;
2699   }
2700 }
2701
2702 static gboolean
2703 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
2704     const GstAllocator * allocator, guint prefix, guint alignment)
2705 {
2706   GstBufferPool *oldpool;
2707   GstBaseSrcPrivate *priv = basesrc->priv;
2708
2709   if (pool) {
2710     GST_DEBUG_OBJECT (basesrc, "activate pool");
2711     if (!gst_buffer_pool_set_active (pool, TRUE))
2712       goto activate_failed;
2713   }
2714
2715   GST_OBJECT_LOCK (basesrc);
2716   oldpool = priv->pool;
2717   priv->pool = pool;
2718
2719   priv->allocator = allocator;
2720
2721   priv->prefix = prefix;
2722   priv->alignment = alignment;
2723   GST_OBJECT_UNLOCK (basesrc);
2724
2725   if (oldpool) {
2726     /* only deactivate if the pool is not the one we're using */
2727     if (oldpool != pool) {
2728       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
2729       gst_buffer_pool_set_active (oldpool, FALSE);
2730     }
2731     gst_object_unref (oldpool);
2732   }
2733   return TRUE;
2734
2735   /* ERRORS */
2736 activate_failed:
2737   {
2738     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
2739     return FALSE;
2740   }
2741 }
2742
2743 static gboolean
2744 gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
2745 {
2746   GstBaseSrcPrivate *priv = basesrc->priv;
2747   GstBufferPool *pool;
2748   gboolean res = TRUE;
2749
2750   GST_OBJECT_LOCK (basesrc);
2751   if ((pool = priv->pool))
2752     pool = gst_object_ref (pool);
2753   GST_OBJECT_UNLOCK (basesrc);
2754
2755   if (pool) {
2756     res = gst_buffer_pool_set_active (pool, active);
2757     gst_object_unref (pool);
2758   }
2759   return res;
2760 }
2761
2762 static gboolean
2763 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
2764 {
2765   GstBaseSrcClass *bclass;
2766   gboolean result = TRUE;
2767   GstQuery *query;
2768   GstBufferPool *pool = NULL;
2769   const GstAllocator *allocator = NULL;
2770   guint size, min, max, prefix, alignment;
2771
2772   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2773
2774   /* make query and let peer pad answer, we don't really care if it worked or
2775    * not, if it failed, the allocation query would contain defaults and the
2776    * subclass would then set better values if needed */
2777   query = gst_query_new_allocation (caps, TRUE);
2778   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
2779     /* not a problem, just debug a little */
2780     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
2781   }
2782
2783   if (G_LIKELY (bclass->decide_allocation))
2784     result = bclass->decide_allocation (basesrc, query);
2785
2786   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
2787       query);
2788   gst_query_parse_allocation_params (query, &size, &min, &max, &prefix,
2789       &alignment, &pool);
2790
2791   if (size == 0) {
2792     const gchar *mem = NULL;
2793
2794     /* no size, we have variable size buffers */
2795     if (gst_query_get_n_allocation_memories (query) > 0) {
2796       mem = gst_query_parse_nth_allocation_memory (query, 0);
2797     }
2798     GST_DEBUG_OBJECT (basesrc, "0 size, getting allocator %s",
2799         GST_STR_NULL (mem));
2800     allocator = gst_allocator_find (mem);
2801   } else if (pool == NULL) {
2802     /* fixed size, we can use a bufferpool */
2803     GstStructure *config;
2804
2805     /* we did not get a pool, make one ourselves then */
2806     pool = gst_buffer_pool_new ();
2807     GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
2808
2809     config = gst_buffer_pool_get_config (pool);
2810     gst_buffer_pool_config_set (config, caps, size, min, max, prefix,
2811         alignment);
2812     gst_buffer_pool_set_config (pool, config);
2813   }
2814
2815   gst_query_unref (query);
2816
2817   result =
2818       gst_base_src_set_allocation (basesrc, pool, allocator, prefix, alignment);
2819
2820   return result;
2821
2822 }
2823
2824 /* default negotiation code.
2825  *
2826  * Take intersection between src and sink pads, take first
2827  * caps and fixate.
2828  */
2829 static gboolean
2830 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2831 {
2832   GstCaps *thiscaps;
2833   GstCaps *caps = NULL;
2834   GstCaps *peercaps = NULL;
2835   gboolean result = FALSE;
2836
2837   /* first see what is possible on our source pad */
2838   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2839   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2840   /* nothing or anything is allowed, we're done */
2841   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2842     goto no_nego_needed;
2843
2844   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2845     goto no_caps;
2846
2847   /* get the peer caps */
2848   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2849   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2850   if (peercaps) {
2851     /* The result is already a subset of our caps */
2852     caps = peercaps;
2853     gst_caps_unref (thiscaps);
2854   } else {
2855     /* no peer, work with our own caps then */
2856     caps = thiscaps;
2857   }
2858   if (caps && !gst_caps_is_empty (caps)) {
2859     /* now fixate */
2860     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2861     if (gst_caps_is_any (caps)) {
2862       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
2863       /* hmm, still anything, so element can do anything and
2864        * nego is not needed */
2865       result = TRUE;
2866     } else {
2867       caps = gst_caps_make_writable (caps);
2868       gst_base_src_fixate (basesrc, caps);
2869       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2870       if (gst_caps_is_fixed (caps)) {
2871         /* yay, fixed caps, use those then, it's possible that the subclass does
2872          * not accept this caps after all and we have to fail. */
2873         result = gst_base_src_set_caps (basesrc, caps);
2874       }
2875     }
2876     gst_caps_unref (caps);
2877   } else {
2878     if (caps)
2879       gst_caps_unref (caps);
2880     GST_DEBUG_OBJECT (basesrc, "no common caps");
2881   }
2882   return result;
2883
2884 no_nego_needed:
2885   {
2886     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2887     if (thiscaps)
2888       gst_caps_unref (thiscaps);
2889     return TRUE;
2890   }
2891 no_caps:
2892   {
2893     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2894         ("No supported formats found"),
2895         ("This element did not produce valid caps"));
2896     if (thiscaps)
2897       gst_caps_unref (thiscaps);
2898     return TRUE;
2899   }
2900 }
2901
2902 static gboolean
2903 gst_base_src_negotiate (GstBaseSrc * basesrc)
2904 {
2905   GstBaseSrcClass *bclass;
2906   gboolean result;
2907
2908   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2909
2910   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
2911
2912   if (G_LIKELY (bclass->negotiate))
2913     result = bclass->negotiate (basesrc);
2914   else
2915     result = TRUE;
2916
2917   if (G_LIKELY (result)) {
2918     GstCaps *caps;
2919
2920     caps = gst_pad_get_current_caps (basesrc->srcpad);
2921
2922     result = gst_base_src_prepare_allocation (basesrc, caps);
2923
2924     if (caps)
2925       gst_caps_unref (caps);
2926   }
2927   return result;
2928 }
2929
2930 static gboolean
2931 gst_base_src_start (GstBaseSrc * basesrc)
2932 {
2933   GstBaseSrcClass *bclass;
2934   gboolean result;
2935
2936   GST_LIVE_LOCK (basesrc);
2937   if (GST_BASE_SRC_IS_STARTING (basesrc))
2938     goto was_starting;
2939   if (GST_BASE_SRC_IS_STARTED (basesrc))
2940     goto was_started;
2941
2942   basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
2943   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
2944   basesrc->num_buffers_left = basesrc->num_buffers;
2945   basesrc->running = FALSE;
2946   basesrc->priv->segment_pending = FALSE;
2947   GST_OBJECT_LOCK (basesrc);
2948   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2949   GST_OBJECT_UNLOCK (basesrc);
2950   GST_LIVE_UNLOCK (basesrc);
2951
2952   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2953   if (bclass->start)
2954     result = bclass->start (basesrc);
2955   else
2956     result = TRUE;
2957
2958   if (!result)
2959     goto could_not_start;
2960
2961   if (!gst_base_src_is_async (basesrc))
2962     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
2963
2964   return result;
2965
2966   /* ERROR */
2967 was_starting:
2968   {
2969     GST_DEBUG_OBJECT (basesrc, "was starting");
2970     GST_LIVE_UNLOCK (basesrc);
2971     return TRUE;
2972   }
2973 was_started:
2974   {
2975     GST_DEBUG_OBJECT (basesrc, "was started");
2976     GST_LIVE_UNLOCK (basesrc);
2977     return TRUE;
2978   }
2979 could_not_start:
2980   {
2981     GST_DEBUG_OBJECT (basesrc, "could not start");
2982     /* subclass is supposed to post a message. We don't have to call _stop. */
2983     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
2984     return FALSE;
2985   }
2986 }
2987
2988 /**
2989  * gst_base_src_start_complete:
2990  * @src: base source instance
2991  * @ret: a #GstFlowReturn
2992  *
2993  * Complete an asynchronous start operation. When the subclass overrides the
2994  * start method, it should call gst_base_src_start_complete() when the start
2995  * operation completes either from the same thread or from an asynchronous
2996  * helper thread.
2997  */
2998 void
2999 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3000 {
3001   gboolean have_size;
3002   guint64 size;
3003   gboolean seekable;
3004   GstFormat format;
3005   GstPadMode mode;
3006   GstEvent *event;
3007   GstBaseSrcClass *bclass;
3008
3009   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3010
3011   if (ret != GST_FLOW_OK)
3012     goto error;
3013
3014   GST_DEBUG_OBJECT (basesrc, "starting source");
3015   format = basesrc->segment.format;
3016
3017   /* figure out the size */
3018   have_size = FALSE;
3019   size = -1;
3020   if (format == GST_FORMAT_BYTES) {
3021     bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3022
3023     if (bclass->get_size) {
3024       if (!(have_size = bclass->get_size (basesrc, &size)))
3025         size = -1;
3026     }
3027     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3028     /* only update the size when operating in bytes, subclass is supposed
3029      * to set duration in the start method for other formats */
3030     GST_OBJECT_LOCK (basesrc);
3031     basesrc->segment.duration = size;
3032     GST_OBJECT_UNLOCK (basesrc);
3033   }
3034
3035   GST_DEBUG_OBJECT (basesrc,
3036       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3037       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3038       basesrc->segment.duration);
3039
3040   seekable = gst_base_src_seekable (basesrc);
3041   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3042
3043   /* update for random access flag */
3044   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3045
3046   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3047
3048   /* stop flushing now but for live sources, still block in the LIVE lock when
3049    * we are not yet PLAYING */
3050   gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
3051
3052   GST_OBJECT_LOCK (basesrc->srcpad);
3053   mode = GST_PAD_MODE (basesrc->srcpad);
3054   GST_OBJECT_UNLOCK (basesrc->srcpad);
3055
3056   if (mode == GST_PAD_MODE_PUSH) {
3057     /* do initial seek, which will start the task */
3058     GST_OBJECT_LOCK (basesrc);
3059     event = basesrc->pending_seek;
3060     basesrc->pending_seek = NULL;
3061     GST_OBJECT_UNLOCK (basesrc);
3062
3063     /* no need to unlock anything, the task is certainly
3064      * not running here. The perform seek code will start the task when
3065      * finished. */
3066     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3067       goto seek_failed;
3068
3069     if (event)
3070       gst_event_unref (event);
3071   } else {
3072     /* if not random_access, we cannot operate in pull mode for now */
3073     if (G_UNLIKELY (!basesrc->random_access))
3074       goto no_get_range;
3075   }
3076
3077   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3078
3079   GST_LIVE_LOCK (basesrc);
3080   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3081   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3082   basesrc->priv->start_result = ret;
3083   GST_LIVE_SIGNAL (basesrc);
3084   GST_LIVE_UNLOCK (basesrc);
3085
3086   return;
3087
3088 seek_failed:
3089   {
3090     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3091     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3092     if (event)
3093       gst_event_unref (event);
3094     ret = GST_FLOW_ERROR;
3095     goto error;
3096   }
3097 no_get_range:
3098   {
3099     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3100     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3101     ret = GST_FLOW_ERROR;
3102     goto error;
3103   }
3104 error:
3105   {
3106     GST_LIVE_LOCK (basesrc);
3107     basesrc->priv->start_result = ret;
3108     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3109     GST_LIVE_SIGNAL (basesrc);
3110     GST_LIVE_UNLOCK (basesrc);
3111     return;
3112   }
3113 }
3114
3115 /**
3116  * gst_base_src_start_complete:
3117  * @src: base source instance
3118  * @ret: a #GstFlowReturn
3119  *
3120  * Wait until the start operation completes.
3121  *
3122  * Returns: a #GstFlowReturn.
3123  */
3124 GstFlowReturn
3125 gst_base_src_start_wait (GstBaseSrc * basesrc)
3126 {
3127   GstFlowReturn result;
3128
3129   GST_LIVE_LOCK (basesrc);
3130   if (G_UNLIKELY (basesrc->priv->flushing))
3131     goto flushing;
3132
3133   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3134     GST_LIVE_WAIT (basesrc);
3135     if (G_UNLIKELY (basesrc->priv->flushing))
3136       goto flushing;
3137   }
3138   result = basesrc->priv->start_result;
3139   GST_LIVE_UNLOCK (basesrc);
3140
3141   return result;
3142
3143   /* ERRORS */
3144 flushing:
3145   {
3146     GST_DEBUG_OBJECT (basesrc, "we are flushing");
3147     GST_LIVE_UNLOCK (basesrc);
3148     return GST_FLOW_WRONG_STATE;
3149   }
3150 }
3151
3152 static gboolean
3153 gst_base_src_stop (GstBaseSrc * basesrc)
3154 {
3155   GstBaseSrcClass *bclass;
3156   gboolean result = TRUE;
3157
3158   GST_DEBUG_OBJECT (basesrc, "stopping source");
3159
3160   /* flush all */
3161   gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
3162   /* stop the task */
3163   gst_pad_stop_task (basesrc->srcpad);
3164
3165   GST_LIVE_LOCK (basesrc);
3166   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3167     goto was_stopped;
3168
3169   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3170   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3171   basesrc->priv->start_result = GST_FLOW_WRONG_STATE;
3172   GST_LIVE_SIGNAL (basesrc);
3173   GST_LIVE_UNLOCK (basesrc);
3174
3175   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3176   if (bclass->stop)
3177     result = bclass->stop (basesrc);
3178
3179   gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0);
3180
3181   return result;
3182
3183 was_stopped:
3184   {
3185     GST_DEBUG_OBJECT (basesrc, "was started");
3186     GST_LIVE_UNLOCK (basesrc);
3187     return TRUE;
3188   }
3189 }
3190
3191 /* start or stop flushing dataprocessing
3192  */
3193 static gboolean
3194 gst_base_src_set_flushing (GstBaseSrc * basesrc,
3195     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
3196 {
3197   GstBaseSrcClass *bclass;
3198
3199   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3200
3201   if (flushing && unlock) {
3202     gst_base_src_activate_pool (basesrc, FALSE);
3203     /* unlock any subclasses, we need to do this before grabbing the
3204      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
3205      * unlock to the params because of backwards compat (see seek handler)*/
3206     if (bclass->unlock)
3207       bclass->unlock (basesrc);
3208   }
3209
3210   /* the live lock is released when we are blocked, waiting for playing or
3211    * when we sync to the clock. */
3212   GST_LIVE_LOCK (basesrc);
3213   if (playing)
3214     *playing = basesrc->live_running;
3215   basesrc->priv->flushing = flushing;
3216   if (flushing) {
3217     /* if we are locked in the live lock, signal it to make it flush */
3218     basesrc->live_running = TRUE;
3219
3220     /* clear pending EOS if any */
3221     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3222
3223     /* step 1, now that we have the LIVE lock, clear our unlock request */
3224     if (bclass->unlock_stop)
3225       bclass->unlock_stop (basesrc);
3226
3227     /* step 2, unblock clock sync (if any) or any other blocking thing */
3228     if (basesrc->clock_id)
3229       gst_clock_id_unschedule (basesrc->clock_id);
3230   } else {
3231     /* signal the live source that it can start playing */
3232     basesrc->live_running = live_play;
3233
3234     gst_base_src_activate_pool (basesrc, TRUE);
3235
3236     /* When unlocking drop all delayed events */
3237     if (unlock) {
3238       GST_OBJECT_LOCK (basesrc);
3239       if (basesrc->priv->pending_events) {
3240         g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3241             NULL);
3242         g_list_free (basesrc->priv->pending_events);
3243         basesrc->priv->pending_events = NULL;
3244         g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3245       }
3246       GST_OBJECT_UNLOCK (basesrc);
3247     }
3248   }
3249   GST_LIVE_SIGNAL (basesrc);
3250   GST_LIVE_UNLOCK (basesrc);
3251
3252   return TRUE;
3253 }
3254
3255 /* the purpose of this function is to make sure that a live source blocks in the
3256  * LIVE lock or leaves the LIVE lock and continues playing. */
3257 static gboolean
3258 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3259 {
3260   GstBaseSrcClass *bclass;
3261
3262   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3263
3264   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
3265   if (!live_play) {
3266     GST_DEBUG_OBJECT (basesrc, "unlock");
3267     if (bclass->unlock)
3268       bclass->unlock (basesrc);
3269   }
3270
3271   /* we are now able to grab the LIVE lock, when we get it, we can be
3272    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3273    * for the clock. */
3274   GST_LIVE_LOCK (basesrc);
3275   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3276
3277   /* unblock clock sync (if any) */
3278   if (basesrc->clock_id)
3279     gst_clock_id_unschedule (basesrc->clock_id);
3280
3281   /* configure what to do when we get to the LIVE lock. */
3282   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3283   basesrc->live_running = live_play;
3284
3285   if (live_play) {
3286     gboolean start;
3287
3288     /* clear our unlock request when going to PLAYING */
3289     GST_DEBUG_OBJECT (basesrc, "unlock stop");
3290     if (bclass->unlock_stop)
3291       bclass->unlock_stop (basesrc);
3292
3293     /* for live sources we restart the timestamp correction */
3294     basesrc->priv->latency = -1;
3295     /* have to restart the task in case it stopped because of the unlock when
3296      * we went to PAUSED. Only do this if we operating in push mode. */
3297     GST_OBJECT_LOCK (basesrc->srcpad);
3298     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3299     GST_OBJECT_UNLOCK (basesrc->srcpad);
3300     if (start)
3301       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3302           basesrc->srcpad);
3303     GST_DEBUG_OBJECT (basesrc, "signal");
3304     GST_LIVE_SIGNAL (basesrc);
3305   }
3306   GST_LIVE_UNLOCK (basesrc);
3307
3308   return TRUE;
3309 }
3310
3311 static gboolean
3312 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3313 {
3314   GstBaseSrc *basesrc;
3315
3316   basesrc = GST_BASE_SRC (parent);
3317
3318   /* prepare subclass first */
3319   if (active) {
3320     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
3321
3322     if (G_UNLIKELY (!basesrc->can_activate_push))
3323       goto no_push_activation;
3324
3325     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3326       goto error_start;
3327   } else {
3328     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
3329     /* now we can stop the source */
3330     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3331       goto error_stop;
3332   }
3333   return TRUE;
3334
3335   /* ERRORS */
3336 no_push_activation:
3337   {
3338     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
3339     return FALSE;
3340   }
3341 error_start:
3342   {
3343     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
3344     return FALSE;
3345   }
3346 error_stop:
3347   {
3348     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
3349     return FALSE;
3350   }
3351 }
3352
3353 static gboolean
3354 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
3355 {
3356   GstBaseSrc *basesrc;
3357
3358   basesrc = GST_BASE_SRC (parent);
3359
3360   /* prepare subclass first */
3361   if (active) {
3362     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
3363     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
3364       goto error_start;
3365   } else {
3366     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
3367     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3368       goto error_stop;
3369   }
3370   return TRUE;
3371
3372   /* ERRORS */
3373 error_start:
3374   {
3375     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3376     return FALSE;
3377   }
3378 error_stop:
3379   {
3380     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3381     return FALSE;
3382   }
3383 }
3384
3385 static gboolean
3386 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
3387     GstPadMode mode, gboolean active)
3388 {
3389   gboolean res;
3390
3391   switch (mode) {
3392     case GST_PAD_MODE_PULL:
3393       res = gst_base_src_activate_pull (pad, parent, active);
3394       break;
3395     case GST_PAD_MODE_PUSH:
3396       res = gst_base_src_activate_push (pad, parent, active);
3397       break;
3398     default:
3399       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
3400       res = FALSE;
3401       break;
3402   }
3403   return res;
3404 }
3405
3406
3407 static GstStateChangeReturn
3408 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3409 {
3410   GstBaseSrc *basesrc;
3411   GstStateChangeReturn result;
3412   gboolean no_preroll = FALSE;
3413
3414   basesrc = GST_BASE_SRC (element);
3415
3416   switch (transition) {
3417     case GST_STATE_CHANGE_NULL_TO_READY:
3418       break;
3419     case GST_STATE_CHANGE_READY_TO_PAUSED:
3420       basesrc->priv->stream_start_pending = TRUE;
3421       no_preroll = gst_base_src_is_live (basesrc);
3422       break;
3423     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3424       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3425       if (gst_base_src_is_live (basesrc)) {
3426         /* now we can start playback */
3427         gst_base_src_set_playing (basesrc, TRUE);
3428       }
3429       break;
3430     default:
3431       break;
3432   }
3433
3434   if ((result =
3435           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3436               transition)) == GST_STATE_CHANGE_FAILURE)
3437     goto failure;
3438
3439   switch (transition) {
3440     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3441       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3442       if (gst_base_src_is_live (basesrc)) {
3443         /* make sure we block in the live lock in PAUSED */
3444         gst_base_src_set_playing (basesrc, FALSE);
3445         no_preroll = TRUE;
3446       }
3447       break;
3448     case GST_STATE_CHANGE_PAUSED_TO_READY:
3449     {
3450       /* we don't need to unblock anything here, the pad deactivation code
3451        * already did this */
3452       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3453       gst_event_replace (&basesrc->pending_seek, NULL);
3454       basesrc->priv->stream_start_pending = FALSE;
3455       break;
3456     }
3457     case GST_STATE_CHANGE_READY_TO_NULL:
3458       break;
3459     default:
3460       break;
3461   }
3462
3463   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3464     result = GST_STATE_CHANGE_NO_PREROLL;
3465
3466   return result;
3467
3468   /* ERRORS */
3469 failure:
3470   {
3471     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3472     return result;
3473   }
3474 }