da724fa484b9955fed1b8ed8156bcfe7f641c045
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrc:Class.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include "gstbasesrc.h"
169 #include "gsttypefindhelper.h"
170 #include <gst/gstmarshal.h>
171 #include <gst/gst-i18n-lib.h>
172
173 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
174 #define GST_CAT_DEFAULT gst_base_src_debug
175
176 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
177 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
178 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
179 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
181 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
182 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
183                                                                                 timeval)
184 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
185 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
186
187 /* BaseSrc signals and args */
188 enum
189 {
190   /* FILL ME */
191   LAST_SIGNAL
192 };
193
194 #define DEFAULT_BLOCKSIZE       4096
195 #define DEFAULT_NUM_BUFFERS     -1
196 #define DEFAULT_TYPEFIND        FALSE
197 #define DEFAULT_DO_TIMESTAMP    FALSE
198
199 enum
200 {
201   PROP_0,
202   PROP_BLOCKSIZE,
203   PROP_NUM_BUFFERS,
204   PROP_TYPEFIND,
205   PROP_DO_TIMESTAMP
206 };
207
208 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
209    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
210
211 struct _GstBaseSrcPrivate
212 {
213   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
214                                  * to avoid the sending of two EOS in some cases) */
215   gboolean discont;
216   gboolean flushing;
217
218   /* two segments to be sent in the streaming thread with STREAM_LOCK */
219   GstEvent *close_segment;
220   GstEvent *start_segment;
221
222   /* if EOS is pending (atomic) */
223   gint pending_eos;
224
225   /* startup latency is the time it takes between going to PLAYING and producing
226    * the first BUFFER with running_time 0. This value is included in the latency
227    * reporting. */
228   GstClockTime latency;
229   /* timestamp offset, this is the offset add to the values of gst_times for
230    * pseudo live sources */
231   GstClockTimeDiff ts_offset;
232
233   gboolean do_timestamp;
234
235   /* stream sequence number */
236   guint32 seqnum;
237
238   /* pending tags to be pushed in the data stream */
239   GList *pending_tags;
240 };
241
242 static GstElementClass *parent_class = NULL;
243
244 static void gst_base_src_base_init (gpointer g_class);
245 static void gst_base_src_class_init (GstBaseSrcClass * klass);
246 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
247 static void gst_base_src_finalize (GObject * object);
248
249
250 GType
251 gst_base_src_get_type (void)
252 {
253   static volatile gsize base_src_type = 0;
254
255   if (g_once_init_enter (&base_src_type)) {
256     GType _type;
257     static const GTypeInfo base_src_info = {
258       sizeof (GstBaseSrcClass),
259       (GBaseInitFunc) gst_base_src_base_init,
260       NULL,
261       (GClassInitFunc) gst_base_src_class_init,
262       NULL,
263       NULL,
264       sizeof (GstBaseSrc),
265       0,
266       (GInstanceInitFunc) gst_base_src_init,
267     };
268
269     _type = g_type_register_static (GST_TYPE_ELEMENT,
270         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
271     g_once_init_leave (&base_src_type, _type);
272   }
273   return base_src_type;
274 }
275
276 static GstCaps *gst_base_src_getcaps (GstPad * pad);
277 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
278 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
279
280 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
281 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
282 static void gst_base_src_set_property (GObject * object, guint prop_id,
283     const GValue * value, GParamSpec * pspec);
284 static void gst_base_src_get_property (GObject * object, guint prop_id,
285     GValue * value, GParamSpec * pspec);
286 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
287 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
288 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
289 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
290
291 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
292
293 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
294 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
295     GstSegment * segment);
296 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
297 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
298     GstEvent * event, GstSegment * segment);
299
300 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
301     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
302 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
303 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
304
305 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
306     GstStateChange transition);
307
308 static void gst_base_src_loop (GstPad * pad);
309 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
310 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
311 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
312     guint length, GstBuffer ** buf);
313 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
314     guint length, GstBuffer ** buf);
315 static gboolean gst_base_src_seekable (GstBaseSrc * src);
316
317 static void
318 gst_base_src_base_init (gpointer g_class)
319 {
320   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
321 }
322
323 static void
324 gst_base_src_class_init (GstBaseSrcClass * klass)
325 {
326   GObjectClass *gobject_class;
327   GstElementClass *gstelement_class;
328
329   gobject_class = G_OBJECT_CLASS (klass);
330   gstelement_class = GST_ELEMENT_CLASS (klass);
331
332   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
333
334   parent_class = g_type_class_peek_parent (klass);
335
336   gobject_class->finalize = gst_base_src_finalize;
337   gobject_class->set_property = gst_base_src_set_property;
338   gobject_class->get_property = gst_base_src_get_property;
339
340   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
341       g_param_spec_ulong ("blocksize", "Block size",
342           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
343           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
344   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
345       g_param_spec_int ("num-buffers", "num-buffers",
346           "Number of buffers to output before sending EOS (-1 = unlimited)",
347           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
348           G_PARAM_STATIC_STRINGS));
349   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
350       g_param_spec_boolean ("typefind", "Typefind",
351           "Run typefind before negotiating", DEFAULT_TYPEFIND,
352           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
353   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
354       g_param_spec_boolean ("do-timestamp", "Do timestamp",
355           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
356           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
357
358   gstelement_class->change_state =
359       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
360   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
361   gstelement_class->get_query_types =
362       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
363
364   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
365   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
366   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
367   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
368   klass->check_get_range =
369       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
370   klass->prepare_seek_segment =
371       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
372
373   /* Registering debug symbols for function pointers */
374   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
375   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
376   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
377   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
378   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
379   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
380   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
381   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
383 }
384
385 static void
386 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
387 {
388   GstPad *pad;
389   GstPadTemplate *pad_template;
390
391   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
392
393   basesrc->is_live = FALSE;
394   basesrc->live_lock = g_mutex_new ();
395   basesrc->live_cond = g_cond_new ();
396   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
397   basesrc->num_buffers_left = -1;
398
399   basesrc->can_activate_push = TRUE;
400   basesrc->pad_mode = GST_ACTIVATE_NONE;
401
402   pad_template =
403       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
404   g_return_if_fail (pad_template != NULL);
405
406   GST_DEBUG_OBJECT (basesrc, "creating src pad");
407   pad = gst_pad_new_from_template (pad_template, "src");
408
409   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
410   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
411   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
412   gst_pad_set_event_function (pad, gst_base_src_event_handler);
413   gst_pad_set_query_function (pad, gst_base_src_query);
414   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
415   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
416   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
417   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
418   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
419
420   /* hold pointer to pad */
421   basesrc->srcpad = pad;
422   GST_DEBUG_OBJECT (basesrc, "adding src pad");
423   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
424
425   basesrc->blocksize = DEFAULT_BLOCKSIZE;
426   basesrc->clock_id = NULL;
427   /* we operate in BYTES by default */
428   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
429   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
430   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
431
432   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
433
434   GST_DEBUG_OBJECT (basesrc, "init done");
435 }
436
437 static void
438 gst_base_src_finalize (GObject * object)
439 {
440   GstBaseSrc *basesrc;
441   GstEvent **event_p;
442
443   basesrc = GST_BASE_SRC (object);
444
445   g_mutex_free (basesrc->live_lock);
446   g_cond_free (basesrc->live_cond);
447
448   event_p = &basesrc->data.ABI.pending_seek;
449   gst_event_replace (event_p, NULL);
450
451   if (basesrc->priv->pending_tags) {
452     g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
453     g_list_free (basesrc->priv->pending_tags);
454   }
455
456   G_OBJECT_CLASS (parent_class)->finalize (object);
457 }
458
459 /**
460  * gst_base_src_wait_playing:
461  * @src: the src
462  *
463  * If the #GstBaseSrcClass.create() method performs its own synchronisation
464  * against the clock it must unblock when going from PLAYING to the PAUSED state
465  * and call this method before continuing to produce the remaining data.
466  *
467  * This function will block until a state change to PLAYING happens (in which
468  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
469  * to a state change to READY or a FLUSH event (in which case this function
470  * returns #GST_FLOW_WRONG_STATE).
471  *
472  * Since: 0.10.12
473  *
474  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
475  * continue. Any other return value should be returned from the create vmethod.
476  */
477 GstFlowReturn
478 gst_base_src_wait_playing (GstBaseSrc * src)
479 {
480   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
481
482   /* block until the state changes, or we get a flush, or something */
483   GST_DEBUG_OBJECT (src, "live source waiting for running state");
484   GST_LIVE_WAIT (src);
485   if (src->priv->flushing)
486     goto flushing;
487   GST_DEBUG_OBJECT (src, "live source unlocked");
488
489   return GST_FLOW_OK;
490
491   /* ERRORS */
492 flushing:
493   {
494     GST_DEBUG_OBJECT (src, "we are flushing");
495     return GST_FLOW_WRONG_STATE;
496   }
497 }
498
499 /**
500  * gst_base_src_set_live:
501  * @src: base source instance
502  * @live: new live-mode
503  *
504  * If the element listens to a live source, @live should
505  * be set to %TRUE.
506  *
507  * A live source will not produce data in the PAUSED state and
508  * will therefore not be able to participate in the PREROLL phase
509  * of a pipeline. To signal this fact to the application and the
510  * pipeline, the state change return value of the live source will
511  * be GST_STATE_CHANGE_NO_PREROLL.
512  */
513 void
514 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
515 {
516   g_return_if_fail (GST_IS_BASE_SRC (src));
517
518   GST_OBJECT_LOCK (src);
519   src->is_live = live;
520   GST_OBJECT_UNLOCK (src);
521 }
522
523 /**
524  * gst_base_src_is_live:
525  * @src: base source instance
526  *
527  * Check if an element is in live mode.
528  *
529  * Returns: %TRUE if element is in live mode.
530  */
531 gboolean
532 gst_base_src_is_live (GstBaseSrc * src)
533 {
534   gboolean result;
535
536   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
537
538   GST_OBJECT_LOCK (src);
539   result = src->is_live;
540   GST_OBJECT_UNLOCK (src);
541
542   return result;
543 }
544
545 /**
546  * gst_base_src_set_format:
547  * @src: base source instance
548  * @format: the format to use
549  *
550  * Sets the default format of the source. This will be the format used
551  * for sending NEW_SEGMENT events and for performing seeks.
552  *
553  * If a format of GST_FORMAT_BYTES is set, the element will be able to
554  * operate in pull mode if the #GstBaseSrc.is_seekable() returns TRUE.
555  *
556  * Since: 0.10.1
557  */
558 void
559 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
560 {
561   g_return_if_fail (GST_IS_BASE_SRC (src));
562
563   GST_OBJECT_LOCK (src);
564   gst_segment_init (&src->segment, format);
565   GST_OBJECT_UNLOCK (src);
566 }
567
568 /**
569  * gst_base_src_query_latency:
570  * @src: the source
571  * @live: if the source is live
572  * @min_latency: the min latency of the source
573  * @max_latency: the max latency of the source
574  *
575  * Query the source for the latency parameters. @live will be TRUE when @src is
576  * configured as a live source. @min_latency will be set to the difference
577  * between the running time and the timestamp of the first buffer.
578  * @max_latency is always the undefined value of -1.
579  *
580  * This function is mostly used by subclasses.
581  *
582  * Returns: TRUE if the query succeeded.
583  *
584  * Since: 0.10.13
585  */
586 gboolean
587 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
588     GstClockTime * min_latency, GstClockTime * max_latency)
589 {
590   GstClockTime min;
591
592   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
593
594   GST_OBJECT_LOCK (src);
595   if (live)
596     *live = src->is_live;
597
598   /* if we have a startup latency, report this one, else report 0. Subclasses
599    * are supposed to override the query function if they want something
600    * else. */
601   if (src->priv->latency != -1)
602     min = src->priv->latency;
603   else
604     min = 0;
605
606   if (min_latency)
607     *min_latency = min;
608   if (max_latency)
609     *max_latency = -1;
610
611   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
612       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
613       GST_TIME_ARGS (-1));
614   GST_OBJECT_UNLOCK (src);
615
616   return TRUE;
617 }
618
619 /**
620  * gst_base_src_set_blocksize:
621  * @src: the source
622  * @blocksize: the new blocksize in bytes
623  *
624  * Set the number of bytes that @src will push out with each buffer. When
625  * @blocksize is set to -1, a default length will be used.
626  *
627  * Since: 0.10.22
628  */
629 void
630 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
631 {
632   g_return_if_fail (GST_IS_BASE_SRC (src));
633
634   GST_OBJECT_LOCK (src);
635   src->blocksize = blocksize;
636   GST_OBJECT_UNLOCK (src);
637 }
638
639 /**
640  * gst_base_src_get_blocksize:
641  * @src: the source
642  *
643  * Get the number of bytes that @src will push out with each buffer.
644  *
645  * Returns: the number of bytes pushed with each buffer.
646  *
647  * Since: 0.10.22
648  */
649 gulong
650 gst_base_src_get_blocksize (GstBaseSrc * src)
651 {
652   gulong res;
653
654   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
655
656   GST_OBJECT_LOCK (src);
657   res = src->blocksize;
658   GST_OBJECT_UNLOCK (src);
659
660   return res;
661 }
662
663
664 /**
665  * gst_base_src_set_do_timestamp:
666  * @src: the source
667  * @timestamp: enable or disable timestamping
668  *
669  * Configure @src to automatically timestamp outgoing buffers based on the
670  * current running_time of the pipeline. This property is mostly useful for live
671  * sources.
672  *
673  * Since: 0.10.15
674  */
675 void
676 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
677 {
678   g_return_if_fail (GST_IS_BASE_SRC (src));
679
680   GST_OBJECT_LOCK (src);
681   src->priv->do_timestamp = timestamp;
682   GST_OBJECT_UNLOCK (src);
683 }
684
685 /**
686  * gst_base_src_get_do_timestamp:
687  * @src: the source
688  *
689  * Query if @src timestamps outgoing buffers based on the current running_time.
690  *
691  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
692  *
693  * Since: 0.10.15
694  */
695 gboolean
696 gst_base_src_get_do_timestamp (GstBaseSrc * src)
697 {
698   gboolean res;
699
700   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
701
702   GST_OBJECT_LOCK (src);
703   res = src->priv->do_timestamp;
704   GST_OBJECT_UNLOCK (src);
705
706   return res;
707 }
708
709 /**
710  * gst_base_src_new_seamless_segment:
711  * @src: The source
712  * @start: The new start value for the segment
713  * @stop: Stop value for the new segment
714  * @position: The position value for the new segent
715  *
716  * Prepare a new seamless segment for emission downstream. This function must
717  * only be called by derived sub-classes, and only from the create() function,
718  * as the stream-lock needs to be held.
719  *
720  * The format for the new segment will be the current format of the source, as
721  * configured with gst_base_src_set_format()
722  *
723  * Returns: %TRUE if preparation of the seamless segment succeeded.
724  *
725  * Since: 0.10.26
726  */
727 gboolean
728 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
729     gint64 position)
730 {
731   gboolean res = TRUE;
732
733   GST_DEBUG_OBJECT (src,
734       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
735       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
736       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
737
738   GST_OBJECT_LOCK (src);
739   if (src->data.ABI.running) {
740     if (src->priv->close_segment)
741       gst_event_unref (src->priv->close_segment);
742     src->priv->close_segment =
743         gst_event_new_new_segment_full (TRUE,
744         src->segment.rate, src->segment.applied_rate, src->segment.format,
745         src->segment.start, src->segment.last_stop, src->segment.time);
746   }
747
748   gst_segment_set_newsegment_full (&src->segment, FALSE, src->segment.rate,
749       src->segment.applied_rate, src->segment.format, start, stop, position);
750
751   if (src->priv->start_segment)
752     gst_event_unref (src->priv->start_segment);
753   if (src->segment.rate >= 0.0) {
754     /* forward, we send data from last_stop to stop */
755     src->priv->start_segment =
756         gst_event_new_new_segment_full (FALSE,
757         src->segment.rate, src->segment.applied_rate, src->segment.format,
758         src->segment.last_stop, stop, src->segment.time);
759   } else {
760     /* reverse, we send data from last_stop to start */
761     src->priv->start_segment =
762         gst_event_new_new_segment_full (FALSE,
763         src->segment.rate, src->segment.applied_rate, src->segment.format,
764         src->segment.start, src->segment.last_stop, src->segment.time);
765   }
766   GST_OBJECT_UNLOCK (src);
767
768   src->priv->discont = TRUE;
769   src->data.ABI.running = TRUE;
770
771   return res;
772 }
773
774 static gboolean
775 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
776 {
777   GstBaseSrcClass *bclass;
778   GstBaseSrc *bsrc;
779   gboolean res = TRUE;
780
781   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
782   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
783
784   if (bclass->set_caps)
785     res = bclass->set_caps (bsrc, caps);
786
787   return res;
788 }
789
790 static GstCaps *
791 gst_base_src_getcaps (GstPad * pad)
792 {
793   GstBaseSrcClass *bclass;
794   GstBaseSrc *bsrc;
795   GstCaps *caps = NULL;
796
797   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
798   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
799   if (bclass->get_caps)
800     caps = bclass->get_caps (bsrc);
801
802   if (caps == NULL) {
803     GstPadTemplate *pad_template;
804
805     pad_template =
806         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
807     if (pad_template != NULL) {
808       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
809     }
810   }
811   return caps;
812 }
813
814 static void
815 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
816 {
817   GstBaseSrcClass *bclass;
818   GstBaseSrc *bsrc;
819
820   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
821   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
822
823   if (bclass->fixate)
824     bclass->fixate (bsrc, caps);
825
826   gst_object_unref (bsrc);
827 }
828
829 static gboolean
830 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
831 {
832   gboolean res;
833
834   switch (GST_QUERY_TYPE (query)) {
835     case GST_QUERY_POSITION:
836     {
837       GstFormat format;
838
839       gst_query_parse_position (query, &format, NULL);
840       switch (format) {
841         case GST_FORMAT_PERCENT:
842         {
843           gint64 percent;
844           gint64 position;
845           gint64 duration;
846
847           GST_OBJECT_LOCK (src);
848           position = src->segment.last_stop;
849           duration = src->segment.duration;
850           GST_OBJECT_UNLOCK (src);
851
852           if (position != -1 && duration != -1) {
853             if (position < duration)
854               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
855                   duration);
856             else
857               percent = GST_FORMAT_PERCENT_MAX;
858           } else
859             percent = -1;
860
861           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
862           res = TRUE;
863           break;
864         }
865         default:
866         {
867           gint64 position;
868           GstFormat seg_format;
869
870           GST_OBJECT_LOCK (src);
871           position = src->segment.last_stop;
872           seg_format = src->segment.format;
873           GST_OBJECT_UNLOCK (src);
874
875           if (position != -1) {
876             /* convert to requested format */
877             res =
878                 gst_pad_query_convert (src->srcpad, seg_format,
879                 position, &format, &position);
880           } else
881             res = TRUE;
882
883           gst_query_set_position (query, format, position);
884           break;
885         }
886       }
887       break;
888     }
889     case GST_QUERY_DURATION:
890     {
891       GstFormat format;
892
893       gst_query_parse_duration (query, &format, NULL);
894
895       GST_DEBUG_OBJECT (src, "duration query in format %s",
896           gst_format_get_name (format));
897
898       switch (format) {
899         case GST_FORMAT_PERCENT:
900           gst_query_set_duration (query, GST_FORMAT_PERCENT,
901               GST_FORMAT_PERCENT_MAX);
902           res = TRUE;
903           break;
904         default:
905         {
906           gint64 duration;
907           GstFormat seg_format;
908
909           GST_OBJECT_LOCK (src);
910           /* this is the duration as configured by the subclass. */
911           duration = src->segment.duration;
912           seg_format = src->segment.format;
913           GST_OBJECT_UNLOCK (src);
914
915           if (duration != -1) {
916             /* convert to requested format, if this fails, we have a duration
917              * but we cannot answer the query, we must return FALSE. */
918             res =
919                 gst_pad_query_convert (src->srcpad, seg_format,
920                 duration, &format, &duration);
921           } else {
922             /* The subclass did not configure a duration, we assume that the
923              * media has an unknown duration then and we return TRUE to report
924              * this. Note that this is not the same as returning FALSE, which
925              * means that we cannot report the duration at all. */
926             res = TRUE;
927           }
928           gst_query_set_duration (query, format, duration);
929           break;
930         }
931       }
932       break;
933     }
934
935     case GST_QUERY_SEEKING:
936     {
937       GstFormat format, seg_format;
938       gint64 duration;
939
940       GST_OBJECT_LOCK (src);
941       duration = src->segment.duration;
942       seg_format = src->segment.format;
943       GST_OBJECT_UNLOCK (src);
944
945       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
946       if (format == seg_format) {
947         gst_query_set_seeking (query, seg_format,
948             gst_base_src_seekable (src), 0, duration);
949         res = TRUE;
950       } else {
951         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
952         /* Don't reply to the query to make up for demuxers which don't
953          * handle the SEEKING query yet. Players like Totem will fall back
954          * to the duration when the SEEKING query isn't answered. */
955         res = FALSE;
956       }
957       break;
958     }
959     case GST_QUERY_SEGMENT:
960     {
961       gint64 start, stop;
962
963       GST_OBJECT_LOCK (src);
964       /* no end segment configured, current duration then */
965       if ((stop = src->segment.stop) == -1)
966         stop = src->segment.duration;
967       start = src->segment.start;
968
969       /* adjust to stream time */
970       if (src->segment.time != -1) {
971         start -= src->segment.time;
972         if (stop != -1)
973           stop -= src->segment.time;
974       }
975       gst_query_set_segment (query, src->segment.rate, src->segment.format,
976           start, stop);
977       GST_OBJECT_UNLOCK (src);
978       res = TRUE;
979       break;
980     }
981
982     case GST_QUERY_FORMATS:
983     {
984       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
985           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
986       res = TRUE;
987       break;
988     }
989     case GST_QUERY_CONVERT:
990     {
991       GstFormat src_fmt, dest_fmt;
992       gint64 src_val, dest_val;
993
994       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
995
996       /* we can only convert between equal formats... */
997       if (src_fmt == dest_fmt) {
998         dest_val = src_val;
999         res = TRUE;
1000       } else
1001         res = FALSE;
1002
1003       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1004       break;
1005     }
1006     case GST_QUERY_LATENCY:
1007     {
1008       GstClockTime min, max;
1009       gboolean live;
1010
1011       /* Subclasses should override and implement something usefull */
1012       res = gst_base_src_query_latency (src, &live, &min, &max);
1013
1014       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1015           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1016           GST_TIME_ARGS (max));
1017
1018       gst_query_set_latency (query, live, min, max);
1019       break;
1020     }
1021     case GST_QUERY_JITTER:
1022     case GST_QUERY_RATE:
1023       res = FALSE;
1024       break;
1025     case GST_QUERY_BUFFERING:
1026     {
1027       GstFormat format, seg_format;
1028       gint64 start, stop, estimated;
1029
1030       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1031
1032       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1033           gst_format_get_name (format));
1034
1035       GST_OBJECT_LOCK (src);
1036       if (src->random_access) {
1037         estimated = 0;
1038         start = 0;
1039         if (format == GST_FORMAT_PERCENT)
1040           stop = GST_FORMAT_PERCENT_MAX;
1041         else
1042           stop = src->segment.duration;
1043       } else {
1044         estimated = -1;
1045         start = -1;
1046         stop = -1;
1047       }
1048       seg_format = src->segment.format;
1049       GST_OBJECT_UNLOCK (src);
1050
1051       /* convert to required format. When the conversion fails, we can't answer
1052        * the query. When the value is unknown, we can don't perform conversion
1053        * but report TRUE. */
1054       if (format != GST_FORMAT_PERCENT && stop != -1) {
1055         res = gst_pad_query_convert (src->srcpad, seg_format,
1056             stop, &format, &stop);
1057       } else {
1058         res = TRUE;
1059       }
1060       if (res && format != GST_FORMAT_PERCENT && start != -1)
1061         res = gst_pad_query_convert (src->srcpad, seg_format,
1062             start, &format, &start);
1063
1064       gst_query_set_buffering_range (query, format, start, stop, estimated);
1065       break;
1066     }
1067     default:
1068       res = FALSE;
1069       break;
1070   }
1071   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1072       res);
1073   return res;
1074 }
1075
1076 static gboolean
1077 gst_base_src_query (GstPad * pad, GstQuery * query)
1078 {
1079   GstBaseSrc *src;
1080   GstBaseSrcClass *bclass;
1081   gboolean result = FALSE;
1082
1083   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1084
1085   bclass = GST_BASE_SRC_GET_CLASS (src);
1086
1087   if (bclass->query)
1088     result = bclass->query (src, query);
1089   else
1090     result = gst_pad_query_default (pad, query);
1091
1092   gst_object_unref (src);
1093
1094   return result;
1095 }
1096
1097 static gboolean
1098 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1099 {
1100   gboolean res = TRUE;
1101
1102   /* update our offset if the start/stop position was updated */
1103   if (segment->format == GST_FORMAT_BYTES) {
1104     segment->time = segment->start;
1105   } else if (segment->start == 0) {
1106     /* seek to start, we can implement a default for this. */
1107     segment->time = 0;
1108   } else {
1109     res = FALSE;
1110     GST_INFO_OBJECT (src, "Can't do a default seek");
1111   }
1112
1113   return res;
1114 }
1115
1116 static gboolean
1117 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1118 {
1119   GstBaseSrcClass *bclass;
1120   gboolean result = FALSE;
1121
1122   bclass = GST_BASE_SRC_GET_CLASS (src);
1123
1124   if (bclass->do_seek)
1125     result = bclass->do_seek (src, segment);
1126
1127   return result;
1128 }
1129
1130 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1131
1132 static gboolean
1133 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1134     GstSegment * segment)
1135 {
1136   /* By default, we try one of 2 things:
1137    *   - For absolute seek positions, convert the requested position to our
1138    *     configured processing format and place it in the output segment \
1139    *   - For relative seek positions, convert our current (input) values to the
1140    *     seek format, adjust by the relative seek offset and then convert back to
1141    *     the processing format
1142    */
1143   GstSeekType cur_type, stop_type;
1144   gint64 cur, stop;
1145   GstSeekFlags flags;
1146   GstFormat seek_format, dest_format;
1147   gdouble rate;
1148   gboolean update;
1149   gboolean res = TRUE;
1150
1151   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1152       &cur_type, &cur, &stop_type, &stop);
1153   dest_format = segment->format;
1154
1155   if (seek_format == dest_format) {
1156     gst_segment_set_seek (segment, rate, seek_format, flags,
1157         cur_type, cur, stop_type, stop, &update);
1158     return TRUE;
1159   }
1160
1161   if (cur_type != GST_SEEK_TYPE_NONE) {
1162     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1163     res =
1164         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1165         &cur);
1166     cur_type = GST_SEEK_TYPE_SET;
1167   }
1168
1169   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1170     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1171     res =
1172         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1173         &stop);
1174     stop_type = GST_SEEK_TYPE_SET;
1175   }
1176
1177   /* And finally, configure our output segment in the desired format */
1178   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1179       stop_type, stop, &update);
1180
1181   if (!res)
1182     goto no_format;
1183
1184   return res;
1185
1186 no_format:
1187   {
1188     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1189     return FALSE;
1190   }
1191 }
1192
1193 static gboolean
1194 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1195     GstSegment * seeksegment)
1196 {
1197   GstBaseSrcClass *bclass;
1198   gboolean result = FALSE;
1199
1200   bclass = GST_BASE_SRC_GET_CLASS (src);
1201
1202   if (bclass->prepare_seek_segment)
1203     result = bclass->prepare_seek_segment (src, event, seeksegment);
1204
1205   return result;
1206 }
1207
1208 /* this code implements the seeking. It is a good example
1209  * handling all cases.
1210  *
1211  * A seek updates the currently configured segment.start
1212  * and segment.stop values based on the SEEK_TYPE. If the
1213  * segment.start value is updated, a seek to this new position
1214  * should be performed.
1215  *
1216  * The seek can only be executed when we are not currently
1217  * streaming any data, to make sure that this is the case, we
1218  * acquire the STREAM_LOCK which is taken when we are in the
1219  * _loop() function or when a getrange() is called. Normally
1220  * we will not receive a seek if we are operating in pull mode
1221  * though. When we operate as a live source we might block on the live
1222  * cond, which does not release the STREAM_LOCK. Therefore we will try
1223  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1224  * safe to perform the seek.
1225  *
1226  * When we are in the loop() function, we might be in the middle
1227  * of pushing a buffer, which might block in a sink. To make sure
1228  * that the push gets unblocked we push out a FLUSH_START event.
1229  * Our loop function will get a WRONG_STATE return value from
1230  * the push and will pause, effectively releasing the STREAM_LOCK.
1231  *
1232  * For a non-flushing seek, we pause the task, which might eventually
1233  * release the STREAM_LOCK. We say eventually because when the sink
1234  * blocks on the sample we might wait a very long time until the sink
1235  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1236  * can continue the seek. A non-flushing seek is normally done in a
1237  * running pipeline to perform seamless playback, this means that the sink is
1238  * PLAYING and will return from its chain function.
1239  * In the case of a non-flushing seek we need to make sure that the
1240  * data we output after the seek is continuous with the previous data,
1241  * this is because a non-flushing seek does not reset the running-time
1242  * to 0. We do this by closing the currently running segment, ie. sending
1243  * a new_segment event with the stop position set to the last processed
1244  * position.
1245  *
1246  * After updating the segment.start/stop values, we prepare for
1247  * streaming again. We push out a FLUSH_STOP to make the peer pad
1248  * accept data again and we start our task again.
1249  *
1250  * A segment seek posts a message on the bus saying that the playback
1251  * of the segment started. We store the segment flag internally because
1252  * when we reach the segment.stop we have to post a segment.done
1253  * instead of EOS when doing a segment seek.
1254  */
1255 /* FIXME (0.11), we have the unlock gboolean here because most current
1256  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1257  * the streaming thread isn't running, resulting in bogus unlocks later when it
1258  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1259  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1260  * until 0.11
1261  */
1262 static gboolean
1263 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1264 {
1265   gboolean res = TRUE, tres;
1266   gdouble rate;
1267   GstFormat seek_format, dest_format;
1268   GstSeekFlags flags;
1269   GstSeekType cur_type, stop_type;
1270   gint64 cur, stop;
1271   gboolean flush, playing;
1272   gboolean update;
1273   gboolean relative_seek = FALSE;
1274   gboolean seekseg_configured = FALSE;
1275   GstSegment seeksegment;
1276   guint32 seqnum;
1277   GstEvent *tevent;
1278
1279   GST_DEBUG_OBJECT (src, "doing seek");
1280
1281   GST_OBJECT_LOCK (src);
1282   dest_format = src->segment.format;
1283   GST_OBJECT_UNLOCK (src);
1284
1285   if (event) {
1286     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1287         &cur_type, &cur, &stop_type, &stop);
1288
1289     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1290         SEEK_TYPE_IS_RELATIVE (stop_type);
1291
1292     if (dest_format != seek_format && !relative_seek) {
1293       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1294        * here before taking the stream lock, otherwise we must convert it later,
1295        * once we have the stream lock and can read the last configures segment
1296        * start and stop positions */
1297       gst_segment_init (&seeksegment, dest_format);
1298
1299       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1300         goto prepare_failed;
1301
1302       seekseg_configured = TRUE;
1303     }
1304
1305     flush = flags & GST_SEEK_FLAG_FLUSH;
1306     seqnum = gst_event_get_seqnum (event);
1307   } else {
1308     flush = FALSE;
1309     /* get next seqnum */
1310     seqnum = gst_util_seqnum_next ();
1311   }
1312
1313   /* send flush start */
1314   if (flush) {
1315     tevent = gst_event_new_flush_start ();
1316     gst_event_set_seqnum (tevent, seqnum);
1317     gst_pad_push_event (src->srcpad, tevent);
1318   } else
1319     gst_pad_pause_task (src->srcpad);
1320
1321   /* unblock streaming thread. */
1322   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1323
1324   /* grab streaming lock, this should eventually be possible, either
1325    * because the task is paused, our streaming thread stopped
1326    * or because our peer is flushing. */
1327   GST_PAD_STREAM_LOCK (src->srcpad);
1328   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1329     /* we have seen this event before, issue a warning for now */
1330     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1331         seqnum);
1332   } else {
1333     src->priv->seqnum = seqnum;
1334     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1335   }
1336
1337   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1338
1339   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1340    * copy the current segment info into the temp segment that we can actually
1341    * attempt the seek with. We only update the real segment if the seek suceeds. */
1342   if (!seekseg_configured) {
1343     GST_OBJECT_LOCK (src);
1344     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1345     GST_OBJECT_UNLOCK (src);
1346
1347     /* now configure the final seek segment */
1348     if (event) {
1349       if (seeksegment.format != seek_format) {
1350         /* OK, here's where we give the subclass a chance to convert the relative
1351          * seek into an absolute one in the processing format. We set up any
1352          * absolute seek above, before taking the stream lock. */
1353         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1354           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1355               "Aborting seek");
1356           res = FALSE;
1357         }
1358       } else {
1359         /* The seek format matches our processing format, no need to ask the
1360          * the subclass to configure the segment. */
1361         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1362             cur_type, cur, stop_type, stop, &update);
1363       }
1364     }
1365     /* Else, no seek event passed, so we're just (re)starting the
1366        current segment. */
1367   }
1368
1369   if (res) {
1370     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1371         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1372         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1373
1374     /* do the seek, segment.last_stop contains the new position. */
1375     res = gst_base_src_do_seek (src, &seeksegment);
1376   }
1377
1378   /* and prepare to continue streaming */
1379   if (flush) {
1380     tevent = gst_event_new_flush_stop ();
1381     gst_event_set_seqnum (tevent, seqnum);
1382     /* send flush stop, peer will accept data and events again. We
1383      * are not yet providing data as we still have the STREAM_LOCK. */
1384     gst_pad_push_event (src->srcpad, tevent);
1385   } else if (res && src->data.ABI.running) {
1386     GST_OBJECT_LOCK (src);
1387     /* we are running the current segment and doing a non-flushing seek,
1388      * close the segment first based on the last_stop. */
1389     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1390         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1391
1392     /* queue the segment for sending in the stream thread */
1393     if (src->priv->close_segment)
1394       gst_event_unref (src->priv->close_segment);
1395     src->priv->close_segment =
1396         gst_event_new_new_segment_full (TRUE,
1397         src->segment.rate, src->segment.applied_rate, src->segment.format,
1398         src->segment.start, src->segment.last_stop, src->segment.time);
1399     gst_event_set_seqnum (src->priv->close_segment, seqnum);
1400     GST_OBJECT_UNLOCK (src);
1401   }
1402
1403   /* The subclass must have converted the segment to the processing format
1404    * by now */
1405   if (res && seeksegment.format != dest_format) {
1406     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1407         "in the correct format. Aborting seek.");
1408     res = FALSE;
1409   }
1410
1411   /* if the seek was successful, we update our real segment and push
1412    * out the new segment. */
1413   if (res) {
1414     GST_OBJECT_LOCK (src);
1415     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1416     GST_OBJECT_UNLOCK (src);
1417
1418     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1419       GstMessage *message;
1420
1421       message = gst_message_new_segment_start (GST_OBJECT (src),
1422           seeksegment.format, seeksegment.last_stop);
1423       gst_message_set_seqnum (message, seqnum);
1424
1425       gst_element_post_message (GST_ELEMENT (src), message);
1426     }
1427
1428     /* for deriving a stop position for the playback segment from the seek
1429      * segment, we must take the duration when the stop is not set */
1430     if ((stop = seeksegment.stop) == -1)
1431       stop = seeksegment.duration;
1432
1433     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1434         " to %" G_GINT64_FORMAT, seeksegment.start, stop);
1435
1436     /* now replace the old segment so that we send it in the stream thread the
1437      * next time it is scheduled. */
1438     if (src->priv->start_segment)
1439       gst_event_unref (src->priv->start_segment);
1440     if (seeksegment.rate >= 0.0) {
1441       /* forward, we send data from last_stop to stop */
1442       src->priv->start_segment =
1443           gst_event_new_new_segment_full (FALSE,
1444           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1445           seeksegment.last_stop, stop, seeksegment.time);
1446     } else {
1447       /* reverse, we send data from last_stop to start */
1448       src->priv->start_segment =
1449           gst_event_new_new_segment_full (FALSE,
1450           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1451           seeksegment.start, seeksegment.last_stop, seeksegment.time);
1452     }
1453     gst_event_set_seqnum (src->priv->start_segment, seqnum);
1454   }
1455
1456   src->priv->discont = TRUE;
1457   src->data.ABI.running = TRUE;
1458   /* and restart the task in case it got paused explicitly or by
1459    * the FLUSH_START event we pushed out. */
1460   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1461       src->srcpad);
1462   if (res && !tres)
1463     res = FALSE;
1464
1465   /* and release the lock again so we can continue streaming */
1466   GST_PAD_STREAM_UNLOCK (src->srcpad);
1467
1468   return res;
1469
1470   /* ERROR */
1471 prepare_failed:
1472   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1473       "Aborting seek");
1474   return FALSE;
1475 }
1476
1477 static const GstQueryType *
1478 gst_base_src_get_query_types (GstElement * element)
1479 {
1480   static const GstQueryType query_types[] = {
1481     GST_QUERY_DURATION,
1482     GST_QUERY_POSITION,
1483     GST_QUERY_SEEKING,
1484     GST_QUERY_SEGMENT,
1485     GST_QUERY_FORMATS,
1486     GST_QUERY_LATENCY,
1487     GST_QUERY_JITTER,
1488     GST_QUERY_RATE,
1489     GST_QUERY_CONVERT,
1490     0
1491   };
1492
1493   return query_types;
1494 }
1495
1496 /* all events send to this element directly. This is mainly done from the
1497  * application.
1498  */
1499 static gboolean
1500 gst_base_src_send_event (GstElement * element, GstEvent * event)
1501 {
1502   GstBaseSrc *src;
1503   gboolean result = FALSE;
1504
1505   src = GST_BASE_SRC (element);
1506
1507   GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
1508
1509   switch (GST_EVENT_TYPE (event)) {
1510       /* bidirectional events */
1511     case GST_EVENT_FLUSH_START:
1512     case GST_EVENT_FLUSH_STOP:
1513       /* sending random flushes downstream can break stuff,
1514        * especially sync since all segment info will get flushed */
1515       break;
1516
1517       /* downstream serialized events */
1518     case GST_EVENT_EOS:
1519     {
1520       GstBaseSrcClass *bclass;
1521
1522       bclass = GST_BASE_SRC_GET_CLASS (src);
1523
1524       /* queue EOS and make sure the task or pull function performs the EOS
1525        * actions.
1526        *
1527        * We have two possibilities:
1528        *
1529        *  - Before we are to enter the _create function, we check the pending_eos
1530        *    first and do EOS instead of entering it.
1531        *  - If we are in the _create function or we did not manage to set the
1532        *    flag fast enough and we are about to enter the _create function,
1533        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1534        *    check the EOS flag and do the EOS logic.
1535        */
1536       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1537       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1538
1539       /* unlock the _create function so that we can check the pending_eos flag
1540        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1541        * that we can grab it and stop the unlock again. We don't take the stream
1542        * lock so that this operation is guaranteed to never block. */
1543       if (bclass->unlock)
1544         bclass->unlock (src);
1545
1546       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1547
1548       GST_LIVE_LOCK (src);
1549       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1550       /* now stop the unlock of the streaming thread again. Grabbing the live
1551        * lock is enough because that protects the create function. */
1552       if (bclass->unlock_stop)
1553         bclass->unlock_stop (src);
1554       GST_LIVE_UNLOCK (src);
1555
1556       result = TRUE;
1557       break;
1558     }
1559     case GST_EVENT_NEWSEGMENT:
1560       /* sending random NEWSEGMENT downstream can break sync. */
1561       break;
1562     case GST_EVENT_TAG:
1563       /* Insert tag in the dataflow */
1564       GST_OBJECT_LOCK (src);
1565       src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
1566       GST_OBJECT_UNLOCK (src);
1567       event = NULL;
1568       result = TRUE;
1569       break;
1570     case GST_EVENT_BUFFERSIZE:
1571       /* does not seem to make much sense currently */
1572       break;
1573
1574       /* upstream events */
1575     case GST_EVENT_QOS:
1576       /* elements should override send_event and do something */
1577       break;
1578     case GST_EVENT_SEEK:
1579     {
1580       gboolean started;
1581
1582       GST_OBJECT_LOCK (src->srcpad);
1583       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1584         goto wrong_mode;
1585       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1586       GST_OBJECT_UNLOCK (src->srcpad);
1587
1588       if (started) {
1589         GST_DEBUG_OBJECT (src, "performing seek");
1590         /* when we are running in push mode, we can execute the
1591          * seek right now, we need to unlock. */
1592         result = gst_base_src_perform_seek (src, event, TRUE);
1593       } else {
1594         GstEvent **event_p;
1595
1596         /* else we store the event and execute the seek when we
1597          * get activated */
1598         GST_OBJECT_LOCK (src);
1599         GST_DEBUG_OBJECT (src, "queueing seek");
1600         event_p = &src->data.ABI.pending_seek;
1601         gst_event_replace ((GstEvent **) event_p, event);
1602         GST_OBJECT_UNLOCK (src);
1603         /* assume the seek will work */
1604         result = TRUE;
1605       }
1606       break;
1607     }
1608     case GST_EVENT_NAVIGATION:
1609       /* could make sense for elements that do something with navigation events
1610        * but then they would need to override the send_event function */
1611       break;
1612     case GST_EVENT_LATENCY:
1613       /* does not seem to make sense currently */
1614       break;
1615
1616       /* custom events */
1617     case GST_EVENT_CUSTOM_UPSTREAM:
1618       /* override send_event if you want this */
1619       break;
1620     case GST_EVENT_CUSTOM_DOWNSTREAM:
1621     case GST_EVENT_CUSTOM_BOTH:
1622       /* FIXME, insert event in the dataflow */
1623       break;
1624     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1625     case GST_EVENT_CUSTOM_BOTH_OOB:
1626       /* insert a random custom event into the pipeline */
1627       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1628       result = gst_pad_push_event (src->srcpad, event);
1629       /* we gave away the ref to the event in the push */
1630       event = NULL;
1631       break;
1632     default:
1633       break;
1634   }
1635 done:
1636   /* if we still have a ref to the event, unref it now */
1637   if (event)
1638     gst_event_unref (event);
1639
1640   return result;
1641
1642   /* ERRORS */
1643 wrong_mode:
1644   {
1645     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1646     GST_OBJECT_UNLOCK (src->srcpad);
1647     result = FALSE;
1648     goto done;
1649   }
1650 }
1651
1652 static gboolean
1653 gst_base_src_seekable (GstBaseSrc * src)
1654 {
1655   GstBaseSrcClass *bclass;
1656   bclass = GST_BASE_SRC_GET_CLASS (src);
1657   if (bclass->is_seekable)
1658     return bclass->is_seekable (src);
1659   else
1660     return FALSE;
1661 }
1662
1663 static gboolean
1664 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1665 {
1666   gboolean result;
1667
1668   switch (GST_EVENT_TYPE (event)) {
1669     case GST_EVENT_SEEK:
1670       /* is normally called when in push mode */
1671       if (!gst_base_src_seekable (src))
1672         goto not_seekable;
1673
1674       result = gst_base_src_perform_seek (src, event, TRUE);
1675       break;
1676     case GST_EVENT_FLUSH_START:
1677       /* cancel any blocking getrange, is normally called
1678        * when in pull mode. */
1679       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1680       break;
1681     case GST_EVENT_FLUSH_STOP:
1682       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1683       break;
1684     default:
1685       result = TRUE;
1686       break;
1687   }
1688   return result;
1689
1690   /* ERRORS */
1691 not_seekable:
1692   {
1693     GST_DEBUG_OBJECT (src, "is not seekable");
1694     return FALSE;
1695   }
1696 }
1697
1698 static gboolean
1699 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1700 {
1701   GstBaseSrc *src;
1702   GstBaseSrcClass *bclass;
1703   gboolean result = FALSE;
1704
1705   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1706   bclass = GST_BASE_SRC_GET_CLASS (src);
1707
1708   if (bclass->event) {
1709     if (!(result = bclass->event (src, event)))
1710       goto subclass_failed;
1711   }
1712
1713 done:
1714   gst_event_unref (event);
1715   gst_object_unref (src);
1716
1717   return result;
1718
1719   /* ERRORS */
1720 subclass_failed:
1721   {
1722     GST_DEBUG_OBJECT (src, "subclass refused event");
1723     goto done;
1724   }
1725 }
1726
1727 static void
1728 gst_base_src_set_property (GObject * object, guint prop_id,
1729     const GValue * value, GParamSpec * pspec)
1730 {
1731   GstBaseSrc *src;
1732
1733   src = GST_BASE_SRC (object);
1734
1735   switch (prop_id) {
1736     case PROP_BLOCKSIZE:
1737       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1738       break;
1739     case PROP_NUM_BUFFERS:
1740       src->num_buffers = g_value_get_int (value);
1741       break;
1742     case PROP_TYPEFIND:
1743       src->data.ABI.typefind = g_value_get_boolean (value);
1744       break;
1745     case PROP_DO_TIMESTAMP:
1746       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1747       break;
1748     default:
1749       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1750       break;
1751   }
1752 }
1753
1754 static void
1755 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1756     GParamSpec * pspec)
1757 {
1758   GstBaseSrc *src;
1759
1760   src = GST_BASE_SRC (object);
1761
1762   switch (prop_id) {
1763     case PROP_BLOCKSIZE:
1764       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1765       break;
1766     case PROP_NUM_BUFFERS:
1767       g_value_set_int (value, src->num_buffers);
1768       break;
1769     case PROP_TYPEFIND:
1770       g_value_set_boolean (value, src->data.ABI.typefind);
1771       break;
1772     case PROP_DO_TIMESTAMP:
1773       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1774       break;
1775     default:
1776       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1777       break;
1778   }
1779 }
1780
1781 /* with STREAM_LOCK and LOCK */
1782 static GstClockReturn
1783 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1784 {
1785   GstClockReturn ret;
1786   GstClockID id;
1787
1788   id = gst_clock_new_single_shot_id (clock, time);
1789
1790   basesrc->clock_id = id;
1791   /* release the live lock while waiting */
1792   GST_LIVE_UNLOCK (basesrc);
1793
1794   ret = gst_clock_id_wait (id, NULL);
1795
1796   GST_LIVE_LOCK (basesrc);
1797   gst_clock_id_unref (id);
1798   basesrc->clock_id = NULL;
1799
1800   return ret;
1801 }
1802
1803 /* perform synchronisation on a buffer.
1804  * with STREAM_LOCK.
1805  */
1806 static GstClockReturn
1807 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1808 {
1809   GstClockReturn result;
1810   GstClockTime start, end;
1811   GstBaseSrcClass *bclass;
1812   GstClockTime base_time;
1813   GstClock *clock;
1814   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1815   gboolean do_timestamp, first, pseudo_live;
1816
1817   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1818
1819   start = end = -1;
1820   if (bclass->get_times)
1821     bclass->get_times (basesrc, buffer, &start, &end);
1822
1823   /* get buffer timestamp */
1824   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1825
1826   /* grab the lock to prepare for clocking and calculate the startup
1827    * latency. */
1828   GST_OBJECT_LOCK (basesrc);
1829
1830   /* if we are asked to sync against the clock we are a pseudo live element */
1831   pseudo_live = (start != -1 && basesrc->is_live);
1832   /* check for the first buffer */
1833   first = (basesrc->priv->latency == -1);
1834
1835   if (timestamp != -1 && pseudo_live) {
1836     GstClockTime latency;
1837
1838     /* we have a timestamp and a sync time, latency is the diff */
1839     if (timestamp <= start)
1840       latency = start - timestamp;
1841     else
1842       latency = 0;
1843
1844     if (first) {
1845       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1846           GST_TIME_ARGS (latency));
1847       /* first time we calculate latency, just configure */
1848       basesrc->priv->latency = latency;
1849     } else {
1850       if (basesrc->priv->latency != latency) {
1851         /* we have a new latency, FIXME post latency message */
1852         basesrc->priv->latency = latency;
1853         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1854             GST_TIME_ARGS (latency));
1855       }
1856     }
1857   } else if (first) {
1858     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1859         basesrc->is_live, start != -1);
1860     basesrc->priv->latency = 0;
1861   }
1862
1863   /* get clock, if no clock, we can't sync or do timestamps */
1864   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1865     goto no_clock;
1866
1867   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1868
1869   do_timestamp = basesrc->priv->do_timestamp;
1870
1871   /* first buffer, calculate the timestamp offset */
1872   if (first) {
1873     GstClockTime running_time;
1874
1875     now = gst_clock_get_time (clock);
1876     running_time = now - base_time;
1877
1878     GST_LOG_OBJECT (basesrc,
1879         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1880         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1881         GST_TIME_ARGS (running_time));
1882
1883     if (pseudo_live && timestamp != -1) {
1884       /* live source and we need to sync, add startup latency to all timestamps
1885        * to get the real running_time. Live sources should always timestamp
1886        * according to the current running time. */
1887       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1888
1889       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1890           GST_TIME_ARGS (basesrc->priv->ts_offset));
1891     } else {
1892       basesrc->priv->ts_offset = 0;
1893       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1894     }
1895
1896     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1897       if (do_timestamp)
1898         timestamp = running_time;
1899       else
1900         timestamp = 0;
1901
1902       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1903
1904       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1905           GST_TIME_ARGS (timestamp));
1906     }
1907
1908     /* add the timestamp offset we need for sync */
1909     timestamp += basesrc->priv->ts_offset;
1910   } else {
1911     /* not the first buffer, the timestamp is the diff between the clock and
1912      * base_time */
1913     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1914       now = gst_clock_get_time (clock);
1915
1916       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1917
1918       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1919           GST_TIME_ARGS (now - base_time));
1920     }
1921   }
1922
1923   /* if we don't have a buffer timestamp, we don't sync */
1924   if (!GST_CLOCK_TIME_IS_VALID (start))
1925     goto no_sync;
1926
1927   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1928     /* for pseudo live sources, add our ts_offset to the timestamp */
1929     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1930     start += basesrc->priv->ts_offset;
1931   }
1932
1933   GST_LOG_OBJECT (basesrc,
1934       "waiting for clock, base time %" GST_TIME_FORMAT
1935       ", stream_start %" GST_TIME_FORMAT,
1936       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1937   GST_OBJECT_UNLOCK (basesrc);
1938
1939   result = gst_base_src_wait (basesrc, clock, start + base_time);
1940
1941   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1942
1943   return result;
1944
1945   /* special cases */
1946 no_clock:
1947   {
1948     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1949     GST_OBJECT_UNLOCK (basesrc);
1950     return GST_CLOCK_OK;
1951   }
1952 no_sync:
1953   {
1954     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1955     GST_OBJECT_UNLOCK (basesrc);
1956     return GST_CLOCK_OK;
1957   }
1958 }
1959
1960 static gboolean
1961 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
1962 {
1963   guint64 size, maxsize;
1964   GstBaseSrcClass *bclass;
1965   GstFormat format;
1966   gint64 stop;
1967
1968   bclass = GST_BASE_SRC_GET_CLASS (src);
1969
1970   GST_OBJECT_LOCK (src);
1971   format = src->segment.format;
1972   stop = src->segment.stop;
1973   /* get total file size */
1974   size = (guint64) src->segment.duration;
1975   GST_OBJECT_UNLOCK (src);
1976
1977   /* only operate if we are working with bytes */
1978   if (format != GST_FORMAT_BYTES)
1979     return TRUE;
1980
1981   /* the max amount of bytes to read is the total size or
1982    * up to the segment.stop if present. */
1983   if (stop != -1)
1984     maxsize = MIN (size, stop);
1985   else
1986     maxsize = size;
1987
1988   GST_DEBUG_OBJECT (src,
1989       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
1990       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
1991       *length, size, stop, maxsize);
1992
1993   /* check size if we have one */
1994   if (maxsize != -1) {
1995     /* if we run past the end, check if the file became bigger and
1996      * retry. */
1997     if (G_UNLIKELY (offset + *length >= maxsize)) {
1998       /* see if length of the file changed */
1999       if (bclass->get_size)
2000         if (!bclass->get_size (src, &size))
2001           size = -1;
2002
2003       GST_OBJECT_LOCK (src);
2004       gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
2005       GST_OBJECT_UNLOCK (src);
2006
2007       /* make sure we don't exceed the configured segment stop
2008        * if it was set */
2009       if (stop != -1)
2010         maxsize = MIN (size, stop);
2011       else
2012         maxsize = size;
2013
2014       /* if we are at or past the end, EOS */
2015       if (G_UNLIKELY (offset >= maxsize))
2016         goto unexpected_length;
2017
2018       /* else we can clip to the end */
2019       if (G_UNLIKELY (offset + *length >= maxsize))
2020         *length = maxsize - offset;
2021
2022     }
2023   }
2024
2025   /* keep track of current position. segment is in bytes, we checked
2026    * that above. */
2027   GST_OBJECT_LOCK (src);
2028   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
2029   GST_OBJECT_UNLOCK (src);
2030
2031   return TRUE;
2032
2033   /* ERRORS */
2034 unexpected_length:
2035   {
2036     return FALSE;
2037   }
2038 }
2039
2040 /* must be called with LIVE_LOCK */
2041 static GstFlowReturn
2042 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2043     GstBuffer ** buf)
2044 {
2045   GstFlowReturn ret;
2046   GstBaseSrcClass *bclass;
2047   GstClockReturn status;
2048
2049   bclass = GST_BASE_SRC_GET_CLASS (src);
2050
2051 again:
2052   if (src->is_live) {
2053     while (G_UNLIKELY (!src->live_running)) {
2054       ret = gst_base_src_wait_playing (src);
2055       if (ret != GST_FLOW_OK)
2056         goto stopped;
2057     }
2058   }
2059
2060   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2061     goto not_started;
2062
2063   if (G_UNLIKELY (!bclass->create))
2064     goto no_function;
2065
2066   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2067     goto unexpected_length;
2068
2069   /* normally we don't count buffers */
2070   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2071     if (src->num_buffers_left == 0)
2072       goto reached_num_buffers;
2073     else
2074       src->num_buffers_left--;
2075   }
2076
2077   /* don't enter the create function if a pending EOS event was set. For the
2078    * logic of the pending_eos, check the event function of this class. */
2079   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2080     goto eos;
2081
2082   GST_DEBUG_OBJECT (src,
2083       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2084       G_GINT64_FORMAT, offset, length, src->segment.time);
2085
2086   ret = bclass->create (src, offset, length, buf);
2087
2088   /* The create function could be unlocked because we have a pending EOS. It's
2089    * possible that we have a valid buffer from create that we need to
2090    * discard when the create function returned _OK. */
2091   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2092     if (ret == GST_FLOW_OK) {
2093       gst_buffer_unref (*buf);
2094       *buf = NULL;
2095     }
2096     goto eos;
2097   }
2098
2099   if (G_UNLIKELY (ret != GST_FLOW_OK))
2100     goto not_ok;
2101
2102   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2103   if (offset == 0 && src->segment.time == 0
2104       && GST_BUFFER_TIMESTAMP (*buf) == -1)
2105     GST_BUFFER_TIMESTAMP (*buf) = 0;
2106
2107   /* set pad caps on the buffer if the buffer had no caps */
2108   if (GST_BUFFER_CAPS (*buf) == NULL)
2109     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
2110
2111   /* now sync before pushing the buffer */
2112   status = gst_base_src_do_sync (src, *buf);
2113
2114   /* waiting for the clock could have made us flushing */
2115   if (G_UNLIKELY (src->priv->flushing))
2116     goto flushing;
2117
2118   switch (status) {
2119     case GST_CLOCK_EARLY:
2120       /* the buffer is too late. We currently don't drop the buffer. */
2121       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2122       break;
2123     case GST_CLOCK_OK:
2124       /* buffer synchronised properly */
2125       GST_DEBUG_OBJECT (src, "buffer ok");
2126       break;
2127     case GST_CLOCK_UNSCHEDULED:
2128       /* this case is triggered when we were waiting for the clock and
2129        * it got unlocked because we did a state change. In any case, get rid of
2130        * the buffer. */
2131       gst_buffer_unref (*buf);
2132       *buf = NULL;
2133       if (!src->live_running) {
2134         /* We return WRONG_STATE when we are not running to stop the dataflow also
2135          * get rid of the produced buffer. */
2136         GST_DEBUG_OBJECT (src,
2137             "clock was unscheduled (%d), returning WRONG_STATE", status);
2138         ret = GST_FLOW_WRONG_STATE;
2139       } else {
2140         /* If we are running when this happens, we quickly switched between
2141          * pause and playing. We try to produce a new buffer */
2142         GST_DEBUG_OBJECT (src,
2143             "clock was unscheduled (%d), but we are running", status);
2144         goto again;
2145       }
2146       break;
2147     default:
2148       /* all other result values are unexpected and errors */
2149       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2150           (_("Internal clock error.")),
2151           ("clock returned unexpected return value %d", status));
2152       gst_buffer_unref (*buf);
2153       *buf = NULL;
2154       ret = GST_FLOW_ERROR;
2155       break;
2156   }
2157   return ret;
2158
2159   /* ERROR */
2160 stopped:
2161   {
2162     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2163         gst_flow_get_name (ret));
2164     return ret;
2165   }
2166 not_ok:
2167   {
2168     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2169         gst_flow_get_name (ret));
2170     return ret;
2171   }
2172 not_started:
2173   {
2174     GST_DEBUG_OBJECT (src, "getrange but not started");
2175     return GST_FLOW_WRONG_STATE;
2176   }
2177 no_function:
2178   {
2179     GST_DEBUG_OBJECT (src, "no create function");
2180     return GST_FLOW_ERROR;
2181   }
2182 unexpected_length:
2183   {
2184     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2185         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2186     return GST_FLOW_UNEXPECTED;
2187   }
2188 reached_num_buffers:
2189   {
2190     GST_DEBUG_OBJECT (src, "sent all buffers");
2191     return GST_FLOW_UNEXPECTED;
2192   }
2193 flushing:
2194   {
2195     GST_DEBUG_OBJECT (src, "we are flushing");
2196     gst_buffer_unref (*buf);
2197     *buf = NULL;
2198     return GST_FLOW_WRONG_STATE;
2199   }
2200 eos:
2201   {
2202     GST_DEBUG_OBJECT (src, "we are EOS");
2203     return GST_FLOW_UNEXPECTED;
2204   }
2205 }
2206
2207 static GstFlowReturn
2208 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2209     GstBuffer ** buf)
2210 {
2211   GstBaseSrc *src;
2212   GstFlowReturn res;
2213
2214   src = GST_BASE_SRC (gst_pad_get_parent (pad));
2215
2216   GST_LIVE_LOCK (src);
2217   if (G_UNLIKELY (src->priv->flushing))
2218     goto flushing;
2219
2220   res = gst_base_src_get_range (src, offset, length, buf);
2221
2222 done:
2223   GST_LIVE_UNLOCK (src);
2224
2225   gst_object_unref (src);
2226
2227   return res;
2228
2229   /* ERRORS */
2230 flushing:
2231   {
2232     GST_DEBUG_OBJECT (src, "we are flushing");
2233     res = GST_FLOW_WRONG_STATE;
2234     goto done;
2235   }
2236 }
2237
2238 static gboolean
2239 gst_base_src_default_check_get_range (GstBaseSrc * src)
2240 {
2241   gboolean res;
2242
2243   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2244     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2245     if (G_LIKELY (gst_base_src_start (src)))
2246       gst_base_src_stop (src);
2247   }
2248
2249   /* we can operate in getrange mode if the native format is bytes
2250    * and we are seekable, this condition is set in the random_access
2251    * flag and is set in the _start() method. */
2252   res = src->random_access;
2253
2254   return res;
2255 }
2256
2257 static gboolean
2258 gst_base_src_check_get_range (GstBaseSrc * src)
2259 {
2260   GstBaseSrcClass *bclass;
2261   gboolean res;
2262
2263   bclass = GST_BASE_SRC_GET_CLASS (src);
2264
2265   if (bclass->check_get_range == NULL)
2266     goto no_function;
2267
2268   res = bclass->check_get_range (src);
2269   GST_LOG_OBJECT (src, "%s() returned %d",
2270       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2271
2272   return res;
2273
2274   /* ERRORS */
2275 no_function:
2276   {
2277     GST_WARNING_OBJECT (src, "no check_get_range function set");
2278     return FALSE;
2279   }
2280 }
2281
2282 static gboolean
2283 gst_base_src_pad_check_get_range (GstPad * pad)
2284 {
2285   GstBaseSrc *src;
2286   gboolean res;
2287
2288   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2289
2290   res = gst_base_src_check_get_range (src);
2291
2292   return res;
2293 }
2294
2295 static void
2296 gst_base_src_loop (GstPad * pad)
2297 {
2298   GstBaseSrc *src;
2299   GstBuffer *buf = NULL;
2300   GstFlowReturn ret;
2301   gint64 position;
2302   gboolean eos;
2303   gulong blocksize;
2304   GList *tags, *tmp;
2305
2306   eos = FALSE;
2307
2308   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2309
2310   GST_LIVE_LOCK (src);
2311
2312   if (G_UNLIKELY (src->priv->flushing))
2313     goto flushing;
2314
2315   src->priv->last_sent_eos = FALSE;
2316
2317   blocksize = src->blocksize;
2318
2319   GST_OBJECT_LOCK (src);
2320   /* if we operate in bytes, we can calculate an offset */
2321   if (src->segment.format == GST_FORMAT_BYTES) {
2322     position = src->segment.last_stop;
2323     /* for negative rates, start with subtracting the blocksize */
2324     if (src->segment.rate < 0.0) {
2325       /* we cannot go below segment.start */
2326       if (position > src->segment.start + blocksize)
2327         position -= blocksize;
2328       else {
2329         /* last block, remainder up to segment.start */
2330         blocksize = position - src->segment.start;
2331         position = src->segment.start;
2332       }
2333     }
2334   } else
2335     position = -1;
2336   GST_OBJECT_UNLOCK (src);
2337
2338   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2339       GST_TIME_ARGS (position), blocksize);
2340
2341   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2342   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2343     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2344         gst_flow_get_name (ret));
2345     GST_LIVE_UNLOCK (src);
2346     goto pause;
2347   }
2348   /* this should not happen */
2349   if (G_UNLIKELY (buf == NULL))
2350     goto null_buffer;
2351
2352   /* push events to close/start our segment before we push the buffer. */
2353   if (G_UNLIKELY (src->priv->close_segment)) {
2354     gst_pad_push_event (pad, src->priv->close_segment);
2355     src->priv->close_segment = NULL;
2356   }
2357   if (G_UNLIKELY (src->priv->start_segment)) {
2358     gst_pad_push_event (pad, src->priv->start_segment);
2359     src->priv->start_segment = NULL;
2360   }
2361
2362   GST_OBJECT_LOCK (src);
2363   /* take the tags */
2364   tags = src->priv->pending_tags;
2365   src->priv->pending_tags = NULL;
2366   GST_OBJECT_UNLOCK (src);
2367
2368   /* Push out pending tags if any */
2369   if (G_UNLIKELY (tags != NULL)) {
2370     for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
2371       GstEvent *ev = (GstEvent *) tmp->data;
2372       gst_pad_push_event (pad, ev);
2373     }
2374     g_list_free (tags);
2375   }
2376
2377   GST_OBJECT_LOCK (src);
2378   /* figure out the new position */
2379   switch (src->segment.format) {
2380     case GST_FORMAT_BYTES:
2381     {
2382       guint bufsize = GST_BUFFER_SIZE (buf);
2383
2384       /* we subtracted above for negative rates */
2385       if (src->segment.rate >= 0.0)
2386         position += bufsize;
2387       break;
2388     }
2389     case GST_FORMAT_TIME:
2390     {
2391       GstClockTime start, duration;
2392
2393       start = GST_BUFFER_TIMESTAMP (buf);
2394       duration = GST_BUFFER_DURATION (buf);
2395
2396       if (GST_CLOCK_TIME_IS_VALID (start))
2397         position = start;
2398       else
2399         position = src->segment.last_stop;
2400
2401       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2402         if (src->segment.rate >= 0.0)
2403           position += duration;
2404         else if (position > duration)
2405           position -= duration;
2406         else
2407           position = 0;
2408       }
2409       break;
2410     }
2411     case GST_FORMAT_DEFAULT:
2412       if (src->segment.rate >= 0.0)
2413         position = GST_BUFFER_OFFSET_END (buf);
2414       else
2415         position = GST_BUFFER_OFFSET (buf);
2416       break;
2417     default:
2418       position = -1;
2419       break;
2420   }
2421   if (position != -1) {
2422     if (src->segment.rate >= 0.0) {
2423       /* positive rate, check if we reached the stop */
2424       if (src->segment.stop != -1) {
2425         if (position >= src->segment.stop) {
2426           eos = TRUE;
2427           position = src->segment.stop;
2428         }
2429       }
2430     } else {
2431       /* negative rate, check if we reached the start. start is always set to
2432        * something different from -1 */
2433       if (position <= src->segment.start) {
2434         eos = TRUE;
2435         position = src->segment.start;
2436       }
2437       /* when going reverse, all buffers are DISCONT */
2438       src->priv->discont = TRUE;
2439     }
2440     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2441   }
2442   GST_OBJECT_UNLOCK (src);
2443
2444   if (G_UNLIKELY (src->priv->discont)) {
2445     buf = gst_buffer_make_metadata_writable (buf);
2446     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2447     src->priv->discont = FALSE;
2448   }
2449   GST_LIVE_UNLOCK (src);
2450
2451   ret = gst_pad_push (pad, buf);
2452   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2453     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2454         gst_flow_get_name (ret));
2455     goto pause;
2456   }
2457
2458   if (G_UNLIKELY (eos)) {
2459     GST_INFO_OBJECT (src, "pausing after end of segment");
2460     ret = GST_FLOW_UNEXPECTED;
2461     goto pause;
2462   }
2463
2464 done:
2465   return;
2466
2467   /* special cases */
2468 flushing:
2469   {
2470     GST_DEBUG_OBJECT (src, "we are flushing");
2471     GST_LIVE_UNLOCK (src);
2472     ret = GST_FLOW_WRONG_STATE;
2473     goto pause;
2474   }
2475 pause:
2476   {
2477     const gchar *reason = gst_flow_get_name (ret);
2478     GstEvent *event;
2479
2480     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2481     src->data.ABI.running = FALSE;
2482     gst_pad_pause_task (pad);
2483     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2484       if (ret == GST_FLOW_UNEXPECTED) {
2485         gboolean flag_segment;
2486         GstFormat format;
2487         gint64 last_stop;
2488
2489         /* perform EOS logic */
2490         GST_OBJECT_LOCK (src);
2491         flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2492         format = src->segment.format;
2493         last_stop = src->segment.last_stop;
2494         GST_OBJECT_UNLOCK (src);
2495
2496         if (flag_segment) {
2497           GstMessage *message;
2498
2499           message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2500               format, last_stop);
2501           gst_message_set_seqnum (message, src->priv->seqnum);
2502           gst_element_post_message (GST_ELEMENT_CAST (src), message);
2503         } else {
2504           event = gst_event_new_eos ();
2505           gst_event_set_seqnum (event, src->priv->seqnum);
2506           gst_pad_push_event (pad, event);
2507           src->priv->last_sent_eos = TRUE;
2508         }
2509       } else {
2510         event = gst_event_new_eos ();
2511         gst_event_set_seqnum (event, src->priv->seqnum);
2512         /* for fatal errors we post an error message, post the error
2513          * first so the app knows about the error first. */
2514         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2515             (_("Internal data flow error.")),
2516             ("streaming task paused, reason %s (%d)", reason, ret));
2517         gst_pad_push_event (pad, event);
2518         src->priv->last_sent_eos = TRUE;
2519       }
2520     }
2521     goto done;
2522   }
2523 null_buffer:
2524   {
2525     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2526         (_("Internal data flow error.")), ("element returned NULL buffer"));
2527     GST_LIVE_UNLOCK (src);
2528     /* we finished the segment on error */
2529     ret = GST_FLOW_ERROR;
2530     goto done;
2531   }
2532 }
2533
2534 /* default negotiation code.
2535  *
2536  * Take intersection between src and sink pads, take first
2537  * caps and fixate.
2538  */
2539 static gboolean
2540 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2541 {
2542   GstCaps *thiscaps;
2543   GstCaps *caps = NULL;
2544   GstCaps *peercaps = NULL;
2545   gboolean result = FALSE;
2546
2547   /* first see what is possible on our source pad */
2548   thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2549   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2550   /* nothing or anything is allowed, we're done */
2551   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2552     goto no_nego_needed;
2553
2554   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2555     goto no_caps;
2556
2557   /* get the peer caps */
2558   peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2559   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2560   if (peercaps) {
2561     GstCaps *icaps;
2562
2563     /* get intersection */
2564     icaps = gst_caps_intersect (thiscaps, peercaps);
2565     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, icaps);
2566     gst_caps_unref (thiscaps);
2567     gst_caps_unref (peercaps);
2568     if (icaps) {
2569       /* take first (and best, since they are sorted) possibility */
2570       caps = gst_caps_copy_nth (icaps, 0);
2571       gst_caps_unref (icaps);
2572     }
2573   } else {
2574     /* no peer, work with our own caps then */
2575     caps = thiscaps;
2576   }
2577   if (caps) {
2578     caps = gst_caps_make_writable (caps);
2579     gst_caps_truncate (caps);
2580
2581     /* now fixate */
2582     if (!gst_caps_is_empty (caps)) {
2583       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2584       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2585
2586       if (gst_caps_is_any (caps)) {
2587         /* hmm, still anything, so element can do anything and
2588          * nego is not needed */
2589         result = TRUE;
2590       } else if (gst_caps_is_fixed (caps)) {
2591         /* yay, fixed caps, use those then, it's possible that the subclass does
2592          * not accept this caps after all and we have to fail. */
2593         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2594       }
2595     }
2596     gst_caps_unref (caps);
2597   } else {
2598     GST_DEBUG_OBJECT (basesrc, "no common caps");
2599   }
2600   return result;
2601
2602 no_nego_needed:
2603   {
2604     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2605     if (thiscaps)
2606       gst_caps_unref (thiscaps);
2607     return TRUE;
2608   }
2609 no_caps:
2610   {
2611     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2612         ("No supported formats found"),
2613         ("This element did not produce valid caps"));
2614     if (thiscaps)
2615       gst_caps_unref (thiscaps);
2616     return TRUE;
2617   }
2618 }
2619
2620 static gboolean
2621 gst_base_src_negotiate (GstBaseSrc * basesrc)
2622 {
2623   GstBaseSrcClass *bclass;
2624   gboolean result = TRUE;
2625
2626   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2627
2628   if (bclass->negotiate)
2629     result = bclass->negotiate (basesrc);
2630
2631   return result;
2632 }
2633
2634 static gboolean
2635 gst_base_src_start (GstBaseSrc * basesrc)
2636 {
2637   GstBaseSrcClass *bclass;
2638   gboolean result;
2639   guint64 size;
2640   gboolean seekable;
2641   GstFormat format;
2642
2643   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2644     return TRUE;
2645
2646   GST_DEBUG_OBJECT (basesrc, "starting source");
2647
2648   basesrc->num_buffers_left = basesrc->num_buffers;
2649
2650   GST_OBJECT_LOCK (basesrc);
2651   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2652   GST_OBJECT_UNLOCK (basesrc);
2653
2654   basesrc->data.ABI.running = FALSE;
2655
2656   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2657   if (bclass->start)
2658     result = bclass->start (basesrc);
2659   else
2660     result = TRUE;
2661
2662   if (!result)
2663     goto could_not_start;
2664
2665   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2666
2667   GST_OBJECT_LOCK (basesrc);
2668   format = basesrc->segment.format;
2669   GST_OBJECT_UNLOCK (basesrc);
2670
2671   /* figure out the size */
2672   if (format == GST_FORMAT_BYTES) {
2673     if (bclass->get_size) {
2674       if (!(result = bclass->get_size (basesrc, &size)))
2675         size = -1;
2676     } else {
2677       result = FALSE;
2678       size = -1;
2679     }
2680     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2681     /* only update the size when operating in bytes, subclass is supposed
2682      * to set duration in the start method for other formats */
2683     GST_OBJECT_LOCK (basesrc);
2684     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2685     GST_OBJECT_UNLOCK (basesrc);
2686   } else {
2687     size = -1;
2688   }
2689
2690   GST_DEBUG_OBJECT (basesrc,
2691       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2692       G_GINT64_FORMAT, format, result, size, basesrc->segment.duration);
2693
2694   seekable = gst_base_src_seekable (basesrc);
2695   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2696
2697   /* update for random access flag */
2698   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2699
2700   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2701
2702   /* run typefind if we are random_access and the typefinding is enabled. */
2703   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2704     GstCaps *caps;
2705
2706     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2707       goto typefind_failed;
2708
2709     result = gst_pad_set_caps (basesrc->srcpad, caps);
2710     gst_caps_unref (caps);
2711   } else {
2712     /* use class or default negotiate function */
2713     if (!(result = gst_base_src_negotiate (basesrc)))
2714       goto could_not_negotiate;
2715   }
2716
2717   return result;
2718
2719   /* ERROR */
2720 could_not_start:
2721   {
2722     GST_DEBUG_OBJECT (basesrc, "could not start");
2723     /* subclass is supposed to post a message. We don't have to call _stop. */
2724     return FALSE;
2725   }
2726 could_not_negotiate:
2727   {
2728     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2729     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2730         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2731     /* we must call stop */
2732     gst_base_src_stop (basesrc);
2733     return FALSE;
2734   }
2735 typefind_failed:
2736   {
2737     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2738     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2739     /* we must call stop */
2740     gst_base_src_stop (basesrc);
2741     return FALSE;
2742   }
2743 }
2744
2745 static gboolean
2746 gst_base_src_stop (GstBaseSrc * basesrc)
2747 {
2748   GstBaseSrcClass *bclass;
2749   gboolean result = TRUE;
2750
2751   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2752     return TRUE;
2753
2754   GST_DEBUG_OBJECT (basesrc, "stopping source");
2755
2756   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2757   if (bclass->stop)
2758     result = bclass->stop (basesrc);
2759
2760   if (result)
2761     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2762
2763   return result;
2764 }
2765
2766 /* start or stop flushing dataprocessing
2767  */
2768 static gboolean
2769 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2770     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2771 {
2772   GstBaseSrcClass *bclass;
2773
2774   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2775
2776   if (flushing && unlock) {
2777     /* unlock any subclasses, we need to do this before grabbing the
2778      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2779      * unlock to the params because of backwards compat (see seek handler)*/
2780     if (bclass->unlock)
2781       bclass->unlock (basesrc);
2782   }
2783
2784   /* the live lock is released when we are blocked, waiting for playing or
2785    * when we sync to the clock. */
2786   GST_LIVE_LOCK (basesrc);
2787   if (playing)
2788     *playing = basesrc->live_running;
2789   basesrc->priv->flushing = flushing;
2790   if (flushing) {
2791     /* if we are locked in the live lock, signal it to make it flush */
2792     basesrc->live_running = TRUE;
2793
2794     /* clear pending EOS if any */
2795     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2796
2797     /* step 1, now that we have the LIVE lock, clear our unlock request */
2798     if (bclass->unlock_stop)
2799       bclass->unlock_stop (basesrc);
2800
2801     /* step 2, unblock clock sync (if any) or any other blocking thing */
2802     if (basesrc->clock_id)
2803       gst_clock_id_unschedule (basesrc->clock_id);
2804   } else {
2805     /* signal the live source that it can start playing */
2806     basesrc->live_running = live_play;
2807   }
2808   GST_LIVE_SIGNAL (basesrc);
2809   GST_LIVE_UNLOCK (basesrc);
2810
2811   return TRUE;
2812 }
2813
2814 /* the purpose of this function is to make sure that a live source blocks in the
2815  * LIVE lock or leaves the LIVE lock and continues playing. */
2816 static gboolean
2817 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2818 {
2819   GstBaseSrcClass *bclass;
2820
2821   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2822
2823   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2824   if (!live_play) {
2825     GST_DEBUG_OBJECT (basesrc, "unlock");
2826     if (bclass->unlock)
2827       bclass->unlock (basesrc);
2828   }
2829
2830   /* we are now able to grab the LIVE lock, when we get it, we can be
2831    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2832    * for the clock. */
2833   GST_LIVE_LOCK (basesrc);
2834   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2835
2836   /* unblock clock sync (if any) */
2837   if (basesrc->clock_id)
2838     gst_clock_id_unschedule (basesrc->clock_id);
2839
2840   /* configure what to do when we get to the LIVE lock. */
2841   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2842   basesrc->live_running = live_play;
2843
2844   if (live_play) {
2845     gboolean start;
2846
2847     /* clear our unlock request when going to PLAYING */
2848     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2849     if (bclass->unlock_stop)
2850       bclass->unlock_stop (basesrc);
2851
2852     /* for live sources we restart the timestamp correction */
2853     basesrc->priv->latency = -1;
2854     /* have to restart the task in case it stopped because of the unlock when
2855      * we went to PAUSED. Only do this if we operating in push mode. */
2856     GST_OBJECT_LOCK (basesrc->srcpad);
2857     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2858     GST_OBJECT_UNLOCK (basesrc->srcpad);
2859     if (start)
2860       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2861           basesrc->srcpad);
2862     GST_DEBUG_OBJECT (basesrc, "signal");
2863     GST_LIVE_SIGNAL (basesrc);
2864   }
2865   GST_LIVE_UNLOCK (basesrc);
2866
2867   return TRUE;
2868 }
2869
2870 static gboolean
2871 gst_base_src_activate_push (GstPad * pad, gboolean active)
2872 {
2873   GstBaseSrc *basesrc;
2874   GstEvent *event;
2875
2876   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2877
2878   /* prepare subclass first */
2879   if (active) {
2880     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2881
2882     if (G_UNLIKELY (!basesrc->can_activate_push))
2883       goto no_push_activation;
2884
2885     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2886       goto error_start;
2887
2888     basesrc->priv->last_sent_eos = FALSE;
2889     basesrc->priv->discont = TRUE;
2890     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2891
2892     /* do initial seek, which will start the task */
2893     GST_OBJECT_LOCK (basesrc);
2894     event = basesrc->data.ABI.pending_seek;
2895     basesrc->data.ABI.pending_seek = NULL;
2896     GST_OBJECT_UNLOCK (basesrc);
2897
2898     /* no need to unlock anything, the task is certainly
2899      * not running here. The perform seek code will start the task when
2900      * finished. */
2901     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2902       goto seek_failed;
2903
2904     if (event)
2905       gst_event_unref (event);
2906   } else {
2907     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2908     /* flush all */
2909     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2910     /* stop the task */
2911     gst_pad_stop_task (pad);
2912     /* now we can stop the source */
2913     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2914       goto error_stop;
2915   }
2916   return TRUE;
2917
2918   /* ERRORS */
2919 no_push_activation:
2920   {
2921     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2922     return FALSE;
2923   }
2924 error_start:
2925   {
2926     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2927     return FALSE;
2928   }
2929 seek_failed:
2930   {
2931     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2932     /* flush all */
2933     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2934     /* stop the task */
2935     gst_pad_stop_task (pad);
2936     /* Stop the basesrc */
2937     gst_base_src_stop (basesrc);
2938     if (event)
2939       gst_event_unref (event);
2940     return FALSE;
2941   }
2942 error_stop:
2943   {
2944     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2945     return FALSE;
2946   }
2947 }
2948
2949 static gboolean
2950 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2951 {
2952   GstBaseSrc *basesrc;
2953
2954   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2955
2956   /* prepare subclass first */
2957   if (active) {
2958     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2959     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2960       goto error_start;
2961
2962     /* if not random_access, we cannot operate in pull mode for now */
2963     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2964       goto no_get_range;
2965
2966     /* stop flushing now but for live sources, still block in the LIVE lock when
2967      * we are not yet PLAYING */
2968     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2969   } else {
2970     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2971     /* flush all, there is no task to stop */
2972     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2973
2974     /* don't send EOS when going from PAUSED => READY when in pull mode */
2975     basesrc->priv->last_sent_eos = TRUE;
2976
2977     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2978       goto error_stop;
2979   }
2980   return TRUE;
2981
2982   /* ERRORS */
2983 error_start:
2984   {
2985     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
2986     return FALSE;
2987   }
2988 no_get_range:
2989   {
2990     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
2991     gst_base_src_stop (basesrc);
2992     return FALSE;
2993   }
2994 error_stop:
2995   {
2996     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
2997     return FALSE;
2998   }
2999 }
3000
3001 static GstStateChangeReturn
3002 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3003 {
3004   GstBaseSrc *basesrc;
3005   GstStateChangeReturn result;
3006   gboolean no_preroll = FALSE;
3007
3008   basesrc = GST_BASE_SRC (element);
3009
3010   switch (transition) {
3011     case GST_STATE_CHANGE_NULL_TO_READY:
3012       break;
3013     case GST_STATE_CHANGE_READY_TO_PAUSED:
3014       no_preroll = gst_base_src_is_live (basesrc);
3015       break;
3016     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3017       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3018       if (gst_base_src_is_live (basesrc)) {
3019         /* now we can start playback */
3020         gst_base_src_set_playing (basesrc, TRUE);
3021       }
3022       break;
3023     default:
3024       break;
3025   }
3026
3027   if ((result =
3028           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3029               transition)) == GST_STATE_CHANGE_FAILURE)
3030     goto failure;
3031
3032   switch (transition) {
3033     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3034       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3035       if (gst_base_src_is_live (basesrc)) {
3036         /* make sure we block in the live lock in PAUSED */
3037         gst_base_src_set_playing (basesrc, FALSE);
3038         no_preroll = TRUE;
3039       }
3040       break;
3041     case GST_STATE_CHANGE_PAUSED_TO_READY:
3042     {
3043       GstEvent **event_p, *event;
3044
3045       /* we don't need to unblock anything here, the pad deactivation code
3046        * already did this */
3047
3048       /* FIXME, deprecate this behaviour, it is very dangerous.
3049        * the prefered way of sending EOS downstream is by sending
3050        * the EOS event to the element */
3051       if (!basesrc->priv->last_sent_eos) {
3052         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
3053         event = gst_event_new_eos ();
3054         gst_event_set_seqnum (event, basesrc->priv->seqnum);
3055         gst_pad_push_event (basesrc->srcpad, event);
3056         basesrc->priv->last_sent_eos = TRUE;
3057       }
3058       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3059       event_p = &basesrc->data.ABI.pending_seek;
3060       gst_event_replace (event_p, NULL);
3061       event_p = &basesrc->priv->close_segment;
3062       gst_event_replace (event_p, NULL);
3063       event_p = &basesrc->priv->start_segment;
3064       gst_event_replace (event_p, NULL);
3065       break;
3066     }
3067     case GST_STATE_CHANGE_READY_TO_NULL:
3068       break;
3069     default:
3070       break;
3071   }
3072
3073   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3074     result = GST_STATE_CHANGE_NO_PREROLL;
3075
3076   return result;
3077
3078   /* ERRORS */
3079 failure:
3080   {
3081     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3082     return result;
3083   }
3084 }