Remove some dead assignments
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
20  * Boston, MA 02111-1307, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @short_description: Base class for getrange based source elements
26  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
27  *
28  * This is a generice base class for source elements. The following
29  * types of sources are supported:
30  * <itemizedlist>
31  *   <listitem><para>random access sources like files</para></listitem>
32  *   <listitem><para>seekable sources</para></listitem>
33  *   <listitem><para>live sources</para></listitem>
34  * </itemizedlist>
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any #GST_EVENT_NEWSEGMENT
39  * events. The default format for #GstBaseSrc is #GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  * <itemizedlist>
44  *   <listitem><para>The format is set to #GST_FORMAT_BYTES (default).</para>
45  *   </listitem>
46  *   <listitem><para>#GstBaseSrcClass.is_seekable() returns %TRUE.</para>
47  *   </listitem>
48  * </itemizedlist>
49  *
50  * Since 0.10.9, any #GstBaseSrc can enable pull based scheduling at any time
51  * by overriding #GstBaseSrcClass.check_get_range() so that it returns %TRUE.
52  *
53  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
54  * automatically seekable in push mode as well. The following conditions must
55  * be met to make the element seekable in push mode when the format is not
56  * #GST_FORMAT_BYTES:
57  * <itemizedlist>
58  *   <listitem><para>
59  *     #GstBaseSrcClass.is_seekable() returns %TRUE.
60  *   </para></listitem>
61  *   <listitem><para>
62  *     #GstBaseSrc:Class.query() can convert all supported seek formats to the
63  *     internal format as set with gst_base_src_set_format().
64  *   </para></listitem>
65  *   <listitem><para>
66  *     #GstBaseSrcClass.do_seek() is implemented, performs the seek and returns
67  *      %TRUE.
68  *   </para></listitem>
69  * </itemizedlist>
70  *
71  * When the element does not meet the requirements to operate in pull mode, the
72  * offset and length in the #GstBaseSrcClass.create() method should be ignored.
73  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
74  * element can operate in pull mode but only with specific offsets and
75  * lengths, it is allowed to generate an error when the wrong values are passed
76  * to the #GstBaseSrcClass.create() function.
77  *
78  * #GstBaseSrc has support for live sources. Live sources are sources that when
79  * paused discard data, such as audio or video capture devices. A typical live
80  * source also produces data at a fixed rate and thus provides a clock to publish
81  * this rate.
82  * Use gst_base_src_set_live() to activate the live source mode.
83  *
84  * A live source does not produce data in the PAUSED state. This means that the
85  * #GstBaseSrcClass.create() method will not be called in PAUSED but only in
86  * PLAYING. To signal the pipeline that the element will not produce data, the
87  * return value from the READY to PAUSED state will be
88  * #GST_STATE_CHANGE_NO_PREROLL.
89  *
90  * A typical live source will timestamp the buffers it creates with the
91  * current running time of the pipeline. This is one reason why a live source
92  * can only produce data in the PLAYING state, when the clock is actually
93  * distributed and running.
94  *
95  * Live sources that synchronize and block on the clock (an audio source, for
96  * example) can since 0.10.12 use gst_base_src_wait_playing() when the
97  * #GstBaseSrcClass.create() function was interrupted by a state change to
98  * PAUSED.
99  *
100  * The #GstBaseSrcClass.get_times() method can be used to implement pseudo-live
101  * sources. It only makes sense to implement the #GstBaseSrcClass.get_times()
102  * function if the source is a live source. The #GstBaseSrcClass.get_times()
103  * function should return timestamps starting from 0, as if it were a non-live
104  * source. The base class will make sure that the timestamps are transformed
105  * into the current running_time. The base source will then wait for the
106  * calculated running_time before pushing out the buffer.
107  *
108  * For live sources, the base class will by default report a latency of 0.
109  * For pseudo live sources, the base class will by default measure the difference
110  * between the first buffer timestamp and the start time of get_times and will
111  * report this value as the latency.
112  * Subclasses should override the query function when this behaviour is not
113  * acceptable.
114  *
115  * There is only support in #GstBaseSrc for exactly one source pad, which
116  * should be named "src". A source implementation (subclass of #GstBaseSrc)
117  * should install a pad template in its class_init function, like so:
118  * |[
119  * static void
120  * my_element_class_init (GstMyElementClass *klass)
121  * {
122  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
123  *   // srctemplate should be a #GstStaticPadTemplate with direction
124  *   // #GST_PAD_SRC and name "src"
125  *   gst_element_class_add_pad_template (gstelement_class,
126  *       gst_static_pad_template_get (&amp;srctemplate));
127  *   // see #GstElementDetails
128  *   gst_element_class_set_details (gstelement_class, &amp;details);
129  * }
130  * ]|
131  *
132  * <refsect2>
133  * <title>Controlled shutdown of live sources in applications</title>
134  * <para>
135  * Applications that record from a live source may want to stop recording
136  * in a controlled way, so that the recording is stopped, but the data
137  * already in the pipeline is processed to the end (remember that many live
138  * sources would go on recording forever otherwise). For that to happen the
139  * application needs to make the source stop recording and send an EOS
140  * event down the pipeline. The application would then wait for an
141  * EOS message posted on the pipeline's bus to know when all data has
142  * been processed and the pipeline can safely be stopped.
143  *
144  * Since GStreamer 0.10.16 an application may send an EOS event to a source
145  * element to make it perform the EOS logic (send EOS event downstream or post a
146  * #GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
147  * with the gst_element_send_event() function on the element or its parent bin.
148  *
149  * After the EOS has been sent to the element, the application should wait for
150  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
151  * received, it may safely shut down the entire pipeline.
152  *
153  * The old behaviour for controlled shutdown introduced since GStreamer 0.10.3
154  * is still available but deprecated as it is dangerous and less flexible.
155  *
156  * Last reviewed on 2007-12-19 (0.10.16)
157  * </para>
158  * </refsect2>
159  */
160
161 #ifdef HAVE_CONFIG_H
162 #  include "config.h"
163 #endif
164
165 #include <stdlib.h>
166 #include <string.h>
167
168 #include <gst/gst_private.h>
169
170 #include "gstbasesrc.h"
171 #include "gsttypefindhelper.h"
172 #include <gst/gstmarshal.h>
173 #include <gst/gst-i18n-lib.h>
174
175 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
176 #define GST_CAT_DEFAULT gst_base_src_debug
177
178 #define GST_LIVE_GET_LOCK(elem)               (GST_BASE_SRC_CAST(elem)->live_lock)
179 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
180 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
181 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
182 #define GST_LIVE_GET_COND(elem)               (GST_BASE_SRC_CAST(elem)->live_cond)
183 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
184 #define GST_LIVE_TIMED_WAIT(elem, timeval)    g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem),\
185                                                                                 timeval)
186 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
187 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
188
189 /* BaseSrc signals and args */
190 enum
191 {
192   /* FILL ME */
193   LAST_SIGNAL
194 };
195
196 #define DEFAULT_BLOCKSIZE       4096
197 #define DEFAULT_NUM_BUFFERS     -1
198 #define DEFAULT_TYPEFIND        FALSE
199 #define DEFAULT_DO_TIMESTAMP    FALSE
200
201 enum
202 {
203   PROP_0,
204   PROP_BLOCKSIZE,
205   PROP_NUM_BUFFERS,
206   PROP_TYPEFIND,
207   PROP_DO_TIMESTAMP
208 };
209
210 #define GST_BASE_SRC_GET_PRIVATE(obj)  \
211    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SRC, GstBaseSrcPrivate))
212
213 struct _GstBaseSrcPrivate
214 {
215   gboolean last_sent_eos;       /* last thing we did was send an EOS (we set this
216                                  * to avoid the sending of two EOS in some cases) */
217   gboolean discont;
218   gboolean flushing;
219
220   /* two segments to be sent in the streaming thread with STREAM_LOCK */
221   GstEvent *close_segment;
222   GstEvent *start_segment;
223   gboolean newsegment_pending;
224
225   /* if EOS is pending (atomic) */
226   gint pending_eos;
227
228   /* startup latency is the time it takes between going to PLAYING and producing
229    * the first BUFFER with running_time 0. This value is included in the latency
230    * reporting. */
231   GstClockTime latency;
232   /* timestamp offset, this is the offset add to the values of gst_times for
233    * pseudo live sources */
234   GstClockTimeDiff ts_offset;
235
236   gboolean do_timestamp;
237
238   /* stream sequence number */
239   guint32 seqnum;
240
241   /* pending tags to be pushed in the data stream */
242   GList *pending_tags;
243
244   /* QoS *//* with LOCK */
245   gboolean qos_enabled;
246   gdouble proportion;
247   GstClockTime earliest_time;
248 };
249
250 static GstElementClass *parent_class = NULL;
251
252 static void gst_base_src_base_init (gpointer g_class);
253 static void gst_base_src_class_init (GstBaseSrcClass * klass);
254 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
255 static void gst_base_src_finalize (GObject * object);
256
257
258 GType
259 gst_base_src_get_type (void)
260 {
261   static volatile gsize base_src_type = 0;
262
263   if (g_once_init_enter (&base_src_type)) {
264     GType _type;
265     static const GTypeInfo base_src_info = {
266       sizeof (GstBaseSrcClass),
267       (GBaseInitFunc) gst_base_src_base_init,
268       NULL,
269       (GClassInitFunc) gst_base_src_class_init,
270       NULL,
271       NULL,
272       sizeof (GstBaseSrc),
273       0,
274       (GInstanceInitFunc) gst_base_src_init,
275     };
276
277     _type = g_type_register_static (GST_TYPE_ELEMENT,
278         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
279     g_once_init_leave (&base_src_type, _type);
280   }
281   return base_src_type;
282 }
283
284 static GstCaps *gst_base_src_getcaps (GstPad * pad);
285 static gboolean gst_base_src_setcaps (GstPad * pad, GstCaps * caps);
286 static void gst_base_src_fixate (GstPad * pad, GstCaps * caps);
287
288 static gboolean gst_base_src_activate_push (GstPad * pad, gboolean active);
289 static gboolean gst_base_src_activate_pull (GstPad * pad, gboolean active);
290 static void gst_base_src_set_property (GObject * object, guint prop_id,
291     const GValue * value, GParamSpec * pspec);
292 static void gst_base_src_get_property (GObject * object, guint prop_id,
293     GValue * value, GParamSpec * pspec);
294 static gboolean gst_base_src_event_handler (GstPad * pad, GstEvent * event);
295 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
296 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
297 static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
298
299 static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
300
301 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
302 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
303     GstSegment * segment);
304 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
305 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
306     GstEvent * event, GstSegment * segment);
307
308 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
309     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing);
310 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
311 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
312
313 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
314     GstStateChange transition);
315
316 static void gst_base_src_loop (GstPad * pad);
317 static gboolean gst_base_src_pad_check_get_range (GstPad * pad);
318 static gboolean gst_base_src_default_check_get_range (GstBaseSrc * bsrc);
319 static GstFlowReturn gst_base_src_pad_get_range (GstPad * pad, guint64 offset,
320     guint length, GstBuffer ** buf);
321 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
322     guint length, GstBuffer ** buf);
323 static gboolean gst_base_src_seekable (GstBaseSrc * src);
324
325 static void
326 gst_base_src_base_init (gpointer g_class)
327 {
328   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
329 }
330
331 static void
332 gst_base_src_class_init (GstBaseSrcClass * klass)
333 {
334   GObjectClass *gobject_class;
335   GstElementClass *gstelement_class;
336
337   gobject_class = G_OBJECT_CLASS (klass);
338   gstelement_class = GST_ELEMENT_CLASS (klass);
339
340   g_type_class_add_private (klass, sizeof (GstBaseSrcPrivate));
341
342   parent_class = g_type_class_peek_parent (klass);
343
344   gobject_class->finalize = gst_base_src_finalize;
345   gobject_class->set_property = gst_base_src_set_property;
346   gobject_class->get_property = gst_base_src_get_property;
347
348   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
349       g_param_spec_ulong ("blocksize", "Block size",
350           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXULONG,
351           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
352   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
353       g_param_spec_int ("num-buffers", "num-buffers",
354           "Number of buffers to output before sending EOS (-1 = unlimited)",
355           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
356           G_PARAM_STATIC_STRINGS));
357   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
358       g_param_spec_boolean ("typefind", "Typefind",
359           "Run typefind before negotiating", DEFAULT_TYPEFIND,
360           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
361   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
362       g_param_spec_boolean ("do-timestamp", "Do timestamp",
363           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
364           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
365
366   gstelement_class->change_state =
367       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
368   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
369   gstelement_class->get_query_types =
370       GST_DEBUG_FUNCPTR (gst_base_src_get_query_types);
371
372   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
373   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
374   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
375   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
376   klass->check_get_range =
377       GST_DEBUG_FUNCPTR (gst_base_src_default_check_get_range);
378   klass->prepare_seek_segment =
379       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
380
381   /* Registering debug symbols for function pointers */
382   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_push);
383   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_pull);
384   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event_handler);
385   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
386   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_get_range);
387   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_pad_check_get_range);
388   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getcaps);
389   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_setcaps);
390   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
391 }
392
393 static void
394 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
395 {
396   GstPad *pad;
397   GstPadTemplate *pad_template;
398
399   basesrc->priv = GST_BASE_SRC_GET_PRIVATE (basesrc);
400
401   basesrc->is_live = FALSE;
402   basesrc->live_lock = g_mutex_new ();
403   basesrc->live_cond = g_cond_new ();
404   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
405   basesrc->num_buffers_left = -1;
406
407   basesrc->can_activate_push = TRUE;
408   basesrc->pad_mode = GST_ACTIVATE_NONE;
409
410   pad_template =
411       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
412   g_return_if_fail (pad_template != NULL);
413
414   GST_DEBUG_OBJECT (basesrc, "creating src pad");
415   pad = gst_pad_new_from_template (pad_template, "src");
416
417   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
418   gst_pad_set_activatepush_function (pad, gst_base_src_activate_push);
419   gst_pad_set_activatepull_function (pad, gst_base_src_activate_pull);
420   gst_pad_set_event_function (pad, gst_base_src_event_handler);
421   gst_pad_set_query_function (pad, gst_base_src_query);
422   gst_pad_set_checkgetrange_function (pad, gst_base_src_pad_check_get_range);
423   gst_pad_set_getrange_function (pad, gst_base_src_pad_get_range);
424   gst_pad_set_getcaps_function (pad, gst_base_src_getcaps);
425   gst_pad_set_setcaps_function (pad, gst_base_src_setcaps);
426   gst_pad_set_fixatecaps_function (pad, gst_base_src_fixate);
427
428   /* hold pointer to pad */
429   basesrc->srcpad = pad;
430   GST_DEBUG_OBJECT (basesrc, "adding src pad");
431   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
432
433   basesrc->blocksize = DEFAULT_BLOCKSIZE;
434   basesrc->clock_id = NULL;
435   /* we operate in BYTES by default */
436   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
437   basesrc->data.ABI.typefind = DEFAULT_TYPEFIND;
438   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
439
440   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
441
442   GST_DEBUG_OBJECT (basesrc, "init done");
443 }
444
445 static void
446 gst_base_src_finalize (GObject * object)
447 {
448   GstBaseSrc *basesrc;
449   GstEvent **event_p;
450
451   basesrc = GST_BASE_SRC (object);
452
453   g_mutex_free (basesrc->live_lock);
454   g_cond_free (basesrc->live_cond);
455
456   event_p = &basesrc->data.ABI.pending_seek;
457   gst_event_replace (event_p, NULL);
458
459   if (basesrc->priv->pending_tags) {
460     g_list_foreach (basesrc->priv->pending_tags, (GFunc) gst_event_unref, NULL);
461     g_list_free (basesrc->priv->pending_tags);
462   }
463
464   G_OBJECT_CLASS (parent_class)->finalize (object);
465 }
466
467 /**
468  * gst_base_src_wait_playing:
469  * @src: the src
470  *
471  * If the #GstBaseSrcClass.create() method performs its own synchronisation
472  * against the clock it must unblock when going from PLAYING to the PAUSED state
473  * and call this method before continuing to produce the remaining data.
474  *
475  * This function will block until a state change to PLAYING happens (in which
476  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
477  * to a state change to READY or a FLUSH event (in which case this function
478  * returns #GST_FLOW_WRONG_STATE).
479  *
480  * Since: 0.10.12
481  *
482  * Returns: #GST_FLOW_OK if @src is PLAYING and processing can
483  * continue. Any other return value should be returned from the create vmethod.
484  */
485 GstFlowReturn
486 gst_base_src_wait_playing (GstBaseSrc * src)
487 {
488   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
489
490   /* block until the state changes, or we get a flush, or something */
491   GST_DEBUG_OBJECT (src, "live source waiting for running state");
492   GST_LIVE_WAIT (src);
493   if (src->priv->flushing)
494     goto flushing;
495   GST_DEBUG_OBJECT (src, "live source unlocked");
496
497   return GST_FLOW_OK;
498
499   /* ERRORS */
500 flushing:
501   {
502     GST_DEBUG_OBJECT (src, "we are flushing");
503     return GST_FLOW_WRONG_STATE;
504   }
505 }
506
507 /**
508  * gst_base_src_set_live:
509  * @src: base source instance
510  * @live: new live-mode
511  *
512  * If the element listens to a live source, @live should
513  * be set to %TRUE.
514  *
515  * A live source will not produce data in the PAUSED state and
516  * will therefore not be able to participate in the PREROLL phase
517  * of a pipeline. To signal this fact to the application and the
518  * pipeline, the state change return value of the live source will
519  * be GST_STATE_CHANGE_NO_PREROLL.
520  */
521 void
522 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
523 {
524   g_return_if_fail (GST_IS_BASE_SRC (src));
525
526   GST_OBJECT_LOCK (src);
527   src->is_live = live;
528   GST_OBJECT_UNLOCK (src);
529 }
530
531 /**
532  * gst_base_src_is_live:
533  * @src: base source instance
534  *
535  * Check if an element is in live mode.
536  *
537  * Returns: %TRUE if element is in live mode.
538  */
539 gboolean
540 gst_base_src_is_live (GstBaseSrc * src)
541 {
542   gboolean result;
543
544   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
545
546   GST_OBJECT_LOCK (src);
547   result = src->is_live;
548   GST_OBJECT_UNLOCK (src);
549
550   return result;
551 }
552
553 /**
554  * gst_base_src_set_format:
555  * @src: base source instance
556  * @format: the format to use
557  *
558  * Sets the default format of the source. This will be the format used
559  * for sending NEW_SEGMENT events and for performing seeks.
560  *
561  * If a format of GST_FORMAT_BYTES is set, the element will be able to
562  * operate in pull mode if the #GstBaseSrc.is_seekable() returns TRUE.
563  *
564  * This function must only be called in states < %GST_STATE_PAUSED.
565  *
566  * Since: 0.10.1
567  */
568 void
569 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
570 {
571   g_return_if_fail (GST_IS_BASE_SRC (src));
572   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
573
574   GST_OBJECT_LOCK (src);
575   gst_segment_init (&src->segment, format);
576   GST_OBJECT_UNLOCK (src);
577 }
578
579 /**
580  * gst_base_src_query_latency:
581  * @src: the source
582  * @live: if the source is live
583  * @min_latency: the min latency of the source
584  * @max_latency: the max latency of the source
585  *
586  * Query the source for the latency parameters. @live will be TRUE when @src is
587  * configured as a live source. @min_latency will be set to the difference
588  * between the running time and the timestamp of the first buffer.
589  * @max_latency is always the undefined value of -1.
590  *
591  * This function is mostly used by subclasses.
592  *
593  * Returns: TRUE if the query succeeded.
594  *
595  * Since: 0.10.13
596  */
597 gboolean
598 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
599     GstClockTime * min_latency, GstClockTime * max_latency)
600 {
601   GstClockTime min;
602
603   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
604
605   GST_OBJECT_LOCK (src);
606   if (live)
607     *live = src->is_live;
608
609   /* if we have a startup latency, report this one, else report 0. Subclasses
610    * are supposed to override the query function if they want something
611    * else. */
612   if (src->priv->latency != -1)
613     min = src->priv->latency;
614   else
615     min = 0;
616
617   if (min_latency)
618     *min_latency = min;
619   if (max_latency)
620     *max_latency = -1;
621
622   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
623       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
624       GST_TIME_ARGS (-1));
625   GST_OBJECT_UNLOCK (src);
626
627   return TRUE;
628 }
629
630 /**
631  * gst_base_src_set_blocksize:
632  * @src: the source
633  * @blocksize: the new blocksize in bytes
634  *
635  * Set the number of bytes that @src will push out with each buffer. When
636  * @blocksize is set to -1, a default length will be used.
637  *
638  * Since: 0.10.22
639  */
640 void
641 gst_base_src_set_blocksize (GstBaseSrc * src, gulong blocksize)
642 {
643   g_return_if_fail (GST_IS_BASE_SRC (src));
644
645   GST_OBJECT_LOCK (src);
646   src->blocksize = blocksize;
647   GST_OBJECT_UNLOCK (src);
648 }
649
650 /**
651  * gst_base_src_get_blocksize:
652  * @src: the source
653  *
654  * Get the number of bytes that @src will push out with each buffer.
655  *
656  * Returns: the number of bytes pushed with each buffer.
657  *
658  * Since: 0.10.22
659  */
660 gulong
661 gst_base_src_get_blocksize (GstBaseSrc * src)
662 {
663   gulong res;
664
665   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
666
667   GST_OBJECT_LOCK (src);
668   res = src->blocksize;
669   GST_OBJECT_UNLOCK (src);
670
671   return res;
672 }
673
674
675 /**
676  * gst_base_src_set_do_timestamp:
677  * @src: the source
678  * @timestamp: enable or disable timestamping
679  *
680  * Configure @src to automatically timestamp outgoing buffers based on the
681  * current running_time of the pipeline. This property is mostly useful for live
682  * sources.
683  *
684  * Since: 0.10.15
685  */
686 void
687 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
688 {
689   g_return_if_fail (GST_IS_BASE_SRC (src));
690
691   GST_OBJECT_LOCK (src);
692   src->priv->do_timestamp = timestamp;
693   GST_OBJECT_UNLOCK (src);
694 }
695
696 /**
697  * gst_base_src_get_do_timestamp:
698  * @src: the source
699  *
700  * Query if @src timestamps outgoing buffers based on the current running_time.
701  *
702  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
703  *
704  * Since: 0.10.15
705  */
706 gboolean
707 gst_base_src_get_do_timestamp (GstBaseSrc * src)
708 {
709   gboolean res;
710
711   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
712
713   GST_OBJECT_LOCK (src);
714   res = src->priv->do_timestamp;
715   GST_OBJECT_UNLOCK (src);
716
717   return res;
718 }
719
720 /**
721  * gst_base_src_new_seamless_segment:
722  * @src: The source
723  * @start: The new start value for the segment
724  * @stop: Stop value for the new segment
725  * @position: The position value for the new segent
726  *
727  * Prepare a new seamless segment for emission downstream. This function must
728  * only be called by derived sub-classes, and only from the create() function,
729  * as the stream-lock needs to be held.
730  *
731  * The format for the new segment will be the current format of the source, as
732  * configured with gst_base_src_set_format()
733  *
734  * Returns: %TRUE if preparation of the seamless segment succeeded.
735  *
736  * Since: 0.10.26
737  */
738 gboolean
739 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
740     gint64 position)
741 {
742   gboolean res = TRUE;
743
744   GST_DEBUG_OBJECT (src,
745       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
746       GST_TIME_FORMAT " position %" GST_TIME_FORMAT, GST_TIME_ARGS (start),
747       GST_TIME_ARGS (stop), GST_TIME_ARGS (position));
748
749   GST_OBJECT_LOCK (src);
750   if (src->data.ABI.running && !src->priv->newsegment_pending) {
751     if (src->priv->close_segment)
752       gst_event_unref (src->priv->close_segment);
753     src->priv->close_segment =
754         gst_event_new_new_segment_full (TRUE,
755         src->segment.rate, src->segment.applied_rate, src->segment.format,
756         src->segment.start, src->segment.last_stop, src->segment.time);
757   }
758
759   gst_segment_set_newsegment_full (&src->segment, FALSE, src->segment.rate,
760       src->segment.applied_rate, src->segment.format, start, stop, position);
761
762   if (src->priv->start_segment)
763     gst_event_unref (src->priv->start_segment);
764   if (src->segment.rate >= 0.0) {
765     /* forward, we send data from last_stop to stop */
766     src->priv->start_segment =
767         gst_event_new_new_segment_full (FALSE,
768         src->segment.rate, src->segment.applied_rate, src->segment.format,
769         src->segment.last_stop, stop, src->segment.time);
770   } else {
771     /* reverse, we send data from last_stop to start */
772     src->priv->start_segment =
773         gst_event_new_new_segment_full (FALSE,
774         src->segment.rate, src->segment.applied_rate, src->segment.format,
775         src->segment.start, src->segment.last_stop, src->segment.time);
776   }
777   GST_OBJECT_UNLOCK (src);
778
779   src->priv->discont = TRUE;
780   src->data.ABI.running = TRUE;
781
782   return res;
783 }
784
785 static gboolean
786 gst_base_src_setcaps (GstPad * pad, GstCaps * caps)
787 {
788   GstBaseSrcClass *bclass;
789   GstBaseSrc *bsrc;
790   gboolean res = TRUE;
791
792   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
793   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
794
795   if (bclass->set_caps)
796     res = bclass->set_caps (bsrc, caps);
797
798   return res;
799 }
800
801 static GstCaps *
802 gst_base_src_getcaps (GstPad * pad)
803 {
804   GstBaseSrcClass *bclass;
805   GstBaseSrc *bsrc;
806   GstCaps *caps = NULL;
807
808   bsrc = GST_BASE_SRC (GST_PAD_PARENT (pad));
809   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
810   if (bclass->get_caps)
811     caps = bclass->get_caps (bsrc);
812
813   if (caps == NULL) {
814     GstPadTemplate *pad_template;
815
816     pad_template =
817         gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
818     if (pad_template != NULL) {
819       caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
820     }
821   }
822   return caps;
823 }
824
825 static void
826 gst_base_src_fixate (GstPad * pad, GstCaps * caps)
827 {
828   GstBaseSrcClass *bclass;
829   GstBaseSrc *bsrc;
830
831   bsrc = GST_BASE_SRC (gst_pad_get_parent (pad));
832   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
833
834   if (bclass->fixate)
835     bclass->fixate (bsrc, caps);
836
837   gst_object_unref (bsrc);
838 }
839
840 static gboolean
841 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
842 {
843   gboolean res;
844
845   switch (GST_QUERY_TYPE (query)) {
846     case GST_QUERY_POSITION:
847     {
848       GstFormat format;
849
850       gst_query_parse_position (query, &format, NULL);
851
852       GST_DEBUG_OBJECT (src, "position query in format %s",
853           gst_format_get_name (format));
854
855       switch (format) {
856         case GST_FORMAT_PERCENT:
857         {
858           gint64 percent;
859           gint64 position;
860           gint64 duration;
861
862           GST_OBJECT_LOCK (src);
863           position = src->segment.last_stop;
864           duration = src->segment.duration;
865           GST_OBJECT_UNLOCK (src);
866
867           if (position != -1 && duration != -1) {
868             if (position < duration)
869               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
870                   duration);
871             else
872               percent = GST_FORMAT_PERCENT_MAX;
873           } else
874             percent = -1;
875
876           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
877           res = TRUE;
878           break;
879         }
880         default:
881         {
882           gint64 position;
883           GstFormat seg_format;
884
885           GST_OBJECT_LOCK (src);
886           position = src->segment.last_stop;
887           seg_format = src->segment.format;
888           GST_OBJECT_UNLOCK (src);
889
890           if (position != -1) {
891             /* convert to requested format */
892             res =
893                 gst_pad_query_convert (src->srcpad, seg_format,
894                 position, &format, &position);
895           } else
896             res = TRUE;
897
898           gst_query_set_position (query, format, position);
899           break;
900         }
901       }
902       break;
903     }
904     case GST_QUERY_DURATION:
905     {
906       GstFormat format;
907
908       gst_query_parse_duration (query, &format, NULL);
909
910       GST_DEBUG_OBJECT (src, "duration query in format %s",
911           gst_format_get_name (format));
912
913       switch (format) {
914         case GST_FORMAT_PERCENT:
915           gst_query_set_duration (query, GST_FORMAT_PERCENT,
916               GST_FORMAT_PERCENT_MAX);
917           res = TRUE;
918           break;
919         default:
920         {
921           gint64 duration;
922           GstFormat seg_format;
923
924           GST_OBJECT_LOCK (src);
925           /* this is the duration as configured by the subclass. */
926           duration = src->segment.duration;
927           seg_format = src->segment.format;
928           GST_OBJECT_UNLOCK (src);
929
930           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
931               duration, gst_format_get_name (seg_format));
932
933           if (duration != -1) {
934             /* convert to requested format, if this fails, we have a duration
935              * but we cannot answer the query, we must return FALSE. */
936             res =
937                 gst_pad_query_convert (src->srcpad, seg_format,
938                 duration, &format, &duration);
939           } else {
940             /* The subclass did not configure a duration, we assume that the
941              * media has an unknown duration then and we return TRUE to report
942              * this. Note that this is not the same as returning FALSE, which
943              * means that we cannot report the duration at all. */
944             res = TRUE;
945           }
946           gst_query_set_duration (query, format, duration);
947           break;
948         }
949       }
950       break;
951     }
952
953     case GST_QUERY_SEEKING:
954     {
955       GstFormat format, seg_format;
956       gint64 duration;
957
958       GST_OBJECT_LOCK (src);
959       duration = src->segment.duration;
960       seg_format = src->segment.format;
961       GST_OBJECT_UNLOCK (src);
962
963       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
964       if (format == seg_format) {
965         gst_query_set_seeking (query, seg_format,
966             gst_base_src_seekable (src), 0, duration);
967         res = TRUE;
968       } else {
969         /* FIXME 0.11: return TRUE + seekable=FALSE for SEEKING query here */
970         /* Don't reply to the query to make up for demuxers which don't
971          * handle the SEEKING query yet. Players like Totem will fall back
972          * to the duration when the SEEKING query isn't answered. */
973         res = FALSE;
974       }
975       break;
976     }
977     case GST_QUERY_SEGMENT:
978     {
979       gint64 start, stop;
980
981       GST_OBJECT_LOCK (src);
982       /* no end segment configured, current duration then */
983       if ((stop = src->segment.stop) == -1)
984         stop = src->segment.duration;
985       start = src->segment.start;
986
987       /* adjust to stream time */
988       if (src->segment.time != -1) {
989         start -= src->segment.time;
990         if (stop != -1)
991           stop -= src->segment.time;
992       }
993       gst_query_set_segment (query, src->segment.rate, src->segment.format,
994           start, stop);
995       GST_OBJECT_UNLOCK (src);
996       res = TRUE;
997       break;
998     }
999
1000     case GST_QUERY_FORMATS:
1001     {
1002       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1003           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1004       res = TRUE;
1005       break;
1006     }
1007     case GST_QUERY_CONVERT:
1008     {
1009       GstFormat src_fmt, dest_fmt;
1010       gint64 src_val, dest_val;
1011
1012       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1013
1014       /* we can only convert between equal formats... */
1015       if (src_fmt == dest_fmt) {
1016         dest_val = src_val;
1017         res = TRUE;
1018       } else
1019         res = FALSE;
1020
1021       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1022       break;
1023     }
1024     case GST_QUERY_LATENCY:
1025     {
1026       GstClockTime min, max;
1027       gboolean live;
1028
1029       /* Subclasses should override and implement something usefull */
1030       res = gst_base_src_query_latency (src, &live, &min, &max);
1031
1032       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1033           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1034           GST_TIME_ARGS (max));
1035
1036       gst_query_set_latency (query, live, min, max);
1037       break;
1038     }
1039     case GST_QUERY_JITTER:
1040     case GST_QUERY_RATE:
1041       res = FALSE;
1042       break;
1043     case GST_QUERY_BUFFERING:
1044     {
1045       GstFormat format, seg_format;
1046       gint64 start, stop, estimated;
1047
1048       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1049
1050       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1051           gst_format_get_name (format));
1052
1053       GST_OBJECT_LOCK (src);
1054       if (src->random_access) {
1055         estimated = 0;
1056         start = 0;
1057         if (format == GST_FORMAT_PERCENT)
1058           stop = GST_FORMAT_PERCENT_MAX;
1059         else
1060           stop = src->segment.duration;
1061       } else {
1062         estimated = -1;
1063         start = -1;
1064         stop = -1;
1065       }
1066       seg_format = src->segment.format;
1067       GST_OBJECT_UNLOCK (src);
1068
1069       /* convert to required format. When the conversion fails, we can't answer
1070        * the query. When the value is unknown, we can don't perform conversion
1071        * but report TRUE. */
1072       if (format != GST_FORMAT_PERCENT && stop != -1) {
1073         res = gst_pad_query_convert (src->srcpad, seg_format,
1074             stop, &format, &stop);
1075       } else {
1076         res = TRUE;
1077       }
1078       if (res && format != GST_FORMAT_PERCENT && start != -1)
1079         res = gst_pad_query_convert (src->srcpad, seg_format,
1080             start, &format, &start);
1081
1082       gst_query_set_buffering_range (query, format, start, stop, estimated);
1083       break;
1084     }
1085     default:
1086       res = FALSE;
1087       break;
1088   }
1089   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1090       res);
1091   return res;
1092 }
1093
1094 static gboolean
1095 gst_base_src_query (GstPad * pad, GstQuery * query)
1096 {
1097   GstBaseSrc *src;
1098   GstBaseSrcClass *bclass;
1099   gboolean result = FALSE;
1100
1101   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1102
1103   bclass = GST_BASE_SRC_GET_CLASS (src);
1104
1105   if (bclass->query)
1106     result = bclass->query (src, query);
1107   else
1108     result = gst_pad_query_default (pad, query);
1109
1110   gst_object_unref (src);
1111
1112   return result;
1113 }
1114
1115 static gboolean
1116 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1117 {
1118   gboolean res = TRUE;
1119
1120   /* update our offset if the start/stop position was updated */
1121   if (segment->format == GST_FORMAT_BYTES) {
1122     segment->time = segment->start;
1123   } else if (segment->start == 0) {
1124     /* seek to start, we can implement a default for this. */
1125     segment->time = 0;
1126   } else {
1127     res = FALSE;
1128     GST_INFO_OBJECT (src, "Can't do a default seek");
1129   }
1130
1131   return res;
1132 }
1133
1134 static gboolean
1135 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1136 {
1137   GstBaseSrcClass *bclass;
1138   gboolean result = FALSE;
1139
1140   bclass = GST_BASE_SRC_GET_CLASS (src);
1141
1142   if (bclass->do_seek)
1143     result = bclass->do_seek (src, segment);
1144
1145   return result;
1146 }
1147
1148 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1149
1150 static gboolean
1151 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1152     GstSegment * segment)
1153 {
1154   /* By default, we try one of 2 things:
1155    *   - For absolute seek positions, convert the requested position to our
1156    *     configured processing format and place it in the output segment \
1157    *   - For relative seek positions, convert our current (input) values to the
1158    *     seek format, adjust by the relative seek offset and then convert back to
1159    *     the processing format
1160    */
1161   GstSeekType cur_type, stop_type;
1162   gint64 cur, stop;
1163   GstSeekFlags flags;
1164   GstFormat seek_format, dest_format;
1165   gdouble rate;
1166   gboolean update;
1167   gboolean res = TRUE;
1168
1169   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1170       &cur_type, &cur, &stop_type, &stop);
1171   dest_format = segment->format;
1172
1173   if (seek_format == dest_format) {
1174     gst_segment_set_seek (segment, rate, seek_format, flags,
1175         cur_type, cur, stop_type, stop, &update);
1176     return TRUE;
1177   }
1178
1179   if (cur_type != GST_SEEK_TYPE_NONE) {
1180     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1181     res =
1182         gst_pad_query_convert (src->srcpad, seek_format, cur, &dest_format,
1183         &cur);
1184     cur_type = GST_SEEK_TYPE_SET;
1185   }
1186
1187   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1188     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
1189     res =
1190         gst_pad_query_convert (src->srcpad, seek_format, stop, &dest_format,
1191         &stop);
1192     stop_type = GST_SEEK_TYPE_SET;
1193   }
1194
1195   /* And finally, configure our output segment in the desired format */
1196   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
1197       stop_type, stop, &update);
1198
1199   if (!res)
1200     goto no_format;
1201
1202   return res;
1203
1204 no_format:
1205   {
1206     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1207     return FALSE;
1208   }
1209 }
1210
1211 static gboolean
1212 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1213     GstSegment * seeksegment)
1214 {
1215   GstBaseSrcClass *bclass;
1216   gboolean result = FALSE;
1217
1218   bclass = GST_BASE_SRC_GET_CLASS (src);
1219
1220   if (bclass->prepare_seek_segment)
1221     result = bclass->prepare_seek_segment (src, event, seeksegment);
1222
1223   return result;
1224 }
1225
1226 /* this code implements the seeking. It is a good example
1227  * handling all cases.
1228  *
1229  * A seek updates the currently configured segment.start
1230  * and segment.stop values based on the SEEK_TYPE. If the
1231  * segment.start value is updated, a seek to this new position
1232  * should be performed.
1233  *
1234  * The seek can only be executed when we are not currently
1235  * streaming any data, to make sure that this is the case, we
1236  * acquire the STREAM_LOCK which is taken when we are in the
1237  * _loop() function or when a getrange() is called. Normally
1238  * we will not receive a seek if we are operating in pull mode
1239  * though. When we operate as a live source we might block on the live
1240  * cond, which does not release the STREAM_LOCK. Therefore we will try
1241  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1242  * safe to perform the seek.
1243  *
1244  * When we are in the loop() function, we might be in the middle
1245  * of pushing a buffer, which might block in a sink. To make sure
1246  * that the push gets unblocked we push out a FLUSH_START event.
1247  * Our loop function will get a WRONG_STATE return value from
1248  * the push and will pause, effectively releasing the STREAM_LOCK.
1249  *
1250  * For a non-flushing seek, we pause the task, which might eventually
1251  * release the STREAM_LOCK. We say eventually because when the sink
1252  * blocks on the sample we might wait a very long time until the sink
1253  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1254  * can continue the seek. A non-flushing seek is normally done in a
1255  * running pipeline to perform seamless playback, this means that the sink is
1256  * PLAYING and will return from its chain function.
1257  * In the case of a non-flushing seek we need to make sure that the
1258  * data we output after the seek is continuous with the previous data,
1259  * this is because a non-flushing seek does not reset the running-time
1260  * to 0. We do this by closing the currently running segment, ie. sending
1261  * a new_segment event with the stop position set to the last processed
1262  * position.
1263  *
1264  * After updating the segment.start/stop values, we prepare for
1265  * streaming again. We push out a FLUSH_STOP to make the peer pad
1266  * accept data again and we start our task again.
1267  *
1268  * A segment seek posts a message on the bus saying that the playback
1269  * of the segment started. We store the segment flag internally because
1270  * when we reach the segment.stop we have to post a segment.done
1271  * instead of EOS when doing a segment seek.
1272  */
1273 /* FIXME (0.11), we have the unlock gboolean here because most current
1274  * implementations (fdsrc, -base/gst/tcp/, ...) unconditionally unlock, even when
1275  * the streaming thread isn't running, resulting in bogus unlocks later when it
1276  * starts. This is fixed by adding unlock_stop, but we should still avoid unlocking
1277  * unnecessarily for backwards compatibility. Ergo, the unlock variable stays
1278  * until 0.11
1279  */
1280 static gboolean
1281 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1282 {
1283   gboolean res = TRUE, tres;
1284   gdouble rate;
1285   GstFormat seek_format, dest_format;
1286   GstSeekFlags flags;
1287   GstSeekType cur_type, stop_type;
1288   gint64 cur, stop;
1289   gboolean flush, playing;
1290   gboolean update;
1291   gboolean relative_seek = FALSE;
1292   gboolean seekseg_configured = FALSE;
1293   GstSegment seeksegment;
1294   guint32 seqnum;
1295   GstEvent *tevent;
1296
1297   GST_DEBUG_OBJECT (src, "doing seek");
1298
1299   GST_OBJECT_LOCK (src);
1300   dest_format = src->segment.format;
1301   GST_OBJECT_UNLOCK (src);
1302
1303   if (event) {
1304     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1305         &cur_type, &cur, &stop_type, &stop);
1306
1307     relative_seek = SEEK_TYPE_IS_RELATIVE (cur_type) ||
1308         SEEK_TYPE_IS_RELATIVE (stop_type);
1309
1310     if (dest_format != seek_format && !relative_seek) {
1311       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1312        * here before taking the stream lock, otherwise we must convert it later,
1313        * once we have the stream lock and can read the last configures segment
1314        * start and stop positions */
1315       gst_segment_init (&seeksegment, dest_format);
1316
1317       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1318         goto prepare_failed;
1319
1320       seekseg_configured = TRUE;
1321     }
1322
1323     flush = flags & GST_SEEK_FLAG_FLUSH;
1324     seqnum = gst_event_get_seqnum (event);
1325   } else {
1326     flush = FALSE;
1327     /* get next seqnum */
1328     seqnum = gst_util_seqnum_next ();
1329   }
1330
1331   /* send flush start */
1332   if (flush) {
1333     tevent = gst_event_new_flush_start ();
1334     gst_event_set_seqnum (tevent, seqnum);
1335     gst_pad_push_event (src->srcpad, tevent);
1336   } else
1337     gst_pad_pause_task (src->srcpad);
1338
1339   /* unblock streaming thread. */
1340   gst_base_src_set_flushing (src, TRUE, FALSE, unlock, &playing);
1341
1342   /* grab streaming lock, this should eventually be possible, either
1343    * because the task is paused, our streaming thread stopped
1344    * or because our peer is flushing. */
1345   GST_PAD_STREAM_LOCK (src->srcpad);
1346   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1347     /* we have seen this event before, issue a warning for now */
1348     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1349         seqnum);
1350   } else {
1351     src->priv->seqnum = seqnum;
1352     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1353   }
1354
1355   gst_base_src_set_flushing (src, FALSE, playing, unlock, NULL);
1356
1357   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1358    * copy the current segment info into the temp segment that we can actually
1359    * attempt the seek with. We only update the real segment if the seek suceeds. */
1360   if (!seekseg_configured) {
1361     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1362
1363     /* now configure the final seek segment */
1364     if (event) {
1365       if (seeksegment.format != seek_format) {
1366         /* OK, here's where we give the subclass a chance to convert the relative
1367          * seek into an absolute one in the processing format. We set up any
1368          * absolute seek above, before taking the stream lock. */
1369         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1370           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1371               "Aborting seek");
1372           res = FALSE;
1373         }
1374       } else {
1375         /* The seek format matches our processing format, no need to ask the
1376          * the subclass to configure the segment. */
1377         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
1378             cur_type, cur, stop_type, stop, &update);
1379       }
1380     }
1381     /* Else, no seek event passed, so we're just (re)starting the
1382        current segment. */
1383   }
1384
1385   if (res) {
1386     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1387         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1388         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
1389
1390     /* do the seek, segment.last_stop contains the new position. */
1391     res = gst_base_src_do_seek (src, &seeksegment);
1392   }
1393
1394   /* and prepare to continue streaming */
1395   if (flush) {
1396     tevent = gst_event_new_flush_stop ();
1397     gst_event_set_seqnum (tevent, seqnum);
1398     /* send flush stop, peer will accept data and events again. We
1399      * are not yet providing data as we still have the STREAM_LOCK. */
1400     gst_pad_push_event (src->srcpad, tevent);
1401   } else if (res && src->data.ABI.running) {
1402     /* we are running the current segment and doing a non-flushing seek,
1403      * close the segment first based on the last_stop. */
1404     GST_DEBUG_OBJECT (src, "closing running segment %" G_GINT64_FORMAT
1405         " to %" G_GINT64_FORMAT, src->segment.start, src->segment.last_stop);
1406
1407     /* queue the segment for sending in the stream thread */
1408     if (src->priv->close_segment)
1409       gst_event_unref (src->priv->close_segment);
1410     src->priv->close_segment =
1411         gst_event_new_new_segment_full (TRUE,
1412         src->segment.rate, src->segment.applied_rate, src->segment.format,
1413         src->segment.start, src->segment.last_stop, src->segment.time);
1414     gst_event_set_seqnum (src->priv->close_segment, seqnum);
1415   }
1416
1417   /* The subclass must have converted the segment to the processing format
1418    * by now */
1419   if (res && seeksegment.format != dest_format) {
1420     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1421         "in the correct format. Aborting seek.");
1422     res = FALSE;
1423   }
1424
1425   /* if the seek was successful, we update our real segment and push
1426    * out the new segment. */
1427   if (res) {
1428     GST_OBJECT_LOCK (src);
1429     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1430     GST_OBJECT_UNLOCK (src);
1431
1432     if (seeksegment.flags & GST_SEEK_FLAG_SEGMENT) {
1433       GstMessage *message;
1434
1435       message = gst_message_new_segment_start (GST_OBJECT (src),
1436           seeksegment.format, seeksegment.last_stop);
1437       gst_message_set_seqnum (message, seqnum);
1438
1439       gst_element_post_message (GST_ELEMENT (src), message);
1440     }
1441
1442     /* for deriving a stop position for the playback segment from the seek
1443      * segment, we must take the duration when the stop is not set */
1444     if ((stop = seeksegment.stop) == -1)
1445       stop = seeksegment.duration;
1446
1447     GST_DEBUG_OBJECT (src, "Sending newsegment from %" G_GINT64_FORMAT
1448         " to %" G_GINT64_FORMAT, seeksegment.start, stop);
1449
1450     /* now replace the old segment so that we send it in the stream thread the
1451      * next time it is scheduled. */
1452     if (src->priv->start_segment)
1453       gst_event_unref (src->priv->start_segment);
1454     if (seeksegment.rate >= 0.0) {
1455       /* forward, we send data from last_stop to stop */
1456       src->priv->start_segment =
1457           gst_event_new_new_segment_full (FALSE,
1458           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1459           seeksegment.last_stop, stop, seeksegment.time);
1460     } else {
1461       /* reverse, we send data from last_stop to start */
1462       src->priv->start_segment =
1463           gst_event_new_new_segment_full (FALSE,
1464           seeksegment.rate, seeksegment.applied_rate, seeksegment.format,
1465           seeksegment.start, seeksegment.last_stop, seeksegment.time);
1466     }
1467     gst_event_set_seqnum (src->priv->start_segment, seqnum);
1468     src->priv->newsegment_pending = TRUE;
1469   }
1470
1471   src->priv->discont = TRUE;
1472   src->data.ABI.running = TRUE;
1473   /* and restart the task in case it got paused explicitly or by
1474    * the FLUSH_START event we pushed out. */
1475   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1476       src->srcpad);
1477   if (res && !tres)
1478     res = FALSE;
1479
1480   /* and release the lock again so we can continue streaming */
1481   GST_PAD_STREAM_UNLOCK (src->srcpad);
1482
1483   return res;
1484
1485   /* ERROR */
1486 prepare_failed:
1487   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1488       "Aborting seek");
1489   return FALSE;
1490 }
1491
1492 static const GstQueryType *
1493 gst_base_src_get_query_types (GstElement * element)
1494 {
1495   static const GstQueryType query_types[] = {
1496     GST_QUERY_DURATION,
1497     GST_QUERY_POSITION,
1498     GST_QUERY_SEEKING,
1499     GST_QUERY_SEGMENT,
1500     GST_QUERY_FORMATS,
1501     GST_QUERY_LATENCY,
1502     GST_QUERY_JITTER,
1503     GST_QUERY_RATE,
1504     GST_QUERY_CONVERT,
1505     0
1506   };
1507
1508   return query_types;
1509 }
1510
1511 /* all events send to this element directly. This is mainly done from the
1512  * application.
1513  */
1514 static gboolean
1515 gst_base_src_send_event (GstElement * element, GstEvent * event)
1516 {
1517   GstBaseSrc *src;
1518   gboolean result = FALSE;
1519
1520   src = GST_BASE_SRC (element);
1521
1522   GST_DEBUG_OBJECT (src, "reveived %s event", GST_EVENT_TYPE_NAME (event));
1523
1524   switch (GST_EVENT_TYPE (event)) {
1525       /* bidirectional events */
1526     case GST_EVENT_FLUSH_START:
1527     case GST_EVENT_FLUSH_STOP:
1528       /* sending random flushes downstream can break stuff,
1529        * especially sync since all segment info will get flushed */
1530       break;
1531
1532       /* downstream serialized events */
1533     case GST_EVENT_EOS:
1534     {
1535       GstBaseSrcClass *bclass;
1536
1537       bclass = GST_BASE_SRC_GET_CLASS (src);
1538
1539       /* queue EOS and make sure the task or pull function performs the EOS
1540        * actions.
1541        *
1542        * We have two possibilities:
1543        *
1544        *  - Before we are to enter the _create function, we check the pending_eos
1545        *    first and do EOS instead of entering it.
1546        *  - If we are in the _create function or we did not manage to set the
1547        *    flag fast enough and we are about to enter the _create function,
1548        *    we unlock it so that we exit with WRONG_STATE immediatly. We then
1549        *    check the EOS flag and do the EOS logic.
1550        */
1551       g_atomic_int_set (&src->priv->pending_eos, TRUE);
1552       GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
1553
1554       /* unlock the _create function so that we can check the pending_eos flag
1555        * and we can do EOS. This will eventually release the LIVE_LOCK again so
1556        * that we can grab it and stop the unlock again. We don't take the stream
1557        * lock so that this operation is guaranteed to never block. */
1558       if (bclass->unlock)
1559         bclass->unlock (src);
1560
1561       GST_DEBUG_OBJECT (src, "unlock called, waiting for LIVE_LOCK");
1562
1563       GST_LIVE_LOCK (src);
1564       GST_DEBUG_OBJECT (src, "LIVE_LOCK acquired, calling unlock_stop");
1565       /* now stop the unlock of the streaming thread again. Grabbing the live
1566        * lock is enough because that protects the create function. */
1567       if (bclass->unlock_stop)
1568         bclass->unlock_stop (src);
1569       GST_LIVE_UNLOCK (src);
1570
1571       result = TRUE;
1572       break;
1573     }
1574     case GST_EVENT_NEWSEGMENT:
1575       /* sending random NEWSEGMENT downstream can break sync. */
1576       break;
1577     case GST_EVENT_TAG:
1578       /* Insert tag in the dataflow */
1579       GST_OBJECT_LOCK (src);
1580       src->priv->pending_tags = g_list_append (src->priv->pending_tags, event);
1581       GST_OBJECT_UNLOCK (src);
1582       event = NULL;
1583       result = TRUE;
1584       break;
1585     case GST_EVENT_BUFFERSIZE:
1586       /* does not seem to make much sense currently */
1587       break;
1588
1589       /* upstream events */
1590     case GST_EVENT_QOS:
1591       /* elements should override send_event and do something */
1592       break;
1593     case GST_EVENT_SEEK:
1594     {
1595       gboolean started;
1596
1597       GST_OBJECT_LOCK (src->srcpad);
1598       if (GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PULL)
1599         goto wrong_mode;
1600       started = GST_PAD_ACTIVATE_MODE (src->srcpad) == GST_ACTIVATE_PUSH;
1601       GST_OBJECT_UNLOCK (src->srcpad);
1602
1603       if (started) {
1604         GST_DEBUG_OBJECT (src, "performing seek");
1605         /* when we are running in push mode, we can execute the
1606          * seek right now, we need to unlock. */
1607         result = gst_base_src_perform_seek (src, event, TRUE);
1608       } else {
1609         GstEvent **event_p;
1610
1611         /* else we store the event and execute the seek when we
1612          * get activated */
1613         GST_OBJECT_LOCK (src);
1614         GST_DEBUG_OBJECT (src, "queueing seek");
1615         event_p = &src->data.ABI.pending_seek;
1616         gst_event_replace ((GstEvent **) event_p, event);
1617         GST_OBJECT_UNLOCK (src);
1618         /* assume the seek will work */
1619         result = TRUE;
1620       }
1621       break;
1622     }
1623     case GST_EVENT_NAVIGATION:
1624       /* could make sense for elements that do something with navigation events
1625        * but then they would need to override the send_event function */
1626       break;
1627     case GST_EVENT_LATENCY:
1628       /* does not seem to make sense currently */
1629       break;
1630
1631       /* custom events */
1632     case GST_EVENT_CUSTOM_UPSTREAM:
1633       /* override send_event if you want this */
1634       break;
1635     case GST_EVENT_CUSTOM_DOWNSTREAM:
1636     case GST_EVENT_CUSTOM_BOTH:
1637       /* FIXME, insert event in the dataflow */
1638       break;
1639     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
1640     case GST_EVENT_CUSTOM_BOTH_OOB:
1641       /* insert a random custom event into the pipeline */
1642       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
1643       result = gst_pad_push_event (src->srcpad, event);
1644       /* we gave away the ref to the event in the push */
1645       event = NULL;
1646       break;
1647     default:
1648       break;
1649   }
1650 done:
1651   /* if we still have a ref to the event, unref it now */
1652   if (event)
1653     gst_event_unref (event);
1654
1655   return result;
1656
1657   /* ERRORS */
1658 wrong_mode:
1659   {
1660     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
1661     GST_OBJECT_UNLOCK (src->srcpad);
1662     result = FALSE;
1663     goto done;
1664   }
1665 }
1666
1667 static gboolean
1668 gst_base_src_seekable (GstBaseSrc * src)
1669 {
1670   GstBaseSrcClass *bclass;
1671   bclass = GST_BASE_SRC_GET_CLASS (src);
1672   if (bclass->is_seekable)
1673     return bclass->is_seekable (src);
1674   else
1675     return FALSE;
1676 }
1677
1678 static void
1679 gst_base_src_update_qos (GstBaseSrc * src,
1680     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
1681 {
1682   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
1683       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
1684       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
1685
1686   GST_OBJECT_LOCK (src);
1687   src->priv->proportion = proportion;
1688   src->priv->earliest_time = timestamp + diff;
1689   GST_OBJECT_UNLOCK (src);
1690 }
1691
1692
1693 static gboolean
1694 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
1695 {
1696   gboolean result;
1697
1698   switch (GST_EVENT_TYPE (event)) {
1699     case GST_EVENT_SEEK:
1700       /* is normally called when in push mode */
1701       if (!gst_base_src_seekable (src))
1702         goto not_seekable;
1703
1704       result = gst_base_src_perform_seek (src, event, TRUE);
1705       break;
1706     case GST_EVENT_FLUSH_START:
1707       /* cancel any blocking getrange, is normally called
1708        * when in pull mode. */
1709       result = gst_base_src_set_flushing (src, TRUE, FALSE, TRUE, NULL);
1710       break;
1711     case GST_EVENT_FLUSH_STOP:
1712       result = gst_base_src_set_flushing (src, FALSE, TRUE, TRUE, NULL);
1713       break;
1714     case GST_EVENT_QOS:
1715     {
1716       gdouble proportion;
1717       GstClockTimeDiff diff;
1718       GstClockTime timestamp;
1719
1720       gst_event_parse_qos (event, &proportion, &diff, &timestamp);
1721       gst_base_src_update_qos (src, proportion, diff, timestamp);
1722       result = TRUE;
1723       break;
1724     }
1725     default:
1726       result = TRUE;
1727       break;
1728   }
1729   return result;
1730
1731   /* ERRORS */
1732 not_seekable:
1733   {
1734     GST_DEBUG_OBJECT (src, "is not seekable");
1735     return FALSE;
1736   }
1737 }
1738
1739 static gboolean
1740 gst_base_src_event_handler (GstPad * pad, GstEvent * event)
1741 {
1742   GstBaseSrc *src;
1743   GstBaseSrcClass *bclass;
1744   gboolean result = FALSE;
1745
1746   src = GST_BASE_SRC (gst_pad_get_parent (pad));
1747   bclass = GST_BASE_SRC_GET_CLASS (src);
1748
1749   if (bclass->event) {
1750     if (!(result = bclass->event (src, event)))
1751       goto subclass_failed;
1752   }
1753
1754 done:
1755   gst_event_unref (event);
1756   gst_object_unref (src);
1757
1758   return result;
1759
1760   /* ERRORS */
1761 subclass_failed:
1762   {
1763     GST_DEBUG_OBJECT (src, "subclass refused event");
1764     goto done;
1765   }
1766 }
1767
1768 static void
1769 gst_base_src_set_property (GObject * object, guint prop_id,
1770     const GValue * value, GParamSpec * pspec)
1771 {
1772   GstBaseSrc *src;
1773
1774   src = GST_BASE_SRC (object);
1775
1776   switch (prop_id) {
1777     case PROP_BLOCKSIZE:
1778       gst_base_src_set_blocksize (src, g_value_get_ulong (value));
1779       break;
1780     case PROP_NUM_BUFFERS:
1781       src->num_buffers = g_value_get_int (value);
1782       break;
1783     case PROP_TYPEFIND:
1784       src->data.ABI.typefind = g_value_get_boolean (value);
1785       break;
1786     case PROP_DO_TIMESTAMP:
1787       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
1788       break;
1789     default:
1790       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1791       break;
1792   }
1793 }
1794
1795 static void
1796 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
1797     GParamSpec * pspec)
1798 {
1799   GstBaseSrc *src;
1800
1801   src = GST_BASE_SRC (object);
1802
1803   switch (prop_id) {
1804     case PROP_BLOCKSIZE:
1805       g_value_set_ulong (value, gst_base_src_get_blocksize (src));
1806       break;
1807     case PROP_NUM_BUFFERS:
1808       g_value_set_int (value, src->num_buffers);
1809       break;
1810     case PROP_TYPEFIND:
1811       g_value_set_boolean (value, src->data.ABI.typefind);
1812       break;
1813     case PROP_DO_TIMESTAMP:
1814       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
1815       break;
1816     default:
1817       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1818       break;
1819   }
1820 }
1821
1822 /* with STREAM_LOCK and LOCK */
1823 static GstClockReturn
1824 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
1825 {
1826   GstClockReturn ret;
1827   GstClockID id;
1828
1829   id = gst_clock_new_single_shot_id (clock, time);
1830
1831   basesrc->clock_id = id;
1832   /* release the live lock while waiting */
1833   GST_LIVE_UNLOCK (basesrc);
1834
1835   ret = gst_clock_id_wait (id, NULL);
1836
1837   GST_LIVE_LOCK (basesrc);
1838   gst_clock_id_unref (id);
1839   basesrc->clock_id = NULL;
1840
1841   return ret;
1842 }
1843
1844 /* perform synchronisation on a buffer.
1845  * with STREAM_LOCK.
1846  */
1847 static GstClockReturn
1848 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
1849 {
1850   GstClockReturn result;
1851   GstClockTime start, end;
1852   GstBaseSrcClass *bclass;
1853   GstClockTime base_time;
1854   GstClock *clock;
1855   GstClockTime now = GST_CLOCK_TIME_NONE, timestamp;
1856   gboolean do_timestamp, first, pseudo_live;
1857
1858   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
1859
1860   start = end = -1;
1861   if (bclass->get_times)
1862     bclass->get_times (basesrc, buffer, &start, &end);
1863
1864   /* get buffer timestamp */
1865   timestamp = GST_BUFFER_TIMESTAMP (buffer);
1866
1867   /* grab the lock to prepare for clocking and calculate the startup
1868    * latency. */
1869   GST_OBJECT_LOCK (basesrc);
1870
1871   /* if we are asked to sync against the clock we are a pseudo live element */
1872   pseudo_live = (start != -1 && basesrc->is_live);
1873   /* check for the first buffer */
1874   first = (basesrc->priv->latency == -1);
1875
1876   if (timestamp != -1 && pseudo_live) {
1877     GstClockTime latency;
1878
1879     /* we have a timestamp and a sync time, latency is the diff */
1880     if (timestamp <= start)
1881       latency = start - timestamp;
1882     else
1883       latency = 0;
1884
1885     if (first) {
1886       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
1887           GST_TIME_ARGS (latency));
1888       /* first time we calculate latency, just configure */
1889       basesrc->priv->latency = latency;
1890     } else {
1891       if (basesrc->priv->latency != latency) {
1892         /* we have a new latency, FIXME post latency message */
1893         basesrc->priv->latency = latency;
1894         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
1895             GST_TIME_ARGS (latency));
1896       }
1897     }
1898   } else if (first) {
1899     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
1900         basesrc->is_live, start != -1);
1901     basesrc->priv->latency = 0;
1902   }
1903
1904   /* get clock, if no clock, we can't sync or do timestamps */
1905   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
1906     goto no_clock;
1907
1908   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
1909
1910   do_timestamp = basesrc->priv->do_timestamp;
1911
1912   /* first buffer, calculate the timestamp offset */
1913   if (first) {
1914     GstClockTime running_time;
1915
1916     now = gst_clock_get_time (clock);
1917     running_time = now - base_time;
1918
1919     GST_LOG_OBJECT (basesrc,
1920         "startup timestamp: %" GST_TIME_FORMAT ", running_time %"
1921         GST_TIME_FORMAT, GST_TIME_ARGS (timestamp),
1922         GST_TIME_ARGS (running_time));
1923
1924     if (pseudo_live && timestamp != -1) {
1925       /* live source and we need to sync, add startup latency to all timestamps
1926        * to get the real running_time. Live sources should always timestamp
1927        * according to the current running time. */
1928       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
1929
1930       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
1931           GST_TIME_ARGS (basesrc->priv->ts_offset));
1932     } else {
1933       basesrc->priv->ts_offset = 0;
1934       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
1935     }
1936
1937     if (!GST_CLOCK_TIME_IS_VALID (timestamp)) {
1938       if (do_timestamp)
1939         timestamp = running_time;
1940       else
1941         timestamp = 0;
1942
1943       GST_BUFFER_TIMESTAMP (buffer) = timestamp;
1944
1945       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1946           GST_TIME_ARGS (timestamp));
1947     }
1948
1949     /* add the timestamp offset we need for sync */
1950     timestamp += basesrc->priv->ts_offset;
1951   } else {
1952     /* not the first buffer, the timestamp is the diff between the clock and
1953      * base_time */
1954     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (timestamp)) {
1955       now = gst_clock_get_time (clock);
1956
1957       GST_BUFFER_TIMESTAMP (buffer) = now - base_time;
1958
1959       GST_LOG_OBJECT (basesrc, "created timestamp: %" GST_TIME_FORMAT,
1960           GST_TIME_ARGS (now - base_time));
1961     }
1962   }
1963
1964   /* if we don't have a buffer timestamp, we don't sync */
1965   if (!GST_CLOCK_TIME_IS_VALID (start))
1966     goto no_sync;
1967
1968   if (basesrc->is_live && GST_CLOCK_TIME_IS_VALID (timestamp)) {
1969     /* for pseudo live sources, add our ts_offset to the timestamp */
1970     GST_BUFFER_TIMESTAMP (buffer) += basesrc->priv->ts_offset;
1971     start += basesrc->priv->ts_offset;
1972   }
1973
1974   GST_LOG_OBJECT (basesrc,
1975       "waiting for clock, base time %" GST_TIME_FORMAT
1976       ", stream_start %" GST_TIME_FORMAT,
1977       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
1978   GST_OBJECT_UNLOCK (basesrc);
1979
1980   result = gst_base_src_wait (basesrc, clock, start + base_time);
1981
1982   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
1983
1984   return result;
1985
1986   /* special cases */
1987 no_clock:
1988   {
1989     GST_DEBUG_OBJECT (basesrc, "we have no clock");
1990     GST_OBJECT_UNLOCK (basesrc);
1991     return GST_CLOCK_OK;
1992   }
1993 no_sync:
1994   {
1995     GST_DEBUG_OBJECT (basesrc, "no sync needed");
1996     GST_OBJECT_UNLOCK (basesrc);
1997     return GST_CLOCK_OK;
1998   }
1999 }
2000
2001 /* Called with STREAM_LOCK and LIVE_LOCK */
2002 static gboolean
2003 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length)
2004 {
2005   guint64 size, maxsize;
2006   GstBaseSrcClass *bclass;
2007   GstFormat format;
2008   gint64 stop;
2009
2010   bclass = GST_BASE_SRC_GET_CLASS (src);
2011
2012   format = src->segment.format;
2013   stop = src->segment.stop;
2014   /* get total file size */
2015   size = (guint64) src->segment.duration;
2016
2017   /* only operate if we are working with bytes */
2018   if (format != GST_FORMAT_BYTES)
2019     return TRUE;
2020
2021   /* the max amount of bytes to read is the total size or
2022    * up to the segment.stop if present. */
2023   if (stop != -1)
2024     maxsize = MIN (size, stop);
2025   else
2026     maxsize = size;
2027
2028   GST_DEBUG_OBJECT (src,
2029       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2030       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2031       *length, size, stop, maxsize);
2032
2033   /* check size if we have one */
2034   if (maxsize != -1) {
2035     /* if we run past the end, check if the file became bigger and
2036      * retry. */
2037     if (G_UNLIKELY (offset + *length >= maxsize)) {
2038       /* see if length of the file changed */
2039       if (bclass->get_size)
2040         if (!bclass->get_size (src, &size))
2041           size = -1;
2042
2043       /* make sure we don't exceed the configured segment stop
2044        * if it was set */
2045       if (stop != -1)
2046         maxsize = MIN (size, stop);
2047       else
2048         maxsize = size;
2049
2050       /* if we are at or past the end, EOS */
2051       if (G_UNLIKELY (offset >= maxsize))
2052         goto unexpected_length;
2053
2054       /* else we can clip to the end */
2055       if (G_UNLIKELY (offset + *length >= maxsize))
2056         *length = maxsize - offset;
2057
2058     }
2059   }
2060
2061   /* keep track of current position and update duration.
2062    * segment is in bytes, we checked that above. */
2063   GST_OBJECT_LOCK (src);
2064   gst_segment_set_duration (&src->segment, GST_FORMAT_BYTES, size);
2065   gst_segment_set_last_stop (&src->segment, GST_FORMAT_BYTES, offset);
2066   GST_OBJECT_UNLOCK (src);
2067
2068   return TRUE;
2069
2070   /* ERRORS */
2071 unexpected_length:
2072   {
2073     return FALSE;
2074   }
2075 }
2076
2077 /* must be called with LIVE_LOCK */
2078 static GstFlowReturn
2079 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2080     GstBuffer ** buf)
2081 {
2082   GstFlowReturn ret;
2083   GstBaseSrcClass *bclass;
2084   GstClockReturn status;
2085
2086   bclass = GST_BASE_SRC_GET_CLASS (src);
2087
2088 again:
2089   if (src->is_live) {
2090     while (G_UNLIKELY (!src->live_running)) {
2091       ret = gst_base_src_wait_playing (src);
2092       if (ret != GST_FLOW_OK)
2093         goto stopped;
2094     }
2095   }
2096
2097   if (G_UNLIKELY (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)))
2098     goto not_started;
2099
2100   if (G_UNLIKELY (!bclass->create))
2101     goto no_function;
2102
2103   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length)))
2104     goto unexpected_length;
2105
2106   /* normally we don't count buffers */
2107   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2108     if (src->num_buffers_left == 0)
2109       goto reached_num_buffers;
2110     else
2111       src->num_buffers_left--;
2112   }
2113
2114   /* don't enter the create function if a pending EOS event was set. For the
2115    * logic of the pending_eos, check the event function of this class. */
2116   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos)))
2117     goto eos;
2118
2119   GST_DEBUG_OBJECT (src,
2120       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2121       G_GINT64_FORMAT, offset, length, src->segment.time);
2122
2123   ret = bclass->create (src, offset, length, buf);
2124
2125   /* The create function could be unlocked because we have a pending EOS. It's
2126    * possible that we have a valid buffer from create that we need to
2127    * discard when the create function returned _OK. */
2128   if (G_UNLIKELY (g_atomic_int_get (&src->priv->pending_eos))) {
2129     if (ret == GST_FLOW_OK) {
2130       gst_buffer_unref (*buf);
2131       *buf = NULL;
2132     }
2133     goto eos;
2134   }
2135
2136   if (G_UNLIKELY (ret != GST_FLOW_OK))
2137     goto not_ok;
2138
2139   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2140   if (offset == 0 && src->segment.time == 0
2141       && GST_BUFFER_TIMESTAMP (*buf) == -1)
2142     GST_BUFFER_TIMESTAMP (*buf) = 0;
2143
2144   /* set pad caps on the buffer if the buffer had no caps */
2145   if (GST_BUFFER_CAPS (*buf) == NULL)
2146     gst_buffer_set_caps (*buf, GST_PAD_CAPS (src->srcpad));
2147
2148   /* now sync before pushing the buffer */
2149   status = gst_base_src_do_sync (src, *buf);
2150
2151   /* waiting for the clock could have made us flushing */
2152   if (G_UNLIKELY (src->priv->flushing))
2153     goto flushing;
2154
2155   switch (status) {
2156     case GST_CLOCK_EARLY:
2157       /* the buffer is too late. We currently don't drop the buffer. */
2158       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2159       break;
2160     case GST_CLOCK_OK:
2161       /* buffer synchronised properly */
2162       GST_DEBUG_OBJECT (src, "buffer ok");
2163       break;
2164     case GST_CLOCK_UNSCHEDULED:
2165       /* this case is triggered when we were waiting for the clock and
2166        * it got unlocked because we did a state change. In any case, get rid of
2167        * the buffer. */
2168       gst_buffer_unref (*buf);
2169       *buf = NULL;
2170       if (!src->live_running) {
2171         /* We return WRONG_STATE when we are not running to stop the dataflow also
2172          * get rid of the produced buffer. */
2173         GST_DEBUG_OBJECT (src,
2174             "clock was unscheduled (%d), returning WRONG_STATE", status);
2175         ret = GST_FLOW_WRONG_STATE;
2176       } else {
2177         /* If we are running when this happens, we quickly switched between
2178          * pause and playing. We try to produce a new buffer */
2179         GST_DEBUG_OBJECT (src,
2180             "clock was unscheduled (%d), but we are running", status);
2181         goto again;
2182       }
2183       break;
2184     default:
2185       /* all other result values are unexpected and errors */
2186       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2187           (_("Internal clock error.")),
2188           ("clock returned unexpected return value %d", status));
2189       gst_buffer_unref (*buf);
2190       *buf = NULL;
2191       ret = GST_FLOW_ERROR;
2192       break;
2193   }
2194   return ret;
2195
2196   /* ERROR */
2197 stopped:
2198   {
2199     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2200         gst_flow_get_name (ret));
2201     return ret;
2202   }
2203 not_ok:
2204   {
2205     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2206         gst_flow_get_name (ret));
2207     return ret;
2208   }
2209 not_started:
2210   {
2211     GST_DEBUG_OBJECT (src, "getrange but not started");
2212     return GST_FLOW_WRONG_STATE;
2213   }
2214 no_function:
2215   {
2216     GST_DEBUG_OBJECT (src, "no create function");
2217     return GST_FLOW_ERROR;
2218   }
2219 unexpected_length:
2220   {
2221     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2222         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2223     return GST_FLOW_UNEXPECTED;
2224   }
2225 reached_num_buffers:
2226   {
2227     GST_DEBUG_OBJECT (src, "sent all buffers");
2228     return GST_FLOW_UNEXPECTED;
2229   }
2230 flushing:
2231   {
2232     GST_DEBUG_OBJECT (src, "we are flushing");
2233     gst_buffer_unref (*buf);
2234     *buf = NULL;
2235     return GST_FLOW_WRONG_STATE;
2236   }
2237 eos:
2238   {
2239     GST_DEBUG_OBJECT (src, "we are EOS");
2240     return GST_FLOW_UNEXPECTED;
2241   }
2242 }
2243
2244 static GstFlowReturn
2245 gst_base_src_pad_get_range (GstPad * pad, guint64 offset, guint length,
2246     GstBuffer ** buf)
2247 {
2248   GstBaseSrc *src;
2249   GstFlowReturn res;
2250
2251   src = GST_BASE_SRC_CAST (gst_object_ref (GST_OBJECT_PARENT (pad)));
2252
2253   GST_LIVE_LOCK (src);
2254   if (G_UNLIKELY (src->priv->flushing))
2255     goto flushing;
2256
2257   res = gst_base_src_get_range (src, offset, length, buf);
2258
2259 done:
2260   GST_LIVE_UNLOCK (src);
2261
2262   gst_object_unref (src);
2263
2264   return res;
2265
2266   /* ERRORS */
2267 flushing:
2268   {
2269     GST_DEBUG_OBJECT (src, "we are flushing");
2270     res = GST_FLOW_WRONG_STATE;
2271     goto done;
2272   }
2273 }
2274
2275 static gboolean
2276 gst_base_src_default_check_get_range (GstBaseSrc * src)
2277 {
2278   gboolean res;
2279
2280   if (!GST_OBJECT_FLAG_IS_SET (src, GST_BASE_SRC_STARTED)) {
2281     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2282     if (G_LIKELY (gst_base_src_start (src)))
2283       gst_base_src_stop (src);
2284   }
2285
2286   /* we can operate in getrange mode if the native format is bytes
2287    * and we are seekable, this condition is set in the random_access
2288    * flag and is set in the _start() method. */
2289   res = src->random_access;
2290
2291   return res;
2292 }
2293
2294 static gboolean
2295 gst_base_src_check_get_range (GstBaseSrc * src)
2296 {
2297   GstBaseSrcClass *bclass;
2298   gboolean res;
2299
2300   bclass = GST_BASE_SRC_GET_CLASS (src);
2301
2302   if (bclass->check_get_range == NULL)
2303     goto no_function;
2304
2305   res = bclass->check_get_range (src);
2306   GST_LOG_OBJECT (src, "%s() returned %d",
2307       GST_DEBUG_FUNCPTR_NAME (bclass->check_get_range), (gint) res);
2308
2309   return res;
2310
2311   /* ERRORS */
2312 no_function:
2313   {
2314     GST_WARNING_OBJECT (src, "no check_get_range function set");
2315     return FALSE;
2316   }
2317 }
2318
2319 static gboolean
2320 gst_base_src_pad_check_get_range (GstPad * pad)
2321 {
2322   GstBaseSrc *src;
2323   gboolean res;
2324
2325   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2326
2327   res = gst_base_src_check_get_range (src);
2328
2329   return res;
2330 }
2331
2332 static void
2333 gst_base_src_loop (GstPad * pad)
2334 {
2335   GstBaseSrc *src;
2336   GstBuffer *buf = NULL;
2337   GstFlowReturn ret;
2338   gint64 position;
2339   gboolean eos;
2340   gulong blocksize;
2341   GList *tags, *tmp;
2342
2343   eos = FALSE;
2344
2345   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2346
2347   GST_LIVE_LOCK (src);
2348
2349   if (G_UNLIKELY (src->priv->flushing))
2350     goto flushing;
2351
2352   src->priv->last_sent_eos = FALSE;
2353
2354   blocksize = src->blocksize;
2355
2356   /* if we operate in bytes, we can calculate an offset */
2357   if (src->segment.format == GST_FORMAT_BYTES) {
2358     position = src->segment.last_stop;
2359     /* for negative rates, start with subtracting the blocksize */
2360     if (src->segment.rate < 0.0) {
2361       /* we cannot go below segment.start */
2362       if (position > src->segment.start + blocksize)
2363         position -= blocksize;
2364       else {
2365         /* last block, remainder up to segment.start */
2366         blocksize = position - src->segment.start;
2367         position = src->segment.start;
2368       }
2369     }
2370   } else
2371     position = -1;
2372
2373   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %lu",
2374       GST_TIME_ARGS (position), blocksize);
2375
2376   ret = gst_base_src_get_range (src, position, blocksize, &buf);
2377   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2378     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
2379         gst_flow_get_name (ret));
2380     GST_LIVE_UNLOCK (src);
2381     goto pause;
2382   }
2383   /* this should not happen */
2384   if (G_UNLIKELY (buf == NULL))
2385     goto null_buffer;
2386
2387   /* push events to close/start our segment before we push the buffer. */
2388   if (G_UNLIKELY (src->priv->close_segment)) {
2389     gst_pad_push_event (pad, src->priv->close_segment);
2390     src->priv->close_segment = NULL;
2391   }
2392   if (G_UNLIKELY (src->priv->start_segment)) {
2393     gst_pad_push_event (pad, src->priv->start_segment);
2394     src->priv->start_segment = NULL;
2395   }
2396   src->priv->newsegment_pending = FALSE;
2397
2398   GST_OBJECT_LOCK (src);
2399   /* take the tags */
2400   tags = src->priv->pending_tags;
2401   src->priv->pending_tags = NULL;
2402   GST_OBJECT_UNLOCK (src);
2403
2404   /* Push out pending tags if any */
2405   if (G_UNLIKELY (tags != NULL)) {
2406     for (tmp = tags; tmp; tmp = g_list_next (tmp)) {
2407       GstEvent *ev = (GstEvent *) tmp->data;
2408       gst_pad_push_event (pad, ev);
2409     }
2410     g_list_free (tags);
2411   }
2412
2413   /* figure out the new position */
2414   switch (src->segment.format) {
2415     case GST_FORMAT_BYTES:
2416     {
2417       guint bufsize = GST_BUFFER_SIZE (buf);
2418
2419       /* we subtracted above for negative rates */
2420       if (src->segment.rate >= 0.0)
2421         position += bufsize;
2422       break;
2423     }
2424     case GST_FORMAT_TIME:
2425     {
2426       GstClockTime start, duration;
2427
2428       start = GST_BUFFER_TIMESTAMP (buf);
2429       duration = GST_BUFFER_DURATION (buf);
2430
2431       if (GST_CLOCK_TIME_IS_VALID (start))
2432         position = start;
2433       else
2434         position = src->segment.last_stop;
2435
2436       if (GST_CLOCK_TIME_IS_VALID (duration)) {
2437         if (src->segment.rate >= 0.0)
2438           position += duration;
2439         else if (position > duration)
2440           position -= duration;
2441         else
2442           position = 0;
2443       }
2444       break;
2445     }
2446     case GST_FORMAT_DEFAULT:
2447       if (src->segment.rate >= 0.0)
2448         position = GST_BUFFER_OFFSET_END (buf);
2449       else
2450         position = GST_BUFFER_OFFSET (buf);
2451       break;
2452     default:
2453       position = -1;
2454       break;
2455   }
2456   if (position != -1) {
2457     if (src->segment.rate >= 0.0) {
2458       /* positive rate, check if we reached the stop */
2459       if (src->segment.stop != -1) {
2460         if (position >= src->segment.stop) {
2461           eos = TRUE;
2462           position = src->segment.stop;
2463         }
2464       }
2465     } else {
2466       /* negative rate, check if we reached the start. start is always set to
2467        * something different from -1 */
2468       if (position <= src->segment.start) {
2469         eos = TRUE;
2470         position = src->segment.start;
2471       }
2472       /* when going reverse, all buffers are DISCONT */
2473       src->priv->discont = TRUE;
2474     }
2475     GST_OBJECT_LOCK (src);
2476     gst_segment_set_last_stop (&src->segment, src->segment.format, position);
2477     GST_OBJECT_UNLOCK (src);
2478   }
2479
2480   if (G_UNLIKELY (src->priv->discont)) {
2481     buf = gst_buffer_make_metadata_writable (buf);
2482     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
2483     src->priv->discont = FALSE;
2484   }
2485   GST_LIVE_UNLOCK (src);
2486
2487   ret = gst_pad_push (pad, buf);
2488   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
2489     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
2490         gst_flow_get_name (ret));
2491     goto pause;
2492   }
2493
2494   if (G_UNLIKELY (eos)) {
2495     GST_INFO_OBJECT (src, "pausing after end of segment");
2496     ret = GST_FLOW_UNEXPECTED;
2497     goto pause;
2498   }
2499
2500 done:
2501   return;
2502
2503   /* special cases */
2504 flushing:
2505   {
2506     GST_DEBUG_OBJECT (src, "we are flushing");
2507     GST_LIVE_UNLOCK (src);
2508     ret = GST_FLOW_WRONG_STATE;
2509     goto pause;
2510   }
2511 pause:
2512   {
2513     const gchar *reason = gst_flow_get_name (ret);
2514     GstEvent *event;
2515
2516     GST_DEBUG_OBJECT (src, "pausing task, reason %s", reason);
2517     src->data.ABI.running = FALSE;
2518     gst_pad_pause_task (pad);
2519     if (GST_FLOW_IS_FATAL (ret) || ret == GST_FLOW_NOT_LINKED) {
2520       if (ret == GST_FLOW_UNEXPECTED) {
2521         gboolean flag_segment;
2522         GstFormat format;
2523         gint64 last_stop;
2524
2525         /* perform EOS logic */
2526         flag_segment = (src->segment.flags & GST_SEEK_FLAG_SEGMENT) != 0;
2527         format = src->segment.format;
2528         last_stop = src->segment.last_stop;
2529
2530         if (flag_segment) {
2531           GstMessage *message;
2532
2533           message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
2534               format, last_stop);
2535           gst_message_set_seqnum (message, src->priv->seqnum);
2536           gst_element_post_message (GST_ELEMENT_CAST (src), message);
2537         } else {
2538           event = gst_event_new_eos ();
2539           gst_event_set_seqnum (event, src->priv->seqnum);
2540           gst_pad_push_event (pad, event);
2541           src->priv->last_sent_eos = TRUE;
2542         }
2543       } else {
2544         event = gst_event_new_eos ();
2545         gst_event_set_seqnum (event, src->priv->seqnum);
2546         /* for fatal errors we post an error message, post the error
2547          * first so the app knows about the error first. */
2548         GST_ELEMENT_ERROR (src, STREAM, FAILED,
2549             (_("Internal data flow error.")),
2550             ("streaming task paused, reason %s (%d)", reason, ret));
2551         gst_pad_push_event (pad, event);
2552         src->priv->last_sent_eos = TRUE;
2553       }
2554     }
2555     goto done;
2556   }
2557 null_buffer:
2558   {
2559     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2560         (_("Internal data flow error.")), ("element returned NULL buffer"));
2561     GST_LIVE_UNLOCK (src);
2562     goto done;
2563   }
2564 }
2565
2566 /* default negotiation code.
2567  *
2568  * Take intersection between src and sink pads, take first
2569  * caps and fixate.
2570  */
2571 static gboolean
2572 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
2573 {
2574   GstCaps *thiscaps;
2575   GstCaps *caps = NULL;
2576   GstCaps *peercaps = NULL;
2577   gboolean result = FALSE;
2578
2579   /* first see what is possible on our source pad */
2580   thiscaps = gst_pad_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2581   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
2582   /* nothing or anything is allowed, we're done */
2583   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
2584     goto no_nego_needed;
2585
2586   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
2587     goto no_caps;
2588
2589   /* get the peer caps */
2590   peercaps = gst_pad_peer_get_caps_reffed (GST_BASE_SRC_PAD (basesrc));
2591   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
2592   if (peercaps) {
2593     /* get intersection */
2594     caps = gst_caps_intersect (thiscaps, peercaps);
2595     GST_DEBUG_OBJECT (basesrc, "intersect: %" GST_PTR_FORMAT, caps);
2596     gst_caps_unref (thiscaps);
2597     gst_caps_unref (peercaps);
2598   } else {
2599     /* no peer, work with our own caps then */
2600     caps = thiscaps;
2601   }
2602   if (caps) {
2603     /* take first (and best, since they are sorted) possibility */
2604     caps = gst_caps_make_writable (caps);
2605     gst_caps_truncate (caps);
2606
2607     /* now fixate */
2608     if (!gst_caps_is_empty (caps)) {
2609       gst_pad_fixate_caps (GST_BASE_SRC_PAD (basesrc), caps);
2610       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
2611
2612       if (gst_caps_is_any (caps)) {
2613         /* hmm, still anything, so element can do anything and
2614          * nego is not needed */
2615         result = TRUE;
2616       } else if (gst_caps_is_fixed (caps)) {
2617         /* yay, fixed caps, use those then, it's possible that the subclass does
2618          * not accept this caps after all and we have to fail. */
2619         result = gst_pad_set_caps (GST_BASE_SRC_PAD (basesrc), caps);
2620       }
2621     }
2622     gst_caps_unref (caps);
2623   } else {
2624     GST_DEBUG_OBJECT (basesrc, "no common caps");
2625   }
2626   return result;
2627
2628 no_nego_needed:
2629   {
2630     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
2631     if (thiscaps)
2632       gst_caps_unref (thiscaps);
2633     return TRUE;
2634   }
2635 no_caps:
2636   {
2637     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2638         ("No supported formats found"),
2639         ("This element did not produce valid caps"));
2640     if (thiscaps)
2641       gst_caps_unref (thiscaps);
2642     return TRUE;
2643   }
2644 }
2645
2646 static gboolean
2647 gst_base_src_negotiate (GstBaseSrc * basesrc)
2648 {
2649   GstBaseSrcClass *bclass;
2650   gboolean result = TRUE;
2651
2652   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2653
2654   if (bclass->negotiate)
2655     result = bclass->negotiate (basesrc);
2656
2657   return result;
2658 }
2659
2660 static gboolean
2661 gst_base_src_start (GstBaseSrc * basesrc)
2662 {
2663   GstBaseSrcClass *bclass;
2664   gboolean result;
2665   guint64 size;
2666   gboolean seekable;
2667   GstFormat format;
2668
2669   if (GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2670     return TRUE;
2671
2672   GST_DEBUG_OBJECT (basesrc, "starting source");
2673
2674   basesrc->num_buffers_left = basesrc->num_buffers;
2675
2676   GST_OBJECT_LOCK (basesrc);
2677   gst_segment_init (&basesrc->segment, basesrc->segment.format);
2678   GST_OBJECT_UNLOCK (basesrc);
2679
2680   basesrc->data.ABI.running = FALSE;
2681   basesrc->priv->newsegment_pending = FALSE;
2682
2683   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2684   if (bclass->start)
2685     result = bclass->start (basesrc);
2686   else
2687     result = TRUE;
2688
2689   if (!result)
2690     goto could_not_start;
2691
2692   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_STARTED);
2693
2694   format = basesrc->segment.format;
2695
2696   /* figure out the size */
2697   if (format == GST_FORMAT_BYTES) {
2698     if (bclass->get_size) {
2699       if (!(result = bclass->get_size (basesrc, &size)))
2700         size = -1;
2701     } else {
2702       result = FALSE;
2703       size = -1;
2704     }
2705     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
2706     /* only update the size when operating in bytes, subclass is supposed
2707      * to set duration in the start method for other formats */
2708     GST_OBJECT_LOCK (basesrc);
2709     gst_segment_set_duration (&basesrc->segment, GST_FORMAT_BYTES, size);
2710     GST_OBJECT_UNLOCK (basesrc);
2711   } else {
2712     size = -1;
2713   }
2714
2715   GST_DEBUG_OBJECT (basesrc,
2716       "format: %d, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
2717       G_GINT64_FORMAT, format, result, size, basesrc->segment.duration);
2718
2719   seekable = gst_base_src_seekable (basesrc);
2720   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
2721
2722   /* update for random access flag */
2723   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
2724
2725   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
2726
2727   /* run typefind if we are random_access and the typefinding is enabled. */
2728   if (basesrc->random_access && basesrc->data.ABI.typefind && size != -1) {
2729     GstCaps *caps;
2730
2731     if (!(caps = gst_type_find_helper (basesrc->srcpad, size)))
2732       goto typefind_failed;
2733
2734     result = gst_pad_set_caps (basesrc->srcpad, caps);
2735     gst_caps_unref (caps);
2736   } else {
2737     /* use class or default negotiate function */
2738     if (!(result = gst_base_src_negotiate (basesrc)))
2739       goto could_not_negotiate;
2740   }
2741
2742   return result;
2743
2744   /* ERROR */
2745 could_not_start:
2746   {
2747     GST_DEBUG_OBJECT (basesrc, "could not start");
2748     /* subclass is supposed to post a message. We don't have to call _stop. */
2749     return FALSE;
2750   }
2751 could_not_negotiate:
2752   {
2753     GST_DEBUG_OBJECT (basesrc, "could not negotiate, stopping");
2754     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
2755         ("Could not negotiate format"), ("Check your filtered caps, if any"));
2756     /* we must call stop */
2757     gst_base_src_stop (basesrc);
2758     return FALSE;
2759   }
2760 typefind_failed:
2761   {
2762     GST_DEBUG_OBJECT (basesrc, "could not typefind, stopping");
2763     GST_ELEMENT_ERROR (basesrc, STREAM, TYPE_NOT_FOUND, (NULL), (NULL));
2764     /* we must call stop */
2765     gst_base_src_stop (basesrc);
2766     return FALSE;
2767   }
2768 }
2769
2770 static gboolean
2771 gst_base_src_stop (GstBaseSrc * basesrc)
2772 {
2773   GstBaseSrcClass *bclass;
2774   gboolean result = TRUE;
2775
2776   if (!GST_OBJECT_FLAG_IS_SET (basesrc, GST_BASE_SRC_STARTED))
2777     return TRUE;
2778
2779   GST_DEBUG_OBJECT (basesrc, "stopping source");
2780
2781   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2782   if (bclass->stop)
2783     result = bclass->stop (basesrc);
2784
2785   if (result)
2786     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
2787
2788   return result;
2789 }
2790
2791 /* start or stop flushing dataprocessing
2792  */
2793 static gboolean
2794 gst_base_src_set_flushing (GstBaseSrc * basesrc,
2795     gboolean flushing, gboolean live_play, gboolean unlock, gboolean * playing)
2796 {
2797   GstBaseSrcClass *bclass;
2798
2799   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2800
2801   if (flushing && unlock) {
2802     /* unlock any subclasses, we need to do this before grabbing the
2803      * LIVE_LOCK since we hold this lock before going into ::create. We pass an
2804      * unlock to the params because of backwards compat (see seek handler)*/
2805     if (bclass->unlock)
2806       bclass->unlock (basesrc);
2807   }
2808
2809   /* the live lock is released when we are blocked, waiting for playing or
2810    * when we sync to the clock. */
2811   GST_LIVE_LOCK (basesrc);
2812   if (playing)
2813     *playing = basesrc->live_running;
2814   basesrc->priv->flushing = flushing;
2815   if (flushing) {
2816     /* if we are locked in the live lock, signal it to make it flush */
2817     basesrc->live_running = TRUE;
2818
2819     /* clear pending EOS if any */
2820     g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
2821
2822     /* step 1, now that we have the LIVE lock, clear our unlock request */
2823     if (bclass->unlock_stop)
2824       bclass->unlock_stop (basesrc);
2825
2826     /* step 2, unblock clock sync (if any) or any other blocking thing */
2827     if (basesrc->clock_id)
2828       gst_clock_id_unschedule (basesrc->clock_id);
2829   } else {
2830     /* signal the live source that it can start playing */
2831     basesrc->live_running = live_play;
2832   }
2833   GST_LIVE_SIGNAL (basesrc);
2834   GST_LIVE_UNLOCK (basesrc);
2835
2836   return TRUE;
2837 }
2838
2839 /* the purpose of this function is to make sure that a live source blocks in the
2840  * LIVE lock or leaves the LIVE lock and continues playing. */
2841 static gboolean
2842 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
2843 {
2844   GstBaseSrcClass *bclass;
2845
2846   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2847
2848   /* unlock subclasses locked in ::create, we only do this when we stop playing. */
2849   if (!live_play) {
2850     GST_DEBUG_OBJECT (basesrc, "unlock");
2851     if (bclass->unlock)
2852       bclass->unlock (basesrc);
2853   }
2854
2855   /* we are now able to grab the LIVE lock, when we get it, we can be
2856    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
2857    * for the clock. */
2858   GST_LIVE_LOCK (basesrc);
2859   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
2860
2861   /* unblock clock sync (if any) */
2862   if (basesrc->clock_id)
2863     gst_clock_id_unschedule (basesrc->clock_id);
2864
2865   /* configure what to do when we get to the LIVE lock. */
2866   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
2867   basesrc->live_running = live_play;
2868
2869   if (live_play) {
2870     gboolean start;
2871
2872     /* clear our unlock request when going to PLAYING */
2873     GST_DEBUG_OBJECT (basesrc, "unlock stop");
2874     if (bclass->unlock_stop)
2875       bclass->unlock_stop (basesrc);
2876
2877     /* for live sources we restart the timestamp correction */
2878     basesrc->priv->latency = -1;
2879     /* have to restart the task in case it stopped because of the unlock when
2880      * we went to PAUSED. Only do this if we operating in push mode. */
2881     GST_OBJECT_LOCK (basesrc->srcpad);
2882     start = (GST_PAD_ACTIVATE_MODE (basesrc->srcpad) == GST_ACTIVATE_PUSH);
2883     GST_OBJECT_UNLOCK (basesrc->srcpad);
2884     if (start)
2885       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
2886           basesrc->srcpad);
2887     GST_DEBUG_OBJECT (basesrc, "signal");
2888     GST_LIVE_SIGNAL (basesrc);
2889   }
2890   GST_LIVE_UNLOCK (basesrc);
2891
2892   return TRUE;
2893 }
2894
2895 static gboolean
2896 gst_base_src_activate_push (GstPad * pad, gboolean active)
2897 {
2898   GstBaseSrc *basesrc;
2899   GstEvent *event;
2900
2901   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2902
2903   /* prepare subclass first */
2904   if (active) {
2905     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
2906
2907     if (G_UNLIKELY (!basesrc->can_activate_push))
2908       goto no_push_activation;
2909
2910     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2911       goto error_start;
2912
2913     basesrc->priv->last_sent_eos = FALSE;
2914     basesrc->priv->discont = TRUE;
2915     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2916
2917     /* do initial seek, which will start the task */
2918     GST_OBJECT_LOCK (basesrc);
2919     event = basesrc->data.ABI.pending_seek;
2920     basesrc->data.ABI.pending_seek = NULL;
2921     GST_OBJECT_UNLOCK (basesrc);
2922
2923     /* no need to unlock anything, the task is certainly
2924      * not running here. The perform seek code will start the task when
2925      * finished. */
2926     if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
2927       goto seek_failed;
2928
2929     if (event)
2930       gst_event_unref (event);
2931   } else {
2932     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
2933     /* flush all */
2934     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2935     /* stop the task */
2936     gst_pad_stop_task (pad);
2937     /* now we can stop the source */
2938     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
2939       goto error_stop;
2940   }
2941   return TRUE;
2942
2943   /* ERRORS */
2944 no_push_activation:
2945   {
2946     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
2947     return FALSE;
2948   }
2949 error_start:
2950   {
2951     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
2952     return FALSE;
2953   }
2954 seek_failed:
2955   {
2956     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
2957     /* flush all */
2958     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2959     /* stop the task */
2960     gst_pad_stop_task (pad);
2961     /* Stop the basesrc */
2962     gst_base_src_stop (basesrc);
2963     if (event)
2964       gst_event_unref (event);
2965     return FALSE;
2966   }
2967 error_stop:
2968   {
2969     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
2970     return FALSE;
2971   }
2972 }
2973
2974 static gboolean
2975 gst_base_src_activate_pull (GstPad * pad, gboolean active)
2976 {
2977   GstBaseSrc *basesrc;
2978
2979   basesrc = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2980
2981   /* prepare subclass first */
2982   if (active) {
2983     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
2984     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
2985       goto error_start;
2986
2987     /* if not random_access, we cannot operate in pull mode for now */
2988     if (G_UNLIKELY (!gst_base_src_check_get_range (basesrc)))
2989       goto no_get_range;
2990
2991     /* stop flushing now but for live sources, still block in the LIVE lock when
2992      * we are not yet PLAYING */
2993     gst_base_src_set_flushing (basesrc, FALSE, FALSE, FALSE, NULL);
2994   } else {
2995     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
2996     /* flush all, there is no task to stop */
2997     gst_base_src_set_flushing (basesrc, TRUE, FALSE, TRUE, NULL);
2998
2999     /* don't send EOS when going from PAUSED => READY when in pull mode */
3000     basesrc->priv->last_sent_eos = TRUE;
3001
3002     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
3003       goto error_stop;
3004   }
3005   return TRUE;
3006
3007   /* ERRORS */
3008 error_start:
3009   {
3010     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
3011     return FALSE;
3012   }
3013 no_get_range:
3014   {
3015     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3016     gst_base_src_stop (basesrc);
3017     return FALSE;
3018   }
3019 error_stop:
3020   {
3021     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
3022     return FALSE;
3023   }
3024 }
3025
3026 static GstStateChangeReturn
3027 gst_base_src_change_state (GstElement * element, GstStateChange transition)
3028 {
3029   GstBaseSrc *basesrc;
3030   GstStateChangeReturn result;
3031   gboolean no_preroll = FALSE;
3032
3033   basesrc = GST_BASE_SRC (element);
3034
3035   switch (transition) {
3036     case GST_STATE_CHANGE_NULL_TO_READY:
3037       break;
3038     case GST_STATE_CHANGE_READY_TO_PAUSED:
3039       no_preroll = gst_base_src_is_live (basesrc);
3040       break;
3041     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
3042       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
3043       if (gst_base_src_is_live (basesrc)) {
3044         /* now we can start playback */
3045         gst_base_src_set_playing (basesrc, TRUE);
3046       }
3047       break;
3048     default:
3049       break;
3050   }
3051
3052   if ((result =
3053           GST_ELEMENT_CLASS (parent_class)->change_state (element,
3054               transition)) == GST_STATE_CHANGE_FAILURE)
3055     goto failure;
3056
3057   switch (transition) {
3058     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
3059       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
3060       if (gst_base_src_is_live (basesrc)) {
3061         /* make sure we block in the live lock in PAUSED */
3062         gst_base_src_set_playing (basesrc, FALSE);
3063         no_preroll = TRUE;
3064       }
3065       break;
3066     case GST_STATE_CHANGE_PAUSED_TO_READY:
3067     {
3068       GstEvent **event_p, *event;
3069
3070       /* we don't need to unblock anything here, the pad deactivation code
3071        * already did this */
3072
3073       /* FIXME, deprecate this behaviour, it is very dangerous.
3074        * the prefered way of sending EOS downstream is by sending
3075        * the EOS event to the element */
3076       if (!basesrc->priv->last_sent_eos) {
3077         GST_DEBUG_OBJECT (basesrc, "Sending EOS event");
3078         event = gst_event_new_eos ();
3079         gst_event_set_seqnum (event, basesrc->priv->seqnum);
3080         gst_pad_push_event (basesrc->srcpad, event);
3081         basesrc->priv->last_sent_eos = TRUE;
3082       }
3083       g_atomic_int_set (&basesrc->priv->pending_eos, FALSE);
3084       event_p = &basesrc->data.ABI.pending_seek;
3085       gst_event_replace (event_p, NULL);
3086       event_p = &basesrc->priv->close_segment;
3087       gst_event_replace (event_p, NULL);
3088       event_p = &basesrc->priv->start_segment;
3089       gst_event_replace (event_p, NULL);
3090       break;
3091     }
3092     case GST_STATE_CHANGE_READY_TO_NULL:
3093       break;
3094     default:
3095       break;
3096   }
3097
3098   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
3099     result = GST_STATE_CHANGE_NO_PREROLL;
3100
3101   return result;
3102
3103   /* ERRORS */
3104 failure:
3105   {
3106     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
3107     return result;
3108   }
3109 }