Merge branch 'move_subdir' into tizen_gst_1.19.2_mono
[platform/upstream/gstreamer.git] / subprojects / gstreamer / libs / gst / base / gstbasesrc.c
1 /* GStreamer
2  * Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
3  *               2000,2005 Wim Taymans <wim@fluendo.com>
4  *
5  * gstbasesrc.c:
6  *
7  * This library is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU Library General Public
9  * License as published by the Free Software Foundation; either
10  * version 2 of the License, or (at your option) any later version.
11  *
12  * This library is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * Library General Public License for more details.
16  *
17  * You should have received a copy of the GNU Library General Public
18  * License along with this library; if not, write to the
19  * Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
20  * Boston, MA 02110-1301, USA.
21  */
22
23 /**
24  * SECTION:gstbasesrc
25  * @title: GstBaseSrc
26  * @short_description: Base class for getrange based source elements
27  * @see_also: #GstPushSrc, #GstBaseTransform, #GstBaseSink
28  *
29  * This is a generic base class for source elements. The following
30  * types of sources are supported:
31  *
32  *   * random access sources like files
33  *   * seekable sources
34  *   * live sources
35  *
36  * The source can be configured to operate in any #GstFormat with the
37  * gst_base_src_set_format() method. The currently set format determines
38  * the format of the internal #GstSegment and any %GST_EVENT_SEGMENT
39  * events. The default format for #GstBaseSrc is %GST_FORMAT_BYTES.
40  *
41  * #GstBaseSrc always supports push mode scheduling. If the following
42  * conditions are met, it also supports pull mode scheduling:
43  *
44  *   * The format is set to %GST_FORMAT_BYTES (default).
45  *   * #GstBaseSrcClass::is_seekable returns %TRUE.
46  *
47  * If all the conditions are met for operating in pull mode, #GstBaseSrc is
48  * automatically seekable in push mode as well. The following conditions must
49  * be met to make the element seekable in push mode when the format is not
50  * %GST_FORMAT_BYTES:
51  *
52  * * #GstBaseSrcClass::is_seekable returns %TRUE.
53  * * #GstBaseSrcClass::query can convert all supported seek formats to the
54  *   internal format as set with gst_base_src_set_format().
55  * * #GstBaseSrcClass::do_seek is implemented, performs the seek and returns
56  *    %TRUE.
57  *
58  * When the element does not meet the requirements to operate in pull mode, the
59  * offset and length in the #GstBaseSrcClass::create method should be ignored.
60  * It is recommended to subclass #GstPushSrc instead, in this situation. If the
61  * element can operate in pull mode but only with specific offsets and
62  * lengths, it is allowed to generate an error when the wrong values are passed
63  * to the #GstBaseSrcClass::create function.
64  *
65  * #GstBaseSrc has support for live sources. Live sources are sources that when
66  * paused discard data, such as audio or video capture devices. A typical live
67  * source also produces data at a fixed rate and thus provides a clock to publish
68  * this rate.
69  * Use gst_base_src_set_live() to activate the live source mode.
70  *
71  * A live source does not produce data in the PAUSED state. This means that the
72  * #GstBaseSrcClass::create method will not be called in PAUSED but only in
73  * PLAYING. To signal the pipeline that the element will not produce data, the
74  * return value from the READY to PAUSED state will be
75  * %GST_STATE_CHANGE_NO_PREROLL.
76  *
77  * A typical live source will timestamp the buffers it creates with the
78  * current running time of the pipeline. This is one reason why a live source
79  * can only produce data in the PLAYING state, when the clock is actually
80  * distributed and running.
81  *
82  * Live sources that synchronize and block on the clock (an audio source, for
83  * example) can use gst_base_src_wait_playing() when the
84  * #GstBaseSrcClass::create function was interrupted by a state change to
85  * PAUSED.
86  *
87  * The #GstBaseSrcClass::get_times method can be used to implement pseudo-live
88  * sources. It only makes sense to implement the #GstBaseSrcClass::get_times
89  * function if the source is a live source. The #GstBaseSrcClass::get_times
90  * function should return timestamps starting from 0, as if it were a non-live
91  * source. The base class will make sure that the timestamps are transformed
92  * into the current running_time. The base source will then wait for the
93  * calculated running_time before pushing out the buffer.
94  *
95  * For live sources, the base class will by default report a latency of 0.
96  * For pseudo live sources, the base class will by default measure the difference
97  * between the first buffer timestamp and the start time of get_times and will
98  * report this value as the latency.
99  * Subclasses should override the query function when this behaviour is not
100  * acceptable.
101  *
102  * There is only support in #GstBaseSrc for exactly one source pad, which
103  * should be named "src". A source implementation (subclass of #GstBaseSrc)
104  * should install a pad template in its class_init function, like so:
105  * |[<!-- language="C" -->
106  * static void
107  * my_element_class_init (GstMyElementClass *klass)
108  * {
109  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
110  *   // srctemplate should be a #GstStaticPadTemplate with direction
111  *   // %GST_PAD_SRC and name "src"
112  *   gst_element_class_add_static_pad_template (gstelement_class, &amp;srctemplate);
113  *
114  *   gst_element_class_set_static_metadata (gstelement_class,
115  *      "Source name",
116  *      "Source",
117  *      "My Source element",
118  *      "The author <my.sink@my.email>");
119  * }
120  * ]|
121  *
122  * ## Controlled shutdown of live sources in applications
123  *
124  * Applications that record from a live source may want to stop recording
125  * in a controlled way, so that the recording is stopped, but the data
126  * already in the pipeline is processed to the end (remember that many live
127  * sources would go on recording forever otherwise). For that to happen the
128  * application needs to make the source stop recording and send an EOS
129  * event down the pipeline. The application would then wait for an
130  * EOS message posted on the pipeline's bus to know when all data has
131  * been processed and the pipeline can safely be stopped.
132  *
133  * An application may send an EOS event to a source element to make it
134  * perform the EOS logic (send EOS event downstream or post a
135  * %GST_MESSAGE_SEGMENT_DONE on the bus). This can typically be done
136  * with the gst_element_send_event() function on the element or its parent bin.
137  *
138  * After the EOS has been sent to the element, the application should wait for
139  * an EOS message to be posted on the pipeline's bus. Once this EOS message is
140  * received, it may safely shut down the entire pipeline.
141  *
142  */
143
144 #ifdef HAVE_CONFIG_H
145 #  include "config.h"
146 #endif
147
148 #include <stdlib.h>
149 #include <string.h>
150
151 #include <gst/gst_private.h>
152 #include <gst/glib-compat-private.h>
153
154 #include "gstbasesrc.h"
155 #include <gst/gst-i18n-lib.h>
156
157 GST_DEBUG_CATEGORY_STATIC (gst_base_src_debug);
158 #define GST_CAT_DEFAULT gst_base_src_debug
159
160 #define GST_LIVE_GET_LOCK(elem)               (&GST_BASE_SRC_CAST(elem)->live_lock)
161 #define GST_LIVE_LOCK(elem)                   g_mutex_lock(GST_LIVE_GET_LOCK(elem))
162 #define GST_LIVE_TRYLOCK(elem)                g_mutex_trylock(GST_LIVE_GET_LOCK(elem))
163 #define GST_LIVE_UNLOCK(elem)                 g_mutex_unlock(GST_LIVE_GET_LOCK(elem))
164 #define GST_LIVE_GET_COND(elem)               (&GST_BASE_SRC_CAST(elem)->live_cond)
165 #define GST_LIVE_WAIT(elem)                   g_cond_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem))
166 #define GST_LIVE_WAIT_UNTIL(elem, end_time)   g_cond_timed_wait (GST_LIVE_GET_COND (elem), GST_LIVE_GET_LOCK (elem), end_time)
167 #define GST_LIVE_SIGNAL(elem)                 g_cond_signal (GST_LIVE_GET_COND (elem));
168 #define GST_LIVE_BROADCAST(elem)              g_cond_broadcast (GST_LIVE_GET_COND (elem));
169
170
171 #define GST_ASYNC_GET_COND(elem)              (&GST_BASE_SRC_CAST(elem)->priv->async_cond)
172 #define GST_ASYNC_WAIT(elem)                  g_cond_wait (GST_ASYNC_GET_COND (elem), GST_OBJECT_GET_LOCK (elem))
173 #define GST_ASYNC_SIGNAL(elem)                g_cond_signal (GST_ASYNC_GET_COND (elem));
174
175 #define CLEAR_PENDING_EOS(bsrc) \
176   G_STMT_START { \
177     g_atomic_int_set (&bsrc->priv->has_pending_eos, FALSE); \
178     gst_event_replace (&bsrc->priv->pending_eos, NULL); \
179   } G_STMT_END
180
181
182 /* BaseSrc signals and args */
183 enum
184 {
185   /* FILL ME */
186   LAST_SIGNAL
187 };
188
189 #define DEFAULT_BLOCKSIZE       4096
190 #define DEFAULT_NUM_BUFFERS     -1
191 #define DEFAULT_DO_TIMESTAMP    FALSE
192 #ifdef TIZEN_PROFILE_TV
193 #define DEFAULT_SEGMENT_TIME    -1
194 #endif
195
196 enum
197 {
198   PROP_0,
199   PROP_BLOCKSIZE,
200   PROP_NUM_BUFFERS,
201 #ifndef GST_REMOVE_DEPRECATED
202   PROP_TYPEFIND,
203 #endif
204   PROP_DO_TIMESTAMP
205 #ifdef TIZEN_PROFILE_TV
206   ,PROP_UPDATE_SEGMENT
207 #endif
208 };
209
210 /* The basesrc implementation need to respect the following locking order:
211  *   1. STREAM_LOCK
212  *   2. LIVE_LOCK
213  *   3. OBJECT_LOCK
214  */
215 struct _GstBaseSrcPrivate
216 {
217   gboolean discont;             /* STREAM_LOCK */
218   gboolean flushing;            /* LIVE_LOCK */
219
220   GstFlowReturn start_result;   /* OBJECT_LOCK */
221   gboolean async;               /* OBJECT_LOCK */
222
223   /* if a stream-start event should be sent */
224   gboolean stream_start_pending;        /* STREAM_LOCK */
225
226   /* if segment should be sent and a
227    * seqnum if it was originated by a seek */
228   gboolean segment_pending;     /* OBJECT_LOCK */
229   guint32 segment_seqnum;       /* OBJECT_LOCK */
230
231   /* if EOS is pending (atomic) */
232   GstEvent *pending_eos;        /* OBJECT_LOCK */
233   gint has_pending_eos;         /* atomic */
234
235   /* if the eos was caused by a forced eos from the application */
236   gboolean forced_eos;          /* LIVE_LOCK */
237
238   /* startup latency is the time it takes between going to PLAYING and producing
239    * the first BUFFER with running_time 0. This value is included in the latency
240    * reporting. */
241   GstClockTime latency;         /* OBJECT_LOCK */
242   /* timestamp offset, this is the offset add to the values of gst_times for
243    * pseudo live sources */
244   GstClockTimeDiff ts_offset;   /* OBJECT_LOCK */
245
246   gboolean do_timestamp;        /* OBJECT_LOCK */
247   gint dynamic_size;            /* atomic */
248   gint automatic_eos;           /* atomic */
249
250   /* stream sequence number */
251   guint32 seqnum;               /* STREAM_LOCK */
252
253   /* pending events (TAG, CUSTOM_BOTH, CUSTOM_DOWNSTREAM) to be
254    * pushed in the data stream */
255   GList *pending_events;        /* OBJECT_LOCK */
256   gint have_events;             /* OBJECT_LOCK */
257
258   /* QoS *//* with LOCK */
259   gdouble proportion;           /* OBJECT_LOCK */
260   GstClockTime earliest_time;   /* OBJECT_LOCK */
261
262   GstBufferPool *pool;          /* OBJECT_LOCK */
263   GstAllocator *allocator;      /* OBJECT_LOCK */
264   GstAllocationParams params;   /* OBJECT_LOCK */
265
266   GCond async_cond;             /* OBJECT_LOCK */
267
268   /* for _submit_buffer_list() */
269   GstBufferList *pending_bufferlist;
270 };
271
272 #define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
273     ((src)->priv->pending_bufferlist != NULL)
274
275 static GstElementClass *parent_class = NULL;
276 static gint private_offset = 0;
277
278 static void gst_base_src_class_init (GstBaseSrcClass * klass);
279 static void gst_base_src_init (GstBaseSrc * src, gpointer g_class);
280 static void gst_base_src_finalize (GObject * object);
281
282
283 GType
284 gst_base_src_get_type (void)
285 {
286   static gsize base_src_type = 0;
287
288   if (g_once_init_enter (&base_src_type)) {
289     GType _type;
290     static const GTypeInfo base_src_info = {
291       sizeof (GstBaseSrcClass),
292       NULL,
293       NULL,
294       (GClassInitFunc) gst_base_src_class_init,
295       NULL,
296       NULL,
297       sizeof (GstBaseSrc),
298       0,
299       (GInstanceInitFunc) gst_base_src_init,
300     };
301
302     _type = g_type_register_static (GST_TYPE_ELEMENT,
303         "GstBaseSrc", &base_src_info, G_TYPE_FLAG_ABSTRACT);
304
305     private_offset =
306         g_type_add_instance_private (_type, sizeof (GstBaseSrcPrivate));
307
308     g_once_init_leave (&base_src_type, _type);
309   }
310   return base_src_type;
311 }
312
313 static inline GstBaseSrcPrivate *
314 gst_base_src_get_instance_private (GstBaseSrc * self)
315 {
316   return (G_STRUCT_MEMBER_P (self, private_offset));
317 }
318
319 static GstCaps *gst_base_src_default_get_caps (GstBaseSrc * bsrc,
320     GstCaps * filter);
321 static GstCaps *gst_base_src_default_fixate (GstBaseSrc * src, GstCaps * caps);
322 static GstCaps *gst_base_src_fixate (GstBaseSrc * src, GstCaps * caps);
323
324 static gboolean gst_base_src_is_random_access (GstBaseSrc * src);
325 static gboolean gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
326     GstPadMode mode, gboolean active);
327 static void gst_base_src_set_property (GObject * object, guint prop_id,
328     const GValue * value, GParamSpec * pspec);
329 static void gst_base_src_get_property (GObject * object, guint prop_id,
330     GValue * value, GParamSpec * pspec);
331 static gboolean gst_base_src_event (GstPad * pad, GstObject * parent,
332     GstEvent * event);
333 static gboolean gst_base_src_send_event (GstElement * elem, GstEvent * event);
334 static gboolean gst_base_src_default_event (GstBaseSrc * src, GstEvent * event);
335
336 static gboolean gst_base_src_query (GstPad * pad, GstObject * parent,
337     GstQuery * query);
338
339 static void gst_base_src_set_pool_flushing (GstBaseSrc * basesrc,
340     gboolean flushing);
341 static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
342 static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
343     GstSegment * segment);
344 static gboolean gst_base_src_default_query (GstBaseSrc * src, GstQuery * query);
345 static gboolean gst_base_src_default_prepare_seek_segment (GstBaseSrc * src,
346     GstEvent * event, GstSegment * segment);
347 static GstFlowReturn gst_base_src_default_create (GstBaseSrc * basesrc,
348     guint64 offset, guint size, GstBuffer ** buf);
349 static GstFlowReturn gst_base_src_default_alloc (GstBaseSrc * basesrc,
350     guint64 offset, guint size, GstBuffer ** buf);
351 static gboolean gst_base_src_decide_allocation_default (GstBaseSrc * basesrc,
352     GstQuery * query);
353
354 static gboolean gst_base_src_set_flushing (GstBaseSrc * basesrc,
355     gboolean flushing);
356
357 static gboolean gst_base_src_start (GstBaseSrc * basesrc);
358 static gboolean gst_base_src_stop (GstBaseSrc * basesrc);
359
360 static GstStateChangeReturn gst_base_src_change_state (GstElement * element,
361     GstStateChange transition);
362
363 static void gst_base_src_loop (GstPad * pad);
364 static GstFlowReturn gst_base_src_getrange (GstPad * pad, GstObject * parent,
365     guint64 offset, guint length, GstBuffer ** buf);
366 static GstFlowReturn gst_base_src_get_range (GstBaseSrc * src, guint64 offset,
367     guint length, GstBuffer ** buf);
368 static gboolean gst_base_src_seekable (GstBaseSrc * src);
369 static gboolean gst_base_src_negotiate_unlocked (GstBaseSrc * basesrc);
370 static gboolean gst_base_src_update_length (GstBaseSrc * src, guint64 offset,
371     guint * length, gboolean force);
372
373 static void
374 gst_base_src_class_init (GstBaseSrcClass * klass)
375 {
376   GObjectClass *gobject_class;
377   GstElementClass *gstelement_class;
378
379   gobject_class = G_OBJECT_CLASS (klass);
380   gstelement_class = GST_ELEMENT_CLASS (klass);
381
382   if (private_offset != 0)
383     g_type_class_adjust_private_offset (klass, &private_offset);
384
385   GST_DEBUG_CATEGORY_INIT (gst_base_src_debug, "basesrc", 0, "basesrc element");
386
387   parent_class = g_type_class_peek_parent (klass);
388
389   gobject_class->finalize = gst_base_src_finalize;
390   gobject_class->set_property = gst_base_src_set_property;
391   gobject_class->get_property = gst_base_src_get_property;
392
393   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
394       g_param_spec_uint ("blocksize", "Block size",
395           "Size in bytes to read per buffer (-1 = default)", 0, G_MAXUINT,
396           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
397   g_object_class_install_property (gobject_class, PROP_NUM_BUFFERS,
398       g_param_spec_int ("num-buffers", "num-buffers",
399           "Number of buffers to output before sending EOS (-1 = unlimited)",
400           -1, G_MAXINT, DEFAULT_NUM_BUFFERS, G_PARAM_READWRITE |
401           G_PARAM_STATIC_STRINGS));
402 #ifndef GST_REMOVE_DEPRECATED
403   g_object_class_install_property (gobject_class, PROP_TYPEFIND,
404       g_param_spec_boolean ("typefind", "Typefind",
405           "Run typefind before negotiating (deprecated, non-functional)", FALSE,
406           G_PARAM_READWRITE | G_PARAM_DEPRECATED | G_PARAM_STATIC_STRINGS));
407 #endif
408   g_object_class_install_property (gobject_class, PROP_DO_TIMESTAMP,
409       g_param_spec_boolean ("do-timestamp", "Do timestamp",
410           "Apply current stream time to buffers", DEFAULT_DO_TIMESTAMP,
411           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
412 #ifdef TIZEN_PROFILE_TV
413   g_object_class_install_property (gobject_class, PROP_UPDATE_SEGMENT,
414       g_param_spec_int64 ("update-segment", "Update Segment",
415           "Timestamp that application want to set (-1 unlimited)",
416           -1, G_MAXINT64, DEFAULT_SEGMENT_TIME,
417           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
418 #endif
419
420   gstelement_class->change_state =
421       GST_DEBUG_FUNCPTR (gst_base_src_change_state);
422   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_src_send_event);
423
424   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_src_default_get_caps);
425   klass->negotiate = GST_DEBUG_FUNCPTR (gst_base_src_default_negotiate);
426   klass->fixate = GST_DEBUG_FUNCPTR (gst_base_src_default_fixate);
427   klass->prepare_seek_segment =
428       GST_DEBUG_FUNCPTR (gst_base_src_default_prepare_seek_segment);
429   klass->do_seek = GST_DEBUG_FUNCPTR (gst_base_src_default_do_seek);
430   klass->query = GST_DEBUG_FUNCPTR (gst_base_src_default_query);
431   klass->event = GST_DEBUG_FUNCPTR (gst_base_src_default_event);
432   klass->create = GST_DEBUG_FUNCPTR (gst_base_src_default_create);
433   klass->alloc = GST_DEBUG_FUNCPTR (gst_base_src_default_alloc);
434   klass->decide_allocation =
435       GST_DEBUG_FUNCPTR (gst_base_src_decide_allocation_default);
436
437   /* Registering debug symbols for function pointers */
438   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_activate_mode);
439   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_event);
440   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_query);
441   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_getrange);
442   GST_DEBUG_REGISTER_FUNCPTR (gst_base_src_fixate);
443 }
444
445 static void
446 gst_base_src_init (GstBaseSrc * basesrc, gpointer g_class)
447 {
448   GstPad *pad;
449   GstPadTemplate *pad_template;
450
451   basesrc->priv = gst_base_src_get_instance_private (basesrc);
452
453   basesrc->is_live = FALSE;
454   g_mutex_init (&basesrc->live_lock);
455   g_cond_init (&basesrc->live_cond);
456   basesrc->num_buffers = DEFAULT_NUM_BUFFERS;
457   basesrc->num_buffers_left = -1;
458   g_atomic_int_set (&basesrc->priv->automatic_eos, TRUE);
459
460   basesrc->can_activate_push = TRUE;
461
462   pad_template =
463       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "src");
464   g_return_if_fail (pad_template != NULL);
465
466   GST_DEBUG_OBJECT (basesrc, "creating src pad");
467   pad = gst_pad_new_from_template (pad_template, "src");
468
469   GST_DEBUG_OBJECT (basesrc, "setting functions on src pad");
470   gst_pad_set_activatemode_function (pad, gst_base_src_activate_mode);
471   gst_pad_set_event_function (pad, gst_base_src_event);
472   gst_pad_set_query_function (pad, gst_base_src_query);
473   gst_pad_set_getrange_function (pad, gst_base_src_getrange);
474
475   /* hold pointer to pad */
476   basesrc->srcpad = pad;
477   GST_DEBUG_OBJECT (basesrc, "adding src pad");
478   gst_element_add_pad (GST_ELEMENT (basesrc), pad);
479
480   basesrc->blocksize = DEFAULT_BLOCKSIZE;
481   basesrc->clock_id = NULL;
482   /* we operate in BYTES by default */
483   gst_base_src_set_format (basesrc, GST_FORMAT_BYTES);
484   basesrc->priv->do_timestamp = DEFAULT_DO_TIMESTAMP;
485   g_atomic_int_set (&basesrc->priv->have_events, FALSE);
486
487   g_cond_init (&basesrc->priv->async_cond);
488   basesrc->priv->start_result = GST_FLOW_FLUSHING;
489   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
490   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
491   GST_OBJECT_FLAG_SET (basesrc, GST_ELEMENT_FLAG_SOURCE);
492
493   GST_DEBUG_OBJECT (basesrc, "init done");
494 }
495
496 static void
497 gst_base_src_finalize (GObject * object)
498 {
499   GstBaseSrc *basesrc;
500   GstEvent **event_p;
501
502   basesrc = GST_BASE_SRC (object);
503
504   g_mutex_clear (&basesrc->live_lock);
505   g_cond_clear (&basesrc->live_cond);
506   g_cond_clear (&basesrc->priv->async_cond);
507
508   event_p = &basesrc->pending_seek;
509   gst_event_replace (event_p, NULL);
510
511   if (basesrc->priv->pending_events) {
512     g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
513         NULL);
514     g_list_free (basesrc->priv->pending_events);
515   }
516
517   G_OBJECT_CLASS (parent_class)->finalize (object);
518 }
519
520 /* Call with LIVE_LOCK held */
521 static GstFlowReturn
522 gst_base_src_wait_playing_unlocked (GstBaseSrc * src)
523 {
524   while (G_UNLIKELY (!src->live_running && !src->priv->flushing)) {
525     /* block until the state changes, or we get a flush, or something */
526     GST_DEBUG_OBJECT (src, "live source waiting for running state");
527     GST_LIVE_WAIT (src);
528     GST_DEBUG_OBJECT (src, "live source unlocked");
529   }
530
531   if (src->priv->flushing)
532     goto flushing;
533
534   return GST_FLOW_OK;
535
536   /* ERRORS */
537 flushing:
538   {
539     GST_DEBUG_OBJECT (src, "we are flushing");
540     return GST_FLOW_FLUSHING;
541   }
542 }
543
544
545 /**
546  * gst_base_src_wait_playing:
547  * @src: the src
548  *
549  * If the #GstBaseSrcClass::create method performs its own synchronisation
550  * against the clock it must unblock when going from PLAYING to the PAUSED state
551  * and call this method before continuing to produce the remaining data.
552  *
553  * This function will block until a state change to PLAYING happens (in which
554  * case this function returns %GST_FLOW_OK) or the processing must be stopped due
555  * to a state change to READY or a FLUSH event (in which case this function
556  * returns %GST_FLOW_FLUSHING).
557  *
558  * Returns: %GST_FLOW_OK if @src is PLAYING and processing can
559  * continue. Any other return value should be returned from the create vmethod.
560  */
561 GstFlowReturn
562 gst_base_src_wait_playing (GstBaseSrc * src)
563 {
564   GstFlowReturn ret;
565
566   g_return_val_if_fail (GST_IS_BASE_SRC (src), GST_FLOW_ERROR);
567
568   GST_LIVE_LOCK (src);
569   ret = gst_base_src_wait_playing_unlocked (src);
570   GST_LIVE_UNLOCK (src);
571
572   return ret;
573 }
574
575 /**
576  * gst_base_src_set_live:
577  * @src: base source instance
578  * @live: new live-mode
579  *
580  * If the element listens to a live source, @live should
581  * be set to %TRUE.
582  *
583  * A live source will not produce data in the PAUSED state and
584  * will therefore not be able to participate in the PREROLL phase
585  * of a pipeline. To signal this fact to the application and the
586  * pipeline, the state change return value of the live source will
587  * be GST_STATE_CHANGE_NO_PREROLL.
588  */
589 void
590 gst_base_src_set_live (GstBaseSrc * src, gboolean live)
591 {
592   g_return_if_fail (GST_IS_BASE_SRC (src));
593
594   GST_OBJECT_LOCK (src);
595   src->is_live = live;
596   GST_OBJECT_UNLOCK (src);
597 }
598
599 /**
600  * gst_base_src_is_live:
601  * @src: base source instance
602  *
603  * Check if an element is in live mode.
604  *
605  * Returns: %TRUE if element is in live mode.
606  */
607 gboolean
608 gst_base_src_is_live (GstBaseSrc * src)
609 {
610   gboolean result;
611
612   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
613
614   GST_OBJECT_LOCK (src);
615   result = src->is_live;
616   GST_OBJECT_UNLOCK (src);
617
618   return result;
619 }
620
621 /**
622  * gst_base_src_set_format:
623  * @src: base source instance
624  * @format: the format to use
625  *
626  * Sets the default format of the source. This will be the format used
627  * for sending SEGMENT events and for performing seeks.
628  *
629  * If a format of GST_FORMAT_BYTES is set, the element will be able to
630  * operate in pull mode if the #GstBaseSrcClass::is_seekable returns %TRUE.
631  *
632  * This function must only be called in states < %GST_STATE_PAUSED.
633  */
634 void
635 gst_base_src_set_format (GstBaseSrc * src, GstFormat format)
636 {
637   g_return_if_fail (GST_IS_BASE_SRC (src));
638   g_return_if_fail (GST_STATE (src) <= GST_STATE_READY);
639
640   GST_OBJECT_LOCK (src);
641   gst_segment_init (&src->segment, format);
642   GST_OBJECT_UNLOCK (src);
643 }
644
645 /**
646  * gst_base_src_set_dynamic_size:
647  * @src: base source instance
648  * @dynamic: new dynamic size mode
649  *
650  * If not @dynamic, size is only updated when needed, such as when trying to
651  * read past current tracked size.  Otherwise, size is checked for upon each
652  * read.
653  */
654 void
655 gst_base_src_set_dynamic_size (GstBaseSrc * src, gboolean dynamic)
656 {
657   g_return_if_fail (GST_IS_BASE_SRC (src));
658
659   g_atomic_int_set (&src->priv->dynamic_size, dynamic);
660 }
661
662 /**
663  * gst_base_src_set_automatic_eos:
664  * @src: base source instance
665  * @automatic_eos: automatic eos
666  *
667  * If @automatic_eos is %TRUE, @src will automatically go EOS if a buffer
668  * after the total size is returned. By default this is %TRUE but sources
669  * that can't return an authoritative size and only know that they're EOS
670  * when trying to read more should set this to %FALSE.
671  *
672  * When @src operates in %GST_FORMAT_TIME, #GstBaseSrc will send an EOS
673  * when a buffer outside of the currently configured segment is pushed if
674  * @automatic_eos is %TRUE. Since 1.16, if @automatic_eos is %FALSE an
675  * EOS will be pushed only when the #GstBaseSrcClass::create implementation
676  * returns %GST_FLOW_EOS.
677  *
678  * Since: 1.4
679  */
680 void
681 gst_base_src_set_automatic_eos (GstBaseSrc * src, gboolean automatic_eos)
682 {
683   g_return_if_fail (GST_IS_BASE_SRC (src));
684
685   g_atomic_int_set (&src->priv->automatic_eos, automatic_eos);
686 }
687
688 /**
689  * gst_base_src_set_async:
690  * @src: base source instance
691  * @async: new async mode
692  *
693  * Configure async behaviour in @src, no state change will block. The open,
694  * close, start, stop, play and pause virtual methods will be executed in a
695  * different thread and are thus allowed to perform blocking operations. Any
696  * blocking operation should be unblocked with the unlock vmethod.
697  */
698 void
699 gst_base_src_set_async (GstBaseSrc * src, gboolean async)
700 {
701   g_return_if_fail (GST_IS_BASE_SRC (src));
702
703   GST_OBJECT_LOCK (src);
704   src->priv->async = async;
705   GST_OBJECT_UNLOCK (src);
706 }
707
708 /**
709  * gst_base_src_is_async:
710  * @src: base source instance
711  *
712  * Get the current async behaviour of @src. See also gst_base_src_set_async().
713  *
714  * Returns: %TRUE if @src is operating in async mode.
715  */
716 gboolean
717 gst_base_src_is_async (GstBaseSrc * src)
718 {
719   gboolean res;
720
721   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
722
723   GST_OBJECT_LOCK (src);
724   res = src->priv->async;
725   GST_OBJECT_UNLOCK (src);
726
727   return res;
728 }
729
730
731 /**
732  * gst_base_src_query_latency:
733  * @src: the source
734  * @live: (out) (allow-none): if the source is live
735  * @min_latency: (out) (allow-none): the min latency of the source
736  * @max_latency: (out) (allow-none): the max latency of the source
737  *
738  * Query the source for the latency parameters. @live will be %TRUE when @src is
739  * configured as a live source. @min_latency and @max_latency will be set
740  * to the difference between the running time and the timestamp of the first
741  * buffer.
742  *
743  * This function is mostly used by subclasses.
744  *
745  * Returns: %TRUE if the query succeeded.
746  */
747 gboolean
748 gst_base_src_query_latency (GstBaseSrc * src, gboolean * live,
749     GstClockTime * min_latency, GstClockTime * max_latency)
750 {
751   GstClockTime min;
752
753   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
754
755   GST_OBJECT_LOCK (src);
756   if (live)
757     *live = src->is_live;
758
759   /* if we have a startup latency, report this one, else report 0. Subclasses
760    * are supposed to override the query function if they want something
761    * else. */
762   if (src->priv->latency != -1)
763     min = src->priv->latency;
764   else
765     min = 0;
766
767   if (min_latency)
768     *min_latency = min;
769   if (max_latency)
770     *max_latency = min;
771
772   GST_LOG_OBJECT (src, "latency: live %d, min %" GST_TIME_FORMAT
773       ", max %" GST_TIME_FORMAT, src->is_live, GST_TIME_ARGS (min),
774       GST_TIME_ARGS (min));
775   GST_OBJECT_UNLOCK (src);
776
777   return TRUE;
778 }
779
780 /**
781  * gst_base_src_set_blocksize:
782  * @src: the source
783  * @blocksize: the new blocksize in bytes
784  *
785  * Set the number of bytes that @src will push out with each buffer. When
786  * @blocksize is set to -1, a default length will be used.
787  */
788 void
789 gst_base_src_set_blocksize (GstBaseSrc * src, guint blocksize)
790 {
791   g_return_if_fail (GST_IS_BASE_SRC (src));
792
793   GST_OBJECT_LOCK (src);
794   src->blocksize = blocksize;
795   GST_OBJECT_UNLOCK (src);
796 }
797
798 /**
799  * gst_base_src_get_blocksize:
800  * @src: the source
801  *
802  * Get the number of bytes that @src will push out with each buffer.
803  *
804  * Returns: the number of bytes pushed with each buffer.
805  */
806 guint
807 gst_base_src_get_blocksize (GstBaseSrc * src)
808 {
809   gint res;
810
811   g_return_val_if_fail (GST_IS_BASE_SRC (src), 0);
812
813   GST_OBJECT_LOCK (src);
814   res = src->blocksize;
815   GST_OBJECT_UNLOCK (src);
816
817   return res;
818 }
819
820
821 /**
822  * gst_base_src_set_do_timestamp:
823  * @src: the source
824  * @timestamp: enable or disable timestamping
825  *
826  * Configure @src to automatically timestamp outgoing buffers based on the
827  * current running_time of the pipeline. This property is mostly useful for live
828  * sources.
829  */
830 void
831 gst_base_src_set_do_timestamp (GstBaseSrc * src, gboolean timestamp)
832 {
833   g_return_if_fail (GST_IS_BASE_SRC (src));
834
835   GST_OBJECT_LOCK (src);
836   src->priv->do_timestamp = timestamp;
837   if (timestamp && src->segment.format != GST_FORMAT_TIME)
838     gst_segment_init (&src->segment, GST_FORMAT_TIME);
839   GST_OBJECT_UNLOCK (src);
840 }
841
842 #ifdef TIZEN_PROFILE_TV
843 /**
844 *gst_base_src_update_segment:
845 *@src: the source
846 *@timestamp: timestamp to set
847 */
848 void
849 gst_base_src_update_segment (GstBaseSrc * src, gint64 timestamp)
850 {
851   g_return_if_fail (GST_IS_BASE_SRC (src));
852
853   GST_OBJECT_LOCK (src);
854   if (timestamp != -1) {
855     GST_DEBUG_OBJECT (src,
856         "udpate the segment parameter for es player resume playback,%lld",
857         timestamp);
858     src->segment.start = timestamp;
859     src->segment.time = timestamp;
860     src->segment.position = timestamp;
861   }
862   GST_OBJECT_UNLOCK (src);
863   GST_DEBUG_OBJECT (src, "udpate the segment end");
864 }
865 #endif
866
867 /**
868  * gst_base_src_get_do_timestamp:
869  * @src: the source
870  *
871  * Query if @src timestamps outgoing buffers based on the current running_time.
872  *
873  * Returns: %TRUE if the base class will automatically timestamp outgoing buffers.
874  */
875 gboolean
876 gst_base_src_get_do_timestamp (GstBaseSrc * src)
877 {
878   gboolean res;
879
880   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
881
882   GST_OBJECT_LOCK (src);
883   res = src->priv->do_timestamp;
884   GST_OBJECT_UNLOCK (src);
885
886   return res;
887 }
888
889 /**
890  * gst_base_src_new_seamless_segment:
891  * @src: The source
892  * @start: The new start value for the segment
893  * @stop: Stop value for the new segment
894  * @time: The new time value for the start of the new segment
895  *
896  * Prepare a new seamless segment for emission downstream. This function must
897  * only be called by derived sub-classes, and only from the #GstBaseSrcClass::create function,
898  * as the stream-lock needs to be held.
899  *
900  * The format for the new segment will be the current format of the source, as
901  * configured with gst_base_src_set_format()
902  *
903  * Returns: %TRUE if preparation of the seamless segment succeeded.
904  *
905  * Deprecated: 1.18: Use gst_base_src_new_segment()
906  */
907 gboolean
908 gst_base_src_new_seamless_segment (GstBaseSrc * src, gint64 start, gint64 stop,
909     gint64 time)
910 {
911   gboolean res = TRUE;
912
913   GST_OBJECT_LOCK (src);
914
915   src->segment.base = gst_segment_to_running_time (&src->segment,
916       src->segment.format, src->segment.position);
917   src->segment.position = src->segment.start = start;
918   src->segment.stop = stop;
919   src->segment.time = time;
920
921   /* Mark pending segment. Will be sent before next data */
922   src->priv->segment_pending = TRUE;
923   src->priv->segment_seqnum = gst_util_seqnum_next ();
924
925   GST_DEBUG_OBJECT (src,
926       "Starting new seamless segment. Start %" GST_TIME_FORMAT " stop %"
927       GST_TIME_FORMAT " time %" GST_TIME_FORMAT " base %" GST_TIME_FORMAT,
928       GST_TIME_ARGS (start), GST_TIME_ARGS (stop), GST_TIME_ARGS (time),
929       GST_TIME_ARGS (src->segment.base));
930
931   GST_OBJECT_UNLOCK (src);
932
933   src->priv->discont = TRUE;
934   src->running = TRUE;
935
936   return res;
937 }
938
939 /**
940  * gst_base_src_new_segment:
941  * @src: a #GstBaseSrc
942  * @segment: a pointer to a #GstSegment
943  *
944  * Prepare a new segment for emission downstream. This function must
945  * only be called by derived sub-classes, and only from the #GstBaseSrcClass::create function,
946  * as the stream-lock needs to be held.
947  *
948  * The format for the @segment must be identical with the current format
949  * of the source, as configured with gst_base_src_set_format().
950  *
951  * The format of @src must not be %GST_FORMAT_UNDEFINED and the format
952  * should be configured via gst_base_src_set_format() before calling this method.
953  *
954  * Returns: %TRUE if preparation of new segment succeeded.
955  *
956  * Since: 1.18
957  */
958 gboolean
959 gst_base_src_new_segment (GstBaseSrc * src, const GstSegment * segment)
960 {
961   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
962   g_return_val_if_fail (segment != NULL, FALSE);
963
964   GST_OBJECT_LOCK (src);
965
966   if (src->segment.format == GST_FORMAT_UNDEFINED) {
967     /* subclass must set valid format before calling this method */
968     GST_WARNING_OBJECT (src, "segment format is not configured yet, ignore");
969     GST_OBJECT_UNLOCK (src);
970     return FALSE;
971   }
972
973   if (src->segment.format != segment->format) {
974     GST_WARNING_OBJECT (src, "segment format mismatched, ignore");
975     GST_OBJECT_UNLOCK (src);
976     return FALSE;
977   }
978
979   gst_segment_copy_into (segment, &src->segment);
980
981   /* Mark pending segment. Will be sent before next data */
982   src->priv->segment_pending = TRUE;
983   src->priv->segment_seqnum = gst_util_seqnum_next ();
984
985   GST_DEBUG_OBJECT (src, "Starting new segment %" GST_SEGMENT_FORMAT, segment);
986
987   GST_OBJECT_UNLOCK (src);
988
989   src->running = TRUE;
990
991   return TRUE;
992 }
993
994 /* called with STREAM_LOCK */
995 static gboolean
996 gst_base_src_send_stream_start (GstBaseSrc * src)
997 {
998   gboolean ret = TRUE;
999
1000   if (src->priv->stream_start_pending) {
1001     gchar *stream_id;
1002     GstEvent *event;
1003
1004     stream_id =
1005         gst_pad_create_stream_id (src->srcpad, GST_ELEMENT_CAST (src), NULL);
1006
1007     GST_DEBUG_OBJECT (src, "Pushing STREAM_START");
1008     event = gst_event_new_stream_start (stream_id);
1009     gst_event_set_group_id (event, gst_util_group_id_next ());
1010
1011     ret = gst_pad_push_event (src->srcpad, event);
1012     src->priv->stream_start_pending = FALSE;
1013     g_free (stream_id);
1014   }
1015
1016   return ret;
1017 }
1018
1019 /**
1020  * gst_base_src_set_caps:
1021  * @src: a #GstBaseSrc
1022  * @caps: (transfer none): a #GstCaps
1023  *
1024  * Set new caps on the basesrc source pad.
1025  *
1026  * Returns: %TRUE if the caps could be set
1027  */
1028 gboolean
1029 gst_base_src_set_caps (GstBaseSrc * src, GstCaps * caps)
1030 {
1031   GstBaseSrcClass *bclass;
1032   gboolean res = TRUE;
1033   GstCaps *current_caps;
1034
1035   bclass = GST_BASE_SRC_GET_CLASS (src);
1036
1037   gst_base_src_send_stream_start (src);
1038
1039   current_caps = gst_pad_get_current_caps (GST_BASE_SRC_PAD (src));
1040   if (current_caps && gst_caps_is_equal (current_caps, caps)) {
1041     GST_DEBUG_OBJECT (src, "New caps equal to old ones: %" GST_PTR_FORMAT,
1042         caps);
1043     res = TRUE;
1044   } else {
1045     if (bclass->set_caps)
1046       res = bclass->set_caps (src, caps);
1047
1048     if (res)
1049       res = gst_pad_push_event (src->srcpad, gst_event_new_caps (caps));
1050   }
1051
1052   if (current_caps)
1053     gst_caps_unref (current_caps);
1054
1055   return res;
1056 }
1057
1058 static GstCaps *
1059 gst_base_src_default_get_caps (GstBaseSrc * bsrc, GstCaps * filter)
1060 {
1061   GstCaps *caps = NULL;
1062   GstPadTemplate *pad_template;
1063   GstBaseSrcClass *bclass;
1064
1065   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
1066
1067   pad_template =
1068       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass), "src");
1069
1070   if (pad_template != NULL) {
1071     caps = gst_pad_template_get_caps (pad_template);
1072
1073     if (filter) {
1074       GstCaps *intersection;
1075
1076       intersection =
1077           gst_caps_intersect_full (filter, caps, GST_CAPS_INTERSECT_FIRST);
1078       gst_caps_unref (caps);
1079       caps = intersection;
1080     }
1081   }
1082   return caps;
1083 }
1084
1085 static GstCaps *
1086 gst_base_src_default_fixate (GstBaseSrc * bsrc, GstCaps * caps)
1087 {
1088   GST_DEBUG_OBJECT (bsrc, "using default caps fixate function");
1089   return gst_caps_fixate (caps);
1090 }
1091
1092 static GstCaps *
1093 gst_base_src_fixate (GstBaseSrc * bsrc, GstCaps * caps)
1094 {
1095   GstBaseSrcClass *bclass;
1096
1097   bclass = GST_BASE_SRC_GET_CLASS (bsrc);
1098
1099   if (bclass->fixate)
1100     caps = bclass->fixate (bsrc, caps);
1101
1102   return caps;
1103 }
1104
1105 static gboolean
1106 gst_base_src_default_query (GstBaseSrc * src, GstQuery * query)
1107 {
1108   gboolean res;
1109
1110   switch (GST_QUERY_TYPE (query)) {
1111     case GST_QUERY_POSITION:
1112     {
1113       GstFormat format;
1114
1115       gst_query_parse_position (query, &format, NULL);
1116
1117       GST_DEBUG_OBJECT (src, "position query in format %s",
1118           gst_format_get_name (format));
1119
1120       switch (format) {
1121         case GST_FORMAT_PERCENT:
1122         {
1123           gint64 percent;
1124           gint64 position;
1125           gint64 duration;
1126
1127           GST_OBJECT_LOCK (src);
1128           position = src->segment.position;
1129           duration = src->segment.duration;
1130           GST_OBJECT_UNLOCK (src);
1131
1132           if (position != -1 && duration != -1) {
1133             if (position < duration)
1134               percent = gst_util_uint64_scale (GST_FORMAT_PERCENT_MAX, position,
1135                   duration);
1136             else
1137               percent = GST_FORMAT_PERCENT_MAX;
1138           } else
1139             percent = -1;
1140
1141           gst_query_set_position (query, GST_FORMAT_PERCENT, percent);
1142           res = TRUE;
1143           break;
1144         }
1145         default:
1146         {
1147           gint64 position;
1148           GstFormat seg_format;
1149
1150           GST_OBJECT_LOCK (src);
1151           position =
1152               gst_segment_to_stream_time (&src->segment, src->segment.format,
1153               src->segment.position);
1154           seg_format = src->segment.format;
1155           GST_OBJECT_UNLOCK (src);
1156
1157           if (position != -1) {
1158             /* convert to requested format */
1159             res =
1160                 gst_pad_query_convert (src->srcpad, seg_format,
1161                 position, format, &position);
1162           } else
1163             res = TRUE;
1164
1165           if (res)
1166             gst_query_set_position (query, format, position);
1167
1168           break;
1169         }
1170       }
1171       break;
1172     }
1173     case GST_QUERY_DURATION:
1174     {
1175       GstFormat format;
1176
1177       gst_query_parse_duration (query, &format, NULL);
1178
1179       GST_DEBUG_OBJECT (src, "duration query in format %s",
1180           gst_format_get_name (format));
1181
1182       switch (format) {
1183         case GST_FORMAT_PERCENT:
1184           gst_query_set_duration (query, GST_FORMAT_PERCENT,
1185               GST_FORMAT_PERCENT_MAX);
1186           res = TRUE;
1187           break;
1188         default:
1189         {
1190           gint64 duration;
1191           GstFormat seg_format;
1192           guint length = 0;
1193
1194           /* may have to refresh duration */
1195           gst_base_src_update_length (src, 0, &length,
1196               g_atomic_int_get (&src->priv->dynamic_size));
1197
1198           /* this is the duration as configured by the subclass. */
1199           GST_OBJECT_LOCK (src);
1200           duration = src->segment.duration;
1201           seg_format = src->segment.format;
1202           GST_OBJECT_UNLOCK (src);
1203
1204           GST_LOG_OBJECT (src, "duration %" G_GINT64_FORMAT ", format %s",
1205               duration, gst_format_get_name (seg_format));
1206
1207           if (duration != -1) {
1208             /* convert to requested format, if this fails, we have a duration
1209              * but we cannot answer the query, we must return FALSE. */
1210             res =
1211                 gst_pad_query_convert (src->srcpad, seg_format,
1212                 duration, format, &duration);
1213           } else {
1214             /* The subclass did not configure a duration, we assume that the
1215              * media has an unknown duration then and we return TRUE to report
1216              * this. Note that this is not the same as returning FALSE, which
1217              * means that we cannot report the duration at all. */
1218             res = TRUE;
1219           }
1220
1221           if (res)
1222             gst_query_set_duration (query, format, duration);
1223
1224           break;
1225         }
1226       }
1227       break;
1228     }
1229
1230     case GST_QUERY_SEEKING:
1231     {
1232       GstFormat format, seg_format;
1233       gint64 duration;
1234
1235       GST_OBJECT_LOCK (src);
1236       duration = src->segment.duration;
1237       seg_format = src->segment.format;
1238       GST_OBJECT_UNLOCK (src);
1239
1240       gst_query_parse_seeking (query, &format, NULL, NULL, NULL);
1241       if (format == seg_format) {
1242         gst_query_set_seeking (query, seg_format,
1243             gst_base_src_seekable (src), 0, duration);
1244         res = TRUE;
1245       } else {
1246         /* FIXME 2.0: return TRUE + seekable=FALSE for SEEKING query here */
1247         /* Don't reply to the query to make up for demuxers which don't
1248          * handle the SEEKING query yet. Players like Totem will fall back
1249          * to the duration when the SEEKING query isn't answered. */
1250         res = FALSE;
1251       }
1252       break;
1253     }
1254     case GST_QUERY_SEGMENT:
1255     {
1256       GstFormat format;
1257       gint64 start, stop;
1258
1259       GST_OBJECT_LOCK (src);
1260
1261       format = src->segment.format;
1262
1263       start =
1264           gst_segment_to_stream_time (&src->segment, format,
1265           src->segment.start);
1266       if ((stop = src->segment.stop) == -1)
1267         stop = src->segment.duration;
1268       else
1269         stop = gst_segment_to_stream_time (&src->segment, format, stop);
1270
1271       gst_query_set_segment (query, src->segment.rate, format, start, stop);
1272
1273       GST_OBJECT_UNLOCK (src);
1274       res = TRUE;
1275       break;
1276     }
1277
1278     case GST_QUERY_FORMATS:
1279     {
1280       gst_query_set_formats (query, 3, GST_FORMAT_DEFAULT,
1281           GST_FORMAT_BYTES, GST_FORMAT_PERCENT);
1282       res = TRUE;
1283       break;
1284     }
1285     case GST_QUERY_CONVERT:
1286     {
1287       GstFormat src_fmt, dest_fmt;
1288       gint64 src_val, dest_val;
1289
1290       gst_query_parse_convert (query, &src_fmt, &src_val, &dest_fmt, &dest_val);
1291
1292       /* we can only convert between equal formats... */
1293       if (src_fmt == dest_fmt) {
1294         dest_val = src_val;
1295         res = TRUE;
1296       } else
1297         res = FALSE;
1298
1299       gst_query_set_convert (query, src_fmt, src_val, dest_fmt, dest_val);
1300       break;
1301     }
1302     case GST_QUERY_LATENCY:
1303     {
1304       GstClockTime min, max;
1305       gboolean live;
1306
1307       /* Subclasses should override and implement something useful */
1308       res = gst_base_src_query_latency (src, &live, &min, &max);
1309
1310       GST_LOG_OBJECT (src, "report latency: live %d, min %" GST_TIME_FORMAT
1311           ", max %" GST_TIME_FORMAT, live, GST_TIME_ARGS (min),
1312           GST_TIME_ARGS (max));
1313
1314       gst_query_set_latency (query, live, min, max);
1315       break;
1316     }
1317     case GST_QUERY_JITTER:
1318     case GST_QUERY_RATE:
1319       res = FALSE;
1320       break;
1321     case GST_QUERY_BUFFERING:
1322     {
1323       GstFormat format, seg_format;
1324       gint64 start, stop, estimated;
1325
1326       gst_query_parse_buffering_range (query, &format, NULL, NULL, NULL);
1327
1328       GST_DEBUG_OBJECT (src, "buffering query in format %s",
1329           gst_format_get_name (format));
1330
1331       GST_OBJECT_LOCK (src);
1332       if (src->random_access) {
1333         estimated = 0;
1334         start = 0;
1335         if (format == GST_FORMAT_PERCENT)
1336           stop = GST_FORMAT_PERCENT_MAX;
1337         else
1338           stop = src->segment.duration;
1339       } else {
1340         estimated = -1;
1341         start = -1;
1342         stop = -1;
1343       }
1344       seg_format = src->segment.format;
1345       GST_OBJECT_UNLOCK (src);
1346
1347       /* convert to required format. When the conversion fails, we can't answer
1348        * the query. When the value is unknown, we can don't perform conversion
1349        * but report TRUE. */
1350       if (format != GST_FORMAT_PERCENT && stop != -1) {
1351         res = gst_pad_query_convert (src->srcpad, seg_format,
1352             stop, format, &stop);
1353       } else {
1354         res = TRUE;
1355       }
1356       if (res && format != GST_FORMAT_PERCENT && start != -1)
1357         res = gst_pad_query_convert (src->srcpad, seg_format,
1358             start, format, &start);
1359
1360       gst_query_set_buffering_range (query, format, start, stop, estimated);
1361       break;
1362     }
1363     case GST_QUERY_SCHEDULING:
1364     {
1365       gboolean random_access;
1366
1367       random_access = gst_base_src_is_random_access (src);
1368
1369       /* we can operate in getrange mode if the native format is bytes
1370        * and we are seekable, this condition is set in the random_access
1371        * flag and is set in the _start() method. */
1372       gst_query_set_scheduling (query, GST_SCHEDULING_FLAG_SEEKABLE, 1, -1, 0);
1373       if (random_access)
1374         gst_query_add_scheduling_mode (query, GST_PAD_MODE_PULL);
1375       gst_query_add_scheduling_mode (query, GST_PAD_MODE_PUSH);
1376
1377       res = TRUE;
1378       break;
1379     }
1380     case GST_QUERY_CAPS:
1381     {
1382       GstBaseSrcClass *bclass;
1383       GstCaps *caps, *filter;
1384
1385       bclass = GST_BASE_SRC_GET_CLASS (src);
1386       if (bclass->get_caps) {
1387         gst_query_parse_caps (query, &filter);
1388         if ((caps = bclass->get_caps (src, filter))) {
1389           gst_query_set_caps_result (query, caps);
1390           gst_caps_unref (caps);
1391           res = TRUE;
1392         } else {
1393           res = FALSE;
1394         }
1395       } else
1396         res = FALSE;
1397       break;
1398     }
1399     case GST_QUERY_URI:{
1400       if (GST_IS_URI_HANDLER (src)) {
1401         gchar *uri = gst_uri_handler_get_uri (GST_URI_HANDLER (src));
1402
1403         if (uri != NULL) {
1404           gst_query_set_uri (query, uri);
1405           g_free (uri);
1406           res = TRUE;
1407         } else {
1408           res = FALSE;
1409         }
1410       } else {
1411         res = FALSE;
1412       }
1413       break;
1414     }
1415     default:
1416       res = FALSE;
1417       break;
1418   }
1419   GST_DEBUG_OBJECT (src, "query %s returns %d", GST_QUERY_TYPE_NAME (query),
1420       res);
1421
1422   return res;
1423 }
1424
1425 static gboolean
1426 gst_base_src_query (GstPad * pad, GstObject * parent, GstQuery * query)
1427 {
1428   GstBaseSrc *src;
1429   GstBaseSrcClass *bclass;
1430   gboolean result = FALSE;
1431
1432   src = GST_BASE_SRC (parent);
1433   bclass = GST_BASE_SRC_GET_CLASS (src);
1434
1435   if (bclass->query)
1436     result = bclass->query (src, query);
1437
1438   return result;
1439 }
1440
1441 static gboolean
1442 gst_base_src_default_do_seek (GstBaseSrc * src, GstSegment * segment)
1443 {
1444   gboolean res = TRUE;
1445
1446   /* update our offset if the start/stop position was updated */
1447   if (segment->format == GST_FORMAT_BYTES) {
1448     segment->time = segment->start;
1449   } else if (segment->start == 0) {
1450     /* seek to start, we can implement a default for this. */
1451     segment->time = 0;
1452   } else {
1453     res = FALSE;
1454     GST_INFO_OBJECT (src, "Can't do a default seek");
1455   }
1456
1457   return res;
1458 }
1459
1460 static gboolean
1461 gst_base_src_do_seek (GstBaseSrc * src, GstSegment * segment)
1462 {
1463   GstBaseSrcClass *bclass;
1464   gboolean result = FALSE;
1465
1466   bclass = GST_BASE_SRC_GET_CLASS (src);
1467
1468   GST_INFO_OBJECT (src, "seeking: %" GST_SEGMENT_FORMAT, segment);
1469
1470   if (bclass->do_seek)
1471     result = bclass->do_seek (src, segment);
1472
1473   return result;
1474 }
1475
1476 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
1477
1478 static gboolean
1479 gst_base_src_default_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1480     GstSegment * segment)
1481 {
1482   /* By default, we try one of 2 things:
1483    *   - For absolute seek positions, convert the requested position to our
1484    *     configured processing format and place it in the output segment \
1485    *   - For relative seek positions, convert our current (input) values to the
1486    *     seek format, adjust by the relative seek offset and then convert back to
1487    *     the processing format
1488    */
1489   GstSeekType start_type, stop_type;
1490   gint64 start, stop;
1491   GstSeekFlags flags;
1492   GstFormat seek_format, dest_format;
1493   gdouble rate;
1494   gboolean update;
1495   gboolean res = TRUE;
1496
1497   gst_event_parse_seek (event, &rate, &seek_format, &flags,
1498       &start_type, &start, &stop_type, &stop);
1499   dest_format = segment->format;
1500
1501   if (seek_format == dest_format) {
1502     gst_segment_do_seek (segment, rate, seek_format, flags,
1503         start_type, start, stop_type, stop, &update);
1504     return TRUE;
1505   }
1506
1507   if (start_type != GST_SEEK_TYPE_NONE) {
1508     /* FIXME: Handle seek_end by converting the input segment vals */
1509     res =
1510         gst_pad_query_convert (src->srcpad, seek_format, start, dest_format,
1511         &start);
1512     start_type = GST_SEEK_TYPE_SET;
1513   }
1514
1515   if (res && stop_type != GST_SEEK_TYPE_NONE) {
1516     /* FIXME: Handle seek_end by converting the input segment vals */
1517     res =
1518         gst_pad_query_convert (src->srcpad, seek_format, stop, dest_format,
1519         &stop);
1520     stop_type = GST_SEEK_TYPE_SET;
1521   }
1522
1523   /* And finally, configure our output segment in the desired format */
1524   if (res) {
1525     res =
1526         gst_segment_do_seek (segment, rate, dest_format, flags, start_type,
1527         start, stop_type, stop, &update);
1528   }
1529
1530   if (!res)
1531     goto no_format;
1532
1533   return res;
1534
1535 no_format:
1536   {
1537     GST_DEBUG_OBJECT (src, "undefined format given, seek aborted.");
1538     return FALSE;
1539   }
1540 }
1541
1542 static gboolean
1543 gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
1544     GstSegment * seeksegment)
1545 {
1546   GstBaseSrcClass *bclass;
1547   gboolean result = FALSE;
1548
1549   bclass = GST_BASE_SRC_GET_CLASS (src);
1550
1551   if (bclass->prepare_seek_segment)
1552     result = bclass->prepare_seek_segment (src, event, seeksegment);
1553
1554   return result;
1555 }
1556
1557 static GstFlowReturn
1558 gst_base_src_default_alloc (GstBaseSrc * src, guint64 offset,
1559     guint size, GstBuffer ** buffer)
1560 {
1561   GstFlowReturn ret;
1562   GstBaseSrcPrivate *priv = src->priv;
1563   GstBufferPool *pool = NULL;
1564   GstAllocator *allocator = NULL;
1565   GstAllocationParams params;
1566
1567   GST_OBJECT_LOCK (src);
1568   if (priv->pool) {
1569     pool = gst_object_ref (priv->pool);
1570   } else if (priv->allocator) {
1571     allocator = gst_object_ref (priv->allocator);
1572   }
1573   params = priv->params;
1574   GST_OBJECT_UNLOCK (src);
1575
1576   if (pool) {
1577     ret = gst_buffer_pool_acquire_buffer (pool, buffer, NULL);
1578   } else if (size != -1) {
1579     *buffer = gst_buffer_new_allocate (allocator, size, &params);
1580     if (G_UNLIKELY (*buffer == NULL))
1581       goto alloc_failed;
1582
1583     ret = GST_FLOW_OK;
1584   } else {
1585     GST_WARNING_OBJECT (src, "Not trying to alloc %u bytes. Blocksize not set?",
1586         size);
1587     goto alloc_failed;
1588   }
1589
1590 done:
1591   if (pool)
1592     gst_object_unref (pool);
1593   if (allocator)
1594     gst_object_unref (allocator);
1595
1596   return ret;
1597
1598   /* ERRORS */
1599 alloc_failed:
1600   {
1601     GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
1602     ret = GST_FLOW_ERROR;
1603     goto done;
1604   }
1605 }
1606
1607 static GstFlowReturn
1608 gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
1609     guint size, GstBuffer ** buffer)
1610 {
1611   GstBaseSrcClass *bclass;
1612   GstFlowReturn ret;
1613   GstBuffer *res_buf;
1614
1615   bclass = GST_BASE_SRC_GET_CLASS (src);
1616
1617   if (G_UNLIKELY (!bclass->alloc))
1618     goto no_function;
1619   if (G_UNLIKELY (!bclass->fill))
1620     goto no_function;
1621
1622   if (*buffer == NULL) {
1623     /* downstream did not provide us with a buffer to fill, allocate one
1624      * ourselves */
1625     ret = bclass->alloc (src, offset, size, &res_buf);
1626     if (G_UNLIKELY (ret != GST_FLOW_OK))
1627       goto alloc_failed;
1628   } else {
1629     res_buf = *buffer;
1630   }
1631
1632   if (G_LIKELY (size > 0)) {
1633     /* only call fill when there is a size */
1634     ret = bclass->fill (src, offset, size, res_buf);
1635     if (G_UNLIKELY (ret != GST_FLOW_OK))
1636       goto not_ok;
1637   }
1638
1639   *buffer = res_buf;
1640
1641   return GST_FLOW_OK;
1642
1643   /* ERRORS */
1644 no_function:
1645   {
1646     GST_DEBUG_OBJECT (src, "no fill or alloc function");
1647     return GST_FLOW_NOT_SUPPORTED;
1648   }
1649 alloc_failed:
1650   {
1651     GST_DEBUG_OBJECT (src, "Failed to allocate buffer of %u bytes", size);
1652     return ret;
1653   }
1654 not_ok:
1655   {
1656     GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
1657         gst_flow_get_name (ret));
1658     if (*buffer == NULL)
1659       gst_buffer_unref (res_buf);
1660     return ret;
1661   }
1662 }
1663
1664 /* this code implements the seeking. It is a good example
1665  * handling all cases.
1666  *
1667  * A seek updates the currently configured segment.start
1668  * and segment.stop values based on the SEEK_TYPE. If the
1669  * segment.start value is updated, a seek to this new position
1670  * should be performed.
1671  *
1672  * The seek can only be executed when we are not currently
1673  * streaming any data, to make sure that this is the case, we
1674  * acquire the STREAM_LOCK which is taken when we are in the
1675  * _loop() function or when a getrange() is called. Normally
1676  * we will not receive a seek if we are operating in pull mode
1677  * though. When we operate as a live source we might block on the live
1678  * cond, which does not release the STREAM_LOCK. Therefore we will try
1679  * to grab the LIVE_LOCK instead of the STREAM_LOCK to make sure it is
1680  * safe to perform the seek.
1681  *
1682  * When we are in the loop() function, we might be in the middle
1683  * of pushing a buffer, which might block in a sink. To make sure
1684  * that the push gets unblocked we push out a FLUSH_START event.
1685  * Our loop function will get a FLUSHING return value from
1686  * the push and will pause, effectively releasing the STREAM_LOCK.
1687  *
1688  * For a non-flushing seek, we pause the task, which might eventually
1689  * release the STREAM_LOCK. We say eventually because when the sink
1690  * blocks on the sample we might wait a very long time until the sink
1691  * unblocks the sample. In any case we acquire the STREAM_LOCK and
1692  * can continue the seek. A non-flushing seek is normally done in a
1693  * running pipeline to perform seamless playback, this means that the sink is
1694  * PLAYING and will return from its chain function.
1695  * In the case of a non-flushing seek we need to make sure that the
1696  * data we output after the seek is continuous with the previous data,
1697  * this is because a non-flushing seek does not reset the running-time
1698  * to 0. We do this by closing the currently running segment, ie. sending
1699  * a new_segment event with the stop position set to the last processed
1700  * position.
1701  *
1702  * After updating the segment.start/stop values, we prepare for
1703  * streaming again. We push out a FLUSH_STOP to make the peer pad
1704  * accept data again and we start our task again.
1705  *
1706  * A segment seek posts a message on the bus saying that the playback
1707  * of the segment started. We store the segment flag internally because
1708  * when we reach the segment.stop we have to post a segment.done
1709  * instead of EOS when doing a segment seek.
1710  */
1711 static gboolean
1712 gst_base_src_perform_seek (GstBaseSrc * src, GstEvent * event, gboolean unlock)
1713 {
1714   gboolean res = TRUE, tres;
1715   gdouble rate;
1716   GstFormat seek_format, dest_format;
1717   GstSeekFlags flags;
1718   GstSeekType start_type, stop_type;
1719   gint64 start, stop;
1720   gboolean flush;
1721   gboolean update;
1722   gboolean relative_seek = FALSE;
1723   gboolean seekseg_configured = FALSE;
1724   GstSegment seeksegment;
1725   guint32 seqnum;
1726   GstEvent *tevent;
1727
1728   GST_DEBUG_OBJECT (src, "doing seek: %" GST_PTR_FORMAT, event);
1729
1730   GST_OBJECT_LOCK (src);
1731   dest_format = src->segment.format;
1732   GST_OBJECT_UNLOCK (src);
1733
1734   if (event) {
1735     gst_event_parse_seek (event, &rate, &seek_format, &flags,
1736         &start_type, &start, &stop_type, &stop);
1737
1738     relative_seek = SEEK_TYPE_IS_RELATIVE (start_type) ||
1739         SEEK_TYPE_IS_RELATIVE (stop_type);
1740
1741     if (dest_format != seek_format && !relative_seek) {
1742       /* If we have an ABSOLUTE position (SEEK_SET only), we can convert it
1743        * here before taking the stream lock, otherwise we must convert it later,
1744        * once we have the stream lock and can read the last configures segment
1745        * start and stop positions */
1746       gst_segment_init (&seeksegment, dest_format);
1747
1748       if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment))
1749         goto prepare_failed;
1750
1751       seekseg_configured = TRUE;
1752     }
1753
1754     flush = flags & GST_SEEK_FLAG_FLUSH;
1755     seqnum = gst_event_get_seqnum (event);
1756   } else {
1757     flush = FALSE;
1758     /* get next seqnum */
1759     seqnum = gst_util_seqnum_next ();
1760   }
1761
1762   /* send flush start */
1763   if (flush) {
1764     tevent = gst_event_new_flush_start ();
1765     gst_event_set_seqnum (tevent, seqnum);
1766     gst_pad_push_event (src->srcpad, tevent);
1767   } else
1768     gst_pad_pause_task (src->srcpad);
1769
1770   /* unblock streaming thread. */
1771   if (unlock)
1772     gst_base_src_set_flushing (src, TRUE);
1773
1774   /* grab streaming lock, this should eventually be possible, either
1775    * because the task is paused, our streaming thread stopped
1776    * or because our peer is flushing. */
1777   GST_PAD_STREAM_LOCK (src->srcpad);
1778   if (G_UNLIKELY (src->priv->seqnum == seqnum)) {
1779     /* we have seen this event before, issue a warning for now */
1780     GST_WARNING_OBJECT (src, "duplicate event found %" G_GUINT32_FORMAT,
1781         seqnum);
1782   } else {
1783     src->priv->seqnum = seqnum;
1784     GST_DEBUG_OBJECT (src, "seek with seqnum %" G_GUINT32_FORMAT, seqnum);
1785   }
1786
1787   if (unlock)
1788     gst_base_src_set_flushing (src, FALSE);
1789
1790   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
1791    * copy the current segment info into the temp segment that we can actually
1792    * attempt the seek with. We only update the real segment if the seek succeeds. */
1793   if (!seekseg_configured) {
1794     memcpy (&seeksegment, &src->segment, sizeof (GstSegment));
1795
1796     /* now configure the final seek segment */
1797     if (event) {
1798       if (seeksegment.format != seek_format) {
1799         /* OK, here's where we give the subclass a chance to convert the relative
1800          * seek into an absolute one in the processing format. We set up any
1801          * absolute seek above, before taking the stream lock. */
1802         if (!gst_base_src_prepare_seek_segment (src, event, &seeksegment)) {
1803           GST_DEBUG_OBJECT (src, "Preparing the seek failed after flushing. "
1804               "Aborting seek");
1805           res = FALSE;
1806         }
1807       } else {
1808         /* The seek format matches our processing format, no need to ask the
1809          * the subclass to configure the segment. */
1810         gst_segment_do_seek (&seeksegment, rate, seek_format, flags,
1811             start_type, start, stop_type, stop, &update);
1812       }
1813     }
1814     /* Else, no seek event passed, so we're just (re)starting the
1815        current segment. */
1816   }
1817
1818   if (res) {
1819     GST_DEBUG_OBJECT (src, "segment configured from %" G_GINT64_FORMAT
1820         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
1821         seeksegment.start, seeksegment.stop, seeksegment.position);
1822
1823     /* do the seek, segment.position contains the new position. */
1824     res = gst_base_src_do_seek (src, &seeksegment);
1825   }
1826
1827   /* and prepare to continue streaming */
1828   if (flush) {
1829     tevent = gst_event_new_flush_stop (TRUE);
1830     gst_event_set_seqnum (tevent, seqnum);
1831     /* send flush stop, peer will accept data and events again. We
1832      * are not yet providing data as we still have the STREAM_LOCK. */
1833     gst_pad_push_event (src->srcpad, tevent);
1834   }
1835
1836   /* The subclass must have converted the segment to the processing format
1837    * by now */
1838   if (res && seeksegment.format != dest_format) {
1839     GST_DEBUG_OBJECT (src, "Subclass failed to prepare a seek segment "
1840         "in the correct format. Aborting seek.");
1841     res = FALSE;
1842   }
1843
1844   /* if the seek was successful, we update our real segment and push
1845    * out the new segment. */
1846   if (res) {
1847     GST_OBJECT_LOCK (src);
1848     memcpy (&src->segment, &seeksegment, sizeof (GstSegment));
1849     GST_OBJECT_UNLOCK (src);
1850
1851     if (seeksegment.flags & GST_SEGMENT_FLAG_SEGMENT) {
1852       GstMessage *message;
1853
1854       message = gst_message_new_segment_start (GST_OBJECT (src),
1855           seeksegment.format, seeksegment.position);
1856       gst_message_set_seqnum (message, seqnum);
1857
1858       gst_element_post_message (GST_ELEMENT (src), message);
1859     }
1860
1861     src->priv->segment_pending = TRUE;
1862     src->priv->segment_seqnum = seqnum;
1863   }
1864
1865   src->priv->discont = TRUE;
1866   src->running = TRUE;
1867   /* and restart the task in case it got paused explicitly or by
1868    * the FLUSH_START event we pushed out. */
1869   tres = gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1870       src->srcpad, NULL);
1871   if (res && !tres)
1872     res = FALSE;
1873
1874   /* and release the lock again so we can continue streaming */
1875   GST_PAD_STREAM_UNLOCK (src->srcpad);
1876
1877   return res;
1878
1879   /* ERROR */
1880 prepare_failed:
1881   GST_DEBUG_OBJECT (src, "Preparing the seek failed before flushing. "
1882       "Aborting seek");
1883   return FALSE;
1884 }
1885
1886 /* all events send to this element directly. This is mainly done from the
1887  * application.
1888  */
1889 static gboolean
1890 gst_base_src_send_event (GstElement * element, GstEvent * event)
1891 {
1892   GstBaseSrc *src;
1893   gboolean result = FALSE;
1894   GstBaseSrcClass *bclass;
1895
1896   src = GST_BASE_SRC (element);
1897   bclass = GST_BASE_SRC_GET_CLASS (src);
1898
1899   GST_DEBUG_OBJECT (src, "handling event %p %" GST_PTR_FORMAT, event, event);
1900
1901   switch (GST_EVENT_TYPE (event)) {
1902       /* bidirectional events */
1903     case GST_EVENT_FLUSH_START:
1904       GST_DEBUG_OBJECT (src, "pushing flush-start event downstream");
1905
1906       result = gst_pad_push_event (src->srcpad, event);
1907       gst_base_src_set_flushing (src, TRUE);
1908       event = NULL;
1909       break;
1910     case GST_EVENT_FLUSH_STOP:
1911     {
1912       gboolean start;
1913
1914       GST_PAD_STREAM_LOCK (src->srcpad);
1915       gst_base_src_set_flushing (src, FALSE);
1916
1917       GST_DEBUG_OBJECT (src, "pushing flush-stop event downstream");
1918       result = gst_pad_push_event (src->srcpad, event);
1919
1920       /* For external flush, restart the task .. */
1921       GST_LIVE_LOCK (src);
1922       src->priv->segment_pending = TRUE;
1923
1924       GST_OBJECT_LOCK (src->srcpad);
1925       start = (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH);
1926       GST_OBJECT_UNLOCK (src->srcpad);
1927
1928       /* ... and for live sources, only if in playing state */
1929       if (src->is_live) {
1930         if (!src->live_running)
1931           start = FALSE;
1932       }
1933
1934       if (start)
1935         gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1936             src->srcpad, NULL);
1937
1938       GST_LIVE_UNLOCK (src);
1939       GST_PAD_STREAM_UNLOCK (src->srcpad);
1940
1941       event = NULL;
1942       break;
1943     }
1944
1945       /* downstream serialized events */
1946     case GST_EVENT_EOS:
1947     {
1948       gboolean push_mode;
1949
1950       /* queue EOS and make sure the task or pull function performs the EOS
1951        * actions.
1952        *
1953        * For push mode, This will be done in 3 steps. It is required to not
1954        * block here as gst_element_send_event() will hold the STATE_LOCK, hence
1955        * blocking would prevent asynchronous state change to complete.
1956        *
1957        * 1. We stop the streaming thread
1958        * 2. We set the pending eos
1959        * 3. We start the streaming thread again, so it is performed
1960        *    asynchronously.
1961        *
1962        * For pull mode, we simply mark the pending EOS without flushing.
1963        */
1964
1965       GST_OBJECT_LOCK (src->srcpad);
1966       push_mode = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
1967       GST_OBJECT_UNLOCK (src->srcpad);
1968
1969       if (push_mode) {
1970         gst_base_src_set_flushing (src, TRUE);
1971
1972         GST_PAD_STREAM_LOCK (src->srcpad);
1973         gst_base_src_set_flushing (src, FALSE);
1974
1975         GST_OBJECT_LOCK (src);
1976         g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
1977         if (src->priv->pending_eos)
1978           gst_event_unref (src->priv->pending_eos);
1979         src->priv->pending_eos = event;
1980         GST_OBJECT_UNLOCK (src);
1981
1982         GST_DEBUG_OBJECT (src,
1983             "EOS marked, start task for asynchronous handling");
1984         gst_pad_start_task (src->srcpad, (GstTaskFunction) gst_base_src_loop,
1985             src->srcpad, NULL);
1986
1987         GST_PAD_STREAM_UNLOCK (src->srcpad);
1988       } else {
1989         /* In pull mode, we need not to return flushing to downstream, though
1990          * the stream lock is not kept after getrange was unblocked */
1991         GST_OBJECT_LOCK (src);
1992         g_atomic_int_set (&src->priv->has_pending_eos, TRUE);
1993         if (src->priv->pending_eos)
1994           gst_event_unref (src->priv->pending_eos);
1995         src->priv->pending_eos = event;
1996         GST_OBJECT_UNLOCK (src);
1997
1998         gst_base_src_set_pool_flushing (src, TRUE);
1999         if (bclass->unlock)
2000           bclass->unlock (src);
2001
2002         GST_PAD_STREAM_LOCK (src->srcpad);
2003         if (bclass->unlock_stop)
2004           bclass->unlock_stop (src);
2005         gst_base_src_set_pool_flushing (src, TRUE);
2006         GST_PAD_STREAM_UNLOCK (src->srcpad);
2007       }
2008
2009
2010       event = NULL;
2011       result = TRUE;
2012       break;
2013     }
2014     case GST_EVENT_SEGMENT:
2015       /* sending random SEGMENT downstream can break sync. */
2016       break;
2017     case GST_EVENT_TAG:
2018     case GST_EVENT_SINK_MESSAGE:
2019     case GST_EVENT_CUSTOM_DOWNSTREAM:
2020     case GST_EVENT_CUSTOM_BOTH:
2021     case GST_EVENT_PROTECTION:
2022       /* Insert TAG, CUSTOM_DOWNSTREAM, CUSTOM_BOTH, PROTECTION in the dataflow */
2023       GST_OBJECT_LOCK (src);
2024       src->priv->pending_events =
2025           g_list_append (src->priv->pending_events, event);
2026       g_atomic_int_set (&src->priv->have_events, TRUE);
2027       GST_OBJECT_UNLOCK (src);
2028       event = NULL;
2029       result = TRUE;
2030       break;
2031     case GST_EVENT_BUFFERSIZE:
2032       /* does not seem to make much sense currently */
2033       break;
2034
2035       /* upstream events */
2036     case GST_EVENT_QOS:
2037       /* elements should override send_event and do something */
2038       break;
2039     case GST_EVENT_SEEK:
2040     {
2041       gboolean started;
2042
2043       GST_OBJECT_LOCK (src->srcpad);
2044       if (GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PULL)
2045         goto wrong_mode;
2046       started = GST_PAD_MODE (src->srcpad) == GST_PAD_MODE_PUSH;
2047       GST_OBJECT_UNLOCK (src->srcpad);
2048
2049       if (started) {
2050         GST_DEBUG_OBJECT (src, "performing seek");
2051         /* when we are running in push mode, we can execute the
2052          * seek right now. */
2053         result = gst_base_src_perform_seek (src, event, TRUE);
2054       } else {
2055         GstEvent **event_p;
2056
2057         /* else we store the event and execute the seek when we
2058          * get activated */
2059         GST_OBJECT_LOCK (src);
2060         GST_DEBUG_OBJECT (src, "queueing seek");
2061         event_p = &src->pending_seek;
2062         gst_event_replace ((GstEvent **) event_p, event);
2063         GST_OBJECT_UNLOCK (src);
2064         /* assume the seek will work */
2065         result = TRUE;
2066       }
2067       break;
2068     }
2069     case GST_EVENT_NAVIGATION:
2070       /* could make sense for elements that do something with navigation events
2071        * but then they would need to override the send_event function */
2072       break;
2073     case GST_EVENT_LATENCY:
2074       /* does not seem to make sense currently */
2075       break;
2076
2077       /* custom events */
2078     case GST_EVENT_CUSTOM_UPSTREAM:
2079       /* override send_event if you want this */
2080       break;
2081     case GST_EVENT_CUSTOM_DOWNSTREAM_OOB:
2082     case GST_EVENT_CUSTOM_BOTH_OOB:
2083       /* insert a random custom event into the pipeline */
2084       GST_DEBUG_OBJECT (src, "pushing custom OOB event downstream");
2085       result = gst_pad_push_event (src->srcpad, event);
2086       /* we gave away the ref to the event in the push */
2087       event = NULL;
2088       break;
2089     default:
2090       break;
2091   }
2092 done:
2093   /* if we still have a ref to the event, unref it now */
2094   if (event)
2095     gst_event_unref (event);
2096
2097   return result;
2098
2099   /* ERRORS */
2100 wrong_mode:
2101   {
2102     GST_DEBUG_OBJECT (src, "cannot perform seek when operating in pull mode");
2103     GST_OBJECT_UNLOCK (src->srcpad);
2104     result = FALSE;
2105     goto done;
2106   }
2107 }
2108
2109 static gboolean
2110 gst_base_src_seekable (GstBaseSrc * src)
2111 {
2112   GstBaseSrcClass *bclass;
2113   bclass = GST_BASE_SRC_GET_CLASS (src);
2114   if (bclass->is_seekable)
2115     return bclass->is_seekable (src);
2116   else
2117     return FALSE;
2118 }
2119
2120 static void
2121 gst_base_src_update_qos (GstBaseSrc * src,
2122     gdouble proportion, GstClockTimeDiff diff, GstClockTime timestamp)
2123 {
2124   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, src,
2125       "qos: proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2126       GST_TIME_FORMAT, proportion, diff, GST_TIME_ARGS (timestamp));
2127
2128   GST_OBJECT_LOCK (src);
2129   src->priv->proportion = proportion;
2130   src->priv->earliest_time = timestamp + diff;
2131   GST_OBJECT_UNLOCK (src);
2132 }
2133
2134
2135 static gboolean
2136 gst_base_src_default_event (GstBaseSrc * src, GstEvent * event)
2137 {
2138   gboolean result;
2139
2140   GST_DEBUG_OBJECT (src, "handle event %" GST_PTR_FORMAT, event);
2141
2142   switch (GST_EVENT_TYPE (event)) {
2143     case GST_EVENT_SEEK:
2144       /* is normally called when in push mode */
2145       if (!gst_base_src_seekable (src))
2146         goto not_seekable;
2147
2148       result = gst_base_src_perform_seek (src, event, TRUE);
2149       break;
2150     case GST_EVENT_FLUSH_START:
2151       /* cancel any blocking getrange, is normally called
2152        * when in pull mode. */
2153       result = gst_base_src_set_flushing (src, TRUE);
2154       break;
2155     case GST_EVENT_FLUSH_STOP:
2156       result = gst_base_src_set_flushing (src, FALSE);
2157       break;
2158     case GST_EVENT_QOS:
2159     {
2160       gdouble proportion;
2161       GstClockTimeDiff diff;
2162       GstClockTime timestamp;
2163
2164       gst_event_parse_qos (event, NULL, &proportion, &diff, &timestamp);
2165       gst_base_src_update_qos (src, proportion, diff, timestamp);
2166       result = TRUE;
2167       break;
2168     }
2169     case GST_EVENT_RECONFIGURE:
2170       result = TRUE;
2171       break;
2172     case GST_EVENT_LATENCY:
2173       result = TRUE;
2174       break;
2175     default:
2176       result = FALSE;
2177       break;
2178   }
2179   return result;
2180
2181   /* ERRORS */
2182 not_seekable:
2183   {
2184     GST_DEBUG_OBJECT (src, "is not seekable");
2185     return FALSE;
2186   }
2187 }
2188
2189 static gboolean
2190 gst_base_src_event (GstPad * pad, GstObject * parent, GstEvent * event)
2191 {
2192   GstBaseSrc *src;
2193   GstBaseSrcClass *bclass;
2194   gboolean result = FALSE;
2195
2196   src = GST_BASE_SRC (parent);
2197   bclass = GST_BASE_SRC_GET_CLASS (src);
2198
2199   if (bclass->event) {
2200     if (!(result = bclass->event (src, event)))
2201       goto subclass_failed;
2202   }
2203
2204 done:
2205   gst_event_unref (event);
2206
2207   return result;
2208
2209   /* ERRORS */
2210 subclass_failed:
2211   {
2212     GST_DEBUG_OBJECT (src, "subclass refused event");
2213     goto done;
2214   }
2215 }
2216
2217 static void
2218 gst_base_src_set_property (GObject * object, guint prop_id,
2219     const GValue * value, GParamSpec * pspec)
2220 {
2221   GstBaseSrc *src;
2222
2223   src = GST_BASE_SRC (object);
2224
2225   switch (prop_id) {
2226     case PROP_BLOCKSIZE:
2227       gst_base_src_set_blocksize (src, g_value_get_uint (value));
2228       break;
2229     case PROP_NUM_BUFFERS:
2230       src->num_buffers = g_value_get_int (value);
2231       break;
2232 #ifndef GST_REMOVE_DEPRECATED
2233     case PROP_TYPEFIND:
2234       src->typefind = g_value_get_boolean (value);
2235       break;
2236 #endif
2237     case PROP_DO_TIMESTAMP:
2238       gst_base_src_set_do_timestamp (src, g_value_get_boolean (value));
2239       break;
2240 #ifdef TIZEN_PROFILE_TV
2241     case PROP_UPDATE_SEGMENT:
2242       gst_base_src_update_segment (src, g_value_get_int64 (value));
2243       break;
2244 #endif
2245     default:
2246       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2247       break;
2248   }
2249 }
2250
2251 static void
2252 gst_base_src_get_property (GObject * object, guint prop_id, GValue * value,
2253     GParamSpec * pspec)
2254 {
2255   GstBaseSrc *src;
2256
2257   src = GST_BASE_SRC (object);
2258
2259   switch (prop_id) {
2260     case PROP_BLOCKSIZE:
2261       g_value_set_uint (value, gst_base_src_get_blocksize (src));
2262       break;
2263     case PROP_NUM_BUFFERS:
2264       g_value_set_int (value, src->num_buffers);
2265       break;
2266 #ifndef GST_REMOVE_DEPRECATED
2267     case PROP_TYPEFIND:
2268       g_value_set_boolean (value, src->typefind);
2269       break;
2270 #endif
2271     case PROP_DO_TIMESTAMP:
2272       g_value_set_boolean (value, gst_base_src_get_do_timestamp (src));
2273       break;
2274     default:
2275       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
2276       break;
2277   }
2278 }
2279
2280 /* with STREAM_LOCK and LOCK */
2281 static GstClockReturn
2282 gst_base_src_wait (GstBaseSrc * basesrc, GstClock * clock, GstClockTime time)
2283 {
2284   GstClockReturn ret;
2285   GstClockID id;
2286
2287   id = gst_clock_new_single_shot_id (clock, time);
2288
2289   basesrc->clock_id = id;
2290   /* release the live lock while waiting */
2291   GST_LIVE_UNLOCK (basesrc);
2292
2293   ret = gst_clock_id_wait (id, NULL);
2294
2295   GST_LIVE_LOCK (basesrc);
2296   gst_clock_id_unref (id);
2297   basesrc->clock_id = NULL;
2298
2299   return ret;
2300 }
2301
2302 /* perform synchronisation on a buffer.
2303  * with STREAM_LOCK.
2304  */
2305 static GstClockReturn
2306 gst_base_src_do_sync (GstBaseSrc * basesrc, GstBuffer * buffer)
2307 {
2308   GstClockReturn result;
2309   GstClockTime start, end;
2310   GstBaseSrcClass *bclass;
2311   GstClockTime base_time;
2312   GstClock *clock;
2313   GstClockTime now = GST_CLOCK_TIME_NONE, pts, dts, timestamp;
2314   gboolean do_timestamp, first, pseudo_live, is_live;
2315
2316   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
2317
2318   start = end = -1;
2319   if (bclass->get_times)
2320     bclass->get_times (basesrc, buffer, &start, &end);
2321
2322   /* get buffer timestamp */
2323   dts = GST_BUFFER_DTS (buffer);
2324   pts = GST_BUFFER_PTS (buffer);
2325
2326   if (GST_CLOCK_TIME_IS_VALID (dts))
2327     timestamp = dts;
2328   else
2329     timestamp = pts;
2330
2331   /* grab the lock to prepare for clocking and calculate the startup
2332    * latency. */
2333   GST_OBJECT_LOCK (basesrc);
2334
2335   is_live = basesrc->is_live;
2336   /* if we are asked to sync against the clock we are a pseudo live element */
2337   pseudo_live = (start != -1 && is_live);
2338   /* check for the first buffer */
2339   first = (basesrc->priv->latency == -1);
2340
2341   if (timestamp != -1 && pseudo_live) {
2342     GstClockTime latency;
2343
2344     /* we have a timestamp and a sync time, latency is the diff */
2345     if (timestamp <= start)
2346       latency = start - timestamp;
2347     else
2348       latency = 0;
2349
2350     if (first) {
2351       GST_DEBUG_OBJECT (basesrc, "pseudo_live with latency %" GST_TIME_FORMAT,
2352           GST_TIME_ARGS (latency));
2353       /* first time we calculate latency, just configure */
2354       basesrc->priv->latency = latency;
2355     } else {
2356       if (basesrc->priv->latency != latency) {
2357         /* we have a new latency, FIXME post latency message */
2358         basesrc->priv->latency = latency;
2359         GST_DEBUG_OBJECT (basesrc, "latency changed to %" GST_TIME_FORMAT,
2360             GST_TIME_ARGS (latency));
2361       }
2362     }
2363   } else if (first) {
2364     GST_DEBUG_OBJECT (basesrc, "no latency needed, live %d, sync %d",
2365         is_live, start != -1);
2366     basesrc->priv->latency = 0;
2367   }
2368
2369   /* get clock, if no clock, we can't sync or do timestamps */
2370   if ((clock = GST_ELEMENT_CLOCK (basesrc)) == NULL)
2371     goto no_clock;
2372   else
2373     gst_object_ref (clock);
2374
2375   base_time = GST_ELEMENT_CAST (basesrc)->base_time;
2376
2377   do_timestamp = basesrc->priv->do_timestamp;
2378   GST_OBJECT_UNLOCK (basesrc);
2379
2380   /* first buffer, calculate the timestamp offset */
2381   if (first) {
2382     GstClockTime running_time;
2383
2384     now = gst_clock_get_time (clock);
2385     running_time = now - base_time;
2386
2387     GST_LOG_OBJECT (basesrc,
2388         "startup PTS: %" GST_TIME_FORMAT ", DTS %" GST_TIME_FORMAT
2389         ", running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (pts),
2390         GST_TIME_ARGS (dts), GST_TIME_ARGS (running_time));
2391
2392     if (pseudo_live && timestamp != -1) {
2393       /* live source and we need to sync, add startup latency to all timestamps
2394        * to get the real running_time. Live sources should always timestamp
2395        * according to the current running time. */
2396       basesrc->priv->ts_offset = GST_CLOCK_DIFF (timestamp, running_time);
2397
2398       GST_LOG_OBJECT (basesrc, "live with sync, ts_offset %" GST_TIME_FORMAT,
2399           GST_TIME_ARGS (basesrc->priv->ts_offset));
2400     } else {
2401       basesrc->priv->ts_offset = 0;
2402       GST_LOG_OBJECT (basesrc, "no timestamp offset needed");
2403     }
2404
2405     if (!GST_CLOCK_TIME_IS_VALID (dts)) {
2406       if (do_timestamp) {
2407         dts = running_time;
2408       } else if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2409         if (GST_CLOCK_TIME_IS_VALID (basesrc->segment.start)) {
2410           dts = basesrc->segment.start;
2411         } else {
2412           dts = 0;
2413         }
2414       }
2415       GST_BUFFER_DTS (buffer) = dts;
2416
2417       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2418           GST_TIME_ARGS (dts));
2419     }
2420   } else {
2421     /* not the first buffer, the timestamp is the diff between the clock and
2422      * base_time */
2423     if (do_timestamp && !GST_CLOCK_TIME_IS_VALID (dts)) {
2424       now = gst_clock_get_time (clock);
2425
2426       dts = now - base_time;
2427       GST_BUFFER_DTS (buffer) = dts;
2428
2429       GST_LOG_OBJECT (basesrc, "created DTS %" GST_TIME_FORMAT,
2430           GST_TIME_ARGS (dts));
2431     }
2432   }
2433   if (!GST_CLOCK_TIME_IS_VALID (pts)) {
2434     if (!GST_BUFFER_FLAG_IS_SET (buffer, GST_BUFFER_FLAG_DELTA_UNIT))
2435       pts = dts;
2436
2437     GST_BUFFER_PTS (buffer) = dts;
2438
2439     GST_LOG_OBJECT (basesrc, "created PTS %" GST_TIME_FORMAT,
2440         GST_TIME_ARGS (pts));
2441   }
2442
2443   /* if we don't have a buffer timestamp, we don't sync */
2444   if (!GST_CLOCK_TIME_IS_VALID (start))
2445     goto no_sync;
2446
2447   if (is_live) {
2448     /* for pseudo live sources, add our ts_offset to the timestamp */
2449     if (GST_CLOCK_TIME_IS_VALID (pts))
2450       GST_BUFFER_PTS (buffer) += basesrc->priv->ts_offset;
2451     if (GST_CLOCK_TIME_IS_VALID (dts))
2452       GST_BUFFER_DTS (buffer) += basesrc->priv->ts_offset;
2453     start += basesrc->priv->ts_offset;
2454   }
2455
2456   GST_LOG_OBJECT (basesrc,
2457       "waiting for clock, base time %" GST_TIME_FORMAT
2458       ", stream_start %" GST_TIME_FORMAT,
2459       GST_TIME_ARGS (base_time), GST_TIME_ARGS (start));
2460
2461   result = gst_base_src_wait (basesrc, clock, start + base_time);
2462
2463   gst_object_unref (clock);
2464
2465   GST_LOG_OBJECT (basesrc, "clock entry done: %d", result);
2466
2467   return result;
2468
2469   /* special cases */
2470 no_clock:
2471   {
2472     GST_DEBUG_OBJECT (basesrc, "we have no clock");
2473     GST_OBJECT_UNLOCK (basesrc);
2474     return GST_CLOCK_OK;
2475   }
2476 no_sync:
2477   {
2478     GST_DEBUG_OBJECT (basesrc, "no sync needed");
2479     gst_object_unref (clock);
2480     return GST_CLOCK_OK;
2481   }
2482 }
2483
2484 /* Called with STREAM_LOCK and LIVE_LOCK */
2485 static gboolean
2486 gst_base_src_update_length (GstBaseSrc * src, guint64 offset, guint * length,
2487     gboolean force)
2488 {
2489   guint64 size, maxsize;
2490   GstBaseSrcClass *bclass;
2491   gint64 stop;
2492
2493   /* only operate if we are working with bytes */
2494   if (src->segment.format != GST_FORMAT_BYTES)
2495     return TRUE;
2496
2497   bclass = GST_BASE_SRC_GET_CLASS (src);
2498
2499   stop = src->segment.stop;
2500   /* get total file size */
2501   size = src->segment.duration;
2502
2503   /* when not doing automatic EOS, just use the stop position. We don't use
2504    * the size to check for EOS */
2505   if (!g_atomic_int_get (&src->priv->automatic_eos))
2506     maxsize = stop;
2507   /* Otherwise, the max amount of bytes to read is the total
2508    * size or up to the segment.stop if present. */
2509   else if (stop != -1)
2510     maxsize = size != -1 ? MIN (size, stop) : stop;
2511   else
2512     maxsize = size;
2513
2514   GST_DEBUG_OBJECT (src,
2515       "reading offset %" G_GUINT64_FORMAT ", length %u, size %" G_GINT64_FORMAT
2516       ", segment.stop %" G_GINT64_FORMAT ", maxsize %" G_GINT64_FORMAT, offset,
2517       *length, size, stop, maxsize);
2518
2519   /* check size if we have one */
2520   if (maxsize != -1) {
2521     /* if we run past the end, check if the file became bigger and
2522      * retry.  Mind wrap when checking. */
2523     if (G_UNLIKELY (offset >= maxsize || offset + *length >= maxsize || force)) {
2524       /* see if length of the file changed */
2525       if (bclass->get_size)
2526         if (!bclass->get_size (src, &size))
2527           size = -1;
2528
2529       /* when not doing automatic EOS, just use the stop position. We don't use
2530        * the size to check for EOS */
2531       if (!g_atomic_int_get (&src->priv->automatic_eos))
2532         maxsize = stop;
2533       /* Otherwise, the max amount of bytes to read is the total
2534        * size or up to the segment.stop if present. */
2535       else if (stop != -1)
2536         maxsize = size != -1 ? MIN (size, stop) : stop;
2537       else
2538         maxsize = size;
2539
2540       if (maxsize != -1) {
2541         /* if we are at or past the end, EOS */
2542         if (G_UNLIKELY (offset >= maxsize))
2543           goto unexpected_length;
2544
2545         /* else we can clip to the end */
2546         if (G_UNLIKELY (offset + *length >= maxsize))
2547           *length = maxsize - offset;
2548       }
2549     }
2550   }
2551
2552   /* keep track of current duration. segment is in bytes, we checked
2553    * that above. */
2554   GST_OBJECT_LOCK (src);
2555   src->segment.duration = size;
2556   GST_OBJECT_UNLOCK (src);
2557
2558   return TRUE;
2559
2560   /* ERRORS */
2561 unexpected_length:
2562   {
2563     GST_DEBUG_OBJECT (src, "processing at or past EOS");
2564     return FALSE;
2565   }
2566 }
2567
2568 /* must be called with LIVE_LOCK */
2569 static GstFlowReturn
2570 gst_base_src_get_range (GstBaseSrc * src, guint64 offset, guint length,
2571     GstBuffer ** buf)
2572 {
2573   GstFlowReturn ret;
2574   GstBaseSrcClass *bclass;
2575   GstClockReturn status;
2576   GstBuffer *res_buf;
2577   GstBuffer *in_buf;
2578   gboolean own_res_buf;
2579
2580   bclass = GST_BASE_SRC_GET_CLASS (src);
2581
2582 again:
2583   if (src->is_live) {
2584     if (G_UNLIKELY (!src->live_running)) {
2585       ret = gst_base_src_wait_playing_unlocked (src);
2586       if (ret != GST_FLOW_OK)
2587         goto stopped;
2588     }
2589   }
2590
2591   if (G_UNLIKELY (!GST_BASE_SRC_IS_STARTED (src)
2592           && !GST_BASE_SRC_IS_STARTING (src)))
2593     goto not_started;
2594
2595   if (G_UNLIKELY (!bclass->create))
2596     goto no_function;
2597
2598   if (G_UNLIKELY (!gst_base_src_update_length (src, offset, &length, FALSE)))
2599     goto unexpected_length;
2600
2601   /* track position */
2602   GST_OBJECT_LOCK (src);
2603   if (src->segment.format == GST_FORMAT_BYTES)
2604     src->segment.position = offset;
2605   GST_OBJECT_UNLOCK (src);
2606
2607   /* normally we don't count buffers */
2608   if (G_UNLIKELY (src->num_buffers_left >= 0)) {
2609     if (src->num_buffers_left == 0)
2610       goto reached_num_buffers;
2611     else
2612       src->num_buffers_left--;
2613   }
2614
2615   /* don't enter the create function if a pending EOS event was set. For the
2616    * logic of the has_pending_eos, check the event function of this class. */
2617   if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
2618     src->priv->forced_eos = TRUE;
2619     goto eos;
2620   }
2621
2622   GST_DEBUG_OBJECT (src,
2623       "calling create offset %" G_GUINT64_FORMAT " length %u, time %"
2624       G_GINT64_FORMAT, offset, length, src->segment.time);
2625
2626   res_buf = in_buf = *buf;
2627   own_res_buf = (*buf == NULL);
2628
2629   GST_LIVE_UNLOCK (src);
2630   ret = bclass->create (src, offset, length, &res_buf);
2631   GST_LIVE_LOCK (src);
2632
2633   /* As we released the LIVE_LOCK, the state may have changed */
2634   if (src->is_live) {
2635     if (G_UNLIKELY (!src->live_running)) {
2636       GstFlowReturn wait_ret;
2637       wait_ret = gst_base_src_wait_playing_unlocked (src);
2638       if (wait_ret != GST_FLOW_OK) {
2639         if (ret == GST_FLOW_OK && own_res_buf)
2640           gst_buffer_unref (res_buf);
2641         ret = wait_ret;
2642         goto stopped;
2643       }
2644     }
2645   }
2646
2647   /* The create function could be unlocked because we have a pending EOS. It's
2648    * possible that we have a valid buffer from create that we need to
2649    * discard when the create function returned _OK. */
2650   if (G_UNLIKELY (g_atomic_int_get (&src->priv->has_pending_eos))) {
2651     if (ret == GST_FLOW_OK) {
2652 #ifdef TIZEN_FEATURE_TRUSTZONE
2653       if (own_res_buf) {
2654         /*tzappsrc patch : release handle when unref res_buf*/
2655         if(bclass->tz_src_release_handle)
2656         {
2657           GST_INFO_OBJECT (src, "tzappsrc release the handle");
2658           bclass->tz_src_release_handle(src,res_buf);
2659         }
2660         gst_buffer_unref (res_buf);
2661       }
2662 #else
2663       if (own_res_buf)
2664         gst_buffer_unref (res_buf);
2665 #endif
2666     }
2667     src->priv->forced_eos = TRUE;
2668     goto eos;
2669   }
2670
2671   if (G_UNLIKELY (ret != GST_FLOW_OK))
2672     goto not_ok;
2673
2674   /* fallback in case the create function didn't fill a provided buffer */
2675   if (in_buf != NULL && res_buf != in_buf) {
2676     GstMapInfo info;
2677     gsize copied_size;
2678
2679     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, src, "create function didn't "
2680         "fill the provided buffer, copying");
2681
2682     if (!gst_buffer_map (in_buf, &info, GST_MAP_WRITE))
2683       goto map_failed;
2684
2685     copied_size = gst_buffer_extract (res_buf, 0, info.data, info.size);
2686     gst_buffer_unmap (in_buf, &info);
2687     gst_buffer_set_size (in_buf, copied_size);
2688
2689     gst_buffer_copy_into (in_buf, res_buf, GST_BUFFER_COPY_METADATA, 0, -1);
2690
2691     gst_buffer_unref (res_buf);
2692     res_buf = in_buf;
2693   }
2694
2695   if (res_buf == NULL) {
2696     GstBufferList *pending_list = src->priv->pending_bufferlist;
2697
2698     if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
2699       goto null_buffer;
2700
2701     res_buf = gst_buffer_list_get_writable (pending_list, 0);
2702     own_res_buf = FALSE;
2703   }
2704
2705   /* no timestamp set and we are at offset 0, we can timestamp with 0 */
2706   if (offset == 0 && src->segment.time == 0
2707       && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
2708     GST_DEBUG_OBJECT (src, "setting first timestamp to 0");
2709     res_buf = gst_buffer_make_writable (res_buf);
2710     GST_BUFFER_DTS (res_buf) = 0;
2711   }
2712
2713   /* now sync before pushing the buffer */
2714   status = gst_base_src_do_sync (src, res_buf);
2715
2716   /* waiting for the clock could have made us flushing */
2717   if (G_UNLIKELY (src->priv->flushing))
2718     goto flushing;
2719
2720   switch (status) {
2721     case GST_CLOCK_EARLY:
2722       /* the buffer is too late. We currently don't drop the buffer. */
2723       GST_DEBUG_OBJECT (src, "buffer too late!, returning anyway");
2724       break;
2725     case GST_CLOCK_OK:
2726       /* buffer synchronised properly */
2727       GST_DEBUG_OBJECT (src, "buffer ok");
2728       break;
2729     case GST_CLOCK_UNSCHEDULED:
2730       /* this case is triggered when we were waiting for the clock and
2731        * it got unlocked because we did a state change. In any case, get rid of
2732        * the buffer. */
2733 #ifdef TIZEN_FEATURE_TRUSTZONE
2734       if (own_res_buf) {
2735
2736         /*tzappsrc patch : release handle when unref res_buf*/
2737         GstBaseSrcClass *klass = GST_BASE_SRC_GET_CLASS(src);
2738         if(klass->tz_src_release_handle)
2739         {
2740           GST_INFO_OBJECT (src, "tzappsrc release the handle");
2741           klass->tz_src_release_handle(src,res_buf);
2742         }
2743
2744         gst_buffer_unref (res_buf);
2745       }
2746 #else
2747       if (own_res_buf)
2748         gst_buffer_unref (res_buf);
2749 #endif
2750       if (!src->live_running) {
2751         /* We return FLUSHING when we are not running to stop the dataflow also
2752          * get rid of the produced buffer. */
2753         GST_DEBUG_OBJECT (src,
2754             "clock was unscheduled (%d), returning FLUSHING", status);
2755         ret = GST_FLOW_FLUSHING;
2756       } else {
2757         /* If we are running when this happens, we quickly switched between
2758          * pause and playing. We try to produce a new buffer */
2759         GST_DEBUG_OBJECT (src,
2760             "clock was unscheduled (%d), but we are running", status);
2761         goto again;
2762       }
2763       break;
2764     default:
2765       /* all other result values are unexpected and errors */
2766       GST_ELEMENT_ERROR (src, CORE, CLOCK,
2767           (_("Internal clock error.")),
2768           ("clock returned unexpected return value %d", status));
2769 #ifdef TIZEN_FEATURE_TRUSTZONE
2770       if (own_res_buf) {
2771         /*tzappsrc patch : release handle when unref res_buf*/
2772         GstBaseSrcClass *klass = GST_BASE_SRC_GET_CLASS(src);
2773         if(klass->tz_src_release_handle)
2774         {
2775           GST_INFO_OBJECT (src, "tzappsrc release the handle");
2776           klass->tz_src_release_handle(src,res_buf);
2777         }
2778         gst_buffer_unref (res_buf);
2779       }
2780 #else
2781       if (own_res_buf)
2782         gst_buffer_unref (res_buf);
2783 #endif
2784       ret = GST_FLOW_ERROR;
2785       break;
2786   }
2787   if (G_LIKELY (ret == GST_FLOW_OK))
2788     *buf = res_buf;
2789
2790   return ret;
2791
2792   /* ERROR */
2793 stopped:
2794   {
2795     GST_DEBUG_OBJECT (src, "wait_playing returned %d (%s)", ret,
2796         gst_flow_get_name (ret));
2797     return ret;
2798   }
2799 not_ok:
2800   {
2801     GST_DEBUG_OBJECT (src, "create returned %d (%s)", ret,
2802         gst_flow_get_name (ret));
2803     return ret;
2804   }
2805 map_failed:
2806   {
2807     GST_ELEMENT_ERROR (src, RESOURCE, BUSY,
2808         (_("Failed to map buffer.")),
2809         ("failed to map result buffer in WRITE mode"));
2810 #ifdef TIZEN_FEATURE_TRUSTZONE
2811     if (own_res_buf) {
2812       /*tzappsrc patch : release handle when unref res_buf*/
2813       GstBaseSrcClass *klass = GST_BASE_SRC_GET_CLASS(src);
2814       if(klass->tz_src_release_handle)
2815       {
2816         GST_INFO_OBJECT (src, "tzappsrc release the handle");
2817         klass->tz_src_release_handle(src,res_buf);
2818       }
2819       gst_buffer_unref (res_buf);
2820     }
2821 #else
2822     if (own_res_buf)
2823       gst_buffer_unref (res_buf);
2824 #endif
2825     return GST_FLOW_ERROR;
2826   }
2827 not_started:
2828   {
2829     GST_DEBUG_OBJECT (src, "getrange but not started");
2830     return GST_FLOW_FLUSHING;
2831   }
2832 no_function:
2833   {
2834     GST_DEBUG_OBJECT (src, "no create function");
2835     return GST_FLOW_NOT_SUPPORTED;
2836   }
2837 unexpected_length:
2838   {
2839     GST_DEBUG_OBJECT (src, "unexpected length %u (offset=%" G_GUINT64_FORMAT
2840         ", size=%" G_GINT64_FORMAT ")", length, offset, src->segment.duration);
2841     return GST_FLOW_EOS;
2842   }
2843 reached_num_buffers:
2844   {
2845     GST_DEBUG_OBJECT (src, "sent all buffers");
2846     return GST_FLOW_EOS;
2847   }
2848 flushing:
2849   {
2850     GST_DEBUG_OBJECT (src, "we are flushing");
2851 #ifdef TIZEN_FEATURE_TRUSTZONE
2852     if (own_res_buf) {
2853       /*tzappsrc patch : release handle when unref res_buf*/
2854       GstBaseSrcClass *klass = GST_BASE_SRC_GET_CLASS(src);
2855       if(klass->tz_src_release_handle)
2856       {
2857         GST_INFO_OBJECT (src, "tzappsrc release the handle");
2858         klass->tz_src_release_handle(src,res_buf);
2859       }
2860       gst_buffer_unref (res_buf);
2861     }
2862 #else
2863     if (own_res_buf)
2864       gst_buffer_unref (res_buf);
2865 #endif
2866     return GST_FLOW_FLUSHING;
2867   }
2868 eos:
2869   {
2870     GST_DEBUG_OBJECT (src, "we are EOS");
2871     return GST_FLOW_EOS;
2872   }
2873 null_buffer:
2874   {
2875     GST_ELEMENT_ERROR (src, STREAM, FAILED,
2876         (_("Internal data flow error.")),
2877         ("Subclass %s neither returned a buffer nor submitted a buffer list "
2878             "from its create function", G_OBJECT_TYPE_NAME (src)));
2879     return GST_FLOW_ERROR;
2880   }
2881 }
2882
2883 static GstFlowReturn
2884 gst_base_src_getrange (GstPad * pad, GstObject * parent, guint64 offset,
2885     guint length, GstBuffer ** buf)
2886 {
2887   GstBaseSrc *src;
2888   GstFlowReturn res;
2889
2890   src = GST_BASE_SRC_CAST (parent);
2891
2892   GST_LIVE_LOCK (src);
2893   if (G_UNLIKELY (src->priv->flushing))
2894     goto flushing;
2895
2896   res = gst_base_src_get_range (src, offset, length, buf);
2897
2898 done:
2899   GST_LIVE_UNLOCK (src);
2900
2901   return res;
2902
2903   /* ERRORS */
2904 flushing:
2905   {
2906     GST_DEBUG_OBJECT (src, "we are flushing");
2907     res = GST_FLOW_FLUSHING;
2908     goto done;
2909   }
2910 }
2911
2912 static gboolean
2913 gst_base_src_is_random_access (GstBaseSrc * src)
2914 {
2915   /* we need to start the basesrc to check random access */
2916   if (!GST_BASE_SRC_IS_STARTED (src)) {
2917     GST_LOG_OBJECT (src, "doing start/stop to check get_range support");
2918     if (G_LIKELY (gst_base_src_start (src))) {
2919       if (gst_base_src_start_wait (src) != GST_FLOW_OK)
2920         goto start_failed;
2921       gst_base_src_stop (src);
2922     }
2923   }
2924
2925   return src->random_access;
2926
2927   /* ERRORS */
2928 start_failed:
2929   {
2930     GST_DEBUG_OBJECT (src, "failed to start");
2931     return FALSE;
2932   }
2933 }
2934
2935 /* Called with STREAM_LOCK */
2936 static void
2937 gst_base_src_loop (GstPad * pad)
2938 {
2939   GstBaseSrc *src;
2940   GstBuffer *buf = NULL;
2941   GstFlowReturn ret;
2942   gint64 position;
2943   gboolean eos;
2944   guint blocksize;
2945   GList *pending_events = NULL, *tmp;
2946
2947   eos = FALSE;
2948
2949   src = GST_BASE_SRC (GST_OBJECT_PARENT (pad));
2950
2951   /* Just leave immediately if we're flushing */
2952   GST_LIVE_LOCK (src);
2953   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2954     goto flushing;
2955   GST_LIVE_UNLOCK (src);
2956
2957   /* Just return if EOS is pushed again, as the app might be unaware that an
2958    * EOS have been sent already */
2959   if (GST_PAD_IS_EOS (pad)) {
2960     GST_DEBUG_OBJECT (src, "Pad is marked as EOS, pause the task");
2961     gst_pad_pause_task (pad);
2962     goto done;
2963   }
2964
2965   gst_base_src_send_stream_start (src);
2966
2967   /* The stream-start event could've caused something to flush us */
2968   GST_LIVE_LOCK (src);
2969   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2970     goto flushing;
2971   GST_LIVE_UNLOCK (src);
2972
2973   /* check if we need to renegotiate */
2974   if (gst_pad_check_reconfigure (pad)) {
2975     if (!gst_base_src_negotiate_unlocked (src)) {
2976       gst_pad_mark_reconfigure (pad);
2977       if (GST_PAD_IS_FLUSHING (pad)) {
2978         GST_LIVE_LOCK (src);
2979         goto flushing;
2980       } else {
2981         goto negotiate_failed;
2982       }
2983     }
2984   }
2985
2986   GST_LIVE_LOCK (src);
2987
2988   if (G_UNLIKELY (src->priv->flushing || GST_PAD_IS_FLUSHING (pad)))
2989     goto flushing;
2990
2991   blocksize = src->blocksize;
2992
2993   /* if we operate in bytes, we can calculate an offset */
2994   if (src->segment.format == GST_FORMAT_BYTES) {
2995     position = src->segment.position;
2996     /* for negative rates, start with subtracting the blocksize */
2997     if (src->segment.rate < 0.0) {
2998       /* we cannot go below segment.start */
2999       if (position > src->segment.start + blocksize)
3000         position -= blocksize;
3001       else {
3002         /* last block, remainder up to segment.start */
3003         blocksize = position - src->segment.start;
3004         position = src->segment.start;
3005       }
3006     }
3007   } else
3008     position = -1;
3009
3010   GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
3011       GST_TIME_ARGS (position), blocksize);
3012
3013   /* clean up just in case we got interrupted or so last time round */
3014   if (src->priv->pending_bufferlist != NULL) {
3015     gst_buffer_list_unref (src->priv->pending_bufferlist);
3016     src->priv->pending_bufferlist = NULL;
3017   }
3018
3019   ret = gst_base_src_get_range (src, position, blocksize, &buf);
3020   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
3021     GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
3022         gst_flow_get_name (ret));
3023     GST_LIVE_UNLOCK (src);
3024     goto pause;
3025   }
3026
3027   /* Note: at this point buf might be a single buf returned which we own or
3028    * the first buf of a pending buffer list submitted via submit_buffer_list(),
3029    * in which case the buffer is owned by the pending buffer list and not us. */
3030   g_assert (buf != NULL);
3031
3032   /* push events to close/start our segment before we push the buffer. */
3033   if (G_UNLIKELY (src->priv->segment_pending)) {
3034     GstEvent *seg_event = gst_event_new_segment (&src->segment);
3035
3036     gst_event_set_seqnum (seg_event, src->priv->segment_seqnum);
3037     src->priv->segment_seqnum = gst_util_seqnum_next ();
3038     gst_pad_push_event (pad, seg_event);
3039     src->priv->segment_pending = FALSE;
3040   }
3041
3042   if (g_atomic_int_get (&src->priv->have_events)) {
3043     GST_OBJECT_LOCK (src);
3044     /* take the events */
3045     pending_events = src->priv->pending_events;
3046     src->priv->pending_events = NULL;
3047     g_atomic_int_set (&src->priv->have_events, FALSE);
3048     GST_OBJECT_UNLOCK (src);
3049   }
3050
3051   /* Push out pending events if any */
3052   if (G_UNLIKELY (pending_events != NULL)) {
3053     for (tmp = pending_events; tmp; tmp = g_list_next (tmp)) {
3054       GstEvent *ev = (GstEvent *) tmp->data;
3055       gst_pad_push_event (pad, ev);
3056     }
3057     g_list_free (pending_events);
3058   }
3059
3060   /* figure out the new position */
3061   switch (src->segment.format) {
3062     case GST_FORMAT_BYTES:
3063     {
3064       guint bufsize = gst_buffer_get_size (buf);
3065
3066       /* we subtracted above for negative rates */
3067       if (src->segment.rate >= 0.0)
3068         position += bufsize;
3069       break;
3070     }
3071     case GST_FORMAT_TIME:
3072     {
3073       GstClockTime start, duration;
3074
3075       start = GST_BUFFER_TIMESTAMP (buf);
3076       duration = GST_BUFFER_DURATION (buf);
3077
3078       if (GST_CLOCK_TIME_IS_VALID (start))
3079         position = start;
3080       else
3081         position = src->segment.position;
3082
3083       if (GST_CLOCK_TIME_IS_VALID (duration)) {
3084         if (src->segment.rate >= 0.0)
3085           position += duration;
3086       }
3087       break;
3088     }
3089     case GST_FORMAT_DEFAULT:
3090       if (src->segment.rate >= 0.0)
3091         position = GST_BUFFER_OFFSET_END (buf);
3092       else
3093         position = GST_BUFFER_OFFSET (buf);
3094       break;
3095     default:
3096       position = -1;
3097       break;
3098   }
3099   if (position != -1) {
3100     if (src->segment.rate >= 0.0) {
3101       /* positive rate, check if we reached the stop */
3102       if (src->segment.stop != -1) {
3103         if (position >= src->segment.stop) {
3104           if (g_atomic_int_get (&src->priv->automatic_eos))
3105             eos = TRUE;
3106           position = src->segment.stop;
3107         }
3108       }
3109     } else {
3110       /* negative rate, check if we reached the start. start is always set to
3111        * something different from -1 */
3112       if (position <= src->segment.start) {
3113         if (g_atomic_int_get (&src->priv->automatic_eos))
3114           eos = TRUE;
3115         position = src->segment.start;
3116       }
3117       /* when going reverse, all buffers are DISCONT */
3118       src->priv->discont = TRUE;
3119     }
3120     GST_OBJECT_LOCK (src);
3121     src->segment.position = position;
3122     GST_OBJECT_UNLOCK (src);
3123   }
3124
3125   if (G_UNLIKELY (src->priv->discont)) {
3126     GST_INFO_OBJECT (src, "marking pending DISCONT");
3127     buf = gst_buffer_make_writable (buf);
3128     GST_BUFFER_FLAG_SET (buf, GST_BUFFER_FLAG_DISCONT);
3129     src->priv->discont = FALSE;
3130   }
3131   GST_LIVE_UNLOCK (src);
3132
3133   /* push buffer or buffer list */
3134   if (src->priv->pending_bufferlist != NULL) {
3135     ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
3136     src->priv->pending_bufferlist = NULL;
3137   } else {
3138     ret = gst_pad_push (pad, buf);
3139   }
3140
3141   if (G_UNLIKELY (ret != GST_FLOW_OK)) {
3142 #ifdef TIZEN_FEATURE_TRUSTZONE
3143     /*tzappsrc patch : release handle when unref res_buf*/
3144     GstBaseSrcClass *klass = GST_BASE_SRC_GET_CLASS (src);
3145     if (klass->tz_src_release_handle) {
3146       GST_INFO_OBJECT (src, "tzappsrc release the handle");
3147       klass->tz_src_release_handle (src,buf);
3148     }
3149 #endif
3150     if (ret == GST_FLOW_NOT_NEGOTIATED) {
3151       goto not_negotiated;
3152     }
3153     GST_INFO_OBJECT (src, "pausing after gst_pad_push() = %s",
3154         gst_flow_get_name (ret));
3155     goto pause;
3156   }
3157
3158   /* Segment pending means that a new segment was configured
3159    * during this loop run */
3160   if (G_UNLIKELY (eos && !src->priv->segment_pending)) {
3161     GST_INFO_OBJECT (src, "pausing after end of segment");
3162     ret = GST_FLOW_EOS;
3163     goto pause;
3164   }
3165
3166 done:
3167   return;
3168
3169   /* special cases */
3170 not_negotiated:
3171   {
3172     if (gst_pad_needs_reconfigure (pad)) {
3173       GST_DEBUG_OBJECT (src, "Retrying to renegotiate");
3174       return;
3175     }
3176     /* fallthrough when push returns NOT_NEGOTIATED and we don't have
3177      * a pending negotiation request on our srcpad */
3178   }
3179 negotiate_failed:
3180   {
3181     GST_DEBUG_OBJECT (src, "Not negotiated");
3182     ret = GST_FLOW_NOT_NEGOTIATED;
3183     goto pause;
3184   }
3185 flushing:
3186   {
3187     GST_DEBUG_OBJECT (src, "we are flushing");
3188     GST_LIVE_UNLOCK (src);
3189     ret = GST_FLOW_FLUSHING;
3190     goto pause;
3191   }
3192 pause:
3193   {
3194     GstEvent *event;
3195
3196     GST_DEBUG_OBJECT (src, "pausing task, reason %s", gst_flow_get_name (ret));
3197     src->running = FALSE;
3198     gst_pad_pause_task (pad);
3199     if (ret == GST_FLOW_EOS) {
3200       gboolean flag_segment;
3201       GstFormat format;
3202       gint64 position;
3203
3204       flag_segment = (src->segment.flags & GST_SEGMENT_FLAG_SEGMENT) != 0;
3205       format = src->segment.format;
3206       position = src->segment.position;
3207
3208       /* perform EOS logic */
3209       if (src->priv->forced_eos) {
3210         g_assert (g_atomic_int_get (&src->priv->has_pending_eos));
3211         GST_OBJECT_LOCK (src);
3212         event = src->priv->pending_eos;
3213         src->priv->pending_eos = NULL;
3214         GST_OBJECT_UNLOCK (src);
3215
3216       } else if (flag_segment) {
3217         GstMessage *message;
3218
3219         message = gst_message_new_segment_done (GST_OBJECT_CAST (src),
3220             format, position);
3221         gst_message_set_seqnum (message, src->priv->seqnum);
3222         gst_element_post_message (GST_ELEMENT_CAST (src), message);
3223         event = gst_event_new_segment_done (format, position);
3224         gst_event_set_seqnum (event, src->priv->seqnum);
3225
3226       } else {
3227         event = gst_event_new_eos ();
3228         gst_event_set_seqnum (event, src->priv->seqnum);
3229       }
3230
3231       gst_pad_push_event (pad, event);
3232       src->priv->forced_eos = FALSE;
3233
3234     } else if (ret == GST_FLOW_NOT_LINKED || ret <= GST_FLOW_EOS) {
3235       event = gst_event_new_eos ();
3236       gst_event_set_seqnum (event, src->priv->seqnum);
3237       /* for fatal errors we post an error message, post the error
3238        * first so the app knows about the error first.
3239        * Also don't do this for FLUSHING because it happens
3240        * due to flushing and posting an error message because of
3241        * that is the wrong thing to do, e.g. when we're doing
3242        * a flushing seek. */
3243       GST_ELEMENT_FLOW_ERROR (src, ret);
3244       gst_pad_push_event (pad, event);
3245     }
3246     goto done;
3247   }
3248 }
3249
3250 static gboolean
3251 gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
3252     GstAllocator * allocator, const GstAllocationParams * params)
3253 {
3254   GstAllocator *oldalloc;
3255   GstBufferPool *oldpool;
3256   GstBaseSrcPrivate *priv = basesrc->priv;
3257
3258   if (pool) {
3259     GST_DEBUG_OBJECT (basesrc, "activate pool");
3260     if (!gst_buffer_pool_set_active (pool, TRUE))
3261       goto activate_failed;
3262   }
3263
3264   GST_OBJECT_LOCK (basesrc);
3265   oldpool = priv->pool;
3266   priv->pool = pool;
3267
3268   oldalloc = priv->allocator;
3269   priv->allocator = allocator;
3270
3271   if (priv->pool)
3272     gst_object_ref (priv->pool);
3273   if (priv->allocator)
3274     gst_object_ref (priv->allocator);
3275
3276   if (params)
3277     priv->params = *params;
3278   else
3279     gst_allocation_params_init (&priv->params);
3280   GST_OBJECT_UNLOCK (basesrc);
3281
3282   if (oldpool) {
3283     /* only deactivate if the pool is not the one we're using */
3284     if (oldpool != pool) {
3285       GST_DEBUG_OBJECT (basesrc, "deactivate old pool");
3286       gst_buffer_pool_set_active (oldpool, FALSE);
3287     }
3288     gst_object_unref (oldpool);
3289   }
3290   if (oldalloc) {
3291     gst_object_unref (oldalloc);
3292   }
3293   return TRUE;
3294
3295   /* ERRORS */
3296 activate_failed:
3297   {
3298     GST_ERROR_OBJECT (basesrc, "failed to activate bufferpool.");
3299     return FALSE;
3300   }
3301 }
3302
3303 static void
3304 gst_base_src_set_pool_flushing (GstBaseSrc * basesrc, gboolean flushing)
3305 {
3306   GstBaseSrcPrivate *priv = basesrc->priv;
3307   GstBufferPool *pool;
3308
3309   GST_OBJECT_LOCK (basesrc);
3310   if ((pool = priv->pool))
3311     pool = gst_object_ref (pool);
3312   GST_OBJECT_UNLOCK (basesrc);
3313
3314   if (pool) {
3315     gst_buffer_pool_set_flushing (pool, flushing);
3316     gst_object_unref (pool);
3317   }
3318 }
3319
3320
3321 static gboolean
3322 gst_base_src_decide_allocation_default (GstBaseSrc * basesrc, GstQuery * query)
3323 {
3324   GstCaps *outcaps;
3325   GstBufferPool *pool;
3326   guint size, min, max;
3327   GstAllocator *allocator;
3328   GstAllocationParams params;
3329   GstStructure *config;
3330   gboolean update_allocator;
3331
3332   gst_query_parse_allocation (query, &outcaps, NULL);
3333
3334   /* we got configuration from our peer or the decide_allocation method,
3335    * parse them */
3336   if (gst_query_get_n_allocation_params (query) > 0) {
3337     /* try the allocator */
3338     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
3339     update_allocator = TRUE;
3340   } else {
3341     allocator = NULL;
3342     gst_allocation_params_init (&params);
3343     update_allocator = FALSE;
3344   }
3345
3346   if (gst_query_get_n_allocation_pools (query) > 0) {
3347     gst_query_parse_nth_allocation_pool (query, 0, &pool, &size, &min, &max);
3348
3349     if (pool == NULL) {
3350       /* no pool, we can make our own */
3351       GST_DEBUG_OBJECT (basesrc, "no pool, making new pool");
3352       pool = gst_buffer_pool_new ();
3353     }
3354   } else {
3355     pool = NULL;
3356     size = min = max = 0;
3357   }
3358
3359   /* now configure */
3360   if (pool) {
3361     config = gst_buffer_pool_get_config (pool);
3362     gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
3363     gst_buffer_pool_config_set_allocator (config, allocator, &params);
3364
3365     /* buffer pool may have to do some changes */
3366     if (!gst_buffer_pool_set_config (pool, config)) {
3367       config = gst_buffer_pool_get_config (pool);
3368
3369       /* If change are not acceptable, fallback to generic pool */
3370       if (!gst_buffer_pool_config_validate_params (config, outcaps, size, min,
3371               max)) {
3372         GST_DEBUG_OBJECT (basesrc, "unsupported pool, making new pool");
3373
3374         gst_object_unref (pool);
3375         pool = gst_buffer_pool_new ();
3376         gst_buffer_pool_config_set_params (config, outcaps, size, min, max);
3377         gst_buffer_pool_config_set_allocator (config, allocator, &params);
3378       }
3379
3380       if (!gst_buffer_pool_set_config (pool, config))
3381         goto config_failed;
3382     }
3383   }
3384
3385   if (update_allocator)
3386     gst_query_set_nth_allocation_param (query, 0, allocator, &params);
3387   else
3388     gst_query_add_allocation_param (query, allocator, &params);
3389   if (allocator)
3390     gst_object_unref (allocator);
3391
3392   if (pool) {
3393     gst_query_set_nth_allocation_pool (query, 0, pool, size, min, max);
3394     gst_object_unref (pool);
3395   }
3396
3397   return TRUE;
3398
3399 config_failed:
3400   GST_ELEMENT_ERROR (basesrc, RESOURCE, SETTINGS,
3401       ("Failed to configure the buffer pool"),
3402       ("Configuration is most likely invalid, please report this issue."));
3403   gst_object_unref (pool);
3404   return FALSE;
3405 }
3406
3407 static gboolean
3408 gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
3409 {
3410   GstBaseSrcClass *bclass;
3411   gboolean result = TRUE;
3412   GstQuery *query;
3413   GstBufferPool *pool = NULL;
3414   GstAllocator *allocator = NULL;
3415   GstAllocationParams params;
3416
3417   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3418
3419   /* make query and let peer pad answer, we don't really care if it worked or
3420    * not, if it failed, the allocation query would contain defaults and the
3421    * subclass would then set better values if needed */
3422   query = gst_query_new_allocation (caps, TRUE);
3423   if (!gst_pad_peer_query (basesrc->srcpad, query)) {
3424     /* not a problem, just debug a little */
3425     GST_DEBUG_OBJECT (basesrc, "peer ALLOCATION query failed");
3426   }
3427
3428   g_assert (bclass->decide_allocation != NULL);
3429   result = bclass->decide_allocation (basesrc, query);
3430
3431   GST_DEBUG_OBJECT (basesrc, "ALLOCATION (%d) params: %" GST_PTR_FORMAT, result,
3432       query);
3433
3434   if (!result)
3435     goto no_decide_allocation;
3436
3437   /* we got configuration from our peer or the decide_allocation method,
3438    * parse them */
3439   if (gst_query_get_n_allocation_params (query) > 0) {
3440     gst_query_parse_nth_allocation_param (query, 0, &allocator, &params);
3441   } else {
3442     allocator = NULL;
3443     gst_allocation_params_init (&params);
3444   }
3445
3446   if (gst_query_get_n_allocation_pools (query) > 0)
3447     gst_query_parse_nth_allocation_pool (query, 0, &pool, NULL, NULL, NULL);
3448
3449   result = gst_base_src_set_allocation (basesrc, pool, allocator, &params);
3450
3451   if (allocator)
3452     gst_object_unref (allocator);
3453   if (pool)
3454     gst_object_unref (pool);
3455
3456   gst_query_unref (query);
3457
3458   return result;
3459
3460   /* Errors */
3461 no_decide_allocation:
3462   {
3463     GST_WARNING_OBJECT (basesrc, "Subclass failed to decide allocation");
3464     gst_query_unref (query);
3465
3466     return result;
3467   }
3468 }
3469
3470 /* default negotiation code.
3471  *
3472  * Take intersection between src and sink pads, take first
3473  * caps and fixate.
3474  */
3475 static gboolean
3476 gst_base_src_default_negotiate (GstBaseSrc * basesrc)
3477 {
3478   GstCaps *thiscaps;
3479   GstCaps *caps = NULL;
3480   GstCaps *peercaps = NULL;
3481   gboolean result = FALSE;
3482
3483   /* first see what is possible on our source pad */
3484   thiscaps = gst_pad_query_caps (GST_BASE_SRC_PAD (basesrc), NULL);
3485   GST_DEBUG_OBJECT (basesrc, "caps of src: %" GST_PTR_FORMAT, thiscaps);
3486   /* nothing or anything is allowed, we're done */
3487   if (thiscaps == NULL || gst_caps_is_any (thiscaps))
3488     goto no_nego_needed;
3489
3490   if (G_UNLIKELY (gst_caps_is_empty (thiscaps)))
3491     goto no_caps;
3492
3493   /* get the peer caps */
3494   peercaps = gst_pad_peer_query_caps (GST_BASE_SRC_PAD (basesrc), thiscaps);
3495   GST_DEBUG_OBJECT (basesrc, "caps of peer: %" GST_PTR_FORMAT, peercaps);
3496   if (peercaps) {
3497     /* The result is already a subset of our caps */
3498     caps = peercaps;
3499     gst_caps_unref (thiscaps);
3500   } else {
3501     /* no peer, work with our own caps then */
3502     caps = thiscaps;
3503   }
3504   if (caps && !gst_caps_is_empty (caps)) {
3505     /* now fixate */
3506     GST_DEBUG_OBJECT (basesrc, "have caps: %" GST_PTR_FORMAT, caps);
3507     if (gst_caps_is_any (caps)) {
3508       GST_DEBUG_OBJECT (basesrc, "any caps, we stop");
3509       /* hmm, still anything, so element can do anything and
3510        * nego is not needed */
3511       result = TRUE;
3512     } else {
3513       caps = gst_base_src_fixate (basesrc, caps);
3514       GST_DEBUG_OBJECT (basesrc, "fixated to: %" GST_PTR_FORMAT, caps);
3515       if (gst_caps_is_fixed (caps)) {
3516         /* yay, fixed caps, use those then, it's possible that the subclass does
3517          * not accept this caps after all and we have to fail. */
3518         result = gst_base_src_set_caps (basesrc, caps);
3519       }
3520     }
3521     gst_caps_unref (caps);
3522   } else {
3523     if (caps)
3524       gst_caps_unref (caps);
3525     GST_DEBUG_OBJECT (basesrc, "no common caps");
3526   }
3527   return result;
3528
3529 no_nego_needed:
3530   {
3531     GST_DEBUG_OBJECT (basesrc, "no negotiation needed");
3532     if (thiscaps)
3533       gst_caps_unref (thiscaps);
3534     return TRUE;
3535   }
3536 no_caps:
3537   {
3538     GST_ELEMENT_ERROR (basesrc, STREAM, FORMAT,
3539         ("No supported formats found"),
3540         ("This element did not produce valid caps"));
3541     if (thiscaps)
3542       gst_caps_unref (thiscaps);
3543     return TRUE;
3544   }
3545 }
3546
3547 static gboolean
3548 gst_base_src_negotiate_unlocked (GstBaseSrc * basesrc)
3549 {
3550   GstBaseSrcClass *bclass;
3551   gboolean result;
3552
3553   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3554
3555   GST_DEBUG_OBJECT (basesrc, "starting negotiation");
3556
3557   if (G_LIKELY (bclass->negotiate))
3558     result = bclass->negotiate (basesrc);
3559   else
3560     result = TRUE;
3561
3562   if (G_LIKELY (result)) {
3563     GstCaps *caps;
3564
3565     caps = gst_pad_get_current_caps (basesrc->srcpad);
3566
3567     result = gst_base_src_prepare_allocation (basesrc, caps);
3568
3569     if (caps)
3570       gst_caps_unref (caps);
3571   }
3572   return result;
3573 }
3574
3575 /**
3576  * gst_base_src_negotiate:
3577  * @src: base source instance
3578  *
3579  * Negotiates src pad caps with downstream elements.
3580  * Unmarks GST_PAD_FLAG_NEED_RECONFIGURE in any case. But marks it again
3581  * if #GstBaseSrcClass::negotiate fails.
3582  *
3583  * Do not call this in the #GstBaseSrcClass::fill vmethod. Call this in
3584  * #GstBaseSrcClass::create or in #GstBaseSrcClass::alloc, _before_ any
3585  * buffer is allocated.
3586  *
3587  * Returns: %TRUE if the negotiation succeeded, else %FALSE.
3588  *
3589  * Since: 1.18
3590  */
3591 gboolean
3592 gst_base_src_negotiate (GstBaseSrc * src)
3593 {
3594   gboolean ret = TRUE;
3595
3596   g_return_val_if_fail (GST_IS_BASE_SRC (src), FALSE);
3597
3598   GST_PAD_STREAM_LOCK (src->srcpad);
3599   gst_pad_check_reconfigure (src->srcpad);
3600   ret = gst_base_src_negotiate_unlocked (src);
3601   if (!ret)
3602     gst_pad_mark_reconfigure (src->srcpad);
3603   GST_PAD_STREAM_UNLOCK (src->srcpad);
3604
3605   return ret;
3606 }
3607
3608 static gboolean
3609 gst_base_src_start (GstBaseSrc * basesrc)
3610 {
3611   GstBaseSrcClass *bclass;
3612   gboolean result;
3613
3614   GST_LIVE_LOCK (basesrc);
3615
3616   GST_OBJECT_LOCK (basesrc);
3617   if (GST_BASE_SRC_IS_STARTING (basesrc))
3618     goto was_starting;
3619   if (GST_BASE_SRC_IS_STARTED (basesrc))
3620     goto was_started;
3621
3622   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3623   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3624   gst_segment_init (&basesrc->segment, basesrc->segment.format);
3625   GST_OBJECT_UNLOCK (basesrc);
3626
3627   basesrc->num_buffers_left = basesrc->num_buffers;
3628   basesrc->running = FALSE;
3629   basesrc->priv->segment_pending = FALSE;
3630   basesrc->priv->segment_seqnum = gst_util_seqnum_next ();
3631   basesrc->priv->forced_eos = FALSE;
3632   GST_LIVE_UNLOCK (basesrc);
3633
3634   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3635   if (bclass->start)
3636     result = bclass->start (basesrc);
3637   else
3638     result = TRUE;
3639
3640   if (!result)
3641     goto could_not_start;
3642
3643   if (!gst_base_src_is_async (basesrc)) {
3644     gst_base_src_start_complete (basesrc, GST_FLOW_OK);
3645     /* not really waiting here, we call this to get the result
3646      * from the start_complete call */
3647     result = gst_base_src_start_wait (basesrc) == GST_FLOW_OK;
3648   }
3649
3650   return result;
3651
3652   /* ERROR */
3653 was_starting:
3654   {
3655     GST_DEBUG_OBJECT (basesrc, "was starting");
3656     GST_OBJECT_UNLOCK (basesrc);
3657     GST_LIVE_UNLOCK (basesrc);
3658     return TRUE;
3659   }
3660 was_started:
3661   {
3662     GST_DEBUG_OBJECT (basesrc, "was started");
3663     GST_OBJECT_UNLOCK (basesrc);
3664     GST_LIVE_UNLOCK (basesrc);
3665     return TRUE;
3666   }
3667 could_not_start:
3668   {
3669     GST_DEBUG_OBJECT (basesrc, "could not start");
3670     /* subclass is supposed to post a message but we post one as a fallback
3671      * just in case. We don't have to call _stop. */
3672     GST_ELEMENT_ERROR (basesrc, CORE, STATE_CHANGE, (NULL),
3673         ("Failed to start"));
3674     gst_base_src_start_complete (basesrc, GST_FLOW_ERROR);
3675     return FALSE;
3676   }
3677 }
3678
3679 /**
3680  * gst_base_src_start_complete:
3681  * @basesrc: base source instance
3682  * @ret: a #GstFlowReturn
3683  *
3684  * Complete an asynchronous start operation. When the subclass overrides the
3685  * start method, it should call gst_base_src_start_complete() when the start
3686  * operation completes either from the same thread or from an asynchronous
3687  * helper thread.
3688  */
3689 void
3690 gst_base_src_start_complete (GstBaseSrc * basesrc, GstFlowReturn ret)
3691 {
3692   gboolean have_size;
3693   guint64 size;
3694   gboolean seekable;
3695   GstFormat format;
3696   GstPadMode mode;
3697   GstEvent *event;
3698
3699   if (ret != GST_FLOW_OK)
3700     goto error;
3701
3702   GST_DEBUG_OBJECT (basesrc, "starting source");
3703   format = basesrc->segment.format;
3704
3705   /* figure out the size */
3706   have_size = FALSE;
3707   size = -1;
3708   if (format == GST_FORMAT_BYTES) {
3709     GstBaseSrcClass *bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3710
3711     if (bclass->get_size) {
3712       if (!(have_size = bclass->get_size (basesrc, &size)))
3713         size = -1;
3714     }
3715     GST_DEBUG_OBJECT (basesrc, "setting size %" G_GUINT64_FORMAT, size);
3716     /* only update the size when operating in bytes, subclass is supposed
3717      * to set duration in the start method for other formats */
3718     GST_OBJECT_LOCK (basesrc);
3719     basesrc->segment.duration = size;
3720     GST_OBJECT_UNLOCK (basesrc);
3721   }
3722
3723   GST_DEBUG_OBJECT (basesrc,
3724       "format: %s, have size: %d, size: %" G_GUINT64_FORMAT ", duration: %"
3725       G_GINT64_FORMAT, gst_format_get_name (format), have_size, size,
3726       basesrc->segment.duration);
3727
3728   seekable = gst_base_src_seekable (basesrc);
3729   GST_DEBUG_OBJECT (basesrc, "is seekable: %d", seekable);
3730
3731   /* update for random access flag */
3732   basesrc->random_access = seekable && format == GST_FORMAT_BYTES;
3733
3734   GST_DEBUG_OBJECT (basesrc, "is random_access: %d", basesrc->random_access);
3735
3736   gst_pad_mark_reconfigure (GST_BASE_SRC_PAD (basesrc));
3737
3738   GST_OBJECT_LOCK (basesrc->srcpad);
3739   mode = GST_PAD_MODE (basesrc->srcpad);
3740   GST_OBJECT_UNLOCK (basesrc->srcpad);
3741
3742   /* take the stream lock here, we only want to let the task run when we have
3743    * set the STARTED flag */
3744   GST_PAD_STREAM_LOCK (basesrc->srcpad);
3745   switch (mode) {
3746     case GST_PAD_MODE_PUSH:
3747       /* do initial seek, which will start the task */
3748       GST_OBJECT_LOCK (basesrc);
3749       event = basesrc->pending_seek;
3750       basesrc->pending_seek = NULL;
3751       GST_OBJECT_UNLOCK (basesrc);
3752
3753       /* The perform seek code will start the task when finished. We don't have to
3754        * unlock the streaming thread because it is not running yet */
3755       if (G_UNLIKELY (!gst_base_src_perform_seek (basesrc, event, FALSE)))
3756         goto seek_failed;
3757
3758       if (event)
3759         gst_event_unref (event);
3760       break;
3761     case GST_PAD_MODE_PULL:
3762       /* if not random_access, we cannot operate in pull mode for now */
3763       if (G_UNLIKELY (!basesrc->random_access))
3764         goto no_get_range;
3765       break;
3766     default:
3767       goto not_activated_yet;
3768       break;
3769   }
3770
3771   GST_OBJECT_LOCK (basesrc);
3772   GST_OBJECT_FLAG_SET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3773   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3774   basesrc->priv->start_result = ret;
3775   GST_ASYNC_SIGNAL (basesrc);
3776   GST_OBJECT_UNLOCK (basesrc);
3777
3778   GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3779
3780   return;
3781
3782 seek_failed:
3783   {
3784     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3785     GST_ERROR_OBJECT (basesrc, "Failed to perform initial seek");
3786     gst_base_src_stop (basesrc);
3787     if (event)
3788       gst_event_unref (event);
3789     ret = GST_FLOW_ERROR;
3790     goto error;
3791   }
3792 no_get_range:
3793   {
3794     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3795     gst_base_src_stop (basesrc);
3796     GST_ERROR_OBJECT (basesrc, "Cannot operate in pull mode, stopping");
3797     ret = GST_FLOW_ERROR;
3798     goto error;
3799   }
3800 not_activated_yet:
3801   {
3802     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3803     gst_base_src_stop (basesrc);
3804     GST_WARNING_OBJECT (basesrc, "pad not activated yet");
3805     ret = GST_FLOW_ERROR;
3806     goto error;
3807   }
3808 error:
3809   {
3810     GST_OBJECT_LOCK (basesrc);
3811     basesrc->priv->start_result = ret;
3812     GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3813     GST_ASYNC_SIGNAL (basesrc);
3814     GST_OBJECT_UNLOCK (basesrc);
3815     return;
3816   }
3817 }
3818
3819 /**
3820  * gst_base_src_start_wait:
3821  * @basesrc: base source instance
3822  *
3823  * Wait until the start operation completes.
3824  *
3825  * Returns: a #GstFlowReturn.
3826  */
3827 GstFlowReturn
3828 gst_base_src_start_wait (GstBaseSrc * basesrc)
3829 {
3830   GstFlowReturn result;
3831
3832   GST_OBJECT_LOCK (basesrc);
3833   while (GST_BASE_SRC_IS_STARTING (basesrc)) {
3834     GST_ASYNC_WAIT (basesrc);
3835   }
3836   result = basesrc->priv->start_result;
3837   GST_OBJECT_UNLOCK (basesrc);
3838
3839   GST_DEBUG_OBJECT (basesrc, "got %s", gst_flow_get_name (result));
3840
3841   return result;
3842 }
3843
3844 static gboolean
3845 gst_base_src_stop (GstBaseSrc * basesrc)
3846 {
3847   GstBaseSrcClass *bclass;
3848   gboolean result = TRUE;
3849
3850   GST_DEBUG_OBJECT (basesrc, "stopping source");
3851
3852   /* flush all */
3853   gst_base_src_set_flushing (basesrc, TRUE);
3854
3855   /* stop the task */
3856   gst_pad_stop_task (basesrc->srcpad);
3857   /* stop flushing, this will balance unlock/unlock_stop calls */
3858   gst_base_src_set_flushing (basesrc, FALSE);
3859
3860   GST_OBJECT_LOCK (basesrc);
3861   if (!GST_BASE_SRC_IS_STARTED (basesrc) && !GST_BASE_SRC_IS_STARTING (basesrc))
3862     goto was_stopped;
3863
3864   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTING);
3865   GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_FLAG_STARTED);
3866   basesrc->priv->start_result = GST_FLOW_FLUSHING;
3867   GST_ASYNC_SIGNAL (basesrc);
3868   GST_OBJECT_UNLOCK (basesrc);
3869
3870   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3871   if (bclass->stop)
3872     result = bclass->stop (basesrc);
3873
3874   if (basesrc->priv->pending_bufferlist != NULL) {
3875     gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
3876     basesrc->priv->pending_bufferlist = NULL;
3877   }
3878
3879   gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
3880
3881   return result;
3882
3883 was_stopped:
3884   {
3885     GST_DEBUG_OBJECT (basesrc, "was stopped");
3886     GST_OBJECT_UNLOCK (basesrc);
3887     return TRUE;
3888   }
3889 }
3890
3891 /* start or stop flushing dataprocessing
3892  */
3893 static gboolean
3894 gst_base_src_set_flushing (GstBaseSrc * basesrc, gboolean flushing)
3895 {
3896   GstBaseSrcClass *bclass;
3897
3898   bclass = GST_BASE_SRC_GET_CLASS (basesrc);
3899
3900   GST_DEBUG_OBJECT (basesrc, "flushing %d", flushing);
3901
3902   if (flushing) {
3903     gst_base_src_set_pool_flushing (basesrc, TRUE);
3904     /* unlock any subclasses to allow turning off the streaming thread */
3905     if (bclass->unlock)
3906       bclass->unlock (basesrc);
3907   }
3908
3909   /* the live lock is released when we are blocked, waiting for playing,
3910    * when we sync to the clock or creating a buffer */
3911   GST_LIVE_LOCK (basesrc);
3912   basesrc->priv->flushing = flushing;
3913   if (flushing) {
3914     /* clear pending EOS if any */
3915     if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
3916       GST_OBJECT_LOCK (basesrc);
3917       CLEAR_PENDING_EOS (basesrc);
3918       basesrc->priv->forced_eos = FALSE;
3919       GST_OBJECT_UNLOCK (basesrc);
3920     }
3921
3922     /* unblock clock sync (if any) or any other blocking thing */
3923     if (basesrc->clock_id)
3924       gst_clock_id_unschedule (basesrc->clock_id);
3925   } else {
3926     gst_base_src_set_pool_flushing (basesrc, FALSE);
3927
3928     /* Drop all delayed events */
3929     GST_OBJECT_LOCK (basesrc);
3930     if (basesrc->priv->pending_events) {
3931       g_list_foreach (basesrc->priv->pending_events, (GFunc) gst_event_unref,
3932           NULL);
3933       g_list_free (basesrc->priv->pending_events);
3934       basesrc->priv->pending_events = NULL;
3935       g_atomic_int_set (&basesrc->priv->have_events, FALSE);
3936     }
3937     GST_OBJECT_UNLOCK (basesrc);
3938   }
3939
3940   GST_LIVE_SIGNAL (basesrc);
3941   GST_LIVE_UNLOCK (basesrc);
3942
3943   if (!flushing) {
3944     /* Now wait for the stream lock to be released and clear our unlock request */
3945     GST_PAD_STREAM_LOCK (basesrc->srcpad);
3946     if (bclass->unlock_stop)
3947       bclass->unlock_stop (basesrc);
3948     GST_PAD_STREAM_UNLOCK (basesrc->srcpad);
3949   }
3950
3951   return TRUE;
3952 }
3953
3954 /* the purpose of this function is to make sure that a live source blocks in the
3955  * LIVE lock or leaves the LIVE lock and continues playing. */
3956 static gboolean
3957 gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
3958 {
3959   /* we are now able to grab the LIVE lock, when we get it, we can be
3960    * waiting for PLAYING while blocked in the LIVE cond or we can be waiting
3961    * for the clock. */
3962   GST_LIVE_LOCK (basesrc);
3963   GST_DEBUG_OBJECT (basesrc, "unschedule clock");
3964
3965   /* unblock clock sync (if any) */
3966   if (basesrc->clock_id)
3967     gst_clock_id_unschedule (basesrc->clock_id);
3968
3969   /* configure what to do when we get to the LIVE lock. */
3970   GST_DEBUG_OBJECT (basesrc, "live running %d", live_play);
3971   basesrc->live_running = live_play;
3972
3973   if (live_play) {
3974     gboolean start;
3975
3976     /* for live sources we restart the timestamp correction */
3977     GST_OBJECT_LOCK (basesrc);
3978     basesrc->priv->latency = -1;
3979     GST_OBJECT_UNLOCK (basesrc);
3980     /* have to restart the task in case it stopped because of the unlock when
3981      * we went to PAUSED. Only do this if we operating in push mode. */
3982     GST_OBJECT_LOCK (basesrc->srcpad);
3983     start = (GST_PAD_MODE (basesrc->srcpad) == GST_PAD_MODE_PUSH);
3984     GST_OBJECT_UNLOCK (basesrc->srcpad);
3985     if (start)
3986       gst_pad_start_task (basesrc->srcpad, (GstTaskFunction) gst_base_src_loop,
3987           basesrc->srcpad, NULL);
3988     GST_DEBUG_OBJECT (basesrc, "signal");
3989     GST_LIVE_SIGNAL (basesrc);
3990   }
3991   GST_LIVE_UNLOCK (basesrc);
3992
3993   return TRUE;
3994 }
3995
3996 static gboolean
3997 gst_base_src_activate_push (GstPad * pad, GstObject * parent, gboolean active)
3998 {
3999   GstBaseSrc *basesrc;
4000
4001   basesrc = GST_BASE_SRC (parent);
4002
4003   /* prepare subclass first */
4004   if (active) {
4005     GST_DEBUG_OBJECT (basesrc, "Activating in push mode");
4006
4007     if (G_UNLIKELY (!basesrc->can_activate_push))
4008       goto no_push_activation;
4009
4010     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
4011       goto error_start;
4012   } else {
4013     GST_DEBUG_OBJECT (basesrc, "Deactivating in push mode");
4014     /* now we can stop the source */
4015     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
4016       goto error_stop;
4017   }
4018   return TRUE;
4019
4020   /* ERRORS */
4021 no_push_activation:
4022   {
4023     GST_WARNING_OBJECT (basesrc, "Subclass disabled push-mode activation");
4024     return FALSE;
4025   }
4026 error_start:
4027   {
4028     GST_WARNING_OBJECT (basesrc, "Failed to start in push mode");
4029     return FALSE;
4030   }
4031 error_stop:
4032   {
4033     GST_DEBUG_OBJECT (basesrc, "Failed to stop in push mode");
4034     return FALSE;
4035   }
4036 }
4037
4038 static gboolean
4039 gst_base_src_activate_pull (GstPad * pad, GstObject * parent, gboolean active)
4040 {
4041   GstBaseSrc *basesrc;
4042
4043   basesrc = GST_BASE_SRC (parent);
4044
4045   /* prepare subclass first */
4046   if (active) {
4047     GST_DEBUG_OBJECT (basesrc, "Activating in pull mode");
4048     if (G_UNLIKELY (!gst_base_src_start (basesrc)))
4049       goto error_start;
4050   } else {
4051     GST_DEBUG_OBJECT (basesrc, "Deactivating in pull mode");
4052     if (G_UNLIKELY (!gst_base_src_stop (basesrc)))
4053       goto error_stop;
4054   }
4055   return TRUE;
4056
4057   /* ERRORS */
4058 error_start:
4059   {
4060     GST_ERROR_OBJECT (basesrc, "Failed to start in pull mode");
4061     return FALSE;
4062   }
4063 error_stop:
4064   {
4065     GST_ERROR_OBJECT (basesrc, "Failed to stop in pull mode");
4066     return FALSE;
4067   }
4068 }
4069
4070 static gboolean
4071 gst_base_src_activate_mode (GstPad * pad, GstObject * parent,
4072     GstPadMode mode, gboolean active)
4073 {
4074   gboolean res;
4075   GstBaseSrc *src = GST_BASE_SRC (parent);
4076
4077   src->priv->stream_start_pending = FALSE;
4078
4079   GST_DEBUG_OBJECT (pad, "activating in mode %d", mode);
4080
4081   switch (mode) {
4082     case GST_PAD_MODE_PULL:
4083       res = gst_base_src_activate_pull (pad, parent, active);
4084       break;
4085     case GST_PAD_MODE_PUSH:
4086       src->priv->stream_start_pending = active;
4087       res = gst_base_src_activate_push (pad, parent, active);
4088       break;
4089     default:
4090       GST_LOG_OBJECT (pad, "unknown activation mode %d", mode);
4091       res = FALSE;
4092       break;
4093   }
4094   return res;
4095 }
4096
4097
4098 static GstStateChangeReturn
4099 gst_base_src_change_state (GstElement * element, GstStateChange transition)
4100 {
4101   GstBaseSrc *basesrc;
4102   GstStateChangeReturn result;
4103   gboolean no_preroll = FALSE;
4104
4105   basesrc = GST_BASE_SRC (element);
4106
4107   switch (transition) {
4108     case GST_STATE_CHANGE_NULL_TO_READY:
4109       break;
4110     case GST_STATE_CHANGE_READY_TO_PAUSED:
4111       no_preroll = gst_base_src_is_live (basesrc);
4112       break;
4113     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4114       GST_DEBUG_OBJECT (basesrc, "PAUSED->PLAYING");
4115       if (gst_base_src_is_live (basesrc)) {
4116         /* now we can start playback */
4117         gst_base_src_set_playing (basesrc, TRUE);
4118       }
4119       break;
4120     default:
4121       break;
4122   }
4123
4124   if ((result =
4125           GST_ELEMENT_CLASS (parent_class)->change_state (element,
4126               transition)) == GST_STATE_CHANGE_FAILURE)
4127     goto failure;
4128
4129   switch (transition) {
4130     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
4131       GST_DEBUG_OBJECT (basesrc, "PLAYING->PAUSED");
4132       if (gst_base_src_is_live (basesrc)) {
4133         /* make sure we block in the live cond in PAUSED */
4134         gst_base_src_set_playing (basesrc, FALSE);
4135         no_preroll = TRUE;
4136       }
4137       break;
4138     case GST_STATE_CHANGE_PAUSED_TO_READY:
4139     {
4140       /* we don't need to unblock anything here, the pad deactivation code
4141        * already did this */
4142       if (g_atomic_int_get (&basesrc->priv->has_pending_eos)) {
4143         GST_OBJECT_LOCK (basesrc);
4144         CLEAR_PENDING_EOS (basesrc);
4145         GST_OBJECT_UNLOCK (basesrc);
4146       }
4147       gst_event_replace (&basesrc->pending_seek, NULL);
4148       break;
4149     }
4150     case GST_STATE_CHANGE_READY_TO_NULL:
4151       break;
4152     default:
4153       break;
4154   }
4155
4156   if (no_preroll && result == GST_STATE_CHANGE_SUCCESS)
4157     result = GST_STATE_CHANGE_NO_PREROLL;
4158
4159   return result;
4160
4161   /* ERRORS */
4162 failure:
4163   {
4164     GST_DEBUG_OBJECT (basesrc, "parent failed state change");
4165     return result;
4166   }
4167 }
4168
4169 /**
4170  * gst_base_src_get_buffer_pool:
4171  * @src: a #GstBaseSrc
4172  *
4173  * Returns: (nullable) (transfer full): the instance of the #GstBufferPool used
4174  * by the src; unref it after usage.
4175  */
4176 GstBufferPool *
4177 gst_base_src_get_buffer_pool (GstBaseSrc * src)
4178 {
4179   GstBufferPool *ret = NULL;
4180
4181   g_return_val_if_fail (GST_IS_BASE_SRC (src), NULL);
4182
4183   GST_OBJECT_LOCK (src);
4184   if (src->priv->pool)
4185     ret = gst_object_ref (src->priv->pool);
4186   GST_OBJECT_UNLOCK (src);
4187
4188   return ret;
4189 }
4190
4191 /**
4192  * gst_base_src_get_allocator:
4193  * @src: a #GstBaseSrc
4194  * @allocator: (out) (optional) (nullable) (transfer full): the #GstAllocator
4195  * used
4196  * @params: (out caller-allocates) (optional): the #GstAllocationParams of @allocator
4197  *
4198  * Lets #GstBaseSrc sub-classes to know the memory @allocator
4199  * used by the base class and its @params.
4200  *
4201  * Unref the @allocator after usage.
4202  */
4203 void
4204 gst_base_src_get_allocator (GstBaseSrc * src,
4205     GstAllocator ** allocator, GstAllocationParams * params)
4206 {
4207   g_return_if_fail (GST_IS_BASE_SRC (src));
4208
4209   GST_OBJECT_LOCK (src);
4210   if (allocator)
4211     *allocator = src->priv->allocator ?
4212         gst_object_ref (src->priv->allocator) : NULL;
4213
4214   if (params)
4215     *params = src->priv->params;
4216   GST_OBJECT_UNLOCK (src);
4217 }
4218
4219 /**
4220  * gst_base_src_submit_buffer_list:
4221  * @src: a #GstBaseSrc
4222  * @buffer_list: (transfer full): a #GstBufferList
4223  *
4224  * Subclasses can call this from their create virtual method implementation
4225  * to submit a buffer list to be pushed out later. This is useful in
4226  * cases where the create function wants to produce multiple buffers to be
4227  * pushed out in one go in form of a #GstBufferList, which can reduce overhead
4228  * drastically, especially for packetised inputs (for data streams where
4229  * the packetisation/chunking is not important it is usually more efficient
4230  * to return larger buffers instead).
4231  *
4232  * Subclasses that use this function from their create function must return
4233  * %GST_FLOW_OK and no buffer from their create virtual method implementation.
4234  * If a buffer is returned after a buffer list has also been submitted via this
4235  * function the behaviour is undefined.
4236  *
4237  * Subclasses must only call this function once per create function call and
4238  * subclasses must only call this function when the source operates in push
4239  * mode.
4240  *
4241  * Since: 1.14
4242  */
4243 void
4244 gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
4245 {
4246   g_return_if_fail (GST_IS_BASE_SRC (src));
4247   g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
4248   g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
4249
4250   /* we need it to be writable later in get_range() where we use get_writable */
4251   src->priv->pending_bufferlist = gst_buffer_list_make_writable (buffer_list);
4252
4253   GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
4254       gst_buffer_list_length (buffer_list));
4255 }