basesink: Only reinit the cached GstClockID if it is for the same clock
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its base_init function, like so:
40  * |[
41  * static void
42  * my_element_base_init (gpointer g_class)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (g_class);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199   /* the running time of the previous buffer */
200   GstClockTime prev_rstart;
201
202   /* EOS sync time in running time */
203   GstClockTime eos_rtime;
204
205   /* last buffer that arrived in time, running time */
206   GstClockTime last_render_time;
207   /* when the last buffer left the sink, running time */
208   GstClockTime last_left;
209
210   /* running averages go here these are done on running time */
211   GstClockTime avg_pt;
212   GstClockTime avg_duration;
213   gdouble avg_rate;
214   GstClockTime avg_in_diff;
215
216   /* these are done on system time. avg_jitter and avg_render are
217    * compared to eachother to see if the rendering time takes a
218    * huge amount of the processing, If so we are flooded with
219    * buffers. */
220   GstClockTime last_left_systime;
221   GstClockTime avg_jitter;
222   GstClockTime start, stop;
223   GstClockTime avg_render;
224
225   /* number of rendered and dropped frames */
226   guint64 rendered;
227   guint64 dropped;
228
229   /* latency stuff */
230   GstClockTime latency;
231
232   /* if we already commited the state */
233   gboolean commited;
234
235   /* when we received EOS */
236   gboolean received_eos;
237
238   /* when we are prerolled and able to report latency */
239   gboolean have_latency;
240
241   /* the last buffer we prerolled or rendered. Useful for making snapshots */
242   gint enable_last_buffer;      /* atomic */
243   GstBuffer *last_buffer;
244
245   /* caps for pull based scheduling */
246   GstCaps *pull_caps;
247
248   /* blocksize for pulling */
249   guint blocksize;
250
251   gboolean discont;
252
253   /* seqnum of the stream */
254   guint32 seqnum;
255
256   gboolean call_preroll;
257   gboolean step_unlock;
258
259   /* we have a pending and a current step operation */
260   GstStepInfo current_step;
261   GstStepInfo pending_step;
262
263   /* Cached GstClockID */
264   GstClockID cached_clock_id;
265
266   /* for throttling and QoS */
267   GstClockTime earliest_in_time;
268   GstClockTime throttle_time;
269 };
270
271 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
272
273 /* generic running average, this has a neutral window size */
274 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
275
276 /* the windows for these running averages are experimentally obtained.
277  * possitive values get averaged more while negative values use a small
278  * window so we can react faster to badness. */
279 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
280 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
281
282 enum
283 {
284   _PR_IS_NOTHING = 1 << 0,
285   _PR_IS_BUFFER = 1 << 1,
286   _PR_IS_BUFFERLIST = 1 << 2,
287   _PR_IS_EVENT = 1 << 3
288 } PrivateObjectType;
289
290 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
291 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
292 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
293 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
294
295 /* BaseSink properties */
296
297 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
298 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
299
300 #define DEFAULT_PREROLL_QUEUE_LEN   0
301 #define DEFAULT_SYNC                TRUE
302 #define DEFAULT_MAX_LATENESS        -1
303 #define DEFAULT_QOS                 FALSE
304 #define DEFAULT_ASYNC               TRUE
305 #define DEFAULT_TS_OFFSET           0
306 #define DEFAULT_BLOCKSIZE           4096
307 #define DEFAULT_RENDER_DELAY        0
308 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
309 #define DEFAULT_THROTTLE_TIME       0
310
311 enum
312 {
313   PROP_0,
314   PROP_PREROLL_QUEUE_LEN,
315   PROP_SYNC,
316   PROP_MAX_LATENESS,
317   PROP_QOS,
318   PROP_ASYNC,
319   PROP_TS_OFFSET,
320   PROP_ENABLE_LAST_BUFFER,
321   PROP_LAST_BUFFER,
322   PROP_BLOCKSIZE,
323   PROP_RENDER_DELAY,
324   PROP_THROTTLE_TIME,
325   PROP_LAST
326 };
327
328 static GstElementClass *parent_class = NULL;
329
330 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
331 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
332 static void gst_base_sink_finalize (GObject * object);
333
334 GType
335 gst_base_sink_get_type (void)
336 {
337   static volatile gsize base_sink_type = 0;
338
339   if (g_once_init_enter (&base_sink_type)) {
340     GType _type;
341     static const GTypeInfo base_sink_info = {
342       sizeof (GstBaseSinkClass),
343       NULL,
344       NULL,
345       (GClassInitFunc) gst_base_sink_class_init,
346       NULL,
347       NULL,
348       sizeof (GstBaseSink),
349       0,
350       (GInstanceInitFunc) gst_base_sink_init,
351     };
352
353     _type = g_type_register_static (GST_TYPE_ELEMENT,
354         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
355     g_once_init_leave (&base_sink_type, _type);
356   }
357   return base_sink_type;
358 }
359
360 static void gst_base_sink_set_property (GObject * object, guint prop_id,
361     const GValue * value, GParamSpec * pspec);
362 static void gst_base_sink_get_property (GObject * object, guint prop_id,
363     GValue * value, GParamSpec * pspec);
364
365 static gboolean gst_base_sink_send_event (GstElement * element,
366     GstEvent * event);
367 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
368 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
369
370 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
371 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
372 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
373     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
374 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
375     GstClockTime * start, GstClockTime * end);
376 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
377     GstPad * pad, gboolean flushing);
378 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
379     gboolean active);
380 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
381     GstSegment * segment);
382 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
383     GstEvent * event, GstSegment * segment);
384
385 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
386     GstStateChange transition);
387
388 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
389 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
390     GstBufferList * list);
391
392 static void gst_base_sink_loop (GstPad * pad);
393 static gboolean gst_base_sink_pad_activate (GstPad * pad);
394 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
395 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
396 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
397
398 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
399 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
400 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
401 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
402 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
403     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
404
405
406 /* check if an object was too late */
407 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
408     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
409     GstClockReturn status, GstClockTimeDiff jitter);
410 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
411     guint8 obj_type, GstMiniObject * obj);
412
413 static void
414 gst_base_sink_class_init (GstBaseSinkClass * klass)
415 {
416   GObjectClass *gobject_class;
417   GstElementClass *gstelement_class;
418
419   gobject_class = G_OBJECT_CLASS (klass);
420   gstelement_class = GST_ELEMENT_CLASS (klass);
421
422   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
423       "basesink element");
424
425   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
426
427   parent_class = g_type_class_peek_parent (klass);
428
429   gobject_class->finalize = gst_base_sink_finalize;
430   gobject_class->set_property = gst_base_sink_set_property;
431   gobject_class->get_property = gst_base_sink_get_property;
432
433   /* FIXME, this next value should be configured using an event from the
434    * upstream element, ie, the BUFFER_SIZE event. */
435   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
436       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
437           "Number of buffers to queue during preroll", 0, G_MAXUINT,
438           DEFAULT_PREROLL_QUEUE_LEN,
439           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_SYNC,
442       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
443           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444
445   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
446       g_param_spec_int64 ("max-lateness", "Max Lateness",
447           "Maximum number of nanoseconds that a buffer can be late before it "
448           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_QOS,
452       g_param_spec_boolean ("qos", "Qos",
453           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455   /**
456    * GstBaseSink:async
457    *
458    * If set to #TRUE, the basesink will perform asynchronous state changes.
459    * When set to #FALSE, the sink will not signal the parent when it prerolls.
460    * Use this option when dealing with sparse streams or when synchronisation is
461    * not required.
462    *
463    * Since: 0.10.15
464    */
465   g_object_class_install_property (gobject_class, PROP_ASYNC,
466       g_param_spec_boolean ("async", "Async",
467           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstBaseSink:ts-offset
471    *
472    * Controls the final synchronisation, a negative value will render the buffer
473    * earlier while a positive value delays playback. This property can be
474    * used to fix synchronisation in bad files.
475    *
476    * Since: 0.10.15
477    */
478   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
479       g_param_spec_int64 ("ts-offset", "TS Offset",
480           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
481           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
482
483   /**
484    * GstBaseSink:enable-last-buffer
485    *
486    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
487    * reference to the last buffer arrived and the last-buffer property is always
488    * set to NULL. This can be useful if you need buffers to be released as soon
489    * as possible, eg. if you're using a buffer pool.
490    *
491    * Since: 0.10.30
492    */
493   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
494       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
495           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
496           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
497
498   /**
499    * GstBaseSink:last-buffer
500    *
501    * The last buffer that arrived in the sink and was used for preroll or for
502    * rendering. This property can be used to generate thumbnails. This property
503    * can be NULL when the sink has not yet received a bufer.
504    *
505    * Since: 0.10.15
506    */
507   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
508       gst_param_spec_mini_object ("last-buffer", "Last Buffer",
509           "The last buffer received in the sink", GST_TYPE_BUFFER,
510           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
511   /**
512    * GstBaseSink:blocksize
513    *
514    * The amount of bytes to pull when operating in pull mode.
515    *
516    * Since: 0.10.22
517    */
518   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
519   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
520       g_param_spec_uint ("blocksize", "Block size",
521           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
522           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
523   /**
524    * GstBaseSink:render-delay
525    *
526    * The additional delay between synchronisation and actual rendering of the
527    * media. This property will add additional latency to the device in order to
528    * make other sinks compensate for the delay.
529    *
530    * Since: 0.10.22
531    */
532   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
533       g_param_spec_uint64 ("render-delay", "Render Delay",
534           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
535           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536   /**
537    * GstBaseSink:throttle-time
538    *
539    * The time to insert between buffers. This property can be used to control
540    * the maximum amount of buffers per second to render. Setting this property
541    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
542    *
543    * Since: 0.10.33
544    */
545   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
546       g_param_spec_uint64 ("throttle-time", "Throttle time",
547           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
548           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   gstelement_class->change_state =
551       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
552   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
553   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
554   gstelement_class->get_query_types =
555       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
556
557   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
558   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
559   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
560   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
561   klass->activate_pull =
562       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
563
564   /* Registering debug symbols for function pointers */
565   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
566   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
571   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
572   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
573   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
575 }
576
577 static GstCaps *
578 gst_base_sink_pad_getcaps (GstPad * pad)
579 {
580   GstBaseSinkClass *bclass;
581   GstBaseSink *bsink;
582   GstCaps *caps = NULL;
583
584   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
585   bclass = GST_BASE_SINK_GET_CLASS (bsink);
586
587   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
588     /* if we are operating in pull mode we only accept the negotiated caps */
589     GST_OBJECT_LOCK (pad);
590     if ((caps = GST_PAD_CAPS (pad)))
591       gst_caps_ref (caps);
592     GST_OBJECT_UNLOCK (pad);
593   }
594   if (caps == NULL) {
595     if (bclass->get_caps)
596       caps = bclass->get_caps (bsink);
597
598     if (caps == NULL) {
599       GstPadTemplate *pad_template;
600
601       pad_template =
602           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
603           "sink");
604       if (pad_template != NULL) {
605         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
606       }
607     }
608   }
609   gst_object_unref (bsink);
610
611   return caps;
612 }
613
614 static gboolean
615 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
616 {
617   GstBaseSinkClass *bclass;
618   GstBaseSink *bsink;
619   gboolean res = TRUE;
620
621   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
622   bclass = GST_BASE_SINK_GET_CLASS (bsink);
623
624   if (res && bclass->set_caps)
625     res = bclass->set_caps (bsink, caps);
626
627   gst_object_unref (bsink);
628
629   return res;
630 }
631
632 static void
633 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
634 {
635   GstBaseSinkClass *bclass;
636   GstBaseSink *bsink;
637
638   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
639   bclass = GST_BASE_SINK_GET_CLASS (bsink);
640
641   if (bclass->fixate)
642     bclass->fixate (bsink, caps);
643
644   gst_object_unref (bsink);
645 }
646
647 static GstFlowReturn
648 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
649     GstCaps * caps, GstBuffer ** buf)
650 {
651   GstBaseSinkClass *bclass;
652   GstBaseSink *bsink;
653   GstFlowReturn result = GST_FLOW_OK;
654
655   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
656   if (G_UNLIKELY (bsink == NULL))
657     return GST_FLOW_WRONG_STATE;
658   bclass = GST_BASE_SINK_GET_CLASS (bsink);
659
660   if (bclass->buffer_alloc)
661     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
662   else
663     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
664
665   gst_object_unref (bsink);
666
667   return result;
668 }
669
670 static void
671 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
672 {
673   GstPadTemplate *pad_template;
674   GstBaseSinkPrivate *priv;
675
676   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
677
678   pad_template =
679       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
680   g_return_if_fail (pad_template != NULL);
681
682   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
683
684   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
685   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
686   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
687   gst_pad_set_bufferalloc_function (basesink->sinkpad,
688       gst_base_sink_pad_buffer_alloc);
689   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
690   gst_pad_set_activatepush_function (basesink->sinkpad,
691       gst_base_sink_pad_activate_push);
692   gst_pad_set_activatepull_function (basesink->sinkpad,
693       gst_base_sink_pad_activate_pull);
694   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
695   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
696   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
697   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
698
699   basesink->pad_mode = GST_ACTIVATE_NONE;
700   basesink->preroll_queue = g_queue_new ();
701   basesink->abidata.ABI.clip_segment = gst_segment_new ();
702   priv->have_latency = FALSE;
703
704   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
705   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
706
707   basesink->sync = DEFAULT_SYNC;
708   basesink->abidata.ABI.max_lateness = DEFAULT_MAX_LATENESS;
709   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
710   priv->async_enabled = DEFAULT_ASYNC;
711   priv->ts_offset = DEFAULT_TS_OFFSET;
712   priv->render_delay = DEFAULT_RENDER_DELAY;
713   priv->blocksize = DEFAULT_BLOCKSIZE;
714   priv->cached_clock_id = NULL;
715   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
716   priv->throttle_time = DEFAULT_THROTTLE_TIME;
717
718   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
719 }
720
721 static void
722 gst_base_sink_finalize (GObject * object)
723 {
724   GstBaseSink *basesink;
725
726   basesink = GST_BASE_SINK (object);
727
728   g_queue_free (basesink->preroll_queue);
729   gst_segment_free (basesink->abidata.ABI.clip_segment);
730
731   G_OBJECT_CLASS (parent_class)->finalize (object);
732 }
733
734 /**
735  * gst_base_sink_set_sync:
736  * @sink: the sink
737  * @sync: the new sync value.
738  *
739  * Configures @sink to synchronize on the clock or not. When
740  * @sync is FALSE, incomming samples will be played as fast as
741  * possible. If @sync is TRUE, the timestamps of the incomming
742  * buffers will be used to schedule the exact render time of its
743  * contents.
744  *
745  * Since: 0.10.4
746  */
747 void
748 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
749 {
750   g_return_if_fail (GST_IS_BASE_SINK (sink));
751
752   GST_OBJECT_LOCK (sink);
753   sink->sync = sync;
754   GST_OBJECT_UNLOCK (sink);
755 }
756
757 /**
758  * gst_base_sink_get_sync:
759  * @sink: the sink
760  *
761  * Checks if @sink is currently configured to synchronize against the
762  * clock.
763  *
764  * Returns: TRUE if the sink is configured to synchronize against the clock.
765  *
766  * Since: 0.10.4
767  */
768 gboolean
769 gst_base_sink_get_sync (GstBaseSink * sink)
770 {
771   gboolean res;
772
773   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
774
775   GST_OBJECT_LOCK (sink);
776   res = sink->sync;
777   GST_OBJECT_UNLOCK (sink);
778
779   return res;
780 }
781
782 /**
783  * gst_base_sink_set_max_lateness:
784  * @sink: the sink
785  * @max_lateness: the new max lateness value.
786  *
787  * Sets the new max lateness value to @max_lateness. This value is
788  * used to decide if a buffer should be dropped or not based on the
789  * buffer timestamp and the current clock time. A value of -1 means
790  * an unlimited time.
791  *
792  * Since: 0.10.4
793  */
794 void
795 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
796 {
797   g_return_if_fail (GST_IS_BASE_SINK (sink));
798
799   GST_OBJECT_LOCK (sink);
800   sink->abidata.ABI.max_lateness = max_lateness;
801   GST_OBJECT_UNLOCK (sink);
802 }
803
804 /**
805  * gst_base_sink_get_max_lateness:
806  * @sink: the sink
807  *
808  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
809  * more details.
810  *
811  * Returns: The maximum time in nanoseconds that a buffer can be late
812  * before it is dropped and not rendered. A value of -1 means an
813  * unlimited time.
814  *
815  * Since: 0.10.4
816  */
817 gint64
818 gst_base_sink_get_max_lateness (GstBaseSink * sink)
819 {
820   gint64 res;
821
822   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
823
824   GST_OBJECT_LOCK (sink);
825   res = sink->abidata.ABI.max_lateness;
826   GST_OBJECT_UNLOCK (sink);
827
828   return res;
829 }
830
831 /**
832  * gst_base_sink_set_qos_enabled:
833  * @sink: the sink
834  * @enabled: the new qos value.
835  *
836  * Configures @sink to send Quality-of-Service events upstream.
837  *
838  * Since: 0.10.5
839  */
840 void
841 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
842 {
843   g_return_if_fail (GST_IS_BASE_SINK (sink));
844
845   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
846 }
847
848 /**
849  * gst_base_sink_is_qos_enabled:
850  * @sink: the sink
851  *
852  * Checks if @sink is currently configured to send Quality-of-Service events
853  * upstream.
854  *
855  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
856  *
857  * Since: 0.10.5
858  */
859 gboolean
860 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
861 {
862   gboolean res;
863
864   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
865
866   res = g_atomic_int_get (&sink->priv->qos_enabled);
867
868   return res;
869 }
870
871 /**
872  * gst_base_sink_set_async_enabled:
873  * @sink: the sink
874  * @enabled: the new async value.
875  *
876  * Configures @sink to perform all state changes asynchronusly. When async is
877  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
878  * preroll buffer. This feature is usefull if the sink does not synchronize
879  * against the clock or when it is dealing with sparse streams.
880  *
881  * Since: 0.10.15
882  */
883 void
884 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
885 {
886   g_return_if_fail (GST_IS_BASE_SINK (sink));
887
888   GST_PAD_PREROLL_LOCK (sink->sinkpad);
889   g_atomic_int_set (&sink->priv->async_enabled, enabled);
890   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
891   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
892 }
893
894 /**
895  * gst_base_sink_is_async_enabled:
896  * @sink: the sink
897  *
898  * Checks if @sink is currently configured to perform asynchronous state
899  * changes to PAUSED.
900  *
901  * Returns: TRUE if the sink is configured to perform asynchronous state
902  * changes.
903  *
904  * Since: 0.10.15
905  */
906 gboolean
907 gst_base_sink_is_async_enabled (GstBaseSink * sink)
908 {
909   gboolean res;
910
911   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
912
913   res = g_atomic_int_get (&sink->priv->async_enabled);
914
915   return res;
916 }
917
918 /**
919  * gst_base_sink_set_ts_offset:
920  * @sink: the sink
921  * @offset: the new offset
922  *
923  * Adjust the synchronisation of @sink with @offset. A negative value will
924  * render buffers earlier than their timestamp. A positive value will delay
925  * rendering. This function can be used to fix playback of badly timestamped
926  * buffers.
927  *
928  * Since: 0.10.15
929  */
930 void
931 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
932 {
933   g_return_if_fail (GST_IS_BASE_SINK (sink));
934
935   GST_OBJECT_LOCK (sink);
936   sink->priv->ts_offset = offset;
937   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
938   GST_OBJECT_UNLOCK (sink);
939 }
940
941 /**
942  * gst_base_sink_get_ts_offset:
943  * @sink: the sink
944  *
945  * Get the synchronisation offset of @sink.
946  *
947  * Returns: The synchronisation offset.
948  *
949  * Since: 0.10.15
950  */
951 GstClockTimeDiff
952 gst_base_sink_get_ts_offset (GstBaseSink * sink)
953 {
954   GstClockTimeDiff res;
955
956   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
957
958   GST_OBJECT_LOCK (sink);
959   res = sink->priv->ts_offset;
960   GST_OBJECT_UNLOCK (sink);
961
962   return res;
963 }
964
965 /**
966  * gst_base_sink_get_last_buffer:
967  * @sink: the sink
968  *
969  * Get the last buffer that arrived in the sink and was used for preroll or for
970  * rendering. This property can be used to generate thumbnails.
971  *
972  * The #GstCaps on the buffer can be used to determine the type of the buffer.
973  *
974  * Free-function: gst_buffer_unref
975  *
976  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
977  *     This function returns NULL when no buffer has arrived in the sink yet
978  *     or when the sink is not in PAUSED or PLAYING.
979  *
980  * Since: 0.10.15
981  */
982 GstBuffer *
983 gst_base_sink_get_last_buffer (GstBaseSink * sink)
984 {
985   GstBuffer *res;
986
987   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
988
989   GST_OBJECT_LOCK (sink);
990   if ((res = sink->priv->last_buffer))
991     gst_buffer_ref (res);
992   GST_OBJECT_UNLOCK (sink);
993
994   return res;
995 }
996
997 /* with OBJECT_LOCK */
998 static void
999 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1000 {
1001   GstBuffer *old;
1002
1003   old = sink->priv->last_buffer;
1004   if (G_LIKELY (old != buffer)) {
1005     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1006     if (G_LIKELY (buffer))
1007       gst_buffer_ref (buffer);
1008     sink->priv->last_buffer = buffer;
1009   } else {
1010     old = NULL;
1011   }
1012   /* avoid unreffing with the lock because cleanup code might want to take the
1013    * lock too */
1014   if (G_LIKELY (old)) {
1015     GST_OBJECT_UNLOCK (sink);
1016     gst_buffer_unref (old);
1017     GST_OBJECT_LOCK (sink);
1018   }
1019 }
1020
1021 static void
1022 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1023 {
1024   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1025     return;
1026
1027   GST_OBJECT_LOCK (sink);
1028   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1029   GST_OBJECT_UNLOCK (sink);
1030 }
1031
1032 /**
1033  * gst_base_sink_set_last_buffer_enabled:
1034  * @sink: the sink
1035  * @enabled: the new enable-last-buffer value.
1036  *
1037  * Configures @sink to store the last received buffer in the last-buffer
1038  * property.
1039  *
1040  * Since: 0.10.30
1041  */
1042 void
1043 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1044 {
1045   g_return_if_fail (GST_IS_BASE_SINK (sink));
1046
1047   /* Only take lock if we change the value */
1048   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1049           !enabled, enabled) && !enabled) {
1050     GST_OBJECT_LOCK (sink);
1051     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1052     GST_OBJECT_UNLOCK (sink);
1053   }
1054 }
1055
1056 /**
1057  * gst_base_sink_is_last_buffer_enabled:
1058  * @sink: the sink
1059  *
1060  * Checks if @sink is currently configured to store the last received buffer in
1061  * the last-buffer property.
1062  *
1063  * Returns: TRUE if the sink is configured to store the last received buffer.
1064  *
1065  * Since: 0.10.30
1066  */
1067 gboolean
1068 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1069 {
1070   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1071
1072   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1073 }
1074
1075 /**
1076  * gst_base_sink_get_latency:
1077  * @sink: the sink
1078  *
1079  * Get the currently configured latency.
1080  *
1081  * Returns: The configured latency.
1082  *
1083  * Since: 0.10.12
1084  */
1085 GstClockTime
1086 gst_base_sink_get_latency (GstBaseSink * sink)
1087 {
1088   GstClockTime res;
1089
1090   GST_OBJECT_LOCK (sink);
1091   res = sink->priv->latency;
1092   GST_OBJECT_UNLOCK (sink);
1093
1094   return res;
1095 }
1096
1097 /**
1098  * gst_base_sink_query_latency:
1099  * @sink: the sink
1100  * @live: (out) (allow-none): if the sink is live
1101  * @upstream_live: (out) (allow-none): if an upstream element is live
1102  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1103  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1104  *
1105  * Query the sink for the latency parameters. The latency will be queried from
1106  * the upstream elements. @live will be TRUE if @sink is configured to
1107  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1108  * element is live.
1109  *
1110  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1111  * for the latency introduced by the upstream elements by setting the
1112  * @min_latency to a strictly possitive value.
1113  *
1114  * This function is mostly used by subclasses.
1115  *
1116  * Returns: TRUE if the query succeeded.
1117  *
1118  * Since: 0.10.12
1119  */
1120 gboolean
1121 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1122     gboolean * upstream_live, GstClockTime * min_latency,
1123     GstClockTime * max_latency)
1124 {
1125   gboolean l, us_live, res, have_latency;
1126   GstClockTime min, max, render_delay;
1127   GstQuery *query;
1128   GstClockTime us_min, us_max;
1129
1130   /* we are live when we sync to the clock */
1131   GST_OBJECT_LOCK (sink);
1132   l = sink->sync;
1133   have_latency = sink->priv->have_latency;
1134   render_delay = sink->priv->render_delay;
1135   GST_OBJECT_UNLOCK (sink);
1136
1137   /* assume no latency */
1138   min = 0;
1139   max = -1;
1140   us_live = FALSE;
1141
1142   if (have_latency) {
1143     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1144     /* we are ready for a latency query this is when we preroll or when we are
1145      * not async. */
1146     query = gst_query_new_latency ();
1147
1148     /* ask the peer for the latency */
1149     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1150       /* get upstream min and max latency */
1151       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1152
1153       if (us_live) {
1154         /* upstream live, use its latency, subclasses should use these
1155          * values to create the complete latency. */
1156         min = us_min;
1157         max = us_max;
1158       }
1159       if (l) {
1160         /* we need to add the render delay if we are live */
1161         if (min != -1)
1162           min += render_delay;
1163         if (max != -1)
1164           max += render_delay;
1165       }
1166     }
1167     gst_query_unref (query);
1168   } else {
1169     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1170     res = FALSE;
1171   }
1172
1173   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1174   if (!res) {
1175     if (!l) {
1176       res = TRUE;
1177       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1178     } else {
1179       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1180     }
1181   }
1182
1183   if (res) {
1184     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1185         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1186         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1187
1188     if (live)
1189       *live = l;
1190     if (upstream_live)
1191       *upstream_live = us_live;
1192     if (min_latency)
1193       *min_latency = min;
1194     if (max_latency)
1195       *max_latency = max;
1196   }
1197   return res;
1198 }
1199
1200 /**
1201  * gst_base_sink_set_render_delay:
1202  * @sink: a #GstBaseSink
1203  * @delay: the new delay
1204  *
1205  * Set the render delay in @sink to @delay. The render delay is the time
1206  * between actual rendering of a buffer and its synchronisation time. Some
1207  * devices might delay media rendering which can be compensated for with this
1208  * function.
1209  *
1210  * After calling this function, this sink will report additional latency and
1211  * other sinks will adjust their latency to delay the rendering of their media.
1212  *
1213  * This function is usually called by subclasses.
1214  *
1215  * Since: 0.10.21
1216  */
1217 void
1218 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1219 {
1220   GstClockTime old_render_delay;
1221
1222   g_return_if_fail (GST_IS_BASE_SINK (sink));
1223
1224   GST_OBJECT_LOCK (sink);
1225   old_render_delay = sink->priv->render_delay;
1226   sink->priv->render_delay = delay;
1227   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1228       GST_TIME_ARGS (delay));
1229   GST_OBJECT_UNLOCK (sink);
1230
1231   if (delay != old_render_delay) {
1232     GST_DEBUG_OBJECT (sink, "posting latency changed");
1233     gst_element_post_message (GST_ELEMENT_CAST (sink),
1234         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1235   }
1236 }
1237
1238 /**
1239  * gst_base_sink_get_render_delay:
1240  * @sink: a #GstBaseSink
1241  *
1242  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1243  * information about the render delay.
1244  *
1245  * Returns: the render delay of @sink.
1246  *
1247  * Since: 0.10.21
1248  */
1249 GstClockTime
1250 gst_base_sink_get_render_delay (GstBaseSink * sink)
1251 {
1252   GstClockTimeDiff res;
1253
1254   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1255
1256   GST_OBJECT_LOCK (sink);
1257   res = sink->priv->render_delay;
1258   GST_OBJECT_UNLOCK (sink);
1259
1260   return res;
1261 }
1262
1263 /**
1264  * gst_base_sink_set_blocksize:
1265  * @sink: a #GstBaseSink
1266  * @blocksize: the blocksize in bytes
1267  *
1268  * Set the number of bytes that the sink will pull when it is operating in pull
1269  * mode.
1270  *
1271  * Since: 0.10.22
1272  */
1273 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1274 void
1275 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1276 {
1277   g_return_if_fail (GST_IS_BASE_SINK (sink));
1278
1279   GST_OBJECT_LOCK (sink);
1280   sink->priv->blocksize = blocksize;
1281   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1282   GST_OBJECT_UNLOCK (sink);
1283 }
1284
1285 /**
1286  * gst_base_sink_get_blocksize:
1287  * @sink: a #GstBaseSink
1288  *
1289  * Get the number of bytes that the sink will pull when it is operating in pull
1290  * mode.
1291  *
1292  * Returns: the number of bytes @sink will pull in pull mode.
1293  *
1294  * Since: 0.10.22
1295  */
1296 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1297 guint
1298 gst_base_sink_get_blocksize (GstBaseSink * sink)
1299 {
1300   guint res;
1301
1302   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1303
1304   GST_OBJECT_LOCK (sink);
1305   res = sink->priv->blocksize;
1306   GST_OBJECT_UNLOCK (sink);
1307
1308   return res;
1309 }
1310
1311 /**
1312  * gst_base_sink_set_throttle_time:
1313  * @sink: a #GstBaseSink
1314  * @throttle: the throttle time in nanoseconds
1315  *
1316  * Set the time that will be inserted between rendered buffers. This
1317  * can be used to control the maximum buffers per second that the sink
1318  * will render. 
1319  *
1320  * Since: 0.10.33
1321  */
1322 void
1323 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1324 {
1325   g_return_if_fail (GST_IS_BASE_SINK (sink));
1326
1327   GST_OBJECT_LOCK (sink);
1328   sink->priv->throttle_time = throttle;
1329   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1330   GST_OBJECT_UNLOCK (sink);
1331 }
1332
1333 /**
1334  * gst_base_sink_get_throttle_time:
1335  * @sink: a #GstBaseSink
1336  *
1337  * Get the time that will be inserted between frames to control the 
1338  * maximum buffers per second.
1339  *
1340  * Returns: the number of nanoseconds @sink will put between frames.
1341  *
1342  * Since: 0.10.33
1343  */
1344 guint64
1345 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1346 {
1347   guint64 res;
1348
1349   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1350
1351   GST_OBJECT_LOCK (sink);
1352   res = sink->priv->throttle_time;
1353   GST_OBJECT_UNLOCK (sink);
1354
1355   return res;
1356 }
1357
1358 static void
1359 gst_base_sink_set_property (GObject * object, guint prop_id,
1360     const GValue * value, GParamSpec * pspec)
1361 {
1362   GstBaseSink *sink = GST_BASE_SINK (object);
1363
1364   switch (prop_id) {
1365     case PROP_PREROLL_QUEUE_LEN:
1366       /* preroll lock necessary to serialize with finish_preroll */
1367       GST_PAD_PREROLL_LOCK (sink->sinkpad);
1368       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1369       GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
1370       break;
1371     case PROP_SYNC:
1372       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1373       break;
1374     case PROP_MAX_LATENESS:
1375       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1376       break;
1377     case PROP_QOS:
1378       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1379       break;
1380     case PROP_ASYNC:
1381       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1382       break;
1383     case PROP_TS_OFFSET:
1384       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1385       break;
1386     case PROP_BLOCKSIZE:
1387       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1388       break;
1389     case PROP_RENDER_DELAY:
1390       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1391       break;
1392     case PROP_ENABLE_LAST_BUFFER:
1393       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1394       break;
1395     case PROP_THROTTLE_TIME:
1396       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1397       break;
1398     default:
1399       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1400       break;
1401   }
1402 }
1403
1404 static void
1405 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1406     GParamSpec * pspec)
1407 {
1408   GstBaseSink *sink = GST_BASE_SINK (object);
1409
1410   switch (prop_id) {
1411     case PROP_PREROLL_QUEUE_LEN:
1412       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1413       break;
1414     case PROP_SYNC:
1415       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1416       break;
1417     case PROP_MAX_LATENESS:
1418       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1419       break;
1420     case PROP_QOS:
1421       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1422       break;
1423     case PROP_ASYNC:
1424       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1425       break;
1426     case PROP_TS_OFFSET:
1427       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1428       break;
1429     case PROP_LAST_BUFFER:
1430       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1431       break;
1432     case PROP_ENABLE_LAST_BUFFER:
1433       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1434       break;
1435     case PROP_BLOCKSIZE:
1436       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1437       break;
1438     case PROP_RENDER_DELAY:
1439       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1440       break;
1441     case PROP_THROTTLE_TIME:
1442       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1443       break;
1444     default:
1445       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1446       break;
1447   }
1448 }
1449
1450
1451 static GstCaps *
1452 gst_base_sink_get_caps (GstBaseSink * sink)
1453 {
1454   return NULL;
1455 }
1456
1457 static gboolean
1458 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1459 {
1460   return TRUE;
1461 }
1462
1463 static GstFlowReturn
1464 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1465     GstCaps * caps, GstBuffer ** buf)
1466 {
1467   *buf = NULL;
1468   return GST_FLOW_OK;
1469 }
1470
1471 /* with PREROLL_LOCK, STREAM_LOCK */
1472 static void
1473 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1474 {
1475   GstMiniObject *obj;
1476
1477   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1478   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1479     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1480     gst_mini_object_unref (obj);
1481   }
1482   /* we can't have EOS anymore now */
1483   basesink->eos = FALSE;
1484   basesink->priv->received_eos = FALSE;
1485   basesink->have_preroll = FALSE;
1486   basesink->priv->step_unlock = FALSE;
1487   basesink->eos_queued = FALSE;
1488   basesink->preroll_queued = 0;
1489   basesink->buffers_queued = 0;
1490   basesink->events_queued = 0;
1491   /* can't report latency anymore until we preroll again */
1492   if (basesink->priv->async_enabled) {
1493     GST_OBJECT_LOCK (basesink);
1494     basesink->priv->have_latency = FALSE;
1495     GST_OBJECT_UNLOCK (basesink);
1496   }
1497   /* and signal any waiters now */
1498   GST_PAD_PREROLL_SIGNAL (pad);
1499 }
1500
1501 /* with STREAM_LOCK, configures given segment with the event information. */
1502 static void
1503 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1504     GstEvent * event, GstSegment * segment)
1505 {
1506   gboolean update;
1507   gdouble rate, arate;
1508   GstFormat format;
1509   gint64 start;
1510   gint64 stop;
1511   gint64 time;
1512
1513   /* the newsegment event is needed to bring the buffer timestamps to the
1514    * stream time and to drop samples outside of the playback segment. */
1515   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1516       &start, &stop, &time);
1517
1518   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1519    * We protect with the OBJECT_LOCK so that we can use the values to
1520    * safely answer a POSITION query. */
1521   GST_OBJECT_LOCK (basesink);
1522   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1523       stop, time);
1524
1525   if (format == GST_FORMAT_TIME) {
1526     GST_DEBUG_OBJECT (basesink,
1527         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1528         "format GST_FORMAT_TIME, "
1529         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1530         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1531         update, rate, arate, GST_TIME_ARGS (segment->start),
1532         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1533         GST_TIME_ARGS (segment->accum));
1534   } else {
1535     GST_DEBUG_OBJECT (basesink,
1536         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1537         "format %d, "
1538         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1539         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1540         segment->format, segment->start, segment->stop, segment->time,
1541         segment->accum);
1542   }
1543   GST_OBJECT_UNLOCK (basesink);
1544 }
1545
1546 /* with PREROLL_LOCK, STREAM_LOCK */
1547 static gboolean
1548 gst_base_sink_commit_state (GstBaseSink * basesink)
1549 {
1550   /* commit state and proceed to next pending state */
1551   GstState current, next, pending, post_pending;
1552   gboolean post_paused = FALSE;
1553   gboolean post_async_done = FALSE;
1554   gboolean post_playing = FALSE;
1555
1556   /* we are certainly not playing async anymore now */
1557   basesink->playing_async = FALSE;
1558
1559   GST_OBJECT_LOCK (basesink);
1560   current = GST_STATE (basesink);
1561   next = GST_STATE_NEXT (basesink);
1562   pending = GST_STATE_PENDING (basesink);
1563   post_pending = pending;
1564
1565   switch (pending) {
1566     case GST_STATE_PLAYING:
1567     {
1568       GstBaseSinkClass *bclass;
1569       GstStateChangeReturn ret;
1570
1571       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1572
1573       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1574
1575       basesink->need_preroll = FALSE;
1576       post_async_done = TRUE;
1577       basesink->priv->commited = TRUE;
1578       post_playing = TRUE;
1579       /* post PAUSED too when we were READY */
1580       if (current == GST_STATE_READY) {
1581         post_paused = TRUE;
1582       }
1583
1584       /* make sure we notify the subclass of async playing */
1585       if (bclass->async_play) {
1586         GST_WARNING_OBJECT (basesink, "deprecated async_play");
1587         ret = bclass->async_play (basesink);
1588         if (ret == GST_STATE_CHANGE_FAILURE)
1589           goto async_failed;
1590       }
1591       break;
1592     }
1593     case GST_STATE_PAUSED:
1594       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1595       post_paused = TRUE;
1596       post_async_done = TRUE;
1597       basesink->priv->commited = TRUE;
1598       post_pending = GST_STATE_VOID_PENDING;
1599       break;
1600     case GST_STATE_READY:
1601     case GST_STATE_NULL:
1602       goto stopping;
1603     case GST_STATE_VOID_PENDING:
1604       goto nothing_pending;
1605     default:
1606       break;
1607   }
1608
1609   /* we can report latency queries now */
1610   basesink->priv->have_latency = TRUE;
1611
1612   GST_STATE (basesink) = pending;
1613   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1614   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1615   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1616   GST_OBJECT_UNLOCK (basesink);
1617
1618   if (post_paused) {
1619     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1620     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1621         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1622             current, next, post_pending));
1623   }
1624   if (post_async_done) {
1625     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1626     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1627         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1628   }
1629   if (post_playing) {
1630     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1631     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1632         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1633             next, pending, GST_STATE_VOID_PENDING));
1634   }
1635
1636   GST_STATE_BROADCAST (basesink);
1637
1638   return TRUE;
1639
1640 nothing_pending:
1641   {
1642     /* Depending on the state, set our vars. We get in this situation when the
1643      * state change function got a change to update the state vars before the
1644      * streaming thread did. This is fine but we need to make sure that we
1645      * update the need_preroll var since it was TRUE when we got here and might
1646      * become FALSE if we got to PLAYING. */
1647     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1648         gst_element_state_get_name (current));
1649     switch (current) {
1650       case GST_STATE_PLAYING:
1651         basesink->need_preroll = FALSE;
1652         break;
1653       case GST_STATE_PAUSED:
1654         basesink->need_preroll = TRUE;
1655         break;
1656       default:
1657         basesink->need_preroll = FALSE;
1658         basesink->flushing = TRUE;
1659         break;
1660     }
1661     /* we can report latency queries now */
1662     basesink->priv->have_latency = TRUE;
1663     GST_OBJECT_UNLOCK (basesink);
1664     return TRUE;
1665   }
1666 stopping:
1667   {
1668     /* app is going to READY */
1669     GST_DEBUG_OBJECT (basesink, "stopping");
1670     basesink->need_preroll = FALSE;
1671     basesink->flushing = TRUE;
1672     GST_OBJECT_UNLOCK (basesink);
1673     return FALSE;
1674   }
1675 async_failed:
1676   {
1677     GST_DEBUG_OBJECT (basesink, "async commit failed");
1678     GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_FAILURE;
1679     GST_OBJECT_UNLOCK (basesink);
1680     return FALSE;
1681   }
1682 }
1683
1684 static void
1685 start_stepping (GstBaseSink * sink, GstSegment * segment,
1686     GstStepInfo * pending, GstStepInfo * current)
1687 {
1688   gint64 end;
1689   GstMessage *message;
1690
1691   GST_DEBUG_OBJECT (sink, "update pending step");
1692
1693   GST_OBJECT_LOCK (sink);
1694   memcpy (current, pending, sizeof (GstStepInfo));
1695   pending->valid = FALSE;
1696   GST_OBJECT_UNLOCK (sink);
1697
1698   /* post message first */
1699   message =
1700       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1701       current->amount, current->rate, current->flush, current->intermediate);
1702   gst_message_set_seqnum (message, current->seqnum);
1703   gst_element_post_message (GST_ELEMENT (sink), message);
1704
1705   /* get the running time of where we paused and remember it */
1706   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1707   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1708
1709   /* set the new rate for the remainder of the segment */
1710   current->start_rate = segment->rate;
1711   segment->rate *= current->rate;
1712   segment->abs_rate = ABS (segment->rate);
1713
1714   /* save values */
1715   if (segment->rate > 0.0)
1716     current->start_stop = segment->stop;
1717   else
1718     current->start_start = segment->start;
1719
1720   if (current->format == GST_FORMAT_TIME) {
1721     end = current->start + current->amount;
1722     if (!current->flush) {
1723       /* update the segment clipping regions for non-flushing seeks */
1724       if (segment->rate > 0.0) {
1725         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1726         segment->last_stop = segment->stop;
1727       } else {
1728         gint64 position;
1729
1730         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1731         segment->time = position;
1732         segment->start = position;
1733         segment->last_stop = position;
1734       }
1735     }
1736   }
1737
1738   GST_DEBUG_OBJECT (sink,
1739       "segment now rate %lf, applied rate %lf, "
1740       "format GST_FORMAT_TIME, "
1741       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1742       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1743       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1744       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1745       GST_TIME_ARGS (segment->accum));
1746
1747   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1748       GST_TIME_ARGS (current->start));
1749
1750   if (current->amount == -1) {
1751     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1752     current->valid = FALSE;
1753   } else {
1754     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1755         "rate: %f", current->amount, gst_format_get_name (current->format),
1756         current->rate);
1757   }
1758 }
1759
1760 static void
1761 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1762     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1763 {
1764   gint64 stop, position;
1765   GstMessage *message;
1766
1767   GST_DEBUG_OBJECT (sink, "step complete");
1768
1769   if (segment->rate > 0.0)
1770     stop = rstart;
1771   else
1772     stop = rstop;
1773
1774   GST_DEBUG_OBJECT (sink,
1775       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1776
1777   if (stop == -1)
1778     current->duration = current->position;
1779   else
1780     current->duration = stop - current->start;
1781
1782   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1783       GST_TIME_ARGS (current->duration));
1784
1785   position = current->start + current->duration;
1786
1787   /* now move the segment to the new running time */
1788   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1789
1790   if (current->flush) {
1791     /* and remove the accumulated time we flushed, start time did not change */
1792     segment->accum = current->start;
1793   } else {
1794     /* start time is now the stepped position */
1795     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1796   }
1797
1798   /* restore the previous rate */
1799   segment->rate = current->start_rate;
1800   segment->abs_rate = ABS (segment->rate);
1801
1802   if (segment->rate > 0.0)
1803     segment->stop = current->start_stop;
1804   else
1805     segment->start = current->start_start;
1806
1807   /* the clip segment is used for position report in paused... */
1808   memcpy (sink->abidata.ABI.clip_segment, segment, sizeof (GstSegment));
1809
1810   /* post the step done when we know the stepped duration in TIME */
1811   message =
1812       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1813       current->amount, current->rate, current->flush, current->intermediate,
1814       current->duration, eos);
1815   gst_message_set_seqnum (message, current->seqnum);
1816   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1817
1818   if (!current->intermediate)
1819     sink->need_preroll = current->need_preroll;
1820
1821   /* and the current step info finished and becomes invalid */
1822   current->valid = FALSE;
1823 }
1824
1825 static gboolean
1826 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1827     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1828     gint64 * rstop)
1829 {
1830   gboolean step_end = FALSE;
1831
1832   /* see if we need to skip this buffer because of stepping */
1833   switch (current->format) {
1834     case GST_FORMAT_TIME:
1835     {
1836       guint64 end;
1837       gint64 first, last;
1838
1839       if (segment->rate > 0.0) {
1840         if (segment->stop == *cstop)
1841           *rstop = *rstart + current->amount;
1842
1843         first = *rstart;
1844         last = *rstop;
1845       } else {
1846         if (segment->start == *cstart)
1847           *rstart = *rstop + current->amount;
1848
1849         first = *rstop;
1850         last = *rstart;
1851       }
1852
1853       end = current->start + current->amount;
1854       current->position = first - current->start;
1855
1856       if (G_UNLIKELY (segment->abs_rate != 1.0))
1857         current->position /= segment->abs_rate;
1858
1859       GST_DEBUG_OBJECT (sink,
1860           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1861           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1862       GST_DEBUG_OBJECT (sink,
1863           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1864           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1865           GST_TIME_ARGS (last - current->start),
1866           GST_TIME_ARGS (current->amount));
1867
1868       if ((current->flush && current->position >= current->amount)
1869           || last >= end) {
1870         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1871         step_end = TRUE;
1872         if (segment->rate > 0.0) {
1873           *rstart = end;
1874           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1875         } else {
1876           *rstop = end;
1877           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1878         }
1879       }
1880       GST_DEBUG_OBJECT (sink,
1881           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1882           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1883       GST_DEBUG_OBJECT (sink,
1884           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1885           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1886       break;
1887     }
1888     case GST_FORMAT_BUFFERS:
1889       GST_DEBUG_OBJECT (sink,
1890           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1891           current->position, current->amount);
1892
1893       if (current->position < current->amount) {
1894         current->position++;
1895       } else {
1896         step_end = TRUE;
1897       }
1898       break;
1899     case GST_FORMAT_DEFAULT:
1900     default:
1901       GST_DEBUG_OBJECT (sink,
1902           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1903           current->position, current->amount);
1904       break;
1905   }
1906   return step_end;
1907 }
1908
1909 /* with STREAM_LOCK, PREROLL_LOCK
1910  *
1911  * Returns TRUE if the object needs synchronisation and takes therefore
1912  * part in prerolling.
1913  *
1914  * rsstart/rsstop contain the start/stop in stream time.
1915  * rrstart/rrstop contain the start/stop in running time.
1916  */
1917 static gboolean
1918 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1919     GstClockTime * rsstart, GstClockTime * rsstop,
1920     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1921     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1922     gboolean * step_end, guint8 obj_type)
1923 {
1924   GstBaseSinkClass *bclass;
1925   GstBuffer *buffer;
1926   GstClockTime start, stop;     /* raw start/stop timestamps */
1927   gint64 cstart, cstop;         /* clipped raw timestamps */
1928   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1929   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1930   GstFormat format;
1931   GstBaseSinkPrivate *priv;
1932   gboolean eos;
1933
1934   priv = basesink->priv;
1935
1936   /* start with nothing */
1937   start = stop = GST_CLOCK_TIME_NONE;
1938
1939   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1940     GstEvent *event = GST_EVENT_CAST (obj);
1941
1942     switch (GST_EVENT_TYPE (event)) {
1943         /* EOS event needs syncing */
1944       case GST_EVENT_EOS:
1945       {
1946         if (basesink->segment.rate >= 0.0) {
1947           sstart = sstop = priv->current_sstop;
1948           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1949             /* we have not seen a buffer yet, use the segment values */
1950             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1951                 basesink->segment.format, basesink->segment.stop);
1952           }
1953         } else {
1954           sstart = sstop = priv->current_sstart;
1955           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1956             /* we have not seen a buffer yet, use the segment values */
1957             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1958                 basesink->segment.format, basesink->segment.start);
1959           }
1960         }
1961
1962         rstart = rstop = priv->eos_rtime;
1963         *do_sync = rstart != -1;
1964         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1965             GST_TIME_ARGS (rstart));
1966         /* if we are stepping, we end now */
1967         *step_end = step->valid;
1968         eos = TRUE;
1969         goto eos_done;
1970       }
1971       default:
1972         /* other events do not need syncing */
1973         /* FIXME, maybe NEWSEGMENT might need synchronisation
1974          * since the POSITION query depends on accumulated times and
1975          * we cannot accumulate the current segment before the previous
1976          * one completed.
1977          */
1978         return FALSE;
1979     }
1980   }
1981
1982   eos = FALSE;
1983
1984 again:
1985   /* else do buffer sync code */
1986   buffer = GST_BUFFER_CAST (obj);
1987
1988   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1989
1990   /* just get the times to see if we need syncing, if the start returns -1 we
1991    * don't sync. */
1992   if (bclass->get_times)
1993     bclass->get_times (basesink, buffer, &start, &stop);
1994
1995   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1996     /* we don't need to sync but we still want to get the timestamps for
1997      * tracking the position */
1998     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1999     *do_sync = FALSE;
2000   } else {
2001     *do_sync = TRUE;
2002   }
2003
2004   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
2005       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
2006       GST_TIME_ARGS (stop), *do_sync);
2007
2008   /* collect segment and format for code clarity */
2009   format = segment->format;
2010
2011   /* no timestamp clipping if we did not get a TIME segment format */
2012   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2013     cstart = start;
2014     cstop = stop;
2015     /* do running and stream time in TIME format */
2016     format = GST_FORMAT_TIME;
2017     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2018     goto do_times;
2019   }
2020
2021   /* clip, only when we know about time */
2022   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2023               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2024     if (step->valid) {
2025       GST_DEBUG_OBJECT (basesink, "step out of segment");
2026       /* when we are stepping, pretend we're at the end of the segment */
2027       if (segment->rate > 0.0) {
2028         cstart = segment->stop;
2029         cstop = segment->stop;
2030       } else {
2031         cstart = segment->start;
2032         cstop = segment->start;
2033       }
2034       goto do_times;
2035     }
2036     goto out_of_segment;
2037   }
2038
2039   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2040     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2041         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2042         GST_TIME_ARGS (cstop));
2043   }
2044
2045   /* set last stop position */
2046   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2047     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2048   else
2049     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2050
2051 do_times:
2052   rstart = gst_segment_to_running_time (segment, format, cstart);
2053   rstop = gst_segment_to_running_time (segment, format, cstop);
2054
2055   if (G_UNLIKELY (step->valid)) {
2056     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2057                 &rstart, &rstop))) {
2058       /* step is still busy, we discard data when we are flushing */
2059       *stepped = step->flush;
2060       GST_DEBUG_OBJECT (basesink, "stepping busy");
2061     }
2062   }
2063   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2064    * upstream is behaving very badly */
2065   sstart = gst_segment_to_stream_time (segment, format, cstart);
2066   sstop = gst_segment_to_stream_time (segment, format, cstop);
2067
2068 eos_done:
2069   /* eos_done label only called when doing EOS, we also stop stepping then */
2070   if (*step_end && step->flush) {
2071     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2072     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2073     *step_end = FALSE;
2074     /* re-determine running start times for adjusted segment
2075      * (which has a flushed amount of running/accumulated time removed) */
2076     if (!GST_IS_EVENT (obj)) {
2077       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2078       goto again;
2079     }
2080   }
2081
2082   /* save times */
2083   *rsstart = sstart;
2084   *rsstop = sstop;
2085   *rrstart = rstart;
2086   *rrstop = rstop;
2087
2088   /* buffers and EOS always need syncing and preroll */
2089   return TRUE;
2090
2091   /* special cases */
2092 out_of_segment:
2093   {
2094     /* we usually clip in the chain function already but stepping could cause
2095      * the segment to be updated later. we return FALSE so that we don't try
2096      * to sync on it. */
2097     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2098     return FALSE;
2099   }
2100 }
2101
2102 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2103  * adjust a timestamp with the latency and timestamp offset. This function does
2104  * not adjust for the render delay. */
2105 static GstClockTime
2106 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2107 {
2108   GstClockTimeDiff ts_offset;
2109
2110   /* don't do anything funny with invalid timestamps */
2111   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2112     return time;
2113
2114   time += basesink->priv->latency;
2115
2116   /* apply offset, be carefull for underflows */
2117   ts_offset = basesink->priv->ts_offset;
2118   if (ts_offset < 0) {
2119     ts_offset = -ts_offset;
2120     if (ts_offset < time)
2121       time -= ts_offset;
2122     else
2123       time = 0;
2124   } else
2125     time += ts_offset;
2126
2127   /* subtract the render delay again, which was included in the latency */
2128   if (time > basesink->priv->render_delay)
2129     time -= basesink->priv->render_delay;
2130   else
2131     time = 0;
2132
2133   return time;
2134 }
2135
2136 /**
2137  * gst_base_sink_wait_clock:
2138  * @sink: the sink
2139  * @time: the running_time to be reached
2140  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2141  *
2142  * This function will block until @time is reached. It is usually called by
2143  * subclasses that use their own internal synchronisation.
2144  *
2145  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2146  * returned. Likewise, if synchronisation is disabled in the element or there
2147  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2148  *
2149  * This function should only be called with the PREROLL_LOCK held, like when
2150  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2151  * receiving a buffer in
2152  * the #GstBaseSinkClass.render() vmethod.
2153  *
2154  * The @time argument should be the running_time of when this method should
2155  * return and is not adjusted with any latency or offset configured in the
2156  * sink.
2157  *
2158  * Since: 0.10.20
2159  *
2160  * Returns: #GstClockReturn
2161  */
2162 GstClockReturn
2163 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2164     GstClockTimeDiff * jitter)
2165 {
2166   GstClockReturn ret;
2167   GstClock *clock;
2168   GstClockTime base_time;
2169
2170   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2171     goto invalid_time;
2172
2173   GST_OBJECT_LOCK (sink);
2174   if (G_UNLIKELY (!sink->sync))
2175     goto no_sync;
2176
2177   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2178     goto no_clock;
2179
2180   base_time = GST_ELEMENT_CAST (sink)->base_time;
2181   GST_LOG_OBJECT (sink,
2182       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2183       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2184
2185   /* add base_time to running_time to get the time against the clock */
2186   time += base_time;
2187
2188   /* Re-use existing clockid if available */
2189   /* FIXME: Casting to GstClockEntry only works because the types
2190    * are the same */
2191   if (G_LIKELY (sink->priv->cached_clock_id != NULL
2192           && GST_CLOCK_ENTRY_CLOCK ((GstClockEntry *) sink->
2193               priv->cached_clock_id) == clock)) {
2194     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2195             time)) {
2196       gst_clock_id_unref (sink->priv->cached_clock_id);
2197       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2198     }
2199   } else {
2200     if (sink->priv->cached_clock_id != NULL)
2201       gst_clock_id_unref (sink->priv->cached_clock_id);
2202     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2203   }
2204   GST_OBJECT_UNLOCK (sink);
2205
2206   /* A blocking wait is performed on the clock. We save the ClockID
2207    * so we can unlock the entry at any time. While we are blocking, we
2208    * release the PREROLL_LOCK so that other threads can interrupt the
2209    * entry. */
2210   sink->clock_id = sink->priv->cached_clock_id;
2211   /* release the preroll lock while waiting */
2212   GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
2213
2214   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2215
2216   GST_PAD_PREROLL_LOCK (sink->sinkpad);
2217   sink->clock_id = NULL;
2218
2219   return ret;
2220
2221   /* no syncing needed */
2222 invalid_time:
2223   {
2224     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2225     return GST_CLOCK_BADTIME;
2226   }
2227 no_sync:
2228   {
2229     GST_DEBUG_OBJECT (sink, "sync disabled");
2230     GST_OBJECT_UNLOCK (sink);
2231     return GST_CLOCK_BADTIME;
2232   }
2233 no_clock:
2234   {
2235     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2236     GST_OBJECT_UNLOCK (sink);
2237     return GST_CLOCK_BADTIME;
2238   }
2239 }
2240
2241 /**
2242  * gst_base_sink_wait_preroll:
2243  * @sink: the sink
2244  *
2245  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2246  * against the clock it must unblock when going from PLAYING to the PAUSED state
2247  * and call this method before continuing to render the remaining data.
2248  *
2249  * This function will block until a state change to PLAYING happens (in which
2250  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2251  * to a state change to READY or a FLUSH event (in which case this function
2252  * returns #GST_FLOW_WRONG_STATE).
2253  *
2254  * This function should only be called with the PREROLL_LOCK held, like in the
2255  * render function.
2256  *
2257  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2258  * continue. Any other return value should be returned from the render vmethod.
2259  *
2260  * Since: 0.10.11
2261  */
2262 GstFlowReturn
2263 gst_base_sink_wait_preroll (GstBaseSink * sink)
2264 {
2265   sink->have_preroll = TRUE;
2266   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2267   /* block until the state changes, or we get a flush, or something */
2268   GST_PAD_PREROLL_WAIT (sink->sinkpad);
2269   sink->have_preroll = FALSE;
2270   if (G_UNLIKELY (sink->flushing))
2271     goto stopping;
2272   if (G_UNLIKELY (sink->priv->step_unlock))
2273     goto step_unlocked;
2274   GST_DEBUG_OBJECT (sink, "continue after preroll");
2275
2276   return GST_FLOW_OK;
2277
2278   /* ERRORS */
2279 stopping:
2280   {
2281     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2282     return GST_FLOW_WRONG_STATE;
2283   }
2284 step_unlocked:
2285   {
2286     sink->priv->step_unlock = FALSE;
2287     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2288     return GST_FLOW_STEP;
2289   }
2290 }
2291
2292 static inline guint8
2293 get_object_type (GstMiniObject * obj)
2294 {
2295   guint8 obj_type;
2296
2297   if (G_LIKELY (GST_IS_BUFFER (obj)))
2298     obj_type = _PR_IS_BUFFER;
2299   else if (GST_IS_EVENT (obj))
2300     obj_type = _PR_IS_EVENT;
2301   else if (GST_IS_BUFFER_LIST (obj))
2302     obj_type = _PR_IS_BUFFERLIST;
2303   else
2304     obj_type = _PR_IS_NOTHING;
2305
2306   return obj_type;
2307 }
2308
2309 /**
2310  * gst_base_sink_do_preroll:
2311  * @sink: the sink
2312  * @obj: (transfer none): the mini object that caused the preroll
2313  *
2314  * If the @sink spawns its own thread for pulling buffers from upstream it
2315  * should call this method after it has pulled a buffer. If the element needed
2316  * to preroll, this function will perform the preroll and will then block
2317  * until the element state is changed.
2318  *
2319  * This function should be called with the PREROLL_LOCK held.
2320  *
2321  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2322  * continue. Any other return value should be returned from the render vmethod.
2323  *
2324  * Since: 0.10.22
2325  */
2326 GstFlowReturn
2327 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2328 {
2329   GstFlowReturn ret;
2330
2331   while (G_UNLIKELY (sink->need_preroll)) {
2332     guint8 obj_type;
2333     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2334
2335     obj_type = get_object_type (obj);
2336
2337     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2338     if (ret != GST_FLOW_OK)
2339       goto preroll_failed;
2340
2341     /* need to recheck here because the commit state could have
2342      * made us not need the preroll anymore */
2343     if (G_LIKELY (sink->need_preroll)) {
2344       /* block until the state changes, or we get a flush, or something */
2345       ret = gst_base_sink_wait_preroll (sink);
2346       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2347         goto preroll_failed;
2348     }
2349   }
2350   return GST_FLOW_OK;
2351
2352   /* ERRORS */
2353 preroll_failed:
2354   {
2355     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2356     return ret;
2357   }
2358 }
2359
2360 /**
2361  * gst_base_sink_wait_eos:
2362  * @sink: the sink
2363  * @time: the running_time to be reached
2364  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2365  *
2366  * This function will block until @time is reached. It is usually called by
2367  * subclasses that use their own internal synchronisation but want to let the
2368  * EOS be handled by the base class.
2369  *
2370  * This function should only be called with the PREROLL_LOCK held, like when
2371  * receiving an EOS event in the ::event vmethod.
2372  *
2373  * The @time argument should be the running_time of when the EOS should happen
2374  * and will be adjusted with any latency and offset configured in the sink.
2375  *
2376  * Returns: #GstFlowReturn
2377  *
2378  * Since: 0.10.15
2379  */
2380 GstFlowReturn
2381 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2382     GstClockTimeDiff * jitter)
2383 {
2384   GstClockReturn status;
2385   GstFlowReturn ret;
2386
2387   do {
2388     GstClockTime stime;
2389
2390     GST_DEBUG_OBJECT (sink, "checking preroll");
2391
2392     /* first wait for the playing state before we can continue */
2393     while (G_UNLIKELY (sink->need_preroll)) {
2394       ret = gst_base_sink_wait_preroll (sink);
2395       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2396         goto flushing;
2397     }
2398
2399     /* preroll done, we can sync since we are in PLAYING now. */
2400     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2401         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2402
2403     /* compensate for latency and ts_offset. We don't adjust for render delay
2404      * because we don't interact with the device on EOS normally. */
2405     stime = gst_base_sink_adjust_time (sink, time);
2406
2407     /* wait for the clock, this can be interrupted because we got shut down or
2408      * we PAUSED. */
2409     status = gst_base_sink_wait_clock (sink, stime, jitter);
2410
2411     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2412
2413     /* invalid time, no clock or sync disabled, just continue then */
2414     if (status == GST_CLOCK_BADTIME)
2415       break;
2416
2417     /* waiting could have been interrupted and we can be flushing now */
2418     if (G_UNLIKELY (sink->flushing))
2419       goto flushing;
2420
2421     /* retry if we got unscheduled, which means we did not reach the timeout
2422      * yet. if some other error occures, we continue. */
2423   } while (status == GST_CLOCK_UNSCHEDULED);
2424
2425   GST_DEBUG_OBJECT (sink, "end of stream");
2426
2427   return GST_FLOW_OK;
2428
2429   /* ERRORS */
2430 flushing:
2431   {
2432     GST_DEBUG_OBJECT (sink, "we are flushing");
2433     return GST_FLOW_WRONG_STATE;
2434   }
2435 }
2436
2437 /* with STREAM_LOCK, PREROLL_LOCK
2438  *
2439  * Make sure we are in PLAYING and synchronize an object to the clock.
2440  *
2441  * If we need preroll, we are not in PLAYING. We try to commit the state
2442  * if needed and then block if we still are not PLAYING.
2443  *
2444  * We start waiting on the clock in PLAYING. If we got interrupted, we
2445  * immediatly try to re-preroll.
2446  *
2447  * Some objects do not need synchronisation (most events) and so this function
2448  * immediatly returns GST_FLOW_OK.
2449  *
2450  * for objects that arrive later than max-lateness to be synchronized to the
2451  * clock have the @late boolean set to TRUE.
2452  *
2453  * This function keeps a running average of the jitter (the diff between the
2454  * clock time and the requested sync time). The jitter is negative for
2455  * objects that arrive in time and positive for late buffers.
2456  *
2457  * does not take ownership of obj.
2458  */
2459 static GstFlowReturn
2460 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2461     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2462 {
2463   GstClockTimeDiff jitter = 0;
2464   gboolean syncable;
2465   GstClockReturn status = GST_CLOCK_OK;
2466   GstClockTime rstart, rstop, sstart, sstop, stime;
2467   gboolean do_sync;
2468   GstBaseSinkPrivate *priv;
2469   GstFlowReturn ret;
2470   GstStepInfo *current, *pending;
2471   gboolean stepped;
2472
2473   priv = basesink->priv;
2474
2475 do_step:
2476   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2477   do_sync = TRUE;
2478   stepped = FALSE;
2479
2480   priv->current_rstart = GST_CLOCK_TIME_NONE;
2481
2482   /* get stepping info */
2483   current = &priv->current_step;
2484   pending = &priv->pending_step;
2485
2486   /* get timing information for this object against the render segment */
2487   syncable = gst_base_sink_get_sync_times (basesink, obj,
2488       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2489       current, step_end, obj_type);
2490
2491   if (G_UNLIKELY (stepped))
2492     goto step_skipped;
2493
2494   /* a syncable object needs to participate in preroll and
2495    * clocking. All buffers and EOS are syncable. */
2496   if (G_UNLIKELY (!syncable))
2497     goto not_syncable;
2498
2499   /* store timing info for current object */
2500   priv->current_rstart = rstart;
2501   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2502
2503   /* save sync time for eos when the previous object needed sync */
2504   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2505
2506   /* calculate inter frame spacing */
2507   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2508     GstClockTime in_diff;
2509
2510     in_diff = rstart - priv->prev_rstart;
2511
2512     if (priv->avg_in_diff == -1)
2513       priv->avg_in_diff = in_diff;
2514     else
2515       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2516
2517     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2518         GST_TIME_ARGS (priv->avg_in_diff));
2519
2520   }
2521   priv->prev_rstart = rstart;
2522
2523   if (G_UNLIKELY (priv->earliest_in_time != -1
2524           && rstart < priv->earliest_in_time))
2525     goto qos_dropped;
2526
2527 again:
2528   /* first do preroll, this makes sure we commit our state
2529    * to PAUSED and can continue to PLAYING. We cannot perform
2530    * any clock sync in PAUSED because there is no clock. */
2531   ret = gst_base_sink_do_preroll (basesink, obj);
2532   if (G_UNLIKELY (ret != GST_FLOW_OK))
2533     goto preroll_failed;
2534
2535   /* update the segment with a pending step if the current one is invalid and we
2536    * have a new pending one. We only accept new step updates after a preroll */
2537   if (G_UNLIKELY (pending->valid && !current->valid)) {
2538     start_stepping (basesink, &basesink->segment, pending, current);
2539     goto do_step;
2540   }
2541
2542   /* After rendering we store the position of the last buffer so that we can use
2543    * it to report the position. We need to take the lock here. */
2544   GST_OBJECT_LOCK (basesink);
2545   priv->current_sstart = sstart;
2546   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2547   GST_OBJECT_UNLOCK (basesink);
2548
2549   if (!do_sync)
2550     goto done;
2551
2552   /* adjust for latency */
2553   stime = gst_base_sink_adjust_time (basesink, rstart);
2554
2555   /* adjust for render-delay, avoid underflows */
2556   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2557     if (stime > priv->render_delay)
2558       stime -= priv->render_delay;
2559     else
2560       stime = 0;
2561   }
2562
2563   /* preroll done, we can sync since we are in PLAYING now. */
2564   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2565       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2566       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2567
2568   /* This function will return immediatly if start == -1, no clock
2569    * or sync is disabled with GST_CLOCK_BADTIME. */
2570   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2571
2572   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2573       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2574
2575   /* invalid time, no clock or sync disabled, just render */
2576   if (status == GST_CLOCK_BADTIME)
2577     goto done;
2578
2579   /* waiting could have been interrupted and we can be flushing now */
2580   if (G_UNLIKELY (basesink->flushing))
2581     goto flushing;
2582
2583   /* check for unlocked by a state change, we are not flushing so
2584    * we can try to preroll on the current buffer. */
2585   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2586     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2587     priv->call_preroll = TRUE;
2588     goto again;
2589   }
2590
2591   /* successful syncing done, record observation */
2592   priv->current_jitter = jitter;
2593
2594   /* check if the object should be dropped */
2595   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2596       status, jitter);
2597
2598 done:
2599   return GST_FLOW_OK;
2600
2601   /* ERRORS */
2602 step_skipped:
2603   {
2604     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2605     *late = TRUE;
2606     return GST_FLOW_OK;
2607   }
2608 not_syncable:
2609   {
2610     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2611     return GST_FLOW_OK;
2612   }
2613 qos_dropped:
2614   {
2615     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2616     *late = TRUE;
2617     return GST_FLOW_OK;
2618   }
2619 flushing:
2620   {
2621     GST_DEBUG_OBJECT (basesink, "we are flushing");
2622     return GST_FLOW_WRONG_STATE;
2623   }
2624 preroll_failed:
2625   {
2626     GST_DEBUG_OBJECT (basesink, "preroll failed");
2627     *step_end = FALSE;
2628     return ret;
2629   }
2630 }
2631
2632 static gboolean
2633 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2634     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2635 {
2636   GstEvent *event;
2637   gboolean res;
2638
2639   /* generate Quality-of-Service event */
2640   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2641       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2642       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2643
2644   event = gst_event_new_qos_full (type, proportion, diff, time);
2645
2646   /* send upstream */
2647   res = gst_pad_push_event (basesink->sinkpad, event);
2648
2649   return res;
2650 }
2651
2652 static void
2653 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2654 {
2655   GstBaseSinkPrivate *priv;
2656   GstClockTime start, stop;
2657   GstClockTimeDiff jitter;
2658   GstClockTime pt, entered, left;
2659   GstClockTime duration;
2660   gdouble rate;
2661
2662   priv = sink->priv;
2663
2664   start = priv->current_rstart;
2665
2666   if (priv->current_step.valid)
2667     return;
2668
2669   /* if Quality-of-Service disabled, do nothing */
2670   if (!g_atomic_int_get (&priv->qos_enabled) ||
2671       !GST_CLOCK_TIME_IS_VALID (start))
2672     return;
2673
2674   stop = priv->current_rstop;
2675   jitter = priv->current_jitter;
2676
2677   if (jitter < 0) {
2678     /* this is the time the buffer entered the sink */
2679     if (start < -jitter)
2680       entered = 0;
2681     else
2682       entered = start + jitter;
2683     left = start;
2684   } else {
2685     /* this is the time the buffer entered the sink */
2686     entered = start + jitter;
2687     /* this is the time the buffer left the sink */
2688     left = start + jitter;
2689   }
2690
2691   /* calculate duration of the buffer */
2692   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2693     duration = stop - start;
2694   else
2695     duration = priv->avg_in_diff;
2696
2697   /* if we have the time when the last buffer left us, calculate
2698    * processing time */
2699   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2700     if (entered > priv->last_left) {
2701       pt = entered - priv->last_left;
2702     } else {
2703       pt = 0;
2704     }
2705   } else {
2706     pt = priv->avg_pt;
2707   }
2708
2709   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2710       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2711       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2712       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2713       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2714       GST_TIME_ARGS (duration), jitter);
2715
2716   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2717       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2718       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2719       priv->avg_rate);
2720
2721   /* collect running averages. for first observations, we copy the
2722    * values */
2723   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2724     priv->avg_duration = duration;
2725   else
2726     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2727
2728   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2729     priv->avg_pt = pt;
2730   else
2731     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2732
2733   if (priv->avg_duration != 0)
2734     rate =
2735         gst_guint64_to_gdouble (priv->avg_pt) /
2736         gst_guint64_to_gdouble (priv->avg_duration);
2737   else
2738     rate = 1.0;
2739
2740   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2741     if (dropped || priv->avg_rate < 0.0) {
2742       priv->avg_rate = rate;
2743     } else {
2744       if (rate > 1.0)
2745         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2746       else
2747         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2748     }
2749   }
2750
2751   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2752       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2753       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2754       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2755
2756
2757   if (priv->avg_rate >= 0.0) {
2758     GstQOSType type;
2759     GstClockTimeDiff diff;
2760
2761     /* if we have a valid rate, start sending QoS messages */
2762     if (priv->current_jitter < 0) {
2763       /* make sure we never go below 0 when adding the jitter to the
2764        * timestamp. */
2765       if (priv->current_rstart < -priv->current_jitter)
2766         priv->current_jitter = -priv->current_rstart;
2767     }
2768
2769     if (priv->throttle_time > 0) {
2770       diff = priv->throttle_time;
2771       type = GST_QOS_TYPE_THROTTLE;
2772     } else {
2773       diff = priv->current_jitter;
2774       if (diff <= 0)
2775         type = GST_QOS_TYPE_OVERFLOW;
2776       else
2777         type = GST_QOS_TYPE_UNDERFLOW;
2778     }
2779
2780     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2781         diff);
2782   }
2783
2784   /* record when this buffer will leave us */
2785   priv->last_left = left;
2786 }
2787
2788 /* reset all qos measuring */
2789 static void
2790 gst_base_sink_reset_qos (GstBaseSink * sink)
2791 {
2792   GstBaseSinkPrivate *priv;
2793
2794   priv = sink->priv;
2795
2796   priv->last_render_time = GST_CLOCK_TIME_NONE;
2797   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2798   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2799   priv->last_left = GST_CLOCK_TIME_NONE;
2800   priv->avg_duration = GST_CLOCK_TIME_NONE;
2801   priv->avg_pt = GST_CLOCK_TIME_NONE;
2802   priv->avg_rate = -1.0;
2803   priv->avg_render = GST_CLOCK_TIME_NONE;
2804   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2805   priv->rendered = 0;
2806   priv->dropped = 0;
2807
2808 }
2809
2810 /* Checks if the object was scheduled too late.
2811  *
2812  * rstart/rstop contain the running_time start and stop values
2813  * of the object.
2814  *
2815  * status and jitter contain the return values from the clock wait.
2816  *
2817  * returns TRUE if the buffer was too late.
2818  */
2819 static gboolean
2820 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2821     GstClockTime rstart, GstClockTime rstop,
2822     GstClockReturn status, GstClockTimeDiff jitter)
2823 {
2824   gboolean late;
2825   gint64 max_lateness;
2826   GstBaseSinkPrivate *priv;
2827
2828   priv = basesink->priv;
2829
2830   late = FALSE;
2831
2832   /* only for objects that were too late */
2833   if (G_LIKELY (status != GST_CLOCK_EARLY))
2834     goto in_time;
2835
2836   max_lateness = basesink->abidata.ABI.max_lateness;
2837
2838   /* check if frame dropping is enabled */
2839   if (max_lateness == -1)
2840     goto no_drop;
2841
2842   /* only check for buffers */
2843   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2844     goto not_buffer;
2845
2846   /* can't do check if we don't have a timestamp */
2847   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2848     goto no_timestamp;
2849
2850   /* we can add a valid stop time */
2851   if (GST_CLOCK_TIME_IS_VALID (rstop))
2852     max_lateness += rstop;
2853   else {
2854     max_lateness += rstart;
2855     /* no stop time, use avg frame diff */
2856     if (priv->avg_in_diff != -1)
2857       max_lateness += priv->avg_in_diff;
2858   }
2859
2860   /* if the jitter bigger than duration and lateness we are too late */
2861   if ((late = rstart + jitter > max_lateness)) {
2862     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2863         "buffer is too late %" GST_TIME_FORMAT
2864         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2865         GST_TIME_ARGS (max_lateness));
2866     /* !!emergency!!, if we did not receive anything valid for more than a
2867      * second, render it anyway so the user sees something */
2868     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2869         rstart - priv->last_render_time > GST_SECOND) {
2870       late = FALSE;
2871       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2872           (_("A lot of buffers are being dropped.")),
2873           ("There may be a timestamping problem, or this computer is too slow."));
2874       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2875           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2876           GST_TIME_ARGS (priv->last_render_time));
2877     }
2878   }
2879
2880 done:
2881   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2882     priv->last_render_time = rstart;
2883     /* the next allowed input timestamp */
2884     if (priv->throttle_time > 0)
2885       priv->earliest_in_time = rstart + priv->throttle_time;
2886   }
2887   return late;
2888
2889   /* all is fine */
2890 in_time:
2891   {
2892     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2893     goto done;
2894   }
2895 no_drop:
2896   {
2897     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2898     goto done;
2899   }
2900 not_buffer:
2901   {
2902     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2903     return FALSE;
2904   }
2905 no_timestamp:
2906   {
2907     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2908     return FALSE;
2909   }
2910 }
2911
2912 /* called before and after calling the render vmethod. It keeps track of how
2913  * much time was spent in the render method and is used to check if we are
2914  * flooded */
2915 static void
2916 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2917 {
2918   GstBaseSinkPrivate *priv;
2919
2920   priv = basesink->priv;
2921
2922   if (start) {
2923     priv->start = gst_util_get_timestamp ();
2924   } else {
2925     GstClockTime elapsed;
2926
2927     priv->stop = gst_util_get_timestamp ();
2928
2929     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2930
2931     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2932       priv->avg_render = elapsed;
2933     else
2934       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2935
2936     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2937         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2938   }
2939 }
2940
2941 /* with STREAM_LOCK, PREROLL_LOCK,
2942  *
2943  * Synchronize the object on the clock and then render it.
2944  *
2945  * takes ownership of obj.
2946  */
2947 static GstFlowReturn
2948 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2949     guint8 obj_type, gpointer obj)
2950 {
2951   GstFlowReturn ret;
2952   GstBaseSinkClass *bclass;
2953   gboolean late, step_end;
2954   gpointer sync_obj;
2955   GstBaseSinkPrivate *priv;
2956
2957   priv = basesink->priv;
2958
2959   if (OBJ_IS_BUFFERLIST (obj_type)) {
2960     /*
2961      * If buffer list, use the first group buffer within the list
2962      * for syncing
2963      */
2964     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
2965     g_assert (NULL != sync_obj);
2966   } else {
2967     sync_obj = obj;
2968   }
2969
2970 again:
2971   late = FALSE;
2972   step_end = FALSE;
2973
2974   /* synchronize this object, non syncable objects return OK
2975    * immediatly. */
2976   ret =
2977       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2978       obj_type);
2979   if (G_UNLIKELY (ret != GST_FLOW_OK))
2980     goto sync_failed;
2981
2982   /* and now render, event or buffer/buffer list. */
2983   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2984     /* drop late buffers unconditionally, let's hope it's unlikely */
2985     if (G_UNLIKELY (late))
2986       goto dropped;
2987
2988     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2989
2990     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2991             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2992       gint do_qos;
2993
2994       /* read once, to get same value before and after */
2995       do_qos = g_atomic_int_get (&priv->qos_enabled);
2996
2997       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2998
2999       /* record rendering time for QoS and stats */
3000       if (do_qos)
3001         gst_base_sink_do_render_stats (basesink, TRUE);
3002
3003       if (!OBJ_IS_BUFFERLIST (obj_type)) {
3004         GstBuffer *buf;
3005
3006         /* For buffer lists do not set last buffer. Creating buffer
3007          * with meaningful data can be done only with memcpy which will
3008          * significantly affect performance */
3009         buf = GST_BUFFER_CAST (obj);
3010         gst_base_sink_set_last_buffer (basesink, buf);
3011
3012         ret = bclass->render (basesink, buf);
3013       } else {
3014         GstBufferList *buflist;
3015
3016         buflist = GST_BUFFER_LIST_CAST (obj);
3017
3018         ret = bclass->render_list (basesink, buflist);
3019       }
3020
3021       if (do_qos)
3022         gst_base_sink_do_render_stats (basesink, FALSE);
3023
3024       if (ret == GST_FLOW_STEP)
3025         goto again;
3026
3027       if (G_UNLIKELY (basesink->flushing))
3028         goto flushing;
3029
3030       priv->rendered++;
3031     }
3032   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3033     GstEvent *event = GST_EVENT_CAST (obj);
3034     gboolean event_res = TRUE;
3035     GstEventType type;
3036
3037     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3038
3039     type = GST_EVENT_TYPE (event);
3040
3041     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3042         gst_event_type_get_name (type));
3043
3044     if (bclass->event)
3045       event_res = bclass->event (basesink, event);
3046
3047     /* when we get here we could be flushing again when the event handler calls
3048      * _wait_eos(). We have to ignore this object in that case. */
3049     if (G_UNLIKELY (basesink->flushing))
3050       goto flushing;
3051
3052     if (G_LIKELY (event_res)) {
3053       guint32 seqnum;
3054
3055       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3056       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3057
3058       switch (type) {
3059         case GST_EVENT_EOS:
3060         {
3061           GstMessage *message;
3062
3063           /* the EOS event is completely handled so we mark
3064            * ourselves as being in the EOS state. eos is also
3065            * protected by the object lock so we can read it when
3066            * answering the POSITION query. */
3067           GST_OBJECT_LOCK (basesink);
3068           basesink->eos = TRUE;
3069           GST_OBJECT_UNLOCK (basesink);
3070
3071           /* ok, now we can post the message */
3072           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3073
3074           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3075           gst_message_set_seqnum (message, seqnum);
3076           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3077           break;
3078         }
3079         case GST_EVENT_NEWSEGMENT:
3080           /* configure the segment */
3081           gst_base_sink_configure_segment (basesink, pad, event,
3082               &basesink->segment);
3083           break;
3084         case GST_EVENT_SINK_MESSAGE:{
3085           GstMessage *msg = NULL;
3086
3087           gst_event_parse_sink_message (event, &msg);
3088
3089           if (msg)
3090             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3091         }
3092         default:
3093           break;
3094       }
3095     }
3096   } else {
3097     g_return_val_if_reached (GST_FLOW_ERROR);
3098   }
3099
3100 done:
3101   if (step_end) {
3102     /* the step ended, check if we need to activate a new step */
3103     GST_DEBUG_OBJECT (basesink, "step ended");
3104     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3105         priv->current_rstart, priv->current_rstop, basesink->eos);
3106     goto again;
3107   }
3108
3109   gst_base_sink_perform_qos (basesink, late);
3110
3111   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3112   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3113   return ret;
3114
3115   /* ERRORS */
3116 sync_failed:
3117   {
3118     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3119     goto done;
3120   }
3121 dropped:
3122   {
3123     priv->dropped++;
3124     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3125
3126     if (g_atomic_int_get (&priv->qos_enabled)) {
3127       GstMessage *qos_msg;
3128       GstClockTime timestamp, duration;
3129
3130       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3131       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3132
3133       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3134           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3135           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3136           GST_TIME_ARGS (priv->current_rstart),
3137           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3138           GST_TIME_ARGS (duration));
3139       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3140           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3141           priv->rendered, priv->dropped);
3142
3143       qos_msg =
3144           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3145           priv->current_rstart, priv->current_sstart, timestamp, duration);
3146       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3147           1000000);
3148       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3149           priv->dropped);
3150       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3151     }
3152     goto done;
3153   }
3154 flushing:
3155   {
3156     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3157     gst_mini_object_unref (obj);
3158     return GST_FLOW_WRONG_STATE;
3159   }
3160 }
3161
3162 /* with STREAM_LOCK, PREROLL_LOCK
3163  *
3164  * Perform preroll on the given object. For buffers this means
3165  * calling the preroll subclass method.
3166  * If that succeeds, the state will be commited.
3167  *
3168  * function does not take ownership of obj.
3169  */
3170 static GstFlowReturn
3171 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3172     GstMiniObject * obj)
3173 {
3174   GstFlowReturn ret;
3175
3176   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3177
3178   /* if it's a buffer, we need to call the preroll method */
3179   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3180     GstBaseSinkClass *bclass;
3181     GstBuffer *buf;
3182     GstClockTime timestamp;
3183
3184     if (OBJ_IS_BUFFERLIST (obj_type)) {
3185       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3186       g_assert (NULL != buf);
3187     } else {
3188       buf = GST_BUFFER_CAST (obj);
3189     }
3190
3191     timestamp = GST_BUFFER_TIMESTAMP (buf);
3192
3193     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3194         GST_TIME_ARGS (timestamp));
3195
3196     /*
3197      * For buffer lists do not set last buffer. Creating buffer
3198      * with meaningful data can be done only with memcpy which will
3199      * significantly affect performance
3200      */
3201     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3202       gst_base_sink_set_last_buffer (basesink, buf);
3203     }
3204
3205     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3206     if (bclass->preroll)
3207       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3208         goto preroll_failed;
3209
3210     basesink->priv->call_preroll = FALSE;
3211   }
3212
3213   /* commit state */
3214   if (G_LIKELY (basesink->playing_async)) {
3215     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3216       goto stopping;
3217   }
3218
3219   return GST_FLOW_OK;
3220
3221   /* ERRORS */
3222 preroll_failed:
3223   {
3224     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3225     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3226     return ret;
3227   }
3228 stopping:
3229   {
3230     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3231     return GST_FLOW_WRONG_STATE;
3232   }
3233 }
3234
3235 /* with STREAM_LOCK, PREROLL_LOCK
3236  *
3237  * Queue an object for rendering.
3238  * The first prerollable object queued will complete the preroll. If the
3239  * preroll queue if filled, we render all the objects in the queue.
3240  *
3241  * This function takes ownership of the object.
3242  */
3243 static GstFlowReturn
3244 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3245     guint8 obj_type, gpointer obj, gboolean prerollable)
3246 {
3247   GstFlowReturn ret = GST_FLOW_OK;
3248   gint length;
3249   GQueue *q;
3250
3251   if (G_UNLIKELY (basesink->need_preroll)) {
3252     if (G_LIKELY (prerollable))
3253       basesink->preroll_queued++;
3254
3255     length = basesink->preroll_queued;
3256
3257     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3258
3259     /* first prerollable item needs to finish the preroll */
3260     if (length == 1) {
3261       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3262       if (G_UNLIKELY (ret != GST_FLOW_OK))
3263         goto preroll_failed;
3264     }
3265     /* need to recheck if we need preroll, commmit state during preroll
3266      * could have made us not need more preroll. */
3267     if (G_UNLIKELY (basesink->need_preroll)) {
3268       /* see if we can render now, if we can't add the object to the preroll
3269        * queue. */
3270       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3271         goto more_preroll;
3272     }
3273   }
3274   /* we can start rendering (or blocking) the queued object
3275    * if any. */
3276   q = basesink->preroll_queue;
3277   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3278     GstMiniObject *o;
3279     guint8 ot;
3280
3281     o = g_queue_pop_head (q);
3282     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3283
3284     ot = get_object_type (o);
3285
3286     /* do something with the return value */
3287     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3288     if (ret != GST_FLOW_OK)
3289       goto dequeue_failed;
3290   }
3291
3292   /* now render the object */
3293   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3294   basesink->preroll_queued = 0;
3295
3296   return ret;
3297
3298   /* special cases */
3299 preroll_failed:
3300   {
3301     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3302         gst_flow_get_name (ret));
3303     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3304     return ret;
3305   }
3306 more_preroll:
3307   {
3308     /* add object to the queue and return */
3309     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3310         length, basesink->preroll_queue_max_len);
3311     g_queue_push_tail (basesink->preroll_queue, obj);
3312     return GST_FLOW_OK;
3313   }
3314 dequeue_failed:
3315   {
3316     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3317         gst_flow_get_name (ret));
3318     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3319     return ret;
3320   }
3321 }
3322
3323 /* with STREAM_LOCK
3324  *
3325  * This function grabs the PREROLL_LOCK and adds the object to
3326  * the queue.
3327  *
3328  * This function takes ownership of obj.
3329  *
3330  * Note: Only GstEvent seem to be passed to this private method
3331  */
3332 static GstFlowReturn
3333 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3334     GstMiniObject * obj, gboolean prerollable)
3335 {
3336   GstFlowReturn ret;
3337
3338   GST_PAD_PREROLL_LOCK (pad);
3339   if (G_UNLIKELY (basesink->flushing))
3340     goto flushing;
3341
3342   if (G_UNLIKELY (basesink->priv->received_eos))
3343     goto was_eos;
3344
3345   ret =
3346       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3347       prerollable);
3348   GST_PAD_PREROLL_UNLOCK (pad);
3349
3350   return ret;
3351
3352   /* ERRORS */
3353 flushing:
3354   {
3355     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3356     GST_PAD_PREROLL_UNLOCK (pad);
3357     gst_mini_object_unref (obj);
3358     return GST_FLOW_WRONG_STATE;
3359   }
3360 was_eos:
3361   {
3362     GST_DEBUG_OBJECT (basesink,
3363         "we are EOS, dropping object, return UNEXPECTED");
3364     GST_PAD_PREROLL_UNLOCK (pad);
3365     gst_mini_object_unref (obj);
3366     return GST_FLOW_UNEXPECTED;
3367   }
3368 }
3369
3370 static void
3371 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3372 {
3373   /* make sure we are not blocked on the clock also clear any pending
3374    * eos state. */
3375   gst_base_sink_set_flushing (basesink, pad, TRUE);
3376
3377   /* we grab the stream lock but that is not needed since setting the
3378    * sink to flushing would make sure no state commit is being done
3379    * anymore */
3380   GST_PAD_STREAM_LOCK (pad);
3381   gst_base_sink_reset_qos (basesink);
3382   /* and we need to commit our state again on the next
3383    * prerolled buffer */
3384   basesink->playing_async = TRUE;
3385   if (basesink->priv->async_enabled) {
3386     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3387   } else {
3388     basesink->priv->have_latency = TRUE;
3389   }
3390   gst_base_sink_set_last_buffer (basesink, NULL);
3391   GST_PAD_STREAM_UNLOCK (pad);
3392 }
3393
3394 static void
3395 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3396 {
3397   /* unset flushing so we can accept new data, this also flushes out any EOS
3398    * event. */
3399   gst_base_sink_set_flushing (basesink, pad, FALSE);
3400
3401   /* for position reporting */
3402   GST_OBJECT_LOCK (basesink);
3403   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3404   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3405   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3406   basesink->priv->call_preroll = TRUE;
3407   basesink->priv->current_step.valid = FALSE;
3408   basesink->priv->pending_step.valid = FALSE;
3409   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3410     /* we need new segment info after the flush. */
3411     basesink->have_newsegment = FALSE;
3412     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3413     gst_segment_init (basesink->abidata.ABI.clip_segment, GST_FORMAT_UNDEFINED);
3414   }
3415   GST_OBJECT_UNLOCK (basesink);
3416 }
3417
3418 static gboolean
3419 gst_base_sink_event (GstPad * pad, GstEvent * event)
3420 {
3421   GstBaseSink *basesink;
3422   gboolean result = TRUE;
3423   GstBaseSinkClass *bclass;
3424
3425   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3426   if (G_UNLIKELY (basesink == NULL)) {
3427     gst_event_unref (event);
3428     return FALSE;
3429   }
3430
3431   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3432
3433   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3434       event);
3435
3436   switch (GST_EVENT_TYPE (event)) {
3437     case GST_EVENT_EOS:
3438     {
3439       GstFlowReturn ret;
3440
3441       GST_PAD_PREROLL_LOCK (pad);
3442       if (G_UNLIKELY (basesink->flushing))
3443         goto flushing;
3444
3445       if (G_UNLIKELY (basesink->priv->received_eos)) {
3446         /* we can't accept anything when we are EOS */
3447         result = FALSE;
3448         gst_event_unref (event);
3449       } else {
3450         /* we set the received EOS flag here so that we can use it when testing if
3451          * we are prerolled and to refuse more buffers. */
3452         basesink->priv->received_eos = TRUE;
3453
3454         /* EOS is a prerollable object, we call the unlocked version because it
3455          * does not check the received_eos flag. */
3456         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3457             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3458         if (G_UNLIKELY (ret != GST_FLOW_OK))
3459           result = FALSE;
3460       }
3461       GST_PAD_PREROLL_UNLOCK (pad);
3462       break;
3463     }
3464     case GST_EVENT_NEWSEGMENT:
3465     {
3466       GstFlowReturn ret;
3467       gboolean update;
3468
3469       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3470
3471       GST_PAD_PREROLL_LOCK (pad);
3472       if (G_UNLIKELY (basesink->flushing))
3473         goto flushing;
3474
3475       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3476           NULL, NULL);
3477
3478       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3479         /* we can't accept anything when we are EOS */
3480         result = FALSE;
3481         gst_event_unref (event);
3482       } else {
3483         /* the new segment is a non prerollable item and does not block anything,
3484          * we need to configure the current clipping segment and insert the event
3485          * in the queue to serialize it with the buffers for rendering. */
3486         gst_base_sink_configure_segment (basesink, pad, event,
3487             basesink->abidata.ABI.clip_segment);
3488
3489         ret =
3490             gst_base_sink_queue_object_unlocked (basesink, pad,
3491             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3492         if (G_UNLIKELY (ret != GST_FLOW_OK))
3493           result = FALSE;
3494         else {
3495           GST_OBJECT_LOCK (basesink);
3496           basesink->have_newsegment = TRUE;
3497           GST_OBJECT_UNLOCK (basesink);
3498         }
3499       }
3500       GST_PAD_PREROLL_UNLOCK (pad);
3501       break;
3502     }
3503     case GST_EVENT_FLUSH_START:
3504       if (bclass->event)
3505         bclass->event (basesink, event);
3506
3507       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3508
3509       gst_base_sink_flush_start (basesink, pad);
3510
3511       gst_event_unref (event);
3512       break;
3513     case GST_EVENT_FLUSH_STOP:
3514       if (bclass->event)
3515         bclass->event (basesink, event);
3516
3517       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3518
3519       gst_base_sink_flush_stop (basesink, pad);
3520
3521       gst_event_unref (event);
3522       break;
3523     default:
3524       /* other events are sent to queue or subclass depending on if they
3525        * are serialized. */
3526       if (GST_EVENT_IS_SERIALIZED (event)) {
3527         gst_base_sink_queue_object (basesink, pad,
3528             GST_MINI_OBJECT_CAST (event), FALSE);
3529       } else {
3530         if (bclass->event)
3531           bclass->event (basesink, event);
3532         gst_event_unref (event);
3533       }
3534       break;
3535   }
3536 done:
3537   gst_object_unref (basesink);
3538
3539   return result;
3540
3541   /* ERRORS */
3542 flushing:
3543   {
3544     GST_DEBUG_OBJECT (basesink, "we are flushing");
3545     GST_PAD_PREROLL_UNLOCK (pad);
3546     result = FALSE;
3547     gst_event_unref (event);
3548     goto done;
3549   }
3550 }
3551
3552 /* default implementation to calculate the start and end
3553  * timestamps on a buffer, subclasses can override
3554  */
3555 static void
3556 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3557     GstClockTime * start, GstClockTime * end)
3558 {
3559   GstClockTime timestamp, duration;
3560
3561   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3562   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3563
3564     /* get duration to calculate end time */
3565     duration = GST_BUFFER_DURATION (buffer);
3566     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3567       *end = timestamp + duration;
3568     }
3569     *start = timestamp;
3570   }
3571 }
3572
3573 /* must be called with PREROLL_LOCK */
3574 static gboolean
3575 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3576 {
3577   gboolean is_prerolled, res;
3578
3579   /* we have 2 cases where the PREROLL_LOCK is released:
3580    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3581    *  2) we are syncing on the clock
3582    */
3583   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3584   res = !is_prerolled;
3585
3586   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3587       basesink->have_preroll, basesink->priv->received_eos, res);
3588
3589   return res;
3590 }
3591
3592 /* with STREAM_LOCK, PREROLL_LOCK
3593  *
3594  * Takes a buffer and compare the timestamps with the last segment.
3595  * If the buffer falls outside of the segment boundaries, drop it.
3596  * Else queue the buffer for preroll and rendering.
3597  *
3598  * This function takes ownership of the buffer.
3599  */
3600 static GstFlowReturn
3601 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3602     guint8 obj_type, gpointer obj)
3603 {
3604   GstBaseSinkClass *bclass;
3605   GstFlowReturn result;
3606   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3607   GstSegment *clip_segment;
3608   GstBuffer *time_buf;
3609
3610   if (G_UNLIKELY (basesink->flushing))
3611     goto flushing;
3612
3613   if (G_UNLIKELY (basesink->priv->received_eos))
3614     goto was_eos;
3615
3616   if (OBJ_IS_BUFFERLIST (obj_type)) {
3617     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0, 0);
3618     g_assert (NULL != time_buf);
3619   } else {
3620     time_buf = GST_BUFFER_CAST (obj);
3621   }
3622
3623   /* for code clarity */
3624   clip_segment = basesink->abidata.ABI.clip_segment;
3625
3626   if (G_UNLIKELY (!basesink->have_newsegment)) {
3627     gboolean sync;
3628
3629     sync = gst_base_sink_get_sync (basesink);
3630     if (sync) {
3631       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3632           (_("Internal data flow problem.")),
3633           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3634     }
3635
3636     /* this means this sink will assume timestamps start from 0 */
3637     GST_OBJECT_LOCK (basesink);
3638     clip_segment->start = 0;
3639     clip_segment->stop = -1;
3640     basesink->segment.start = 0;
3641     basesink->segment.stop = -1;
3642     basesink->have_newsegment = TRUE;
3643     GST_OBJECT_UNLOCK (basesink);
3644   }
3645
3646   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3647
3648   /* check if the buffer needs to be dropped, we first ask the subclass for the
3649    * start and end */
3650   if (bclass->get_times)
3651     bclass->get_times (basesink, time_buf, &start, &end);
3652
3653   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3654     /* if the subclass does not want sync, we use our own values so that we at
3655      * least clip the buffer to the segment */
3656     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3657   }
3658
3659   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3660       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3661
3662   /* a dropped buffer does not participate in anything */
3663   if (GST_CLOCK_TIME_IS_VALID (start) &&
3664       (clip_segment->format == GST_FORMAT_TIME)) {
3665     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3666                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3667       goto out_of_segment;
3668   }
3669
3670   /* now we can process the buffer in the queue, this function takes ownership
3671    * of the buffer */
3672   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3673       obj_type, obj, TRUE);
3674   return result;
3675
3676   /* ERRORS */
3677 flushing:
3678   {
3679     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3680     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3681     return GST_FLOW_WRONG_STATE;
3682   }
3683 was_eos:
3684   {
3685     GST_DEBUG_OBJECT (basesink,
3686         "we are EOS, dropping object, return UNEXPECTED");
3687     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3688     return GST_FLOW_UNEXPECTED;
3689   }
3690 out_of_segment:
3691   {
3692     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3693     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3694     return GST_FLOW_OK;
3695   }
3696 }
3697
3698 /* with STREAM_LOCK
3699  */
3700 static GstFlowReturn
3701 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3702     guint8 obj_type, gpointer obj)
3703 {
3704   GstFlowReturn result;
3705
3706   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3707     goto wrong_mode;
3708
3709   GST_PAD_PREROLL_LOCK (pad);
3710   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3711   GST_PAD_PREROLL_UNLOCK (pad);
3712
3713 done:
3714   return result;
3715
3716   /* ERRORS */
3717 wrong_mode:
3718   {
3719     GST_OBJECT_LOCK (pad);
3720     GST_WARNING_OBJECT (basesink,
3721         "Push on pad %s:%s, but it was not activated in push mode",
3722         GST_DEBUG_PAD_NAME (pad));
3723     GST_OBJECT_UNLOCK (pad);
3724     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3725     /* we don't post an error message this will signal to the peer
3726      * pushing that EOS is reached. */
3727     result = GST_FLOW_UNEXPECTED;
3728     goto done;
3729   }
3730 }
3731
3732 static GstFlowReturn
3733 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3734 {
3735   GstBaseSink *basesink;
3736
3737   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3738
3739   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3740 }
3741
3742 static GstFlowReturn
3743 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3744 {
3745   GstBaseSink *basesink;
3746   GstBaseSinkClass *bclass;
3747   GstFlowReturn result;
3748
3749   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3750   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3751
3752   if (G_LIKELY (bclass->render_list)) {
3753     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3754   } else {
3755     GstBufferListIterator *it;
3756     GstBuffer *group;
3757
3758     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3759
3760     it = gst_buffer_list_iterate (list);
3761
3762     if (gst_buffer_list_iterator_next_group (it)) {
3763       do {
3764         group = gst_buffer_list_iterator_merge_group (it);
3765         if (group == NULL) {
3766           group = gst_buffer_new ();
3767           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3768         } else {
3769           GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining group");
3770         }
3771         result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, group);
3772       } while (result == GST_FLOW_OK
3773           && gst_buffer_list_iterator_next_group (it));
3774     } else {
3775       GST_CAT_INFO_OBJECT (GST_CAT_SCHEDULING, pad, "chaining empty group");
3776       result =
3777           gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3778           gst_buffer_new ());
3779     }
3780     gst_buffer_list_iterator_free (it);
3781     gst_buffer_list_unref (list);
3782   }
3783   return result;
3784 }
3785
3786
3787 static gboolean
3788 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3789 {
3790   gboolean res = TRUE;
3791
3792   /* update our offset if the start/stop position was updated */
3793   if (segment->format == GST_FORMAT_BYTES) {
3794     segment->time = segment->start;
3795   } else if (segment->start == 0) {
3796     /* seek to start, we can implement a default for this. */
3797     segment->time = 0;
3798   } else {
3799     res = FALSE;
3800     GST_INFO_OBJECT (sink, "Can't do a default seek");
3801   }
3802
3803   return res;
3804 }
3805
3806 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3807
3808 static gboolean
3809 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3810     GstEvent * event, GstSegment * segment)
3811 {
3812   /* By default, we try one of 2 things:
3813    *   - For absolute seek positions, convert the requested position to our
3814    *     configured processing format and place it in the output segment \
3815    *   - For relative seek positions, convert our current (input) values to the
3816    *     seek format, adjust by the relative seek offset and then convert back to
3817    *     the processing format
3818    */
3819   GstSeekType cur_type, stop_type;
3820   gint64 cur, stop;
3821   GstSeekFlags flags;
3822   GstFormat seek_format, dest_format;
3823   gdouble rate;
3824   gboolean update;
3825   gboolean res = TRUE;
3826
3827   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3828       &cur_type, &cur, &stop_type, &stop);
3829   dest_format = segment->format;
3830
3831   if (seek_format == dest_format) {
3832     gst_segment_set_seek (segment, rate, seek_format, flags,
3833         cur_type, cur, stop_type, stop, &update);
3834     return TRUE;
3835   }
3836
3837   if (cur_type != GST_SEEK_TYPE_NONE) {
3838     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3839     res =
3840         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3841         &cur);
3842     cur_type = GST_SEEK_TYPE_SET;
3843   }
3844
3845   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3846     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3847     res =
3848         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3849         &stop);
3850     stop_type = GST_SEEK_TYPE_SET;
3851   }
3852
3853   /* And finally, configure our output segment in the desired format */
3854   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3855       stop_type, stop, &update);
3856
3857   if (!res)
3858     goto no_format;
3859
3860   return res;
3861
3862 no_format:
3863   {
3864     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3865     return FALSE;
3866   }
3867 }
3868
3869 /* perform a seek, only executed in pull mode */
3870 static gboolean
3871 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3872 {
3873   gboolean flush;
3874   gdouble rate;
3875   GstFormat seek_format, dest_format;
3876   GstSeekFlags flags;
3877   GstSeekType cur_type, stop_type;
3878   gboolean seekseg_configured = FALSE;
3879   gint64 cur, stop;
3880   gboolean update, res = TRUE;
3881   GstSegment seeksegment;
3882
3883   dest_format = sink->segment.format;
3884
3885   if (event) {
3886     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3887     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3888         &cur_type, &cur, &stop_type, &stop);
3889
3890     flush = flags & GST_SEEK_FLAG_FLUSH;
3891   } else {
3892     GST_DEBUG_OBJECT (sink, "performing seek without event");
3893     flush = FALSE;
3894   }
3895
3896   if (flush) {
3897     GST_DEBUG_OBJECT (sink, "flushing upstream");
3898     gst_pad_push_event (pad, gst_event_new_flush_start ());
3899     gst_base_sink_flush_start (sink, pad);
3900   } else {
3901     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3902   }
3903
3904   GST_PAD_STREAM_LOCK (pad);
3905
3906   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3907    * copy the current segment info into the temp segment that we can actually
3908    * attempt the seek with. We only update the real segment if the seek suceeds. */
3909   if (!seekseg_configured) {
3910     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3911
3912     /* now configure the final seek segment */
3913     if (event) {
3914       if (sink->segment.format != seek_format) {
3915         /* OK, here's where we give the subclass a chance to convert the relative
3916          * seek into an absolute one in the processing format. We set up any
3917          * absolute seek above, before taking the stream lock. */
3918         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3919                 &seeksegment)) {
3920           GST_DEBUG_OBJECT (sink,
3921               "Preparing the seek failed after flushing. " "Aborting seek");
3922           res = FALSE;
3923         }
3924       } else {
3925         /* The seek format matches our processing format, no need to ask the
3926          * the subclass to configure the segment. */
3927         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3928             cur_type, cur, stop_type, stop, &update);
3929       }
3930     }
3931     /* Else, no seek event passed, so we're just (re)starting the
3932        current segment. */
3933   }
3934
3935   if (res) {
3936     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3937         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3938         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3939
3940     /* do the seek, segment.last_stop contains the new position. */
3941     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3942   }
3943
3944
3945   if (flush) {
3946     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3947     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3948     gst_base_sink_flush_stop (sink, pad);
3949   } else if (res && sink->abidata.ABI.running) {
3950     /* we are running the current segment and doing a non-flushing seek,
3951      * close the segment first based on the last_stop. */
3952     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3953         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3954   }
3955
3956   /* The subclass must have converted the segment to the processing format
3957    * by now */
3958   if (res && seeksegment.format != dest_format) {
3959     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3960         "in the correct format. Aborting seek.");
3961     res = FALSE;
3962   }
3963
3964   /* if successfull seek, we update our real segment and push
3965    * out the new segment. */
3966   if (res) {
3967     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3968
3969     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3970       gst_element_post_message (GST_ELEMENT (sink),
3971           gst_message_new_segment_start (GST_OBJECT (sink),
3972               sink->segment.format, sink->segment.last_stop));
3973     }
3974   }
3975
3976   sink->priv->discont = TRUE;
3977   sink->abidata.ABI.running = TRUE;
3978
3979   GST_PAD_STREAM_UNLOCK (pad);
3980
3981   return res;
3982 }
3983
3984 static void
3985 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3986     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3987     gboolean flush, gboolean intermediate)
3988 {
3989   GST_OBJECT_LOCK (sink);
3990   pending->seqnum = seqnum;
3991   pending->format = format;
3992   pending->amount = amount;
3993   pending->position = 0;
3994   pending->rate = rate;
3995   pending->flush = flush;
3996   pending->intermediate = intermediate;
3997   pending->valid = TRUE;
3998   /* flush invalidates the current stepping segment */
3999   if (flush)
4000     current->valid = FALSE;
4001   GST_OBJECT_UNLOCK (sink);
4002 }
4003
4004 static gboolean
4005 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
4006 {
4007   GstBaseSinkPrivate *priv;
4008   GstBaseSinkClass *bclass;
4009   gboolean flush, intermediate;
4010   gdouble rate;
4011   GstFormat format;
4012   guint64 amount;
4013   guint seqnum;
4014   GstStepInfo *pending, *current;
4015   GstMessage *message;
4016
4017   bclass = GST_BASE_SINK_GET_CLASS (sink);
4018   priv = sink->priv;
4019
4020   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
4021
4022   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
4023   seqnum = gst_event_get_seqnum (event);
4024
4025   pending = &priv->pending_step;
4026   current = &priv->current_step;
4027
4028   /* post message first */
4029   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4030       amount, rate, flush, intermediate);
4031   gst_message_set_seqnum (message, seqnum);
4032   gst_element_post_message (GST_ELEMENT (sink), message);
4033
4034   if (flush) {
4035     /* we need to call ::unlock before locking PREROLL_LOCK
4036      * since we lock it before going into ::render */
4037     if (bclass->unlock)
4038       bclass->unlock (sink);
4039
4040     GST_PAD_PREROLL_LOCK (sink->sinkpad);
4041     /* now that we have the PREROLL lock, clear our unlock request */
4042     if (bclass->unlock_stop)
4043       bclass->unlock_stop (sink);
4044
4045     /* update the stepinfo and make it valid */
4046     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4047         intermediate);
4048
4049     if (sink->priv->async_enabled) {
4050       /* and we need to commit our state again on the next
4051        * prerolled buffer */
4052       sink->playing_async = TRUE;
4053       priv->pending_step.need_preroll = TRUE;
4054       sink->need_preroll = FALSE;
4055       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4056     } else {
4057       sink->priv->have_latency = TRUE;
4058       sink->need_preroll = FALSE;
4059     }
4060     priv->current_sstart = GST_CLOCK_TIME_NONE;
4061     priv->current_sstop = GST_CLOCK_TIME_NONE;
4062     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4063     priv->call_preroll = TRUE;
4064     gst_base_sink_set_last_buffer (sink, NULL);
4065     gst_base_sink_reset_qos (sink);
4066
4067     if (sink->clock_id) {
4068       gst_clock_id_unschedule (sink->clock_id);
4069     }
4070
4071     if (sink->have_preroll) {
4072       GST_DEBUG_OBJECT (sink, "signal waiter");
4073       priv->step_unlock = TRUE;
4074       GST_PAD_PREROLL_SIGNAL (sink->sinkpad);
4075     }
4076     GST_PAD_PREROLL_UNLOCK (sink->sinkpad);
4077   } else {
4078     /* update the stepinfo and make it valid */
4079     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4080         intermediate);
4081   }
4082
4083   return TRUE;
4084 }
4085
4086 /* with STREAM_LOCK
4087  */
4088 static void
4089 gst_base_sink_loop (GstPad * pad)
4090 {
4091   GstBaseSink *basesink;
4092   GstBuffer *buf = NULL;
4093   GstFlowReturn result;
4094   guint blocksize;
4095   guint64 offset;
4096
4097   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4098
4099   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4100
4101   if ((blocksize = basesink->priv->blocksize) == 0)
4102     blocksize = -1;
4103
4104   offset = basesink->segment.last_stop;
4105
4106   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4107       offset, blocksize);
4108
4109   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4110   if (G_UNLIKELY (result != GST_FLOW_OK))
4111     goto paused;
4112
4113   if (G_UNLIKELY (buf == NULL))
4114     goto no_buffer;
4115
4116   offset += GST_BUFFER_SIZE (buf);
4117
4118   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4119
4120   GST_PAD_PREROLL_LOCK (pad);
4121   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4122   GST_PAD_PREROLL_UNLOCK (pad);
4123   if (G_UNLIKELY (result != GST_FLOW_OK))
4124     goto paused;
4125
4126   return;
4127
4128   /* ERRORS */
4129 paused:
4130   {
4131     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4132         gst_flow_get_name (result));
4133     gst_pad_pause_task (pad);
4134     if (result == GST_FLOW_UNEXPECTED) {
4135       /* perform EOS logic */
4136       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4137         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4138             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4139                 basesink->segment.format, basesink->segment.last_stop));
4140       } else {
4141         gst_base_sink_event (pad, gst_event_new_eos ());
4142       }
4143     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4144       /* for fatal errors we post an error message, post the error
4145        * first so the app knows about the error first. 
4146        * wrong-state is not a fatal error because it happens due to
4147        * flushing and posting an error message in that case is the
4148        * wrong thing to do, e.g. when basesrc is doing a flushing
4149        * seek. */
4150       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4151           (_("Internal data stream error.")),
4152           ("stream stopped, reason %s", gst_flow_get_name (result)));
4153       gst_base_sink_event (pad, gst_event_new_eos ());
4154     }
4155     return;
4156   }
4157 no_buffer:
4158   {
4159     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4160     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4161         (_("Internal data flow error.")), ("element returned NULL buffer"));
4162     result = GST_FLOW_ERROR;
4163     goto paused;
4164   }
4165 }
4166
4167 static gboolean
4168 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4169     gboolean flushing)
4170 {
4171   GstBaseSinkClass *bclass;
4172
4173   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4174
4175   if (flushing) {
4176     /* unlock any subclasses, we need to do this before grabbing the
4177      * PREROLL_LOCK since we hold this lock before going into ::render. */
4178     if (bclass->unlock)
4179       bclass->unlock (basesink);
4180   }
4181
4182   GST_PAD_PREROLL_LOCK (pad);
4183   basesink->flushing = flushing;
4184   if (flushing) {
4185     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4186     if (bclass->unlock_stop)
4187       bclass->unlock_stop (basesink);
4188
4189     /* set need_preroll before we unblock the clock. If the clock is unblocked
4190      * before timing out, we can reuse the buffer for preroll. */
4191     basesink->need_preroll = TRUE;
4192
4193     /* step 2, unblock clock sync (if any) or any other blocking thing */
4194     if (basesink->clock_id) {
4195       gst_clock_id_unschedule (basesink->clock_id);
4196     }
4197
4198     /* flush out the data thread if it's locked in finish_preroll, this will
4199      * also flush out the EOS state */
4200     GST_DEBUG_OBJECT (basesink,
4201         "flushing out data thread, need preroll to TRUE");
4202     gst_base_sink_preroll_queue_flush (basesink, pad);
4203   }
4204   GST_PAD_PREROLL_UNLOCK (pad);
4205
4206   return TRUE;
4207 }
4208
4209 static gboolean
4210 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4211 {
4212   gboolean result;
4213
4214   if (active) {
4215     /* start task */
4216     result = gst_pad_start_task (basesink->sinkpad,
4217         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4218   } else {
4219     /* step 2, make sure streaming finishes */
4220     result = gst_pad_stop_task (basesink->sinkpad);
4221   }
4222
4223   return result;
4224 }
4225
4226 static gboolean
4227 gst_base_sink_pad_activate (GstPad * pad)
4228 {
4229   gboolean result = FALSE;
4230   GstBaseSink *basesink;
4231
4232   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4233
4234   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4235
4236   gst_base_sink_set_flushing (basesink, pad, FALSE);
4237
4238   /* we need to have the pull mode enabled */
4239   if (!basesink->can_activate_pull) {
4240     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4241     goto fallback;
4242   }
4243
4244   /* check if downstreams supports pull mode at all */
4245   if (!gst_pad_check_pull_range (pad)) {
4246     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4247     goto fallback;
4248   }
4249
4250   /* set the pad mode before starting the task so that it's in the
4251    * correct state for the new thread. also the sink set_caps and get_caps
4252    * function checks this */
4253   basesink->pad_mode = GST_ACTIVATE_PULL;
4254
4255   /* we first try to negotiate a format so that when we try to activate
4256    * downstream, it knows about our format */
4257   if (!gst_base_sink_negotiate_pull (basesink)) {
4258     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4259     goto fallback;
4260   }
4261
4262   /* ok activate now */
4263   if (!gst_pad_activate_pull (pad, TRUE)) {
4264     /* clear any pending caps */
4265     GST_OBJECT_LOCK (basesink);
4266     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4267     GST_OBJECT_UNLOCK (basesink);
4268     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4269     goto fallback;
4270   }
4271
4272   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4273   result = TRUE;
4274   goto done;
4275
4276   /* push mode fallback */
4277 fallback:
4278   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4279   if ((result = gst_pad_activate_push (pad, TRUE))) {
4280     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4281   }
4282
4283 done:
4284   if (!result) {
4285     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4286     gst_base_sink_set_flushing (basesink, pad, TRUE);
4287   }
4288
4289   gst_object_unref (basesink);
4290
4291   return result;
4292 }
4293
4294 static gboolean
4295 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4296 {
4297   gboolean result;
4298   GstBaseSink *basesink;
4299
4300   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4301
4302   if (active) {
4303     if (!basesink->can_activate_push) {
4304       result = FALSE;
4305       basesink->pad_mode = GST_ACTIVATE_NONE;
4306     } else {
4307       result = TRUE;
4308       basesink->pad_mode = GST_ACTIVATE_PUSH;
4309     }
4310   } else {
4311     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4312       g_warning ("Internal GStreamer activation error!!!");
4313       result = FALSE;
4314     } else {
4315       gst_base_sink_set_flushing (basesink, pad, TRUE);
4316       result = TRUE;
4317       basesink->pad_mode = GST_ACTIVATE_NONE;
4318     }
4319   }
4320
4321   gst_object_unref (basesink);
4322
4323   return result;
4324 }
4325
4326 static gboolean
4327 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4328 {
4329   GstCaps *caps;
4330   gboolean result;
4331
4332   result = FALSE;
4333
4334   /* this returns the intersection between our caps and the peer caps. If there
4335    * is no peer, it returns NULL and we can't operate in pull mode so we can
4336    * fail the negotiation. */
4337   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4338   if (caps == NULL || gst_caps_is_empty (caps))
4339     goto no_caps_possible;
4340
4341   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4342
4343   caps = gst_caps_make_writable (caps);
4344   /* get the first (prefered) format */
4345   gst_caps_truncate (caps);
4346   /* try to fixate */
4347   gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4348
4349   GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4350
4351   if (gst_caps_is_any (caps)) {
4352     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4353         "allowing pull()");
4354     /* neither side has template caps in this case, so they are prepared for
4355        pull() without setcaps() */
4356     result = TRUE;
4357   } else if (gst_caps_is_fixed (caps)) {
4358     if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4359       goto could_not_set_caps;
4360
4361     GST_OBJECT_LOCK (basesink);
4362     gst_caps_replace (&basesink->priv->pull_caps, caps);
4363     GST_OBJECT_UNLOCK (basesink);
4364
4365     result = TRUE;
4366   }
4367
4368   gst_caps_unref (caps);
4369
4370   return result;
4371
4372 no_caps_possible:
4373   {
4374     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4375     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4376     if (caps)
4377       gst_caps_unref (caps);
4378     return FALSE;
4379   }
4380 could_not_set_caps:
4381   {
4382     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4383     gst_caps_unref (caps);
4384     return FALSE;
4385   }
4386 }
4387
4388 /* this won't get called until we implement an activate function */
4389 static gboolean
4390 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4391 {
4392   gboolean result = FALSE;
4393   GstBaseSink *basesink;
4394   GstBaseSinkClass *bclass;
4395
4396   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4397   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4398
4399   if (active) {
4400     GstFormat format;
4401     gint64 duration;
4402
4403     /* we mark we have a newsegment here because pull based
4404      * mode works just fine without having a newsegment before the
4405      * first buffer */
4406     format = GST_FORMAT_BYTES;
4407
4408     gst_segment_init (&basesink->segment, format);
4409     gst_segment_init (basesink->abidata.ABI.clip_segment, format);
4410     GST_OBJECT_LOCK (basesink);
4411     basesink->have_newsegment = TRUE;
4412     GST_OBJECT_UNLOCK (basesink);
4413
4414     /* get the peer duration in bytes */
4415     result = gst_pad_query_peer_duration (pad, &format, &duration);
4416     if (result) {
4417       GST_DEBUG_OBJECT (basesink,
4418           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4419       gst_segment_set_duration (basesink->abidata.ABI.clip_segment, format,
4420           duration);
4421       gst_segment_set_duration (&basesink->segment, format, duration);
4422     } else {
4423       GST_DEBUG_OBJECT (basesink, "unknown duration");
4424     }
4425
4426     if (bclass->activate_pull)
4427       result = bclass->activate_pull (basesink, TRUE);
4428     else
4429       result = FALSE;
4430
4431     if (!result)
4432       goto activate_failed;
4433
4434   } else {
4435     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4436       g_warning ("Internal GStreamer activation error!!!");
4437       result = FALSE;
4438     } else {
4439       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4440       if (bclass->activate_pull)
4441         result &= bclass->activate_pull (basesink, FALSE);
4442       basesink->pad_mode = GST_ACTIVATE_NONE;
4443       /* clear any pending caps */
4444       GST_OBJECT_LOCK (basesink);
4445       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4446       GST_OBJECT_UNLOCK (basesink);
4447     }
4448   }
4449   gst_object_unref (basesink);
4450
4451   return result;
4452
4453   /* ERRORS */
4454 activate_failed:
4455   {
4456     /* reset, as starting the thread failed */
4457     basesink->pad_mode = GST_ACTIVATE_NONE;
4458
4459     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4460     return FALSE;
4461   }
4462 }
4463
4464 /* send an event to our sinkpad peer. */
4465 static gboolean
4466 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4467 {
4468   GstPad *pad;
4469   GstBaseSink *basesink = GST_BASE_SINK (element);
4470   gboolean forward, result = TRUE;
4471   GstActivateMode mode;
4472
4473   GST_OBJECT_LOCK (element);
4474   /* get the pad and the scheduling mode */
4475   pad = gst_object_ref (basesink->sinkpad);
4476   mode = basesink->pad_mode;
4477   GST_OBJECT_UNLOCK (element);
4478
4479   /* only push UPSTREAM events upstream */
4480   forward = GST_EVENT_IS_UPSTREAM (event);
4481
4482   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4483       event);
4484
4485   switch (GST_EVENT_TYPE (event)) {
4486     case GST_EVENT_LATENCY:
4487     {
4488       GstClockTime latency;
4489
4490       gst_event_parse_latency (event, &latency);
4491
4492       /* store the latency. We use this to adjust the running_time before syncing
4493        * it to the clock. */
4494       GST_OBJECT_LOCK (element);
4495       basesink->priv->latency = latency;
4496       if (!basesink->priv->have_latency)
4497         forward = FALSE;
4498       GST_OBJECT_UNLOCK (element);
4499       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4500           GST_TIME_ARGS (latency));
4501
4502       /* We forward this event so that all elements know about the global pipeline
4503        * latency. This is interesting for an element when it wants to figure out
4504        * when a particular piece of data will be rendered. */
4505       break;
4506     }
4507     case GST_EVENT_SEEK:
4508       /* in pull mode we will execute the seek */
4509       if (mode == GST_ACTIVATE_PULL)
4510         result = gst_base_sink_perform_seek (basesink, pad, event);
4511       break;
4512     case GST_EVENT_STEP:
4513       result = gst_base_sink_perform_step (basesink, pad, event);
4514       forward = FALSE;
4515       break;
4516     default:
4517       break;
4518   }
4519
4520   if (forward) {
4521     result = gst_pad_push_event (pad, event);
4522   } else {
4523     /* not forwarded, unref the event */
4524     gst_event_unref (event);
4525   }
4526
4527   gst_object_unref (pad);
4528   return result;
4529 }
4530
4531 static gboolean
4532 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4533     gint64 * cur, gboolean * upstream)
4534 {
4535   GstClock *clock = NULL;
4536   gboolean res = FALSE;
4537   GstFormat oformat, tformat;
4538   GstSegment *segment;
4539   GstClockTime now, latency;
4540   GstClockTimeDiff base;
4541   gint64 time, accum, duration;
4542   gdouble rate;
4543   gint64 last;
4544   gboolean last_seen, with_clock, in_paused;
4545
4546   GST_OBJECT_LOCK (basesink);
4547   /* we can only get the segment when we are not NULL or READY */
4548   if (!basesink->have_newsegment)
4549     goto wrong_state;
4550
4551   in_paused = FALSE;
4552   /* when not in PLAYING or when we're busy with a state change, we
4553    * cannot read from the clock so we report time based on the
4554    * last seen timestamp. */
4555   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4556       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4557     in_paused = TRUE;
4558   }
4559
4560   /* we don't use the clip segment in pull mode, when seeking we update the
4561    * main segment directly with the new segment values without it having to be
4562    * activated by the rendering after preroll */
4563   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4564     segment = basesink->abidata.ABI.clip_segment;
4565   else
4566     segment = &basesink->segment;
4567
4568   /* our intermediate time format */
4569   tformat = GST_FORMAT_TIME;
4570   /* get the format in the segment */
4571   oformat = segment->format;
4572
4573   /* report with last seen position when EOS */
4574   last_seen = basesink->eos;
4575
4576   /* assume we will use the clock for getting the current position */
4577   with_clock = TRUE;
4578   if (basesink->sync == FALSE)
4579     with_clock = FALSE;
4580
4581   /* and we need a clock */
4582   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4583     with_clock = FALSE;
4584   else
4585     gst_object_ref (clock);
4586
4587   /* collect all data we need holding the lock */
4588   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4589     time = segment->time;
4590   else
4591     time = 0;
4592
4593   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4594     duration = segment->stop - segment->start;
4595   else
4596     duration = 0;
4597
4598   accum = segment->accum;
4599   rate = segment->rate * segment->applied_rate;
4600   latency = basesink->priv->latency;
4601
4602   if (oformat == GST_FORMAT_TIME) {
4603     gint64 start, stop;
4604
4605     start = basesink->priv->current_sstart;
4606     stop = basesink->priv->current_sstop;
4607
4608     if (in_paused) {
4609       /* in paused we use the last position as a lower bound */
4610       if (stop == -1 || segment->rate > 0.0)
4611         last = start;
4612       else
4613         last = stop;
4614     } else {
4615       /* in playing, use last stop time as upper bound */
4616       if (start == -1 || segment->rate > 0.0)
4617         last = stop;
4618       else
4619         last = start;
4620     }
4621   } else {
4622     /* convert last stop to stream time */
4623     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4624   }
4625
4626   if (in_paused) {
4627     /* in paused, use start_time */
4628     base = GST_ELEMENT_START_TIME (basesink);
4629     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4630         GST_TIME_ARGS (base));
4631   } else if (with_clock) {
4632     /* else use clock when needed */
4633     base = GST_ELEMENT_CAST (basesink)->base_time;
4634     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4635         GST_TIME_ARGS (base));
4636   } else {
4637     /* else, no sync or clock -> no base time */
4638     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4639     base = -1;
4640   }
4641
4642   /* no base, we can't calculate running_time, use last seem timestamp to report
4643    * time */
4644   if (base == -1)
4645     last_seen = TRUE;
4646
4647   /* need to release the object lock before we can get the time,
4648    * a clock might take the LOCK of the provider, which could be
4649    * a basesink subclass. */
4650   GST_OBJECT_UNLOCK (basesink);
4651
4652   if (last_seen) {
4653     /* in EOS or when no valid stream_time, report the value of last seen
4654      * timestamp */
4655     if (last == -1) {
4656       /* no timestamp, we need to ask upstream */
4657       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4658       res = FALSE;
4659       *upstream = TRUE;
4660       goto done;
4661     }
4662     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4663         GST_TIME_ARGS (last));
4664     *cur = last;
4665   } else {
4666     if (oformat != tformat) {
4667       /* convert accum, time and duration to time */
4668       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4669               &accum))
4670         goto convert_failed;
4671       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4672               &tformat, &duration))
4673         goto convert_failed;
4674       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4675               &time))
4676         goto convert_failed;
4677       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4678               &last))
4679         goto convert_failed;
4680
4681       /* assume time format from now on */
4682       oformat = tformat;
4683     }
4684
4685     if (!in_paused && with_clock) {
4686       now = gst_clock_get_time (clock);
4687     } else {
4688       now = base;
4689       base = 0;
4690     }
4691
4692     /* subtract base time and accumulated time from the clock time.
4693      * Make sure we don't go negative. This is the current time in
4694      * the segment which we need to scale with the combined
4695      * rate and applied rate. */
4696     base += accum;
4697     base += latency;
4698     if (GST_CLOCK_DIFF (base, now) < 0)
4699       base = now;
4700
4701     /* for negative rates we need to count back from the segment
4702      * duration. */
4703     if (rate < 0.0)
4704       time += duration;
4705
4706     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4707
4708     if (in_paused) {
4709       /* never report less than segment values in paused */
4710       if (last != -1)
4711         *cur = MAX (last, *cur);
4712     } else {
4713       /* never report more than last seen position in playing */
4714       if (last != -1)
4715         *cur = MIN (last, *cur);
4716     }
4717
4718     GST_DEBUG_OBJECT (basesink,
4719         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4720         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4721         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4722         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4723   }
4724
4725   if (oformat != format) {
4726     /* convert to final format */
4727     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4728       goto convert_failed;
4729   }
4730
4731   res = TRUE;
4732
4733 done:
4734   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4735       res, GST_TIME_ARGS (*cur));
4736
4737   if (clock)
4738     gst_object_unref (clock);
4739
4740   return res;
4741
4742   /* special cases */
4743 wrong_state:
4744   {
4745     /* in NULL or READY we always return FALSE and -1 */
4746     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4747     res = FALSE;
4748     *cur = -1;
4749     GST_OBJECT_UNLOCK (basesink);
4750     goto done;
4751   }
4752 convert_failed:
4753   {
4754     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4755     *upstream = TRUE;
4756     res = FALSE;
4757     goto done;
4758   }
4759 }
4760
4761 static gboolean
4762 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4763     gint64 * dur, gboolean * upstream)
4764 {
4765   gboolean res = FALSE;
4766
4767   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4768     GstFormat uformat = GST_FORMAT_BYTES;
4769     gint64 uduration;
4770
4771     /* get the duration in bytes, in pull mode that's all we are sure to
4772      * know. We have to explicitly get this value from upstream instead of
4773      * using our cached value because it might change. Duration caching
4774      * should be done at a higher level. */
4775     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4776     if (res) {
4777       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4778       if (format != uformat) {
4779         /* convert to the requested format */
4780         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4781             &format, dur);
4782       } else {
4783         *dur = uduration;
4784       }
4785     }
4786     *upstream = FALSE;
4787   } else {
4788     *upstream = TRUE;
4789   }
4790
4791   return res;
4792 }
4793
4794 static const GstQueryType *
4795 gst_base_sink_get_query_types (GstElement * element)
4796 {
4797   static const GstQueryType query_types[] = {
4798     GST_QUERY_DURATION,
4799     GST_QUERY_POSITION,
4800     GST_QUERY_SEGMENT,
4801     GST_QUERY_LATENCY,
4802     0
4803   };
4804
4805   return query_types;
4806 }
4807
4808 static gboolean
4809 gst_base_sink_query (GstElement * element, GstQuery * query)
4810 {
4811   gboolean res = FALSE;
4812
4813   GstBaseSink *basesink = GST_BASE_SINK (element);
4814
4815   switch (GST_QUERY_TYPE (query)) {
4816     case GST_QUERY_POSITION:
4817     {
4818       gint64 cur = 0;
4819       GstFormat format;
4820       gboolean upstream = FALSE;
4821
4822       gst_query_parse_position (query, &format, NULL);
4823
4824       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4825           gst_format_get_name (format));
4826
4827       /* first try to get the position based on the clock */
4828       if ((res =
4829               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4830         gst_query_set_position (query, format, cur);
4831       } else if (upstream) {
4832         /* fallback to peer query */
4833         res = gst_pad_peer_query (basesink->sinkpad, query);
4834       }
4835       if (!res) {
4836         /* we can handle a few things if upstream failed */
4837         if (format == GST_FORMAT_PERCENT) {
4838           gint64 dur = 0;
4839           GstFormat uformat = GST_FORMAT_TIME;
4840
4841           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4842               &upstream);
4843           if (!res && upstream) {
4844             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4845                 &cur);
4846           }
4847           if (res) {
4848             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4849                 &upstream);
4850             if (!res && upstream) {
4851               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4852                   &dur);
4853             }
4854           }
4855           if (res) {
4856             gint64 pos;
4857
4858             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4859                 dur);
4860             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4861           }
4862         }
4863       }
4864       break;
4865     }
4866     case GST_QUERY_DURATION:
4867     {
4868       gint64 dur = 0;
4869       GstFormat format;
4870       gboolean upstream = FALSE;
4871
4872       gst_query_parse_duration (query, &format, NULL);
4873
4874       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4875           gst_format_get_name (format));
4876
4877       if ((res =
4878               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4879         gst_query_set_duration (query, format, dur);
4880       } else if (upstream) {
4881         /* fallback to peer query */
4882         res = gst_pad_peer_query (basesink->sinkpad, query);
4883       }
4884       if (!res) {
4885         /* we can handle a few things if upstream failed */
4886         if (format == GST_FORMAT_PERCENT) {
4887           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4888               GST_FORMAT_PERCENT_MAX);
4889           res = TRUE;
4890         }
4891       }
4892       break;
4893     }
4894     case GST_QUERY_LATENCY:
4895     {
4896       gboolean live, us_live;
4897       GstClockTime min, max;
4898
4899       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4900                   &max))) {
4901         gst_query_set_latency (query, live, min, max);
4902       }
4903       break;
4904     }
4905     case GST_QUERY_JITTER:
4906       break;
4907     case GST_QUERY_RATE:
4908       /* gst_query_set_rate (query, basesink->segment_rate); */
4909       res = TRUE;
4910       break;
4911     case GST_QUERY_SEGMENT:
4912     {
4913       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4914         gst_query_set_segment (query, basesink->segment.rate,
4915             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4916         res = TRUE;
4917       } else {
4918         res = gst_pad_peer_query (basesink->sinkpad, query);
4919       }
4920       break;
4921     }
4922     case GST_QUERY_SEEKING:
4923     case GST_QUERY_CONVERT:
4924     case GST_QUERY_FORMATS:
4925     default:
4926       res = gst_pad_peer_query (basesink->sinkpad, query);
4927       break;
4928   }
4929   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4930       GST_QUERY_TYPE_NAME (query), res);
4931   return res;
4932 }
4933
4934 static GstStateChangeReturn
4935 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4936 {
4937   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4938   GstBaseSink *basesink = GST_BASE_SINK (element);
4939   GstBaseSinkClass *bclass;
4940   GstBaseSinkPrivate *priv;
4941
4942   priv = basesink->priv;
4943
4944   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4945
4946   switch (transition) {
4947     case GST_STATE_CHANGE_NULL_TO_READY:
4948       if (bclass->start)
4949         if (!bclass->start (basesink))
4950           goto start_failed;
4951       break;
4952     case GST_STATE_CHANGE_READY_TO_PAUSED:
4953       /* need to complete preroll before this state change completes, there
4954        * is no data flow in READY so we can safely assume we need to preroll. */
4955       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4956       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4957       basesink->have_newsegment = FALSE;
4958       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4959       gst_segment_init (basesink->abidata.ABI.clip_segment,
4960           GST_FORMAT_UNDEFINED);
4961       basesink->offset = 0;
4962       basesink->have_preroll = FALSE;
4963       priv->step_unlock = FALSE;
4964       basesink->need_preroll = TRUE;
4965       basesink->playing_async = TRUE;
4966       priv->current_sstart = GST_CLOCK_TIME_NONE;
4967       priv->current_sstop = GST_CLOCK_TIME_NONE;
4968       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4969       priv->latency = 0;
4970       basesink->eos = FALSE;
4971       priv->received_eos = FALSE;
4972       gst_base_sink_reset_qos (basesink);
4973       priv->commited = FALSE;
4974       priv->call_preroll = TRUE;
4975       priv->current_step.valid = FALSE;
4976       priv->pending_step.valid = FALSE;
4977       if (priv->async_enabled) {
4978         GST_DEBUG_OBJECT (basesink, "doing async state change");
4979         /* when async enabled, post async-start message and return ASYNC from
4980          * the state change function */
4981         ret = GST_STATE_CHANGE_ASYNC;
4982         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4983             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4984       } else {
4985         priv->have_latency = TRUE;
4986       }
4987       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
4988       break;
4989     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4990       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
4991       if (!gst_base_sink_needs_preroll (basesink)) {
4992         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4993         /* no preroll needed anymore now. */
4994         basesink->playing_async = FALSE;
4995         basesink->need_preroll = FALSE;
4996         if (basesink->eos) {
4997           GstMessage *message;
4998
4999           /* need to post EOS message here */
5000           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
5001           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
5002           gst_message_set_seqnum (message, basesink->priv->seqnum);
5003           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
5004         } else {
5005           GST_DEBUG_OBJECT (basesink, "signal preroll");
5006           GST_PAD_PREROLL_SIGNAL (basesink->sinkpad);
5007         }
5008       } else {
5009         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
5010         basesink->need_preroll = TRUE;
5011         basesink->playing_async = TRUE;
5012         priv->call_preroll = TRUE;
5013         priv->commited = FALSE;
5014         if (priv->async_enabled) {
5015           GST_DEBUG_OBJECT (basesink, "doing async state change");
5016           ret = GST_STATE_CHANGE_ASYNC;
5017           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5018               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
5019         }
5020       }
5021       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5022       break;
5023     default:
5024       break;
5025   }
5026
5027   {
5028     GstStateChangeReturn bret;
5029
5030     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5031     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5032       goto activate_failed;
5033   }
5034
5035   switch (transition) {
5036     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5037       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5038       /* FIXME, make sure we cannot enter _render first */
5039
5040       /* we need to call ::unlock before locking PREROLL_LOCK
5041        * since we lock it before going into ::render */
5042       if (bclass->unlock)
5043         bclass->unlock (basesink);
5044
5045       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5046       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5047       /* now that we have the PREROLL lock, clear our unlock request */
5048       if (bclass->unlock_stop)
5049         bclass->unlock_stop (basesink);
5050
5051       /* we need preroll again and we set the flag before unlocking the clockid
5052        * because if the clockid is unlocked before a current buffer expired, we
5053        * can use that buffer to preroll with */
5054       basesink->need_preroll = TRUE;
5055
5056       if (basesink->clock_id) {
5057         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5058         gst_clock_id_unschedule (basesink->clock_id);
5059       }
5060
5061       /* if we don't have a preroll buffer we need to wait for a preroll and
5062        * return ASYNC. */
5063       if (!gst_base_sink_needs_preroll (basesink)) {
5064         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5065         basesink->playing_async = FALSE;
5066       } else {
5067         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5068           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5069           ret = GST_STATE_CHANGE_SUCCESS;
5070         } else {
5071           GST_DEBUG_OBJECT (basesink,
5072               "PLAYING to PAUSED, we are not prerolled");
5073           basesink->playing_async = TRUE;
5074           priv->commited = FALSE;
5075           priv->call_preroll = TRUE;
5076           if (priv->async_enabled) {
5077             GST_DEBUG_OBJECT (basesink, "doing async state change");
5078             ret = GST_STATE_CHANGE_ASYNC;
5079             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5080                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5081                     FALSE));
5082           }
5083         }
5084       }
5085       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5086           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5087
5088       gst_base_sink_reset_qos (basesink);
5089       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5090       break;
5091     case GST_STATE_CHANGE_PAUSED_TO_READY:
5092       GST_PAD_PREROLL_LOCK (basesink->sinkpad);
5093       /* start by reseting our position state with the object lock so that the
5094        * position query gets the right idea. We do this before we post the
5095        * messages so that the message handlers pick this up. */
5096       GST_OBJECT_LOCK (basesink);
5097       basesink->have_newsegment = FALSE;
5098       priv->current_sstart = GST_CLOCK_TIME_NONE;
5099       priv->current_sstop = GST_CLOCK_TIME_NONE;
5100       priv->have_latency = FALSE;
5101       if (priv->cached_clock_id) {
5102         gst_clock_id_unref (priv->cached_clock_id);
5103         priv->cached_clock_id = NULL;
5104       }
5105       GST_OBJECT_UNLOCK (basesink);
5106
5107       gst_base_sink_set_last_buffer (basesink, NULL);
5108       priv->call_preroll = FALSE;
5109
5110       if (!priv->commited) {
5111         if (priv->async_enabled) {
5112           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5113
5114           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5115               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5116                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5117
5118           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5119               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5120         }
5121         priv->commited = TRUE;
5122       } else {
5123         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5124       }
5125       GST_PAD_PREROLL_UNLOCK (basesink->sinkpad);
5126       break;
5127     case GST_STATE_CHANGE_READY_TO_NULL:
5128       if (bclass->stop) {
5129         if (!bclass->stop (basesink)) {
5130           GST_WARNING_OBJECT (basesink, "failed to stop");
5131         }
5132       }
5133       gst_base_sink_set_last_buffer (basesink, NULL);
5134       priv->call_preroll = FALSE;
5135       break;
5136     default:
5137       break;
5138   }
5139
5140   return ret;
5141
5142   /* ERRORS */
5143 start_failed:
5144   {
5145     GST_DEBUG_OBJECT (basesink, "failed to start");
5146     return GST_STATE_CHANGE_FAILURE;
5147   }
5148 activate_failed:
5149   {
5150     GST_DEBUG_OBJECT (basesink,
5151         "element failed to change states -- activation problem?");
5152     return GST_STATE_CHANGE_FAILURE;
5153   }
5154 }