Merge branch 'master' into 0.11
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrcClass.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include <gst/gst_private.h>
169
170 #include "gstbasesrc.h"
171 #include "gsttypefindhelper.h"
172 #include <gst/gstmarshal.h>
173 #include <gst/gst-i18n-lib.h>
174
175 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
176 #define GST_CAT_DEFAULT gst_base_src_debug
177
178 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
179 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
181 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
182 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
183 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
184 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
185                                                                                 timeval)
186 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
187 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
188
189 /* BaseSrc signals and args */
190 enum
191 {
192   /* FILL ME */
193   LAST_SIGNAL
194 };
195
196 #define DEFAULT_BLOCKSIZE       4096
197 #define DEFAULT_NUM_BUFFERS     -1
198 #define DEFAULT_TYPEFIND        FALSE
199 #define DEFAULT_DO_TIMESTAMP    FALSE
200
201 enum
202 {
203   PROP_0,
204   PROP_BLOCKSIZE,
205   PROP_NUM_BUFFERS,
206   PROP_TYPEFIND,
207   PROP_DO_TIMESTAMP
208 };
209
210 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
211    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
212
213 struct _GstBaseSrcPrivate
214 {
215   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
216                                  * to avoid the sending of two EOS in some cases) */
217   gboolean discont;
218   gboolean flushing;
219
220   /* if segment should be sent */
221   gboolean segment_pending;
222
223   /* if EOS is pending (atomic) */
224   gint pending_eos;
225
226   /* startup latency is the time it takes between going to PLAYING and producing
227    * the first BUFFER with running_time 0. This value is included in the latency
228    * reporting. */
229   GstClockTime latency;
230   /* timestamp offset, this is the offset add to the values of gst_times for
231    * pseudo live sources */
232   GstClockTimeDiff ts_offset;
233
234   gboolean do_timestamp;
235
236   /* stream sequence number */
237   guint32 seqnum;
238
239   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
240    * pushed in the data stream */
241   GList *pending_events;
242   volatile gint have_events;
243
244   /* QoS *//* with LOCK */
245   gboolean qos_enabled;
246   gdouble proportion;
247   GstClockTime earliest_time;
248 };
249
250 static GstElementClass *parent_class = NULL;
251
252 static void gst_base_src_class_init (GstBaseSrcClass * klass);
253 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
254 static void gst_base_src_finalize (GObject * object);
255
256
257 GType
258 gst_base_src_get_type (void)
259 {
260   static volatile gsize base_src_type = 0;
261
262   if (g_once_init_enter (&base_src_type)) {
263     GType _type;
264     static const GTypeInfo base_src_info = {
265       sizeof (GstBaseSrcClass),
266       NULL,
267       NULL,
268       (GClassInitFunc) gst_base_src_class_init,
269       NULL,
270       NULL,
271       sizeof (GstBaseSrc),
272       0,
273       (GInstanceInitFunc) gst_base_src_init,
274     };
275
276     _type = g_type_register_static (GST_TYPE_ELEMENT,
277         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
278     g_once_init_leave (&base_src_type, _type);
279   }
280   return base_src_type;
281 }
282
283 static GstCaps *gst_base_src_getcaps (GstPad * pad, GstCaps * filter);
284 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
285 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
286
287 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
288 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
289 static void gst_base_src_set_property (GObject * object, guint prop_id,
290     const GValue * value, GParamSpec * pspec);
291 static void gst_base_src_get_property (GObject * object, guint prop_id,
292     GValue * value, GParamSpec * pspec);
293 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
294 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
295 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
296 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
297
298 static gboolean gst_base_src_query (GstPad * pad, GstQuery ** query);
299
300 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
301 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
302     GstSegment * segment);
303 static gboolean gst_base_src_default_query (GstBaseSrc * src,
304     GstQuery ** query);
305 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
306     GstEvent * event, GstSegment * segment);
307
308 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
309     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
310 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
311 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
312
313 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
314     GstStateChange transition);
315
316 static void gst_base_src_loop (GstPad * pad);
317 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
318 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
319 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
320     guint length, GstBuffer ** buf);
321 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
322     guint length, GstBuffer ** buf);
323 static gboolean gst_base_src_seekable (GstBaseSrc * src);
324 static gboolean gst_base_src_negotiate (GstBaseSrc * basesrc);
325
326 static void
327 gst_base_src_class_init (GstBaseSrcClass * klass)
328 {
329   GObjectClass *gobject_class;
330   GstElementClass *gstelement_class;
331
332   gobject_class = G_OBJECT_CLASS (klass);
333   gstelement_class = GST_ELEMENT_CLASS (klass);
334
335   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
336
337   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
338
339   parent_class = g_type_class_peek_parent (klass);
340
341   gobject_class->finalize = gst_base_src_finalize;
342   gobject_class->set_property = gst_base_src_set_property;
343   gobject_class->get_property = gst_base_src_get_property;
344
345 /* FIXME 0.11: blocksize property should be int, not ulong (min is >max here) */
346   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
347       g_param_spec_ulong ("blocksize", "Block size",
348           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
349           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
350   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
351       g_param_spec_int ("num-buffers", "num-buffers",
352           "Number of buffers to output before sending EOS (-1 = unlimited)",
353           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
354           G_PARAM_STATIC_STRINGS));
355   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
356       g_param_spec_boolean ("typefind", "Typefind",
357           "Run typefind before negotiating", DEFAULT_TYPEFIND,
358           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
359   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
360       g_param_spec_boolean ("do-timestamp", "Do timestamp",
361           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
362           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
363
364   gstelement_class->change_state =
365       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
366   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
367   gstelement_class->get_query_types =
368       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
369
370   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
371   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
372   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
373   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
374   klass->check_get_range =
375       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
376   klass->prepare_seek_segment =
377       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
378
379   /* Registering debug symbols for function pointers */
380   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
381   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
383   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
384   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
385   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
386   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
387   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
388   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
389 }
390
391 static void
392 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
393 {
394   GstPad *pad;
395   GstPadTemplate *pad_template;
396
397   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
398
399   basesrc->is_live = FALSE;
400   basesrc->live_lock = g_mutex_new ();
401   basesrc->live_cond = g_cond_new ();
402   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
403   basesrc->num_buffers_left = -1;
404
405   basesrc->can_activate_push = TRUE;
406   basesrc->pad_mode = GST_ACTIVATE_NONE;
407
408   pad_template =
409       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
410   g_return_if_fail (pad_template != NULL);
411
412   GST_DEBUG_OBJECT (basesrc, "creating src pad");
413   pad = gst_pad_new_from_template (pad_template, "src");
414
415   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
416   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
417   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
418   gst_pad_set_event_function (pad, gst_base_src_event_handler);
419   gst_pad_set_query_function (pad, gst_base_src_query);
420   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
421   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
422   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
423   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
424   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
425
426   /* hold pointer to pad */
427   basesrc->srcpad = pad;
428   GST_DEBUG_OBJECT (basesrc, "adding src pad");
429   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
430
431   basesrc->blocksize = DEFAULT_BLOCKSIZE;
432   basesrc->clock_id = NULL;
433   /* we operate in BYTES by default */
434   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
435   basesrc->typefind = DEFAULT_TYPEFIND;
436   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
437   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
438
439   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
440   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_IS_SOURCE);
441
442   GST_DEBUG_OBJECT (basesrc, "init done");
443 }
444
445 static void
446 gst_base_src_finalize (GObject * object)
447 {
448   GstBaseSrc *basesrc;
449   GstEvent **event_p;
450
451   basesrc = GST_BASE_SRC (object);
452
453   g_mutex_free (basesrc->live_lock);
454   g_cond_free (basesrc->live_cond);
455
456   event_p = &basesrc->pending_seek;
457   gst_event_replace (event_p, NULL);
458
459   if (basesrc->priv->pending_events) {
460     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
461         NULL);
462     g_list_free (basesrc->priv->pending_events);
463   }
464
465   G_OBJECT_CLASS (parent_class)->finalize (object);
466 }
467
468 /**
469  * gst_base_src_wait_playing:
470  * @src: the src
471  *
472  * If the #GstBaseSrcClass.create() method performs its own synchronisation
473  * against the clock it must unblock when going from PLAYING to the PAUSED state
474  * and call this method before continuing to produce the remaining data.
475  *
476  * This function will block until a state change to PLAYING happens (in which
477  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
478  * to a state change to READY or a FLUSH event (in which case this function
479  * returns #GST_FLOW_WRONG_STATE).
480  *
481  * Since: 0.10.12
482  *
483  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
484  * continue. Any other return value should be returned from the create vmethod.
485  */
486 GstFlowReturn
487 gst_base_src_wait_playing (GstBaseSrc * src)
488 {
489   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
490
491   do {
492     /* block until the state changes, or we get a flush, or something */
493     GST_DEBUG_OBJECT (src, "live source waiting for running state");
494     GST_LIVE_WAIT (src);
495     GST_DEBUG_OBJECT (src, "live source unlocked");
496     if (src->priv->flushing)
497       goto flushing;
498   } while (G_UNLIKELY (!src->live_running));
499
500   return GST_FLOW_OK;
501
502   /* ERRORS */
503 flushing:
504   {
505     GST_DEBUG_OBJECT (src, "we are flushing");
506     return GST_FLOW_WRONG_STATE;
507   }
508 }
509
510 /**
511  * gst_base_src_set_live:
512  * @src: base source instance
513  * @live: new live-mode
514  *
515  * If the element listens to a live source, @live should
516  * be set to %TRUE.
517  *
518  * A live source will not produce data in the PAUSED state and
519  * will therefore not be able to participate in the PREROLL phase
520  * of a pipeline. To signal this fact to the application and the
521  * pipeline, the state change return value of the live source will
522  * be GST_STATE_CHANGE_NO_PREROLL.
523  */
524 void
525 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
526 {
527   g_return_if_fail (GST_IS_BASE_SRC (src));
528
529   GST_OBJECT_LOCK (src);
530   src->is_live = live;
531   GST_OBJECT_UNLOCK (src);
532 }
533
534 /**
535  * gst_base_src_is_live:
536  * @src: base source instance
537  *
538  * Check if an element is in live mode.
539  *
540  * Returns: %TRUE if element is in live mode.
541  */
542 gboolean
543 gst_base_src_is_live (GstBaseSrc * src)
544 {
545   gboolean result;
546
547   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
548
549   GST_OBJECT_LOCK (src);
550   result = src->is_live;
551   GST_OBJECT_UNLOCK (src);
552
553   return result;
554 }
555
556 /**
557  * gst_base_src_set_format:
558  * @src: base source instance
559  * @format: the format to use
560  *
561  * Sets the default format of the source. This will be the format used
562  * for sending NEW_SEGMENT events and for performing seeks.
563  *
564  * If a format of GST_FORMAT_BYTES is set, the element will be able to
565  * operate in pull mode if the #GstBaseSrcClass.is_seekable() returns TRUE.
566  *
567  * This function must only be called in states < %GST_STATE_PAUSED.
568  *
569  * Since: 0.10.1
570  */
571 void
572 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
573 {
574   g_return_if_fail (GST_IS_BASE_SRC (src));
575   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
576
577   GST_OBJECT_LOCK (src);
578   gst_segment_init (&src->segment, format);
579   GST_OBJECT_UNLOCK (src);
580 }
581
582 /**
583  * gst_base_src_query_latency:
584  * @src: the source
585  * @live: (out) (allow-none): if the source is live
586  * @min_latency: (out) (allow-none): the min latency of the source
587  * @max_latency: (out) (allow-none): the max latency of the source
588  *
589  * Query the source for the latency parameters. @live will be TRUE when @src is
590  * configured as a live source. @min_latency will be set to the difference
591  * between the running time and the timestamp of the first buffer.
592  * @max_latency is always the undefined value of -1.
593  *
594  * This function is mostly used by subclasses.
595  *
596  * Returns: TRUE if the query succeeded.
597  *
598  * Since: 0.10.13
599  */
600 gboolean
601 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
602     GstClockTime * min_latency, GstClockTime * max_latency)
603 {
604   GstClockTime min;
605
606   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
607
608   GST_OBJECT_LOCK (src);
609   if (live)
610     *live = src->is_live;
611
612   /* if we have a startup latency, report this one, else report 0. Subclasses
613    * are supposed to override the query function if they want something
614    * else. */
615   if (src->priv->latency != -1)
616     min = src->priv->latency;
617   else
618     min = 0;
619
620   if (min_latency)
621     *min_latency = min;
622   if (max_latency)
623     *max_latency = -1;
624
625   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
626       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
627       GST_TIME_ARGS (-1));
628   GST_OBJECT_UNLOCK (src);
629
630   return TRUE;
631 }
632
633 /**
634  * gst_base_src_set_blocksize:
635  * @src: the source
636  * @blocksize: the new blocksize in bytes
637  *
638  * Set the number of bytes that @src will push out with each buffer. When
639  * @blocksize is set to -1, a default length will be used.
640  *
641  * Since: 0.10.22
642  */
643 /* FIXME 0.11: blocksize property should be int, not ulong */
644 void
645 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
646 {
647   g_return_if_fail (GST_IS_BASE_SRC (src));
648
649   GST_OBJECT_LOCK (src);
650   src->blocksize = blocksize;
651   GST_OBJECT_UNLOCK (src);
652 }
653
654 /**
655  * gst_base_src_get_blocksize:
656  * @src: the source
657  *
658  * Get the number of bytes that @src will push out with each buffer.
659  *
660  * Returns: the number of bytes pushed with each buffer.
661  *
662  * Since: 0.10.22
663  */
664 /* FIXME 0.11: blocksize property should be int, not ulong */
665 gulong
666 gst_base_src_get_blocksize (GstBaseSrc * src)
667 {
668   gulong res;
669
670   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
671
672   GST_OBJECT_LOCK (src);
673   res = src->blocksize;
674   GST_OBJECT_UNLOCK (src);
675
676   return res;
677 }
678
679
680 /**
681  * gst_base_src_set_do_timestamp:
682  * @src: the source
683  * @timestamp: enable or disable timestamping
684  *
685  * Configure @src to automatically timestamp outgoing buffers based on the
686  * current running_time of the pipeline. This property is mostly useful for live
687  * sources.
688  *
689  * Since: 0.10.15
690  */
691 void
692 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
693 {
694   g_return_if_fail (GST_IS_BASE_SRC (src));
695
696   GST_OBJECT_LOCK (src);
697   src->priv->do_timestamp = timestamp;
698   GST_OBJECT_UNLOCK (src);
699 }
700
701 /**
702  * gst_base_src_get_do_timestamp:
703  * @src: the source
704  *
705  * Query if @src timestamps outgoing buffers based on the current running_time.
706  *
707  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
708  *
709  * Since: 0.10.15
710  */
711 gboolean
712 gst_base_src_get_do_timestamp (GstBaseSrc * src)
713 {
714   gboolean res;
715
716   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
717
718   GST_OBJECT_LOCK (src);
719   res = src->priv->do_timestamp;
720   GST_OBJECT_UNLOCK (src);
721
722   return res;
723 }
724
725 /**
726  * gst_base_src_new_seamless_segment:
727  * @src: The source
728  * @start: The new start value for the segment
729  * @stop: Stop value for the new segment
730  * @position: The position value for the new segent
731  *
732  * Prepare a new seamless segment for emission downstream. This function must
733  * only be called by derived sub-classes, and only from the create() function,
734  * as the stream-lock needs to be held.
735  *
736  * The format for the new segment will be the current format of the source, as
737  * configured with gst_base_src_set_format()
738  *
739  * Returns: %TRUE if preparation of the seamless segment succeeded.
740  *
741  * Since: 0.10.26
742  */
743 gboolean
744 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
745     gint64 position)
746 {
747   gboolean res = TRUE;
748
749   GST_DEBUG_OBJECT (src,
750       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
751       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
752       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
753
754   GST_OBJECT_LOCK (src);
755
756   src->segment.base = gst_segment_to_running_time (&src->segment,
757       src->segment.format, src->segment.position);
758   src->segment.start = start;
759   src->segment.stop = stop;
760   src->segment.position = position;
761
762   /* forward, we send data from position to stop */
763   src->priv->segment_pending = TRUE;
764   GST_OBJECT_UNLOCK (src);
765
766   src->priv->discont = TRUE;
767   src->running = TRUE;
768
769   return res;
770 }
771
772 static gboolean
773 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
774 {
775   GstBaseSrcClass *bclass;
776   GstBaseSrc *bsrc;
777   gboolean res = TRUE;
778
779   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
780   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
781
782   if (bclass->set_caps)
783     res = bclass->set_caps (bsrc, caps);
784
785   return res;
786 }
787
788 static GstCaps *
789 gst_base_src_getcaps (GstPad * pad, GstCaps * filter)
790 {
791   GstBaseSrcClass *bclass;
792   GstBaseSrc *bsrc;
793   GstCaps *caps = NULL;
794
795   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
796   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
797   if (bclass->get_caps)
798     caps = bclass->get_caps (bsrc, filter);
799
800   if (caps == NULL) {
801     GstPadTemplate *pad_template;
802
803     pad_template =
804         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
805     if (pad_template != NULL) {
806       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
807
808       if (filter) {
809         GstCaps *intersection;
810
811         intersection =
812             gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
813         gst_caps_unref (caps);
814         caps = intersection;
815       }
816     }
817   }
818   return caps;
819 }
820
821 static void
822 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
823 {
824   GstBaseSrcClass *bclass;
825   GstBaseSrc *bsrc;
826
827   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
828   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
829
830   if (bclass->fixate)
831     bclass->fixate (bsrc, caps);
832
833   gst_object_unref (bsrc);
834 }
835
836 static gboolean
837 gst_base_src_default_query (GstBaseSrc * src, GstQuery ** query)
838 {
839   gboolean res;
840
841   switch (GST_QUERY_TYPE (*query)) {
842     case GST_QUERY_POSITION:
843     {
844       GstFormat format;
845
846       gst_query_parse_position (*query, &format, NULL);
847
848       GST_DEBUG_OBJECT (src, "position query in format %s",
849           gst_format_get_name (format));
850
851       switch (format) {
852         case GST_FORMAT_PERCENT:
853         {
854           gint64 percent;
855           gint64 position;
856           gint64 duration;
857
858           GST_OBJECT_LOCK (src);
859           position = src->segment.position;
860           duration = src->segment.duration;
861           GST_OBJECT_UNLOCK (src);
862
863           if (position != -1 && duration != -1) {
864             if (position < duration)
865               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
866                   duration);
867             else
868               percent = GST_FORMAT_PERCENT_MAX;
869           } else
870             percent = -1;
871
872           gst_query_set_position (*query, GST_FORMAT_PERCENT, percent);
873           res = TRUE;
874           break;
875         }
876         default:
877         {
878           gint64 position;
879           GstFormat seg_format;
880
881           GST_OBJECT_LOCK (src);
882           position =
883               gst_segment_to_stream_time (&src->segment, src->segment.format,
884               src->segment.position);
885           seg_format = src->segment.format;
886           GST_OBJECT_UNLOCK (src);
887
888           if (position != -1) {
889             /* convert to requested format */
890             res =
891                 gst_pad_query_convert (src->srcpad, seg_format,
892                 position, &format, &position);
893           } else
894             res = TRUE;
895
896           gst_query_set_position (*query, format, position);
897           break;
898         }
899       }
900       break;
901     }
902     case GST_QUERY_DURATION:
903     {
904       GstFormat format;
905
906       gst_query_parse_duration (*query, &format, NULL);
907
908       GST_DEBUG_OBJECT (src, "duration query in format %s",
909           gst_format_get_name (format));
910
911       switch (format) {
912         case GST_FORMAT_PERCENT:
913           gst_query_set_duration (*query, GST_FORMAT_PERCENT,
914               GST_FORMAT_PERCENT_MAX);
915           res = TRUE;
916           break;
917         default:
918         {
919           gint64 duration;
920           GstFormat seg_format;
921
922           GST_OBJECT_LOCK (src);
923           /* this is the duration as configured by the subclass. */
924           duration = src->segment.duration;
925           seg_format = src->segment.format;
926           GST_OBJECT_UNLOCK (src);
927
928           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
929               duration, gst_format_get_name (seg_format));
930
931           if (duration != -1) {
932             /* convert to requested format, if this fails, we have a duration
933              * but we cannot answer the query, we must return FALSE. */
934             res =
935                 gst_pad_query_convert (src->srcpad, seg_format,
936                 duration, &format, &duration);
937           } else {
938             /* The subclass did not configure a duration, we assume that the
939              * media has an unknown duration then and we return TRUE to report
940              * this. Note that this is not the same as returning FALSE, which
941              * means that we cannot report the duration at all. */
942             res = TRUE;
943           }
944           gst_query_set_duration (*query, format, duration);
945           break;
946         }
947       }
948       break;
949     }
950
951     case GST_QUERY_SEEKING:
952     {
953       GstFormat format, seg_format;
954       gint64 duration;
955
956       GST_OBJECT_LOCK (src);
957       duration = src->segment.duration;
958       seg_format = src->segment.format;
959       GST_OBJECT_UNLOCK (src);
960
961       gst_query_parse_seeking (*query, &format, NULL, NULL, NULL);
962       if (format == seg_format) {
963         gst_query_set_seeking (*query, seg_format,
964             gst_base_src_seekable (src), 0, duration);
965         res = TRUE;
966       } else {
967         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
968         /* Don't reply to the query to make up for demuxers which don't
969          * handle the SEEKING query yet. Players like Totem will fall back
970          * to the duration when the SEEKING query isn't answered. */
971         res = FALSE;
972       }
973       break;
974     }
975     case GST_QUERY_SEGMENT:
976     {
977       gint64 start, stop;
978
979       GST_OBJECT_LOCK (src);
980       /* no end segment configured, current duration then */
981       if ((stop = src->segment.stop) == -1)
982         stop = src->segment.duration;
983       start = src->segment.start;
984
985       /* adjust to stream time */
986       if (src->segment.time != -1) {
987         start -= src->segment.time;
988         if (stop != -1)
989           stop -= src->segment.time;
990       }
991
992       gst_query_set_segment (*query, src->segment.rate, src->segment.format,
993           start, stop);
994       GST_OBJECT_UNLOCK (src);
995       res = TRUE;
996       break;
997     }
998
999     case GST_QUERY_FORMATS:
1000     {
1001       gst_query_set_formats (*query, 3, GST_FORMAT_DEFAULT,
1002           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1003       res = TRUE;
1004       break;
1005     }
1006     case GST_QUERY_CONVERT:
1007     {
1008       GstFormat src_fmt, dest_fmt;
1009       gint64 src_val, dest_val;
1010
1011       gst_query_parse_convert (*query, &src_fmt, &src_val, &dest_fmt,
1012           &dest_val);
1013
1014       /* we can only convert between equal formats... */
1015       if (src_fmt == dest_fmt) {
1016         dest_val = src_val;
1017         res = TRUE;
1018       } else
1019         res = FALSE;
1020
1021       gst_query_set_convert (*query, src_fmt, src_val, dest_fmt, dest_val);
1022       break;
1023     }
1024     case GST_QUERY_LATENCY:
1025     {
1026       GstClockTime min, max;
1027       gboolean live;
1028
1029       /* Subclasses should override and implement something usefull */
1030       res = gst_base_src_query_latency (src, &live, &min, &max);
1031
1032       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1033           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1034           GST_TIME_ARGS (max));
1035
1036       gst_query_set_latency (*query, live, min, max);
1037       break;
1038     }
1039     case GST_QUERY_JITTER:
1040     case GST_QUERY_RATE:
1041       res = FALSE;
1042       break;
1043     case GST_QUERY_BUFFERING:
1044     {
1045       GstFormat format, seg_format;
1046       gint64 start, stop, estimated;
1047
1048       gst_query_parse_buffering_range (*query, &format, NULL, NULL, NULL);
1049
1050       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1051           gst_format_get_name (format));
1052
1053       GST_OBJECT_LOCK (src);
1054       if (src->random_access) {
1055         estimated = 0;
1056         start = 0;
1057         if (format == GST_FORMAT_PERCENT)
1058           stop = GST_FORMAT_PERCENT_MAX;
1059         else
1060           stop = src->segment.duration;
1061       } else {
1062         estimated = -1;
1063         start = -1;
1064         stop = -1;
1065       }
1066       seg_format = src->segment.format;
1067       GST_OBJECT_UNLOCK (src);
1068
1069       /* convert to required format. When the conversion fails, we can't answer
1070        * the query. When the value is unknown, we can don't perform conversion
1071        * but report TRUE. */
1072       if (format != GST_FORMAT_PERCENT && stop != -1) {
1073         res = gst_pad_query_convert (src->srcpad, seg_format,
1074             stop, &format, &stop);
1075       } else {
1076         res = TRUE;
1077       }
1078       if (res && format != GST_FORMAT_PERCENT && start != -1)
1079         res = gst_pad_query_convert (src->srcpad, seg_format,
1080             start, &format, &start);
1081
1082       gst_query_set_buffering_range (*query, format, start, stop, estimated);
1083       break;
1084     }
1085     default:
1086       res = FALSE;
1087       break;
1088   }
1089   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (*query),
1090       res);
1091   return res;
1092 }
1093
1094 static gboolean
1095 gst_base_src_query (GstPad * pad, GstQuery ** query)
1096 {
1097   GstBaseSrc *src;
1098   GstBaseSrcClass *bclass;
1099   gboolean result = FALSE;
1100
1101   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1102   if (G_UNLIKELY (src == NULL))
1103     return FALSE;
1104
1105   bclass = GST_BASE_SRC_GET_CLASS (src);
1106
1107   if (bclass->query)
1108     result = bclass->query (src, query);
1109   else
1110     result = gst_pad_query_default (pad, query);
1111
1112   gst_object_unref (src);
1113
1114   return result;
1115 }
1116
1117 static gboolean
1118 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1119 {
1120   gboolean res = TRUE;
1121
1122   /* update our offset if the start/stop position was updated */
1123   if (segment->format == GST_FORMAT_BYTES) {
1124     segment->time = segment->start;
1125   } else if (segment->start == 0) {
1126     /* seek to start, we can implement a default for this. */
1127     segment->time = 0;
1128   } else {
1129     res = FALSE;
1130     GST_INFO_OBJECT (src, "Can't do a default seek");
1131   }
1132
1133   return res;
1134 }
1135
1136 static gboolean
1137 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1138 {
1139   GstBaseSrcClass *bclass;
1140   gboolean result = FALSE;
1141
1142   bclass = GST_BASE_SRC_GET_CLASS (src);
1143
1144   if (bclass->do_seek)
1145     result = bclass->do_seek (src, segment);
1146
1147   return result;
1148 }
1149
1150 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1151
1152 static gboolean
1153 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1154     GstSegment * segment)
1155 {
1156   /* By default, we try one of 2 things:
1157    *   - For absolute seek positions, convert the requested position to our
1158    *     configured processing format and place it in the output segment \
1159    *   - For relative seek positions, convert our current (input) values to the
1160    *     seek format, adjust by the relative seek offset and then convert back to
1161    *     the processing format
1162    */
1163   GstSeekType cur_type, stop_type;
1164   gint64 cur, stop;
1165   GstSeekFlags flags;
1166   GstFormat seek_format, dest_format;
1167   gdouble rate;
1168   gboolean update;
1169   gboolean res = TRUE;
1170
1171   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1172       &cur_type, &cur, &stop_type, &stop);
1173   dest_format = segment->format;
1174
1175   if (seek_format == dest_format) {
1176     gst_segment_do_seek (segment, rate, seek_format, flags,
1177         cur_type, cur, stop_type, stop, &update);
1178     return TRUE;
1179   }
1180
1181   if (cur_type != GST_SEEK_TYPE_NONE) {
1182     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1183     res =
1184         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1185         &cur);
1186     cur_type = GST_SEEK_TYPE_SET;
1187   }
1188
1189   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1190     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1191     res =
1192         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1193         &stop);
1194     stop_type = GST_SEEK_TYPE_SET;
1195   }
1196
1197   /* And finally, configure our output segment in the desired format */
1198   gst_segment_do_seek (segment, rate, dest_format, flags, cur_type, cur,
1199       stop_type, stop, &update);
1200
1201   if (!res)
1202     goto no_format;
1203
1204   return res;
1205
1206 no_format:
1207   {
1208     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1209     return FALSE;
1210   }
1211 }
1212
1213 static gboolean
1214 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1215     GstSegment * seeksegment)
1216 {
1217   GstBaseSrcClass *bclass;
1218   gboolean result = FALSE;
1219
1220   bclass = GST_BASE_SRC_GET_CLASS (src);
1221
1222   if (bclass->prepare_seek_segment)
1223     result = bclass->prepare_seek_segment (src, event, seeksegment);
1224
1225   return result;
1226 }
1227
1228 /* this code implements the seeking. It is a good example
1229  * handling all cases.
1230  *
1231  * A seek updates the currently configured segment.start
1232  * and segment.stop values based on the SEEK_TYPE. If the
1233  * segment.start value is updated, a seek to this new position
1234  * should be performed.
1235  *
1236  * The seek can only be executed when we are not currently
1237  * streaming any data, to make sure that this is the case, we
1238  * acquire the STREAM_LOCK which is taken when we are in the
1239  * _loop() function or when a getrange() is called. Normally
1240  * we will not receive a seek if we are operating in pull mode
1241  * though. When we operate as a live source we might block on the live
1242  * cond, which does not release the STREAM_LOCK. Therefore we will try
1243  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1244  * safe to perform the seek.
1245  *
1246  * When we are in the loop() function, we might be in the middle
1247  * of pushing a buffer, which might block in a sink. To make sure
1248  * that the push gets unblocked we push out a FLUSH_START event.
1249  * Our loop function will get a WRONG_STATE return value from
1250  * the push and will pause, effectively releasing the STREAM_LOCK.
1251  *
1252  * For a non-flushing seek, we pause the task, which might eventually
1253  * release the STREAM_LOCK. We say eventually because when the sink
1254  * blocks on the sample we might wait a very long time until the sink
1255  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1256  * can continue the seek. A non-flushing seek is normally done in a
1257  * running pipeline to perform seamless playback, this means that the sink is
1258  * PLAYING and will return from its chain function.
1259  * In the case of a non-flushing seek we need to make sure that the
1260  * data we output after the seek is continuous with the previous data,
1261  * this is because a non-flushing seek does not reset the running-time
1262  * to 0. We do this by closing the currently running segment, ie. sending
1263  * a new_segment event with the stop position set to the last processed
1264  * position.
1265  *
1266  * After updating the segment.start/stop values, we prepare for
1267  * streaming again. We push out a FLUSH_STOP to make the peer pad
1268  * accept data again and we start our task again.
1269  *
1270  * A segment seek posts a message on the bus saying that the playback
1271  * of the segment started. We store the segment flag internally because
1272  * when we reach the segment.stop we have to post a segment.done
1273  * instead of EOS when doing a segment seek.
1274  */
1275 /* FIXME (0.11), we have the unlock gboolean here because most current
1276  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1277  * the streaming thread isn't running, resulting in bogus unlocks later when it
1278  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1279  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1280  * until 0.11
1281  */
1282 static gboolean
1283 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1284 {
1285   gboolean res = TRUE, tres;
1286   gdouble rate;
1287   GstFormat seek_format, dest_format;
1288   GstSeekFlags flags;
1289   GstSeekType cur_type, stop_type;
1290   gint64 cur, stop;
1291   gboolean flush, playing;
1292   gboolean update;
1293   gboolean relative_seek = FALSE;
1294   gboolean seekseg_configured = FALSE;
1295   GstSegment seeksegment;
1296   guint32 seqnum;
1297   GstEvent *tevent;
1298
1299   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1300
1301   GST_OBJECT_LOCK (src);
1302   dest_format = src->segment.format;
1303   GST_OBJECT_UNLOCK (src);
1304
1305   if (event) {
1306     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1307         &cur_type, &cur, &stop_type, &stop);
1308
1309     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1310         SEEK_TYPE_IS_RELATIVE (stop_type);
1311
1312     if (dest_format != seek_format && !relative_seek) {
1313       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1314        * here before taking the stream lock, otherwise we must convert it later,
1315        * once we have the stream lock and can read the last configures segment
1316        * start and stop positions */
1317       gst_segment_init (&seeksegment, dest_format);
1318
1319       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1320         goto prepare_failed;
1321
1322       seekseg_configured = TRUE;
1323     }
1324
1325     flush = flags & GST_SEEK_FLAG_FLUSH;
1326     seqnum = gst_event_get_seqnum (event);
1327   } else {
1328     flush = FALSE;
1329     /* get next seqnum */
1330     seqnum = gst_util_seqnum_next ();
1331   }
1332
1333   /* send flush start */
1334   if (flush) {
1335     tevent = gst_event_new_flush_start ();
1336     gst_event_set_seqnum (tevent, seqnum);
1337     gst_pad_push_event (src->srcpad, tevent);
1338   } else
1339     gst_pad_pause_task (src->srcpad);
1340
1341   /* unblock streaming thread. */
1342   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1343
1344   /* grab streaming lock, this should eventually be possible, either
1345    * because the task is paused, our streaming thread stopped
1346    * or because our peer is flushing. */
1347   GST_PAD_STREAM_LOCK (src->srcpad);
1348   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1349     /* we have seen this event before, issue a warning for now */
1350     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1351         seqnum);
1352   } else {
1353     src->priv->seqnum = seqnum;
1354     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1355   }
1356
1357   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1358
1359   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1360    * copy the current segment info into the temp segment that we can actually
1361    * attempt the seek with. We only update the real segment if the seek suceeds. */
1362   if (!seekseg_configured) {
1363     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1364
1365     /* now configure the final seek segment */
1366     if (event) {
1367       if (seeksegment.format != seek_format) {
1368         /* OK, here's where we give the subclass a chance to convert the relative
1369          * seek into an absolute one in the processing format. We set up any
1370          * absolute seek above, before taking the stream lock. */
1371         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1372           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1373               "Aborting seek");
1374           res = FALSE;
1375         }
1376       } else {
1377         /* The seek format matches our processing format, no need to ask the
1378          * the subclass to configure the segment. */
1379         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1380             cur_type, cur, stop_type, stop, &update);
1381       }
1382     }
1383     /* Else, no seek event passed, so we're just (re)starting the
1384        current segment. */
1385   }
1386
1387   if (res) {
1388     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1389         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1390         seeksegment.start, seeksegment.stop, seeksegment.position);
1391
1392     /* do the seek, segment.position contains the new position. */
1393     res = gst_base_src_do_seek (src, &seeksegment);
1394   }
1395
1396   /* and prepare to continue streaming */
1397   if (flush) {
1398     tevent = gst_event_new_flush_stop ();
1399     gst_event_set_seqnum (tevent, seqnum);
1400     /* send flush stop, peer will accept data and events again. We
1401      * are not yet providing data as we still have the STREAM_LOCK. */
1402     gst_pad_push_event (src->srcpad, tevent);
1403   }
1404
1405   /* The subclass must have converted the segment to the processing format
1406    * by now */
1407   if (res && seeksegment.format != dest_format) {
1408     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1409         "in the correct format. Aborting seek.");
1410     res = FALSE;
1411   }
1412
1413   /* if the seek was successful, we update our real segment and push
1414    * out the new segment. */
1415   if (res) {
1416     GST_OBJECT_LOCK (src);
1417     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1418     GST_OBJECT_UNLOCK (src);
1419
1420     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1421       GstMessage *message;
1422
1423       message = gst_message_new_segment_start (GST_OBJECT (src),
1424           seeksegment.format, seeksegment.position);
1425       gst_message_set_seqnum (message, seqnum);
1426
1427       gst_element_post_message (GST_ELEMENT (src), message);
1428     }
1429
1430     /* for deriving a stop position for the playback segment from the seek
1431      * segment, we must take the duration when the stop is not set */
1432     if ((stop = seeksegment.stop) == -1)
1433       stop = seeksegment.duration;
1434
1435     src->priv->segment_pending = TRUE;
1436   }
1437
1438   src->priv->discont = TRUE;
1439   src->running = TRUE;
1440   /* and restart the task in case it got paused explicitly or by
1441    * the FLUSH_START event we pushed out. */
1442   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1443       src->srcpad);
1444   if (res && !tres)
1445     res = FALSE;
1446
1447   /* and release the lock again so we can continue streaming */
1448   GST_PAD_STREAM_UNLOCK (src->srcpad);
1449
1450   return res;
1451
1452   /* ERROR */
1453 prepare_failed:
1454   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1455       "Aborting seek");
1456   return FALSE;
1457 }
1458
1459 static const GstQueryType *
1460 gst_base_src_get_query_types (GstElement * element)
1461 {
1462   static const GstQueryType query_types[] = {
1463     GST_QUERY_DURATION,
1464     GST_QUERY_POSITION,
1465     GST_QUERY_SEEKING,
1466     GST_QUERY_SEGMENT,
1467     GST_QUERY_FORMATS,
1468     GST_QUERY_LATENCY,
1469     GST_QUERY_JITTER,
1470     GST_QUERY_RATE,
1471     GST_QUERY_CONVERT,
1472     0
1473   };
1474
1475   return query_types;
1476 }
1477
1478 /* all events send to this element directly. This is mainly done from the
1479  * application.
1480  */
1481 static gboolean
1482 gst_base_src_send_event (GstElement * element, GstEvent * event)
1483 {
1484   GstBaseSrc *src;
1485   gboolean result = FALSE;
1486
1487   src = GST_BASE_SRC (element);
1488
1489   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1490
1491   switch (GST_EVENT_TYPE (event)) {
1492       /* bidirectional events */
1493     case GST_EVENT_FLUSH_START:
1494     case GST_EVENT_FLUSH_STOP:
1495       /* sending random flushes downstream can break stuff,
1496        * especially sync since all segment info will get flushed */
1497       break;
1498
1499       /* downstream serialized events */
1500     case GST_EVENT_EOS:
1501     {
1502       GstBaseSrcClass *bclass;
1503
1504       bclass = GST_BASE_SRC_GET_CLASS (src);
1505
1506       /* queue EOS and make sure the task or pull function performs the EOS
1507        * actions.
1508        *
1509        * We have two possibilities:
1510        *
1511        *  - Before we are to enter the _create function, we check the pending_eos
1512        *    first and do EOS instead of entering it.
1513        *  - If we are in the _create function or we did not manage to set the
1514        *    flag fast enough and we are about to enter the _create function,
1515        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1516        *    check the EOS flag and do the EOS logic.
1517        */
1518       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1519       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1520
1521       /* unlock the _create function so that we can check the pending_eos flag
1522        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1523        * that we can grab it and stop the unlock again. We don't take the stream
1524        * lock so that this operation is guaranteed to never block. */
1525       if (bclass->unlock)
1526         bclass->unlock (src);
1527
1528       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1529
1530       GST_LIVE_LOCK (src);
1531       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1532       /* now stop the unlock of the streaming thread again. Grabbing the live
1533        * lock is enough because that protects the create function. */
1534       if (bclass->unlock_stop)
1535         bclass->unlock_stop (src);
1536       GST_LIVE_UNLOCK (src);
1537
1538       result = TRUE;
1539       break;
1540     }
1541     case GST_EVENT_SEGMENT:
1542       /* sending random SEGMENT downstream can break sync. */
1543       break;
1544     case GST_EVENT_TAG:
1545     case GST_EVENT_CUSTOM_DOWNSTREAM:
1546     case GST_EVENT_CUSTOM_BOTH:
1547       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH in the dataflow */
1548       GST_OBJECT_LOCK (src);
1549       src->priv->pending_events =
1550           g_list_append (src->priv->pending_events, event);
1551       g_atomic_int_set (&src->priv->have_events, TRUE);
1552       GST_OBJECT_UNLOCK (src);
1553       event = NULL;
1554       result = TRUE;
1555       break;
1556     case GST_EVENT_BUFFERSIZE:
1557       /* does not seem to make much sense currently */
1558       break;
1559
1560       /* upstream events */
1561     case GST_EVENT_QOS:
1562       /* elements should override send_event and do something */
1563       break;
1564     case GST_EVENT_SEEK:
1565     {
1566       gboolean started;
1567
1568       GST_OBJECT_LOCK (src->srcpad);
1569       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1570         goto wrong_mode;
1571       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1572       GST_OBJECT_UNLOCK (src->srcpad);
1573
1574       if (started) {
1575         GST_DEBUG_OBJECT (src, "performing seek");
1576         /* when we are running in push mode, we can execute the
1577          * seek right now, we need to unlock. */
1578         result = gst_base_src_perform_seek (src, event, TRUE);
1579       } else {
1580         GstEvent **event_p;
1581
1582         /* else we store the event and execute the seek when we
1583          * get activated */
1584         GST_OBJECT_LOCK (src);
1585         GST_DEBUG_OBJECT (src, "queueing seek");
1586         event_p = &src->pending_seek;
1587         gst_event_replace ((GstEvent **) event_p, event);
1588         GST_OBJECT_UNLOCK (src);
1589         /* assume the seek will work */
1590         result = TRUE;
1591       }
1592       break;
1593     }
1594     case GST_EVENT_NAVIGATION:
1595       /* could make sense for elements that do something with navigation events
1596        * but then they would need to override the send_event function */
1597       break;
1598     case GST_EVENT_LATENCY:
1599       /* does not seem to make sense currently */
1600       break;
1601
1602       /* custom events */
1603     case GST_EVENT_CUSTOM_UPSTREAM:
1604       /* override send_event if you want this */
1605       break;
1606     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1607     case GST_EVENT_CUSTOM_BOTH_OOB:
1608       /* insert a random custom event into the pipeline */
1609       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1610       result = gst_pad_push_event (src->srcpad, event);
1611       /* we gave away the ref to the event in the push */
1612       event = NULL;
1613       break;
1614     default:
1615       break;
1616   }
1617 done:
1618   /* if we still have a ref to the event, unref it now */
1619   if (event)
1620     gst_event_unref (event);
1621
1622   return result;
1623
1624   /* ERRORS */
1625 wrong_mode:
1626   {
1627     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1628     GST_OBJECT_UNLOCK (src->srcpad);
1629     result = FALSE;
1630     goto done;
1631   }
1632 }
1633
1634 static gboolean
1635 gst_base_src_seekable (GstBaseSrc * src)
1636 {
1637   GstBaseSrcClass *bclass;
1638   bclass = GST_BASE_SRC_GET_CLASS (src);
1639   if (bclass->is_seekable)
1640     return bclass->is_seekable (src);
1641   else
1642     return FALSE;
1643 }
1644
1645 static void
1646 gst_base_src_update_qos (GstBaseSrc * src,
1647     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1648 {
1649   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1650       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1651       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1652
1653   GST_OBJECT_LOCK (src);
1654   src->priv->proportion = proportion;
1655   src->priv->earliest_time = timestamp + diff;
1656   GST_OBJECT_UNLOCK (src);
1657 }
1658
1659
1660 static gboolean
1661 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1662 {
1663   gboolean result;
1664
1665   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
1666
1667   switch (GST_EVENT_TYPE (event)) {
1668     case GST_EVENT_SEEK:
1669       /* is normally called when in push mode */
1670       if (!gst_base_src_seekable (src))
1671         goto not_seekable;
1672
1673       result = gst_base_src_perform_seek (src, event, TRUE);
1674       break;
1675     case GST_EVENT_FLUSH_START:
1676       /* cancel any blocking getrange, is normally called
1677        * when in pull mode. */
1678       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1679       break;
1680     case GST_EVENT_FLUSH_STOP:
1681       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1682       break;
1683     case GST_EVENT_QOS:
1684     {
1685       gdouble proportion;
1686       GstClockTimeDiff diff;
1687       GstClockTime timestamp;
1688
1689       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
1690       gst_base_src_update_qos (src, proportion, diff, timestamp);
1691       result = TRUE;
1692       break;
1693     }
1694     default:
1695       result = FALSE;
1696       break;
1697   }
1698   return result;
1699
1700   /* ERRORS */
1701 not_seekable:
1702   {
1703     GST_DEBUG_OBJECT (src, "is not seekable");
1704     return FALSE;
1705   }
1706 }
1707
1708 static gboolean
1709 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1710 {
1711   GstBaseSrc *src;
1712   GstBaseSrcClass *bclass;
1713   gboolean result = FALSE;
1714
1715   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1716   if (G_UNLIKELY (src == NULL)) {
1717     gst_event_unref (event);
1718     return FALSE;
1719   }
1720
1721   bclass = GST_BASE_SRC_GET_CLASS (src);
1722
1723   if (bclass->event) {
1724     if (!(result = bclass->event (src, event)))
1725       goto subclass_failed;
1726   }
1727
1728 done:
1729   gst_event_unref (event);
1730   gst_object_unref (src);
1731
1732   return result;
1733
1734   /* ERRORS */
1735 subclass_failed:
1736   {
1737     GST_DEBUG_OBJECT (src, "subclass refused event");
1738     goto done;
1739   }
1740 }
1741
1742 static void
1743 gst_base_src_set_property (GObject * object, guint prop_id,
1744     const GValue * value, GParamSpec * pspec)
1745 {
1746   GstBaseSrc *src;
1747
1748   src = GST_BASE_SRC (object);
1749
1750   switch (prop_id) {
1751     case PROP_BLOCKSIZE:
1752       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1753       break;
1754     case PROP_NUM_BUFFERS:
1755       src->num_buffers = g_value_get_int (value);
1756       break;
1757     case PROP_TYPEFIND:
1758       src->typefind = g_value_get_boolean (value);
1759       break;
1760     case PROP_DO_TIMESTAMP:
1761       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1762       break;
1763     default:
1764       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1765       break;
1766   }
1767 }
1768
1769 static void
1770 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1771     GParamSpec * pspec)
1772 {
1773   GstBaseSrc *src;
1774
1775   src = GST_BASE_SRC (object);
1776
1777   switch (prop_id) {
1778     case PROP_BLOCKSIZE:
1779       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1780       break;
1781     case PROP_NUM_BUFFERS:
1782       g_value_set_int (value, src->num_buffers);
1783       break;
1784     case PROP_TYPEFIND:
1785       g_value_set_boolean (value, src->typefind);
1786       break;
1787     case PROP_DO_TIMESTAMP:
1788       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1789       break;
1790     default:
1791       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1792       break;
1793   }
1794 }
1795
1796 /* with STREAM_LOCK and LOCK */
1797 static GstClockReturn
1798 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1799 {
1800   GstClockReturn ret;
1801   GstClockID id;
1802
1803   id = gst_clock_new_single_shot_id (clock, time);
1804
1805   basesrc->clock_id = id;
1806   /* release the live lock while waiting */
1807   GST_LIVE_UNLOCK (basesrc);
1808
1809   ret = gst_clock_id_wait (id, NULL);
1810
1811   GST_LIVE_LOCK (basesrc);
1812   gst_clock_id_unref (id);
1813   basesrc->clock_id = NULL;
1814
1815   return ret;
1816 }
1817
1818 /* perform synchronisation on a buffer.
1819  * with STREAM_LOCK.
1820  */
1821 static GstClockReturn
1822 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1823 {
1824   GstClockReturn result;
1825   GstClockTime start, end;
1826   GstBaseSrcClass *bclass;
1827   GstClockTime base_time;
1828   GstClock *clock;
1829   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1830   gboolean do_timestamp, first, pseudo_live;
1831
1832   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1833
1834   start = end = -1;
1835   if (bclass->get_times)
1836     bclass->get_times (basesrc, buffer, &start, &end);
1837
1838   /* get buffer timestamp */
1839   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1840
1841   /* grab the lock to prepare for clocking and calculate the startup
1842    * latency. */
1843   GST_OBJECT_LOCK (basesrc);
1844
1845   /* if we are asked to sync against the clock we are a pseudo live element */
1846   pseudo_live = (start != -1 && basesrc->is_live);
1847   /* check for the first buffer */
1848   first = (basesrc->priv->latency == -1);
1849
1850   if (timestamp != -1 && pseudo_live) {
1851     GstClockTime latency;
1852
1853     /* we have a timestamp and a sync time, latency is the diff */
1854     if (timestamp <= start)
1855       latency = start - timestamp;
1856     else
1857       latency = 0;
1858
1859     if (first) {
1860       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1861           GST_TIME_ARGS (latency));
1862       /* first time we calculate latency, just configure */
1863       basesrc->priv->latency = latency;
1864     } else {
1865       if (basesrc->priv->latency != latency) {
1866         /* we have a new latency, FIXME post latency message */
1867         basesrc->priv->latency = latency;
1868         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1869             GST_TIME_ARGS (latency));
1870       }
1871     }
1872   } else if (first) {
1873     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1874         basesrc->is_live, start != -1);
1875     basesrc->priv->latency = 0;
1876   }
1877
1878   /* get clock, if no clock, we can't sync or do timestamps */
1879   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1880     goto no_clock;
1881
1882   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1883
1884   do_timestamp = basesrc->priv->do_timestamp;
1885
1886   /* first buffer, calculate the timestamp offset */
1887   if (first) {
1888     GstClockTime running_time;
1889
1890     now = gst_clock_get_time (clock);
1891     running_time = now - base_time;
1892
1893     GST_LOG_OBJECT (basesrc,
1894         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1895         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1896         GST_TIME_ARGS (running_time));
1897
1898     if (pseudo_live && timestamp != -1) {
1899       /* live source and we need to sync, add startup latency to all timestamps
1900        * to get the real running_time. Live sources should always timestamp
1901        * according to the current running time. */
1902       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1903
1904       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1905           GST_TIME_ARGS (basesrc->priv->ts_offset));
1906     } else {
1907       basesrc->priv->ts_offset = 0;
1908       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1909     }
1910
1911     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1912       if (do_timestamp)
1913         timestamp = running_time;
1914       else
1915         timestamp = 0;
1916
1917       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1918
1919       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1920           GST_TIME_ARGS (timestamp));
1921     }
1922
1923     /* add the timestamp offset we need for sync */
1924     timestamp += basesrc->priv->ts_offset;
1925   } else {
1926     /* not the first buffer, the timestamp is the diff between the clock and
1927      * base_time */
1928     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1929       now = gst_clock_get_time (clock);
1930
1931       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1932
1933       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1934           GST_TIME_ARGS (now - base_time));
1935     }
1936   }
1937
1938   /* if we don't have a buffer timestamp, we don't sync */
1939   if (!GST_CLOCK_TIME_IS_VALID (start))
1940     goto no_sync;
1941
1942   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1943     /* for pseudo live sources, add our ts_offset to the timestamp */
1944     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1945     start += basesrc->priv->ts_offset;
1946   }
1947
1948   GST_LOG_OBJECT (basesrc,
1949       "waiting for clock, base time %" GST_TIME_FORMAT
1950       ", stream_start %" GST_TIME_FORMAT,
1951       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1952   GST_OBJECT_UNLOCK (basesrc);
1953
1954   result = gst_base_src_wait (basesrc, clock, start + base_time);
1955
1956   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1957
1958   return result;
1959
1960   /* special cases */
1961 no_clock:
1962   {
1963     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1964     GST_OBJECT_UNLOCK (basesrc);
1965     return GST_CLOCK_OK;
1966   }
1967 no_sync:
1968   {
1969     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1970     GST_OBJECT_UNLOCK (basesrc);
1971     return GST_CLOCK_OK;
1972   }
1973 }
1974
1975 /* Called with STREAM_LOCK and LIVE_LOCK */
1976 static gboolean
1977 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1978 {
1979   guint64 size, maxsize;
1980   GstBaseSrcClass *bclass;
1981   GstFormat format;
1982   gint64 stop;
1983
1984   bclass = GST_BASE_SRC_GET_CLASS (src);
1985
1986   format = src->segment.format;
1987   stop = src->segment.stop;
1988   /* get total file size */
1989   size = (guint64) src->segment.duration;
1990
1991   /* only operate if we are working with bytes */
1992   if (format != GST_FORMAT_BYTES)
1993     return TRUE;
1994
1995   /* the max amount of bytes to read is the total size or
1996    * up to the segment.stop if present. */
1997   if (stop != -1)
1998     maxsize = MIN (size, stop);
1999   else
2000     maxsize = size;
2001
2002   GST_DEBUG_OBJECT (src,
2003       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2004       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2005       *length, size, stop, maxsize);
2006
2007   /* check size if we have one */
2008   if (maxsize != -1) {
2009     /* if we run past the end, check if the file became bigger and
2010      * retry. */
2011     if (G_UNLIKELY (offset + *length >= maxsize)) {
2012       /* see if length of the file changed */
2013       if (bclass->get_size)
2014         if (!bclass->get_size (src, &size))
2015           size = -1;
2016
2017       /* make sure we don't exceed the configured segment stop
2018        * if it was set */
2019       if (stop != -1)
2020         maxsize = MIN (size, stop);
2021       else
2022         maxsize = size;
2023
2024       /* if we are at or past the end, EOS */
2025       if (G_UNLIKELY (offset >= maxsize))
2026         goto unexpected_length;
2027
2028       /* else we can clip to the end */
2029       if (G_UNLIKELY (offset + *length >= maxsize))
2030         *length = maxsize - offset;
2031
2032     }
2033   }
2034
2035   /* keep track of current position and update duration.
2036    * segment is in bytes, we checked that above. */
2037   GST_OBJECT_LOCK (src);
2038   src->segment.duration = size;
2039   src->segment.position = offset;
2040   GST_OBJECT_UNLOCK (src);
2041
2042   return TRUE;
2043
2044   /* ERRORS */
2045 unexpected_length:
2046   {
2047     return FALSE;
2048   }
2049 }
2050
2051 /* must be called with LIVE_LOCK */
2052 static GstFlowReturn
2053 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2054     GstBuffer ** buf)
2055 {
2056   GstFlowReturn ret;
2057   GstBaseSrcClass *bclass;
2058   GstClockReturn status;
2059
2060   bclass = GST_BASE_SRC_GET_CLASS (src);
2061
2062 again:
2063   if (src->is_live) {
2064     if (G_UNLIKELY (!src->live_running)) {
2065       ret = gst_base_src_wait_playing (src);
2066       if (ret != GST_FLOW_OK)
2067         goto stopped;
2068     }
2069   }
2070
2071   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2072     goto not_started;
2073
2074   if (G_UNLIKELY (!bclass->create))
2075     goto no_function;
2076
2077   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2078     goto unexpected_length;
2079
2080   /* normally we don't count buffers */
2081   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2082     if (src->num_buffers_left == 0)
2083       goto reached_num_buffers;
2084     else
2085       src->num_buffers_left--;
2086   }
2087
2088   /* don't enter the create function if a pending EOS event was set. For the
2089    * logic of the pending_eos, check the event function of this class. */
2090   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2091     goto eos;
2092
2093   GST_DEBUG_OBJECT (src,
2094       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2095       G_GINT64_FORMAT, offset, length, src->segment.time);
2096
2097   ret = bclass->create (src, offset, length, buf);
2098
2099   /* The create function could be unlocked because we have a pending EOS. It's
2100    * possible that we have a valid buffer from create that we need to
2101    * discard when the create function returned _OK. */
2102   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2103     if (ret == GST_FLOW_OK) {
2104       gst_buffer_unref (*buf);
2105       *buf = NULL;
2106     }
2107     goto eos;
2108   }
2109
2110   if (G_UNLIKELY (ret != GST_FLOW_OK))
2111     goto not_ok;
2112
2113   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2114   if (offset == 0 && src->segment.time == 0
2115       && GST_BUFFER_TIMESTAMP (*buf) == -1 && !src->is_live) {
2116     *buf = gst_buffer_make_writable (*buf);
2117     GST_BUFFER_TIMESTAMP (*buf) = 0;
2118   }
2119
2120   /* now sync before pushing the buffer */
2121   status = gst_base_src_do_sync (src, *buf);
2122
2123   /* waiting for the clock could have made us flushing */
2124   if (G_UNLIKELY (src->priv->flushing))
2125     goto flushing;
2126
2127   switch (status) {
2128     case GST_CLOCK_EARLY:
2129       /* the buffer is too late. We currently don't drop the buffer. */
2130       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2131       break;
2132     case GST_CLOCK_OK:
2133       /* buffer synchronised properly */
2134       GST_DEBUG_OBJECT (src, "buffer ok");
2135       break;
2136     case GST_CLOCK_UNSCHEDULED:
2137       /* this case is triggered when we were waiting for the clock and
2138        * it got unlocked because we did a state change. In any case, get rid of
2139        * the buffer. */
2140       gst_buffer_unref (*buf);
2141       *buf = NULL;
2142       if (!src->live_running) {
2143         /* We return WRONG_STATE when we are not running to stop the dataflow also
2144          * get rid of the produced buffer. */
2145         GST_DEBUG_OBJECT (src,
2146             "clock was unscheduled (%d), returning WRONG_STATE", status);
2147         ret = GST_FLOW_WRONG_STATE;
2148       } else {
2149         /* If we are running when this happens, we quickly switched between
2150          * pause and playing. We try to produce a new buffer */
2151         GST_DEBUG_OBJECT (src,
2152             "clock was unscheduled (%d), but we are running", status);
2153         goto again;
2154       }
2155       break;
2156     default:
2157       /* all other result values are unexpected and errors */
2158       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2159           (_("Internal clock error.")),
2160           ("clock returned unexpected return value %d", status));
2161       gst_buffer_unref (*buf);
2162       *buf = NULL;
2163       ret = GST_FLOW_ERROR;
2164       break;
2165   }
2166   return ret;
2167
2168   /* ERROR */
2169 stopped:
2170   {
2171     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2172         gst_flow_get_name (ret));
2173     return ret;
2174   }
2175 not_ok:
2176   {
2177     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2178         gst_flow_get_name (ret));
2179     return ret;
2180   }
2181 not_started:
2182   {
2183     GST_DEBUG_OBJECT (src, "getrange but not started");
2184     return GST_FLOW_WRONG_STATE;
2185   }
2186 no_function:
2187   {
2188     GST_DEBUG_OBJECT (src, "no create function");
2189     return GST_FLOW_ERROR;
2190   }
2191 unexpected_length:
2192   {
2193     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2194         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2195     return GST_FLOW_UNEXPECTED;
2196   }
2197 reached_num_buffers:
2198   {
2199     GST_DEBUG_OBJECT (src, "sent all buffers");
2200     return GST_FLOW_UNEXPECTED;
2201   }
2202 flushing:
2203   {
2204     GST_DEBUG_OBJECT (src, "we are flushing");
2205     gst_buffer_unref (*buf);
2206     *buf = NULL;
2207     return GST_FLOW_WRONG_STATE;
2208   }
2209 eos:
2210   {
2211     GST_DEBUG_OBJECT (src, "we are EOS");
2212     return GST_FLOW_UNEXPECTED;
2213   }
2214 }
2215
2216 static GstFlowReturn
2217 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2218     GstBuffer ** buf)
2219 {
2220   GstBaseSrc *src;
2221   GstFlowReturn res;
2222
2223   src = GST_BASE_SRC_CAST (gst_object_ref (GST_OBJECT_PARENT (pad)));
2224
2225   GST_LIVE_LOCK (src);
2226   if (G_UNLIKELY (src->priv->flushing))
2227     goto flushing;
2228
2229   res = gst_base_src_get_range (src, offset, length, buf);
2230
2231 done:
2232   GST_LIVE_UNLOCK (src);
2233
2234   gst_object_unref (src);
2235
2236   return res;
2237
2238   /* ERRORS */
2239 flushing:
2240   {
2241     GST_DEBUG_OBJECT (src, "we are flushing");
2242     res = GST_FLOW_WRONG_STATE;
2243     goto done;
2244   }
2245 }
2246
2247 static gboolean
2248 gst_base_src_default_check_get_range (GstBaseSrc * src)
2249 {
2250   gboolean res;
2251
2252   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2253     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2254     if (G_LIKELY (gst_base_src_start (src)))
2255       gst_base_src_stop (src);
2256   }
2257
2258   /* we can operate in getrange mode if the native format is bytes
2259    * and we are seekable, this condition is set in the random_access
2260    * flag and is set in the _start() method. */
2261   res = src->random_access;
2262
2263   return res;
2264 }
2265
2266 static gboolean
2267 gst_base_src_check_get_range (GstBaseSrc * src)
2268 {
2269   GstBaseSrcClass *bclass;
2270   gboolean res;
2271
2272   bclass = GST_BASE_SRC_GET_CLASS (src);
2273
2274   if (bclass->check_get_range == NULL)
2275     goto no_function;
2276
2277   res = bclass->check_get_range (src);
2278   GST_LOG_OBJECT (src, "%s() returned %d",
2279       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2280
2281   return res;
2282
2283   /* ERRORS */
2284 no_function:
2285   {
2286     GST_WARNING_OBJECT (src, "no check_get_range function set");
2287     return FALSE;
2288   }
2289 }
2290
2291 static gboolean
2292 gst_base_src_pad_check_get_range (GstPad * pad)
2293 {
2294   GstBaseSrc *src;
2295   gboolean res;
2296
2297   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2298
2299   res = gst_base_src_check_get_range (src);
2300
2301   return res;
2302 }
2303
2304 static void
2305 gst_base_src_loop (GstPad * pad)
2306 {
2307   GstBaseSrc *src;
2308   GstBuffer *buf = NULL;
2309   GstFlowReturn ret;
2310   gint64 position;
2311   gboolean eos;
2312   gulong blocksize;
2313   GList *pending_events = NULL, *tmp;
2314   gboolean reconfigure;
2315
2316   eos = FALSE;
2317
2318   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2319
2320   GST_OBJECT_LOCK (pad);
2321   reconfigure = GST_PAD_NEEDS_RECONFIGURE (pad);
2322   GST_OBJECT_FLAG_UNSET (pad, GST_PAD_NEED_RECONFIGURE);
2323   GST_OBJECT_UNLOCK (pad);
2324   /* check if we need to renegotiate */
2325   if (reconfigure) {
2326     if (!gst_base_src_negotiate (src))
2327       GST_DEBUG_OBJECT (src, "Failed to renegotiate");
2328   }
2329
2330   GST_LIVE_LOCK (src);
2331
2332   if (G_UNLIKELY (src->priv->flushing))
2333     goto flushing;
2334
2335   src->priv->last_sent_eos = FALSE;
2336
2337   blocksize = src->blocksize;
2338
2339   /* if we operate in bytes, we can calculate an offset */
2340   if (src->segment.format == GST_FORMAT_BYTES) {
2341     position = src->segment.position;
2342     /* for negative rates, start with subtracting the blocksize */
2343     if (src->segment.rate < 0.0) {
2344       /* we cannot go below segment.start */
2345       if (position > src->segment.start + blocksize)
2346         position -= blocksize;
2347       else {
2348         /* last block, remainder up to segment.start */
2349         blocksize = position - src->segment.start;
2350         position = src->segment.start;
2351       }
2352     }
2353   } else
2354     position = -1;
2355
2356   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2357       GST_TIME_ARGS (position), blocksize);
2358
2359   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2360   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2361     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2362         gst_flow_get_name (ret));
2363     GST_LIVE_UNLOCK (src);
2364     goto pause;
2365   }
2366   /* this should not happen */
2367   if (G_UNLIKELY (buf == NULL))
2368     goto null_buffer;
2369
2370   /* push events to close/start our segment before we push the buffer. */
2371   if (G_UNLIKELY (src->priv->segment_pending)) {
2372     gst_pad_push_event (pad, gst_event_new_segment (&src->segment));
2373     src->priv->segment_pending = FALSE;
2374   }
2375
2376   if (g_atomic_int_get (&src->priv->have_events)) {
2377     GST_OBJECT_LOCK (src);
2378     /* take the events */
2379     pending_events = src->priv->pending_events;
2380     src->priv->pending_events = NULL;
2381     g_atomic_int_set (&src->priv->have_events, FALSE);
2382     GST_OBJECT_UNLOCK (src);
2383   }
2384
2385   /* Push out pending events if any */
2386   if (G_UNLIKELY (pending_events != NULL)) {
2387     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
2388       GstEvent *ev = (GstEvent *) tmp->data;
2389       gst_pad_push_event (pad, ev);
2390     }
2391     g_list_free (pending_events);
2392   }
2393
2394   /* figure out the new position */
2395   switch (src->segment.format) {
2396     case GST_FORMAT_BYTES:
2397     {
2398       guint bufsize = gst_buffer_get_size (buf);
2399
2400       /* we subtracted above for negative rates */
2401       if (src->segment.rate >= 0.0)
2402         position += bufsize;
2403       break;
2404     }
2405     case GST_FORMAT_TIME:
2406     {
2407       GstClockTime start, duration;
2408
2409       start = GST_BUFFER_TIMESTAMP (buf);
2410       duration = GST_BUFFER_DURATION (buf);
2411
2412       if (GST_CLOCK_TIME_IS_VALID (start))
2413         position = start;
2414       else
2415         position = src->segment.position;
2416
2417       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2418         if (src->segment.rate >= 0.0)
2419           position += duration;
2420         else if (position > duration)
2421           position -= duration;
2422         else
2423           position = 0;
2424       }
2425       break;
2426     }
2427     case GST_FORMAT_DEFAULT:
2428       if (src->segment.rate >= 0.0)
2429         position = GST_BUFFER_OFFSET_END (buf);
2430       else
2431         position = GST_BUFFER_OFFSET (buf);
2432       break;
2433     default:
2434       position = -1;
2435       break;
2436   }
2437   if (position != -1) {
2438     if (src->segment.rate >= 0.0) {
2439       /* positive rate, check if we reached the stop */
2440       if (src->segment.stop != -1) {
2441         if (position >= src->segment.stop) {
2442           eos = TRUE;
2443           position = src->segment.stop;
2444         }
2445       }
2446     } else {
2447       /* negative rate, check if we reached the start. start is always set to
2448        * something different from -1 */
2449       if (position <= src->segment.start) {
2450         eos = TRUE;
2451         position = src->segment.start;
2452       }
2453       /* when going reverse, all buffers are DISCONT */
2454       src->priv->discont = TRUE;
2455     }
2456     GST_OBJECT_LOCK (src);
2457     src->segment.position = position;
2458     GST_OBJECT_UNLOCK (src);
2459   }
2460
2461   if (G_UNLIKELY (src->priv->discont)) {
2462     buf = gst_buffer_make_writable (buf);
2463     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2464     src->priv->discont = FALSE;
2465   }
2466   GST_LIVE_UNLOCK (src);
2467
2468   ret = gst_pad_push (pad, buf);
2469   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2470     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2471         gst_flow_get_name (ret));
2472     goto pause;
2473   }
2474
2475   if (G_UNLIKELY (eos)) {
2476     GST_INFO_OBJECT (src, "pausing after end of segment");
2477     ret = GST_FLOW_UNEXPECTED;
2478     goto pause;
2479   }
2480
2481 done:
2482   return;
2483
2484   /* special cases */
2485 flushing:
2486   {
2487     GST_DEBUG_OBJECT (src, "we are flushing");
2488     GST_LIVE_UNLOCK (src);
2489     ret = GST_FLOW_WRONG_STATE;
2490     goto pause;
2491   }
2492 pause:
2493   {
2494     const gchar *reason = gst_flow_get_name (ret);
2495     GstEvent *event;
2496
2497     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2498     src->running = FALSE;
2499     gst_pad_pause_task (pad);
2500     if (ret == GST_FLOW_UNEXPECTED) {
2501       gboolean flag_segment;
2502       GstFormat format;
2503       gint64 position;
2504
2505       /* perform EOS logic */
2506       flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2507       format = src->segment.format;
2508       position = src->segment.position;
2509
2510       if (flag_segment) {
2511         GstMessage *message;
2512
2513         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2514             format, position);
2515         gst_message_set_seqnum (message, src->priv->seqnum);
2516         gst_element_post_message (GST_ELEMENT_CAST (src), message);
2517       } else {
2518         event = gst_event_new_eos ();
2519         gst_event_set_seqnum (event, src->priv->seqnum);
2520         gst_pad_push_event (pad, event);
2521         src->priv->last_sent_eos = TRUE;
2522       }
2523     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_UNEXPECTED) {
2524       event = gst_event_new_eos ();
2525       gst_event_set_seqnum (event, src->priv->seqnum);
2526       /* for fatal errors we post an error message, post the error
2527        * first so the app knows about the error first.
2528        * Also don't do this for WRONG_STATE because it happens
2529        * due to flushing and posting an error message because of
2530        * that is the wrong thing to do, e.g. when we're doing
2531        * a flushing seek. */
2532       GST_ELEMENT_ERROR (src, STREAM, FAILED,
2533           (_("Internal data flow error.")),
2534           ("streaming task paused, reason %s (%d)", reason, ret));
2535       gst_pad_push_event (pad, event);
2536       src->priv->last_sent_eos = TRUE;
2537     }
2538     goto done;
2539   }
2540 null_buffer:
2541   {
2542     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2543         (_("Internal data flow error.")), ("element returned NULL buffer"));
2544     GST_LIVE_UNLOCK (src);
2545     goto done;
2546   }
2547 }
2548
2549 /* default negotiation code.
2550  *
2551  * Take intersection between src and sink pads, take first
2552  * caps and fixate.
2553  */
2554 static gboolean
2555 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2556 {
2557   GstCaps *thiscaps;
2558   GstCaps *caps = NULL;
2559   GstCaps *peercaps = NULL;
2560   gboolean result = FALSE;
2561
2562   /* first see what is possible on our source pad */
2563   thiscaps = gst_pad_get_caps (GST_BASE_SRC_PAD (basesrc), NULL);
2564   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2565   /* nothing or anything is allowed, we're done */
2566   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2567     goto no_nego_needed;
2568
2569   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2570     goto no_caps;
2571
2572   /* get the peer caps */
2573   peercaps = gst_pad_peer_get_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
2574   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2575   if (peercaps) {
2576     /* The result is already a subset of our caps */
2577     caps = peercaps;
2578     gst_caps_unref (thiscaps);
2579   } else {
2580     /* no peer, work with our own caps then */
2581     caps = thiscaps;
2582   }
2583   if (caps && !gst_caps_is_empty (caps)) {
2584     caps = gst_caps_make_writable (caps);
2585
2586     /* take first (and best, since they are sorted) possibility */
2587     gst_caps_truncate (caps);
2588
2589     /* now fixate */
2590     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
2591     if (gst_caps_is_any (caps)) {
2592       /* hmm, still anything, so element can do anything and
2593        * nego is not needed */
2594       result = TRUE;
2595     } else {
2596       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2597       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2598       if (gst_caps_is_fixed (caps)) {
2599         /* yay, fixed caps, use those then, it's possible that the subclass does
2600          * not accept this caps after all and we have to fail. */
2601         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2602       }
2603     }
2604     gst_caps_unref (caps);
2605   } else {
2606     GST_DEBUG_OBJECT (basesrc, "no common caps");
2607   }
2608   return result;
2609
2610 no_nego_needed:
2611   {
2612     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2613     if (thiscaps)
2614       gst_caps_unref (thiscaps);
2615     return TRUE;
2616   }
2617 no_caps:
2618   {
2619     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2620         ("No supported formats found"),
2621         ("This element did not produce valid caps"));
2622     if (thiscaps)
2623       gst_caps_unref (thiscaps);
2624     return TRUE;
2625   }
2626 }
2627
2628 static gboolean
2629 gst_base_src_negotiate (GstBaseSrc * basesrc)
2630 {
2631   GstBaseSrcClass *bclass;
2632   gboolean result = TRUE;
2633
2634   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2635
2636   if (bclass->negotiate)
2637     result = bclass->negotiate (basesrc);
2638
2639   return result;
2640 }
2641
2642 static gboolean
2643 gst_base_src_start (GstBaseSrc * basesrc)
2644 {
2645   GstBaseSrcClass *bclass;
2646   gboolean result;
2647   guint64 size;
2648   gboolean seekable;
2649   GstFormat format;
2650
2651   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2652     return TRUE;
2653
2654   GST_DEBUG_OBJECT (basesrc, "starting source");
2655
2656   basesrc->num_buffers_left = basesrc->num_buffers;
2657
2658   GST_OBJECT_LOCK (basesrc);
2659   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2660   GST_OBJECT_UNLOCK (basesrc);
2661
2662   basesrc->running = FALSE;
2663   basesrc->priv->segment_pending = FALSE;
2664
2665   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2666   if (bclass->start)
2667     result = bclass->start (basesrc);
2668   else
2669     result = TRUE;
2670
2671   if (!result)
2672     goto could_not_start;
2673
2674   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2675
2676   format = basesrc->segment.format;
2677
2678   /* figure out the size */
2679   if (format == GST_FORMAT_BYTES) {
2680     if (bclass->get_size) {
2681       if (!(result = bclass->get_size (basesrc, &size)))
2682         size = -1;
2683     } else {
2684       result = FALSE;
2685       size = -1;
2686     }
2687     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2688     /* only update the size when operating in bytes, subclass is supposed
2689      * to set duration in the start method for other formats */
2690     GST_OBJECT_LOCK (basesrc);
2691     basesrc->segment.duration = size;
2692     GST_OBJECT_UNLOCK (basesrc);
2693   } else {
2694     size = -1;
2695   }
2696
2697   GST_DEBUG_OBJECT (basesrc,
2698       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2699       G_GINT64_FORMAT, gst_format_get_name (format), result, size,
2700       basesrc->segment.duration);
2701
2702   seekable = gst_base_src_seekable (basesrc);
2703   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2704
2705   /* update for random access flag */
2706   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2707
2708   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2709
2710   /* run typefind if we are random_access and the typefinding is enabled. */
2711   if (basesrc->random_access && basesrc->typefind && size != -1) {
2712     GstCaps *caps;
2713
2714     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2715       goto typefind_failed;
2716
2717     result = gst_pad_set_caps (basesrc->srcpad, caps);
2718     gst_caps_unref (caps);
2719   } else {
2720     /* use class or default negotiate function */
2721     if (!(result = gst_base_src_negotiate (basesrc)))
2722       goto could_not_negotiate;
2723   }
2724
2725   return result;
2726
2727   /* ERROR */
2728 could_not_start:
2729   {
2730     GST_DEBUG_OBJECT (basesrc, "could not start");
2731     /* subclass is supposed to post a message. We don't have to call _stop. */
2732     return FALSE;
2733   }
2734 could_not_negotiate:
2735   {
2736     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2737     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2738         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2739     /* we must call stop */
2740     gst_base_src_stop (basesrc);
2741     return FALSE;
2742   }
2743 typefind_failed:
2744   {
2745     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2746     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2747     /* we must call stop */
2748     gst_base_src_stop (basesrc);
2749     return FALSE;
2750   }
2751 }
2752
2753 static gboolean
2754 gst_base_src_stop (GstBaseSrc * basesrc)
2755 {
2756   GstBaseSrcClass *bclass;
2757   gboolean result = TRUE;
2758
2759   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2760     return TRUE;
2761
2762   GST_DEBUG_OBJECT (basesrc, "stopping source");
2763
2764   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2765   if (bclass->stop)
2766     result = bclass->stop (basesrc);
2767
2768   if (result)
2769     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2770
2771   return result;
2772 }
2773
2774 /* start or stop flushing dataprocessing
2775  */
2776 static gboolean
2777 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2778     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2779 {
2780   GstBaseSrcClass *bclass;
2781
2782   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2783
2784   if (flushing && unlock) {
2785     /* unlock any subclasses, we need to do this before grabbing the
2786      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2787      * unlock to the params because of backwards compat (see seek handler)*/
2788     if (bclass->unlock)
2789       bclass->unlock (basesrc);
2790   }
2791
2792   /* the live lock is released when we are blocked, waiting for playing or
2793    * when we sync to the clock. */
2794   GST_LIVE_LOCK (basesrc);
2795   if (playing)
2796     *playing = basesrc->live_running;
2797   basesrc->priv->flushing = flushing;
2798   if (flushing) {
2799     /* if we are locked in the live lock, signal it to make it flush */
2800     basesrc->live_running = TRUE;
2801
2802     /* clear pending EOS if any */
2803     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2804
2805     /* step 1, now that we have the LIVE lock, clear our unlock request */
2806     if (bclass->unlock_stop)
2807       bclass->unlock_stop (basesrc);
2808
2809     /* step 2, unblock clock sync (if any) or any other blocking thing */
2810     if (basesrc->clock_id)
2811       gst_clock_id_unschedule (basesrc->clock_id);
2812   } else {
2813     /* signal the live source that it can start playing */
2814     basesrc->live_running = live_play;
2815
2816     /* When unlocking drop all delayed events */
2817     if (unlock) {
2818       GST_OBJECT_LOCK (basesrc);
2819       if (basesrc->priv->pending_events) {
2820         g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
2821             NULL);
2822         g_list_free (basesrc->priv->pending_events);
2823         basesrc->priv->pending_events = NULL;
2824         g_atomic_int_set (&basesrc->priv->have_events, FALSE);
2825       }
2826       GST_OBJECT_UNLOCK (basesrc);
2827     }
2828   }
2829   GST_LIVE_SIGNAL (basesrc);
2830   GST_LIVE_UNLOCK (basesrc);
2831
2832   return TRUE;
2833 }
2834
2835 /* the purpose of this function is to make sure that a live source blocks in the
2836  * LIVE lock or leaves the LIVE lock and continues playing. */
2837 static gboolean
2838 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2839 {
2840   GstBaseSrcClass *bclass;
2841
2842   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2843
2844   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2845   if (!live_play) {
2846     GST_DEBUG_OBJECT (basesrc, "unlock");
2847     if (bclass->unlock)
2848       bclass->unlock (basesrc);
2849   }
2850
2851   /* we are now able to grab the LIVE lock, when we get it, we can be
2852    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2853    * for the clock. */
2854   GST_LIVE_LOCK (basesrc);
2855   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2856
2857   /* unblock clock sync (if any) */
2858   if (basesrc->clock_id)
2859     gst_clock_id_unschedule (basesrc->clock_id);
2860
2861   /* configure what to do when we get to the LIVE lock. */
2862   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2863   basesrc->live_running = live_play;
2864
2865   if (live_play) {
2866     gboolean start;
2867
2868     /* clear our unlock request when going to PLAYING */
2869     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2870     if (bclass->unlock_stop)
2871       bclass->unlock_stop (basesrc);
2872
2873     /* for live sources we restart the timestamp correction */
2874     basesrc->priv->latency = -1;
2875     /* have to restart the task in case it stopped because of the unlock when
2876      * we went to PAUSED. Only do this if we operating in push mode. */
2877     GST_OBJECT_LOCK (basesrc->srcpad);
2878     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2879     GST_OBJECT_UNLOCK (basesrc->srcpad);
2880     if (start)
2881       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2882           basesrc->srcpad);
2883     GST_DEBUG_OBJECT (basesrc, "signal");
2884     GST_LIVE_SIGNAL (basesrc);
2885   }
2886   GST_LIVE_UNLOCK (basesrc);
2887
2888   return TRUE;
2889 }
2890
2891 static gboolean
2892 gst_base_src_activate_push (GstPad * pad, gboolean active)
2893 {
2894   GstBaseSrc *basesrc;
2895   GstEvent *event;
2896
2897   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2898
2899   /* prepare subclass first */
2900   if (active) {
2901     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2902
2903     if (G_UNLIKELY (!basesrc->can_activate_push))
2904       goto no_push_activation;
2905
2906     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2907       goto error_start;
2908
2909     basesrc->priv->last_sent_eos = FALSE;
2910     basesrc->priv->discont = TRUE;
2911     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2912
2913     /* do initial seek, which will start the task */
2914     GST_OBJECT_LOCK (basesrc);
2915     event = basesrc->pending_seek;
2916     basesrc->pending_seek = NULL;
2917     GST_OBJECT_UNLOCK (basesrc);
2918
2919     /* no need to unlock anything, the task is certainly
2920      * not running here. The perform seek code will start the task when
2921      * finished. */
2922     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2923       goto seek_failed;
2924
2925     if (event)
2926       gst_event_unref (event);
2927   } else {
2928     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2929     /* flush all */
2930     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2931     /* stop the task */
2932     gst_pad_stop_task (pad);
2933     /* now we can stop the source */
2934     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2935       goto error_stop;
2936   }
2937   return TRUE;
2938
2939   /* ERRORS */
2940 no_push_activation:
2941   {
2942     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2943     return FALSE;
2944   }
2945 error_start:
2946   {
2947     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2948     return FALSE;
2949   }
2950 seek_failed:
2951   {
2952     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2953     /* flush all */
2954     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2955     /* stop the task */
2956     gst_pad_stop_task (pad);
2957     /* Stop the basesrc */
2958     gst_base_src_stop (basesrc);
2959     if (event)
2960       gst_event_unref (event);
2961     return FALSE;
2962   }
2963 error_stop:
2964   {
2965     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2966     return FALSE;
2967   }
2968 }
2969
2970 static gboolean
2971 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2972 {
2973   GstBaseSrc *basesrc;
2974
2975   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2976
2977   /* prepare subclass first */
2978   if (active) {
2979     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2980     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2981       goto error_start;
2982
2983     /* if not random_access, we cannot operate in pull mode for now */
2984     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2985       goto no_get_range;
2986
2987     /* stop flushing now but for live sources, still block in the LIVE lock when
2988      * we are not yet PLAYING */
2989     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2990   } else {
2991     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2992     /* flush all, there is no task to stop */
2993     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2994
2995     /* don't send EOS when going from PAUSED => READY when in pull mode */
2996     basesrc->priv->last_sent_eos = TRUE;
2997
2998     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2999       goto error_stop;
3000   }
3001   return TRUE;
3002
3003   /* ERRORS */
3004 error_start:
3005   {
3006     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3007     return FALSE;
3008   }
3009 no_get_range:
3010   {
3011     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3012     gst_base_src_stop (basesrc);
3013     return FALSE;
3014   }
3015 error_stop:
3016   {
3017     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3018     return FALSE;
3019   }
3020 }
3021
3022 static GstStateChangeReturn
3023 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3024 {
3025   GstBaseSrc *basesrc;
3026   GstStateChangeReturn result;
3027   gboolean no_preroll = FALSE;
3028
3029   basesrc = GST_BASE_SRC (element);
3030
3031   switch (transition) {
3032     case GST_STATE_CHANGE_NULL_TO_READY:
3033       break;
3034     case GST_STATE_CHANGE_READY_TO_PAUSED:
3035       no_preroll = gst_base_src_is_live (basesrc);
3036       break;
3037     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3038       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3039       if (gst_base_src_is_live (basesrc)) {
3040         /* now we can start playback */
3041         gst_base_src_set_playing (basesrc, TRUE);
3042       }
3043       break;
3044     default:
3045       break;
3046   }
3047
3048   if ((result =
3049           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3050               transition)) == GST_STATE_CHANGE_FAILURE)
3051     goto failure;
3052
3053   switch (transition) {
3054     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3055       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3056       if (gst_base_src_is_live (basesrc)) {
3057         /* make sure we block in the live lock in PAUSED */
3058         gst_base_src_set_playing (basesrc, FALSE);
3059         no_preroll = TRUE;
3060       }
3061       break;
3062     case GST_STATE_CHANGE_PAUSED_TO_READY:
3063     {
3064       GstEvent **event_p, *event;
3065
3066       /* we don't need to unblock anything here, the pad deactivation code
3067        * already did this */
3068
3069       /* FIXME, deprecate this behaviour, it is very dangerous.
3070        * the prefered way of sending EOS downstream is by sending
3071        * the EOS event to the element */
3072       if (!basesrc->priv->last_sent_eos) {
3073         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
3074         event = gst_event_new_eos ();
3075         gst_event_set_seqnum (event, basesrc->priv->seqnum);
3076         gst_pad_push_event (basesrc->srcpad, event);
3077         basesrc->priv->last_sent_eos = TRUE;
3078       }
3079       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3080       event_p = &basesrc->pending_seek;
3081       gst_event_replace (event_p, NULL);
3082       break;
3083     }
3084     case GST_STATE_CHANGE_READY_TO_NULL:
3085       break;
3086     default:
3087       break;
3088   }
3089
3090   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3091     result = GST_STATE_CHANGE_NO_PREROLL;
3092
3093   return result;
3094
3095   /* ERRORS */
3096 failure:
3097   {
3098     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3099     return result;
3100   }
3101 }