base{sink,src}: Don't try to fixate ANY caps
[platform/upstream/gstreamer.git] / libs / gst / base / gstbasesink.c
1 /* GStreamer
2  * Copyright (C) 2005-2007 Wim Taymans <wim.taymans@gmail.com>
3  *
4  * gstbasesink.c: Base class for sink elements
5  *
6  * This library is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU Library General Public
8  * License as published by the Free Software Foundation; either
9  * version 2 of the License, or (at your option) any later version.
10  *
11  * This library is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * Library General Public License for more details.
15  *
16  * You should have received a copy of the GNU Library General Public
17  * License along with this library; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 02111-1307, USA.
20  */
21
22 /**
23  * SECTION:gstbasesink
24  * @short_description: Base class for sink elements
25  * @see_also: #GstBaseTransform, #GstBaseSrc
26  *
27  * #GstBaseSink is the base class for sink elements in GStreamer, such as
28  * xvimagesink or filesink. It is a layer on top of #GstElement that provides a
29  * simplified interface to plugin writers. #GstBaseSink handles many details
30  * for you, for example: preroll, clock synchronization, state changes,
31  * activation in push or pull mode, and queries.
32  *
33  * In most cases, when writing sink elements, there is no need to implement
34  * class methods from #GstElement or to set functions on pads, because the
35  * #GstBaseSink infrastructure should be sufficient.
36  *
37  * #GstBaseSink provides support for exactly one sink pad, which should be
38  * named "sink". A sink implementation (subclass of #GstBaseSink) should
39  * install a pad template in its class_init function, like so:
40  * |[
41  * static void
42  * my_element_class_init (GstMyElementClass *klass)
43  * {
44  *   GstElementClass *gstelement_class = GST_ELEMENT_CLASS (klass);
45  *
46  *   // sinktemplate should be a #GstStaticPadTemplate with direction
47  *   // #GST_PAD_SINK and name "sink"
48  *   gst_element_class_add_pad_template (gstelement_class,
49  *       gst_static_pad_template_get (&amp;sinktemplate));
50  *   // see #GstElementDetails
51  *   gst_element_class_set_details (gstelement_class, &amp;details);
52  * }
53  * ]|
54  *
55  * #GstBaseSink will handle the prerolling correctly. This means that it will
56  * return #GST_STATE_CHANGE_ASYNC from a state change to PAUSED until the first
57  * buffer arrives in this element. The base class will call the
58  * #GstBaseSinkClass.preroll() vmethod with this preroll buffer and will then
59  * commit the state change to the next asynchronously pending state.
60  *
61  * When the element is set to PLAYING, #GstBaseSink will synchronise on the
62  * clock using the times returned from #GstBaseSinkClass.get_times(). If this
63  * function returns #GST_CLOCK_TIME_NONE for the start time, no synchronisation
64  * will be done. Synchronisation can be disabled entirely by setting the object
65  * #GstBaseSink:sync property to %FALSE.
66  *
67  * After synchronisation the virtual method #GstBaseSinkClass.render() will be
68  * called. Subclasses should minimally implement this method.
69  *
70  * Since 0.10.3 subclasses that synchronise on the clock in the
71  * #GstBaseSinkClass.render() method are supported as well. These classes
72  * typically receive a buffer in the render method and can then potentially
73  * block on the clock while rendering. A typical example is an audiosink.
74  * Since 0.10.11 these subclasses can use gst_base_sink_wait_preroll() to
75  * perform the blocking wait.
76  *
77  * Upon receiving the EOS event in the PLAYING state, #GstBaseSink will wait
78  * for the clock to reach the time indicated by the stop time of the last
79  * #GstBaseSinkClass.get_times() call before posting an EOS message. When the
80  * element receives EOS in PAUSED, preroll completes, the event is queued and an
81  * EOS message is posted when going to PLAYING.
82  *
83  * #GstBaseSink will internally use the #GST_EVENT_NEWSEGMENT events to schedule
84  * synchronisation and clipping of buffers. Buffers that fall completely outside
85  * of the current segment are dropped. Buffers that fall partially in the
86  * segment are rendered (and prerolled). Subclasses should do any subbuffer
87  * clipping themselves when needed.
88  *
89  * #GstBaseSink will by default report the current playback position in
90  * #GST_FORMAT_TIME based on the current clock time and segment information.
91  * If no clock has been set on the element, the query will be forwarded
92  * upstream.
93  *
94  * The #GstBaseSinkClass.set_caps() function will be called when the subclass
95  * should configure itself to process a specific media type.
96  *
97  * The #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() virtual methods
98  * will be called when resources should be allocated. Any 
99  * #GstBaseSinkClass.preroll(), #GstBaseSinkClass.render() and
100  * #GstBaseSinkClass.set_caps() function will be called between the
101  * #GstBaseSinkClass.start() and #GstBaseSinkClass.stop() calls.
102  *
103  * The #GstBaseSinkClass.event() virtual method will be called when an event is
104  * received by #GstBaseSink. Normally this method should only be overriden by
105  * very specific elements (such as file sinks) which need to handle the
106  * newsegment event specially.
107  *
108  * #GstBaseSink provides an overridable #GstBaseSinkClass.buffer_alloc()
109  * function that can be used by sinks that want to do reverse negotiation or to
110  * provide custom buffers (hardware buffers for example) to upstream elements.
111  *
112  * The #GstBaseSinkClass.unlock() method is called when the elements should
113  * unblock any blocking operations they perform in the
114  * #GstBaseSinkClass.render() method. This is mostly useful when the
115  * #GstBaseSinkClass.render() method performs a blocking write on a file
116  * descriptor, for example.
117  *
118  * The #GstBaseSink:max-lateness property affects how the sink deals with
119  * buffers that arrive too late in the sink. A buffer arrives too late in the
120  * sink when the presentation time (as a combination of the last segment, buffer
121  * timestamp and element base_time) plus the duration is before the current
122  * time of the clock.
123  * If the frame is later than max-lateness, the sink will drop the buffer
124  * without calling the render method.
125  * This feature is disabled if sync is disabled, the
126  * #GstBaseSinkClass.get_times() method does not return a valid start time or
127  * max-lateness is set to -1 (the default).
128  * Subclasses can use gst_base_sink_set_max_lateness() to configure the
129  * max-lateness value.
130  *
131  * The #GstBaseSink:qos property will enable the quality-of-service features of
132  * the basesink which gather statistics about the real-time performance of the
133  * clock synchronisation. For each buffer received in the sink, statistics are
134  * gathered and a QOS event is sent upstream with these numbers. This
135  * information can then be used by upstream elements to reduce their processing
136  * rate, for example.
137  *
138  * Since 0.10.15 the #GstBaseSink:async property can be used to instruct the
139  * sink to never perform an ASYNC state change. This feature is mostly usable
140  * when dealing with non-synchronized streams or sparse streams.
141  *
142  * Last reviewed on 2007-08-29 (0.10.15)
143  */
144
145 #ifdef HAVE_CONFIG_H
146 #  include "config.h"
147 #endif
148
149 #include <gst/gst_private.h>
150
151 #include "gstbasesink.h"
152 #include <gst/gstmarshal.h>
153 #include <gst/gst-i18n-lib.h>
154
155 GST_DEBUG_CATEGORY_STATIC (gst_base_sink_debug);
156 #define GST_CAT_DEFAULT gst_base_sink_debug
157
158 #define GST_BASE_SINK_GET_PRIVATE(obj)  \
159    (G_TYPE_INSTANCE_GET_PRIVATE ((obj), GST_TYPE_BASE_SINK, GstBaseSinkPrivate))
160
161 #define GST_FLOW_STEP GST_FLOW_CUSTOM_ERROR
162
163 typedef struct
164 {
165   gboolean valid;               /* if this info is valid */
166   guint32 seqnum;               /* the seqnum of the STEP event */
167   GstFormat format;             /* the format of the amount */
168   guint64 amount;               /* the total amount of data to skip */
169   guint64 position;             /* the position in the stepped data */
170   guint64 duration;             /* the duration in time of the skipped data */
171   guint64 start;                /* running_time of the start */
172   gdouble rate;                 /* rate of skipping */
173   gdouble start_rate;           /* rate before skipping */
174   guint64 start_start;          /* start position skipping */
175   guint64 start_stop;           /* stop position skipping */
176   gboolean flush;               /* if this was a flushing step */
177   gboolean intermediate;        /* if this is an intermediate step */
178   gboolean need_preroll;        /* if we need preroll after this step */
179 } GstStepInfo;
180
181 /* FIXME, some stuff in ABI.data and other in Private...
182  * Make up your mind please.
183  */
184 struct _GstBaseSinkPrivate
185 {
186   gint qos_enabled;             /* ATOMIC */
187   gboolean async_enabled;
188   GstClockTimeDiff ts_offset;
189   GstClockTime render_delay;
190
191   /* start, stop of current buffer, stream time, used to report position */
192   GstClockTime current_sstart;
193   GstClockTime current_sstop;
194
195   /* start, stop and jitter of current buffer, running time */
196   GstClockTime current_rstart;
197   GstClockTime current_rstop;
198   GstClockTimeDiff current_jitter;
199   /* the running time of the previous buffer */
200   GstClockTime prev_rstart;
201
202   /* EOS sync time in running time */
203   GstClockTime eos_rtime;
204
205   /* last buffer that arrived in time, running time */
206   GstClockTime last_render_time;
207   /* when the last buffer left the sink, running time */
208   GstClockTime last_left;
209
210   /* running averages go here these are done on running time */
211   GstClockTime avg_pt;
212   GstClockTime avg_duration;
213   gdouble avg_rate;
214   GstClockTime avg_in_diff;
215
216   /* these are done on system time. avg_jitter and avg_render are
217    * compared to eachother to see if the rendering time takes a
218    * huge amount of the processing, If so we are flooded with
219    * buffers. */
220   GstClockTime last_left_systime;
221   GstClockTime avg_jitter;
222   GstClockTime start, stop;
223   GstClockTime avg_render;
224
225   /* number of rendered and dropped frames */
226   guint64 rendered;
227   guint64 dropped;
228
229   /* latency stuff */
230   GstClockTime latency;
231
232   /* if we already commited the state */
233   gboolean commited;
234
235   /* when we received EOS */
236   gboolean received_eos;
237
238   /* when we are prerolled and able to report latency */
239   gboolean have_latency;
240
241   /* the last buffer we prerolled or rendered. Useful for making snapshots */
242   gint enable_last_buffer;      /* atomic */
243   GstBuffer *last_buffer;
244
245   /* caps for pull based scheduling */
246   GstCaps *pull_caps;
247
248   /* blocksize for pulling */
249   guint blocksize;
250
251   gboolean discont;
252
253   /* seqnum of the stream */
254   guint32 seqnum;
255
256   gboolean call_preroll;
257   gboolean step_unlock;
258
259   /* we have a pending and a current step operation */
260   GstStepInfo current_step;
261   GstStepInfo pending_step;
262
263   /* Cached GstClockID */
264   GstClockID cached_clock_id;
265
266   /* for throttling and QoS */
267   GstClockTime earliest_in_time;
268   GstClockTime throttle_time;
269 };
270
271 #define DO_RUNNING_AVG(avg,val,size) (((val) + ((size)-1) * (avg)) / (size))
272
273 /* generic running average, this has a neutral window size */
274 #define UPDATE_RUNNING_AVG(avg,val)   DO_RUNNING_AVG(avg,val,8)
275
276 /* the windows for these running averages are experimentally obtained.
277  * possitive values get averaged more while negative values use a small
278  * window so we can react faster to badness. */
279 #define UPDATE_RUNNING_AVG_P(avg,val) DO_RUNNING_AVG(avg,val,16)
280 #define UPDATE_RUNNING_AVG_N(avg,val) DO_RUNNING_AVG(avg,val,4)
281
282 enum
283 {
284   _PR_IS_NOTHING = 1 << 0,
285   _PR_IS_BUFFER = 1 << 1,
286   _PR_IS_BUFFERLIST = 1 << 2,
287   _PR_IS_EVENT = 1 << 3
288 } PrivateObjectType;
289
290 #define OBJ_IS_BUFFER(a) ((a) & _PR_IS_BUFFER)
291 #define OBJ_IS_BUFFERLIST(a) ((a) & _PR_IS_BUFFERLIST)
292 #define OBJ_IS_EVENT(a) ((a) & _PR_IS_EVENT)
293 #define OBJ_IS_BUFFERFULL(a) ((a) & (_PR_IS_BUFFER | _PR_IS_BUFFERLIST))
294
295 /* BaseSink properties */
296
297 #define DEFAULT_CAN_ACTIVATE_PULL FALSE /* fixme: enable me */
298 #define DEFAULT_CAN_ACTIVATE_PUSH TRUE
299
300 #define DEFAULT_PREROLL_QUEUE_LEN   0
301 #define DEFAULT_SYNC                TRUE
302 #define DEFAULT_MAX_LATENESS        -1
303 #define DEFAULT_QOS                 FALSE
304 #define DEFAULT_ASYNC               TRUE
305 #define DEFAULT_TS_OFFSET           0
306 #define DEFAULT_BLOCKSIZE           4096
307 #define DEFAULT_RENDER_DELAY        0
308 #define DEFAULT_ENABLE_LAST_BUFFER  TRUE
309 #define DEFAULT_THROTTLE_TIME       0
310
311 enum
312 {
313   PROP_0,
314   PROP_PREROLL_QUEUE_LEN,
315   PROP_SYNC,
316   PROP_MAX_LATENESS,
317   PROP_QOS,
318   PROP_ASYNC,
319   PROP_TS_OFFSET,
320   PROP_ENABLE_LAST_BUFFER,
321   PROP_LAST_BUFFER,
322   PROP_BLOCKSIZE,
323   PROP_RENDER_DELAY,
324   PROP_THROTTLE_TIME,
325   PROP_LAST
326 };
327
328 static GstElementClass *parent_class = NULL;
329
330 static void gst_base_sink_class_init (GstBaseSinkClass * klass);
331 static void gst_base_sink_init (GstBaseSink * trans, gpointer g_class);
332 static void gst_base_sink_finalize (GObject * object);
333
334 GType
335 gst_base_sink_get_type (void)
336 {
337   static volatile gsize base_sink_type = 0;
338
339   if (g_once_init_enter (&base_sink_type)) {
340     GType _type;
341     static const GTypeInfo base_sink_info = {
342       sizeof (GstBaseSinkClass),
343       NULL,
344       NULL,
345       (GClassInitFunc) gst_base_sink_class_init,
346       NULL,
347       NULL,
348       sizeof (GstBaseSink),
349       0,
350       (GInstanceInitFunc) gst_base_sink_init,
351     };
352
353     _type = g_type_register_static (GST_TYPE_ELEMENT,
354         "GstBaseSink", &base_sink_info, G_TYPE_FLAG_ABSTRACT);
355     g_once_init_leave (&base_sink_type, _type);
356   }
357   return base_sink_type;
358 }
359
360 static void gst_base_sink_set_property (GObject * object, guint prop_id,
361     const GValue * value, GParamSpec * pspec);
362 static void gst_base_sink_get_property (GObject * object, guint prop_id,
363     GValue * value, GParamSpec * pspec);
364
365 static gboolean gst_base_sink_send_event (GstElement * element,
366     GstEvent * event);
367 static gboolean gst_base_sink_query (GstElement * element, GstQuery * query);
368 static const GstQueryType *gst_base_sink_get_query_types (GstElement * element);
369
370 static GstCaps *gst_base_sink_get_caps (GstBaseSink * sink);
371 static gboolean gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps);
372 static GstFlowReturn gst_base_sink_buffer_alloc (GstBaseSink * sink,
373     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
374 static void gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
375     GstClockTime * start, GstClockTime * end);
376 static gboolean gst_base_sink_set_flushing (GstBaseSink * basesink,
377     GstPad * pad, gboolean flushing);
378 static gboolean gst_base_sink_default_activate_pull (GstBaseSink * basesink,
379     gboolean active);
380 static gboolean gst_base_sink_default_do_seek (GstBaseSink * sink,
381     GstSegment * segment);
382 static gboolean gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
383     GstEvent * event, GstSegment * segment);
384
385 static GstStateChangeReturn gst_base_sink_change_state (GstElement * element,
386     GstStateChange transition);
387
388 static GstFlowReturn gst_base_sink_chain (GstPad * pad, GstBuffer * buffer);
389 static GstFlowReturn gst_base_sink_chain_list (GstPad * pad,
390     GstBufferList * list);
391
392 static void gst_base_sink_loop (GstPad * pad);
393 static gboolean gst_base_sink_pad_activate (GstPad * pad);
394 static gboolean gst_base_sink_pad_activate_push (GstPad * pad, gboolean active);
395 static gboolean gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active);
396 static gboolean gst_base_sink_event (GstPad * pad, GstEvent * event);
397
398 static gboolean gst_base_sink_negotiate_pull (GstBaseSink * basesink);
399 static GstCaps *gst_base_sink_pad_getcaps (GstPad * pad);
400 static gboolean gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps);
401 static void gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps);
402 static GstFlowReturn gst_base_sink_pad_buffer_alloc (GstPad * pad,
403     guint64 offset, guint size, GstCaps * caps, GstBuffer ** buf);
404
405
406 /* check if an object was too late */
407 static gboolean gst_base_sink_is_too_late (GstBaseSink * basesink,
408     GstMiniObject * obj, GstClockTime rstart, GstClockTime rstop,
409     GstClockReturn status, GstClockTimeDiff jitter);
410 static GstFlowReturn gst_base_sink_preroll_object (GstBaseSink * basesink,
411     guint8 obj_type, GstMiniObject * obj);
412
413 static void
414 gst_base_sink_class_init (GstBaseSinkClass * klass)
415 {
416   GObjectClass *gobject_class;
417   GstElementClass *gstelement_class;
418
419   gobject_class = G_OBJECT_CLASS (klass);
420   gstelement_class = GST_ELEMENT_CLASS (klass);
421
422   GST_DEBUG_CATEGORY_INIT (gst_base_sink_debug, "basesink", 0,
423       "basesink element");
424
425   g_type_class_add_private (klass, sizeof (GstBaseSinkPrivate));
426
427   parent_class = g_type_class_peek_parent (klass);
428
429   gobject_class->finalize = gst_base_sink_finalize;
430   gobject_class->set_property = gst_base_sink_set_property;
431   gobject_class->get_property = gst_base_sink_get_property;
432
433   /* FIXME, this next value should be configured using an event from the
434    * upstream element, ie, the BUFFER_SIZE event. */
435   g_object_class_install_property (gobject_class, PROP_PREROLL_QUEUE_LEN,
436       g_param_spec_uint ("preroll-queue-len", "Preroll queue length",
437           "Number of buffers to queue during preroll", 0, G_MAXUINT,
438           DEFAULT_PREROLL_QUEUE_LEN,
439           G_PARAM_READWRITE | G_PARAM_CONSTRUCT | G_PARAM_STATIC_STRINGS));
440
441   g_object_class_install_property (gobject_class, PROP_SYNC,
442       g_param_spec_boolean ("sync", "Sync", "Sync on the clock", DEFAULT_SYNC,
443           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
444
445   g_object_class_install_property (gobject_class, PROP_MAX_LATENESS,
446       g_param_spec_int64 ("max-lateness", "Max Lateness",
447           "Maximum number of nanoseconds that a buffer can be late before it "
448           "is dropped (-1 unlimited)", -1, G_MAXINT64, DEFAULT_MAX_LATENESS,
449           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
450
451   g_object_class_install_property (gobject_class, PROP_QOS,
452       g_param_spec_boolean ("qos", "Qos",
453           "Generate Quality-of-Service events upstream", DEFAULT_QOS,
454           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
455   /**
456    * GstBaseSink:async
457    *
458    * If set to #TRUE, the basesink will perform asynchronous state changes.
459    * When set to #FALSE, the sink will not signal the parent when it prerolls.
460    * Use this option when dealing with sparse streams or when synchronisation is
461    * not required.
462    *
463    * Since: 0.10.15
464    */
465   g_object_class_install_property (gobject_class, PROP_ASYNC,
466       g_param_spec_boolean ("async", "Async",
467           "Go asynchronously to PAUSED", DEFAULT_ASYNC,
468           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
469   /**
470    * GstBaseSink:ts-offset
471    *
472    * Controls the final synchronisation, a negative value will render the buffer
473    * earlier while a positive value delays playback. This property can be
474    * used to fix synchronisation in bad files.
475    *
476    * Since: 0.10.15
477    */
478   g_object_class_install_property (gobject_class, PROP_TS_OFFSET,
479       g_param_spec_int64 ("ts-offset", "TS Offset",
480           "Timestamp offset in nanoseconds", G_MININT64, G_MAXINT64,
481           DEFAULT_TS_OFFSET, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
482
483   /**
484    * GstBaseSink:enable-last-buffer
485    *
486    * Enable the last-buffer property. If FALSE, basesink doesn't keep a
487    * reference to the last buffer arrived and the last-buffer property is always
488    * set to NULL. This can be useful if you need buffers to be released as soon
489    * as possible, eg. if you're using a buffer pool.
490    *
491    * Since: 0.10.30
492    */
493   g_object_class_install_property (gobject_class, PROP_ENABLE_LAST_BUFFER,
494       g_param_spec_boolean ("enable-last-buffer", "Enable Last Buffer",
495           "Enable the last-buffer property", DEFAULT_ENABLE_LAST_BUFFER,
496           G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
497
498   /**
499    * GstBaseSink:last-buffer
500    *
501    * The last buffer that arrived in the sink and was used for preroll or for
502    * rendering. This property can be used to generate thumbnails. This property
503    * can be NULL when the sink has not yet received a bufer.
504    *
505    * Since: 0.10.15
506    */
507   g_object_class_install_property (gobject_class, PROP_LAST_BUFFER,
508       g_param_spec_boxed ("last-buffer", "Last Buffer",
509           "The last buffer received in the sink", GST_TYPE_BUFFER,
510           G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
511   /**
512    * GstBaseSink:blocksize
513    *
514    * The amount of bytes to pull when operating in pull mode.
515    *
516    * Since: 0.10.22
517    */
518   /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
519   g_object_class_install_property (gobject_class, PROP_BLOCKSIZE,
520       g_param_spec_uint ("blocksize", "Block size",
521           "Size in bytes to pull per buffer (0 = default)", 0, G_MAXUINT,
522           DEFAULT_BLOCKSIZE, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
523   /**
524    * GstBaseSink:render-delay
525    *
526    * The additional delay between synchronisation and actual rendering of the
527    * media. This property will add additional latency to the device in order to
528    * make other sinks compensate for the delay.
529    *
530    * Since: 0.10.22
531    */
532   g_object_class_install_property (gobject_class, PROP_RENDER_DELAY,
533       g_param_spec_uint64 ("render-delay", "Render Delay",
534           "Additional render delay of the sink in nanoseconds", 0, G_MAXUINT64,
535           DEFAULT_RENDER_DELAY, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
536   /**
537    * GstBaseSink:throttle-time
538    *
539    * The time to insert between buffers. This property can be used to control
540    * the maximum amount of buffers per second to render. Setting this property
541    * to a value bigger than 0 will make the sink create THROTTLE QoS events.
542    *
543    * Since: 0.10.33
544    */
545   g_object_class_install_property (gobject_class, PROP_THROTTLE_TIME,
546       g_param_spec_uint64 ("throttle-time", "Throttle time",
547           "The time to keep between rendered buffers (unused)", 0, G_MAXUINT64,
548           DEFAULT_THROTTLE_TIME, G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
549
550   gstelement_class->change_state =
551       GST_DEBUG_FUNCPTR (gst_base_sink_change_state);
552   gstelement_class->send_event = GST_DEBUG_FUNCPTR (gst_base_sink_send_event);
553   gstelement_class->query = GST_DEBUG_FUNCPTR (gst_base_sink_query);
554   gstelement_class->get_query_types =
555       GST_DEBUG_FUNCPTR (gst_base_sink_get_query_types);
556
557   klass->get_caps = GST_DEBUG_FUNCPTR (gst_base_sink_get_caps);
558   klass->set_caps = GST_DEBUG_FUNCPTR (gst_base_sink_set_caps);
559   klass->buffer_alloc = GST_DEBUG_FUNCPTR (gst_base_sink_buffer_alloc);
560   klass->get_times = GST_DEBUG_FUNCPTR (gst_base_sink_get_times);
561   klass->activate_pull =
562       GST_DEBUG_FUNCPTR (gst_base_sink_default_activate_pull);
563
564   /* Registering debug symbols for function pointers */
565   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_getcaps);
566   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_setcaps);
567   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_fixate);
568   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_buffer_alloc);
569   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate);
570   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_push);
571   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_pad_activate_pull);
572   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_event);
573   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain);
574   GST_DEBUG_REGISTER_FUNCPTR (gst_base_sink_chain_list);
575 }
576
577 static GstCaps *
578 gst_base_sink_pad_getcaps (GstPad * pad)
579 {
580   GstBaseSinkClass *bclass;
581   GstBaseSink *bsink;
582   GstCaps *caps = NULL;
583
584   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
585   bclass = GST_BASE_SINK_GET_CLASS (bsink);
586
587   if (bsink->pad_mode == GST_ACTIVATE_PULL) {
588     /* if we are operating in pull mode we only accept the negotiated caps */
589     GST_OBJECT_LOCK (pad);
590     if ((caps = GST_PAD_CAPS (pad)))
591       gst_caps_ref (caps);
592     GST_OBJECT_UNLOCK (pad);
593   }
594   if (caps == NULL) {
595     if (bclass->get_caps)
596       caps = bclass->get_caps (bsink);
597
598     if (caps == NULL) {
599       GstPadTemplate *pad_template;
600
601       pad_template =
602           gst_element_class_get_pad_template (GST_ELEMENT_CLASS (bclass),
603           "sink");
604       if (pad_template != NULL) {
605         caps = gst_caps_ref (gst_pad_template_get_caps (pad_template));
606       }
607     }
608   }
609   gst_object_unref (bsink);
610
611   return caps;
612 }
613
614 static gboolean
615 gst_base_sink_pad_setcaps (GstPad * pad, GstCaps * caps)
616 {
617   GstBaseSinkClass *bclass;
618   GstBaseSink *bsink;
619   gboolean res = TRUE;
620
621   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
622   bclass = GST_BASE_SINK_GET_CLASS (bsink);
623
624   if (res && bclass->set_caps)
625     res = bclass->set_caps (bsink, caps);
626
627   gst_object_unref (bsink);
628
629   return res;
630 }
631
632 static void
633 gst_base_sink_pad_fixate (GstPad * pad, GstCaps * caps)
634 {
635   GstBaseSinkClass *bclass;
636   GstBaseSink *bsink;
637
638   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
639   bclass = GST_BASE_SINK_GET_CLASS (bsink);
640
641   if (bclass->fixate)
642     bclass->fixate (bsink, caps);
643
644   gst_object_unref (bsink);
645 }
646
647 static GstFlowReturn
648 gst_base_sink_pad_buffer_alloc (GstPad * pad, guint64 offset, guint size,
649     GstCaps * caps, GstBuffer ** buf)
650 {
651   GstBaseSinkClass *bclass;
652   GstBaseSink *bsink;
653   GstFlowReturn result = GST_FLOW_OK;
654
655   bsink = GST_BASE_SINK (gst_pad_get_parent (pad));
656   if (G_UNLIKELY (bsink == NULL))
657     return GST_FLOW_WRONG_STATE;
658   bclass = GST_BASE_SINK_GET_CLASS (bsink);
659
660   if (bclass->buffer_alloc)
661     result = bclass->buffer_alloc (bsink, offset, size, caps, buf);
662   else
663     *buf = NULL;                /* fallback in gstpad.c will allocate generic buffer */
664
665   gst_object_unref (bsink);
666
667   return result;
668 }
669
670 static void
671 gst_base_sink_init (GstBaseSink * basesink, gpointer g_class)
672 {
673   GstPadTemplate *pad_template;
674   GstBaseSinkPrivate *priv;
675
676   basesink->priv = priv = GST_BASE_SINK_GET_PRIVATE (basesink);
677
678   pad_template =
679       gst_element_class_get_pad_template (GST_ELEMENT_CLASS (g_class), "sink");
680   g_return_if_fail (pad_template != NULL);
681
682   basesink->sinkpad = gst_pad_new_from_template (pad_template, "sink");
683
684   gst_pad_set_getcaps_function (basesink->sinkpad, gst_base_sink_pad_getcaps);
685   gst_pad_set_setcaps_function (basesink->sinkpad, gst_base_sink_pad_setcaps);
686   gst_pad_set_fixatecaps_function (basesink->sinkpad, gst_base_sink_pad_fixate);
687   gst_pad_set_bufferalloc_function (basesink->sinkpad,
688       gst_base_sink_pad_buffer_alloc);
689   gst_pad_set_activate_function (basesink->sinkpad, gst_base_sink_pad_activate);
690   gst_pad_set_activatepush_function (basesink->sinkpad,
691       gst_base_sink_pad_activate_push);
692   gst_pad_set_activatepull_function (basesink->sinkpad,
693       gst_base_sink_pad_activate_pull);
694   gst_pad_set_event_function (basesink->sinkpad, gst_base_sink_event);
695   gst_pad_set_chain_function (basesink->sinkpad, gst_base_sink_chain);
696   gst_pad_set_chain_list_function (basesink->sinkpad, gst_base_sink_chain_list);
697   gst_element_add_pad (GST_ELEMENT_CAST (basesink), basesink->sinkpad);
698
699   basesink->pad_mode = GST_ACTIVATE_NONE;
700   basesink->preroll_lock = g_mutex_new ();
701   basesink->preroll_cond = g_cond_new ();
702   basesink->preroll_queue = g_queue_new ();
703   basesink->clip_segment = gst_segment_new ();
704   priv->have_latency = FALSE;
705
706   basesink->can_activate_push = DEFAULT_CAN_ACTIVATE_PUSH;
707   basesink->can_activate_pull = DEFAULT_CAN_ACTIVATE_PULL;
708
709   basesink->sync = DEFAULT_SYNC;
710   basesink->max_lateness = DEFAULT_MAX_LATENESS;
711   g_atomic_int_set (&priv->qos_enabled, DEFAULT_QOS);
712   priv->async_enabled = DEFAULT_ASYNC;
713   priv->ts_offset = DEFAULT_TS_OFFSET;
714   priv->render_delay = DEFAULT_RENDER_DELAY;
715   priv->blocksize = DEFAULT_BLOCKSIZE;
716   priv->cached_clock_id = NULL;
717   g_atomic_int_set (&priv->enable_last_buffer, DEFAULT_ENABLE_LAST_BUFFER);
718   priv->throttle_time = DEFAULT_THROTTLE_TIME;
719
720   GST_OBJECT_FLAG_SET (basesink, GST_ELEMENT_IS_SINK);
721 }
722
723 static void
724 gst_base_sink_finalize (GObject * object)
725 {
726   GstBaseSink *basesink;
727
728   basesink = GST_BASE_SINK (object);
729
730   g_mutex_free (basesink->preroll_lock);
731   g_cond_free (basesink->preroll_cond);
732   g_queue_free (basesink->preroll_queue);
733   gst_segment_free (basesink->clip_segment);
734
735   G_OBJECT_CLASS (parent_class)->finalize (object);
736 }
737
738 /**
739  * gst_base_sink_set_sync:
740  * @sink: the sink
741  * @sync: the new sync value.
742  *
743  * Configures @sink to synchronize on the clock or not. When
744  * @sync is FALSE, incomming samples will be played as fast as
745  * possible. If @sync is TRUE, the timestamps of the incomming
746  * buffers will be used to schedule the exact render time of its
747  * contents.
748  *
749  * Since: 0.10.4
750  */
751 void
752 gst_base_sink_set_sync (GstBaseSink * sink, gboolean sync)
753 {
754   g_return_if_fail (GST_IS_BASE_SINK (sink));
755
756   GST_OBJECT_LOCK (sink);
757   sink->sync = sync;
758   GST_OBJECT_UNLOCK (sink);
759 }
760
761 /**
762  * gst_base_sink_get_sync:
763  * @sink: the sink
764  *
765  * Checks if @sink is currently configured to synchronize against the
766  * clock.
767  *
768  * Returns: TRUE if the sink is configured to synchronize against the clock.
769  *
770  * Since: 0.10.4
771  */
772 gboolean
773 gst_base_sink_get_sync (GstBaseSink * sink)
774 {
775   gboolean res;
776
777   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
778
779   GST_OBJECT_LOCK (sink);
780   res = sink->sync;
781   GST_OBJECT_UNLOCK (sink);
782
783   return res;
784 }
785
786 /**
787  * gst_base_sink_set_max_lateness:
788  * @sink: the sink
789  * @max_lateness: the new max lateness value.
790  *
791  * Sets the new max lateness value to @max_lateness. This value is
792  * used to decide if a buffer should be dropped or not based on the
793  * buffer timestamp and the current clock time. A value of -1 means
794  * an unlimited time.
795  *
796  * Since: 0.10.4
797  */
798 void
799 gst_base_sink_set_max_lateness (GstBaseSink * sink, gint64 max_lateness)
800 {
801   g_return_if_fail (GST_IS_BASE_SINK (sink));
802
803   GST_OBJECT_LOCK (sink);
804   sink->max_lateness = max_lateness;
805   GST_OBJECT_UNLOCK (sink);
806 }
807
808 /**
809  * gst_base_sink_get_max_lateness:
810  * @sink: the sink
811  *
812  * Gets the max lateness value. See gst_base_sink_set_max_lateness for
813  * more details.
814  *
815  * Returns: The maximum time in nanoseconds that a buffer can be late
816  * before it is dropped and not rendered. A value of -1 means an
817  * unlimited time.
818  *
819  * Since: 0.10.4
820  */
821 gint64
822 gst_base_sink_get_max_lateness (GstBaseSink * sink)
823 {
824   gint64 res;
825
826   g_return_val_if_fail (GST_IS_BASE_SINK (sink), -1);
827
828   GST_OBJECT_LOCK (sink);
829   res = sink->max_lateness;
830   GST_OBJECT_UNLOCK (sink);
831
832   return res;
833 }
834
835 /**
836  * gst_base_sink_set_qos_enabled:
837  * @sink: the sink
838  * @enabled: the new qos value.
839  *
840  * Configures @sink to send Quality-of-Service events upstream.
841  *
842  * Since: 0.10.5
843  */
844 void
845 gst_base_sink_set_qos_enabled (GstBaseSink * sink, gboolean enabled)
846 {
847   g_return_if_fail (GST_IS_BASE_SINK (sink));
848
849   g_atomic_int_set (&sink->priv->qos_enabled, enabled);
850 }
851
852 /**
853  * gst_base_sink_is_qos_enabled:
854  * @sink: the sink
855  *
856  * Checks if @sink is currently configured to send Quality-of-Service events
857  * upstream.
858  *
859  * Returns: TRUE if the sink is configured to perform Quality-of-Service.
860  *
861  * Since: 0.10.5
862  */
863 gboolean
864 gst_base_sink_is_qos_enabled (GstBaseSink * sink)
865 {
866   gboolean res;
867
868   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
869
870   res = g_atomic_int_get (&sink->priv->qos_enabled);
871
872   return res;
873 }
874
875 /**
876  * gst_base_sink_set_async_enabled:
877  * @sink: the sink
878  * @enabled: the new async value.
879  *
880  * Configures @sink to perform all state changes asynchronusly. When async is
881  * disabled, the sink will immediatly go to PAUSED instead of waiting for a
882  * preroll buffer. This feature is usefull if the sink does not synchronize
883  * against the clock or when it is dealing with sparse streams.
884  *
885  * Since: 0.10.15
886  */
887 void
888 gst_base_sink_set_async_enabled (GstBaseSink * sink, gboolean enabled)
889 {
890   g_return_if_fail (GST_IS_BASE_SINK (sink));
891
892   GST_BASE_SINK_PREROLL_LOCK (sink);
893   g_atomic_int_set (&sink->priv->async_enabled, enabled);
894   GST_LOG_OBJECT (sink, "set async enabled to %d", enabled);
895   GST_BASE_SINK_PREROLL_UNLOCK (sink);
896 }
897
898 /**
899  * gst_base_sink_is_async_enabled:
900  * @sink: the sink
901  *
902  * Checks if @sink is currently configured to perform asynchronous state
903  * changes to PAUSED.
904  *
905  * Returns: TRUE if the sink is configured to perform asynchronous state
906  * changes.
907  *
908  * Since: 0.10.15
909  */
910 gboolean
911 gst_base_sink_is_async_enabled (GstBaseSink * sink)
912 {
913   gboolean res;
914
915   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
916
917   res = g_atomic_int_get (&sink->priv->async_enabled);
918
919   return res;
920 }
921
922 /**
923  * gst_base_sink_set_ts_offset:
924  * @sink: the sink
925  * @offset: the new offset
926  *
927  * Adjust the synchronisation of @sink with @offset. A negative value will
928  * render buffers earlier than their timestamp. A positive value will delay
929  * rendering. This function can be used to fix playback of badly timestamped
930  * buffers.
931  *
932  * Since: 0.10.15
933  */
934 void
935 gst_base_sink_set_ts_offset (GstBaseSink * sink, GstClockTimeDiff offset)
936 {
937   g_return_if_fail (GST_IS_BASE_SINK (sink));
938
939   GST_OBJECT_LOCK (sink);
940   sink->priv->ts_offset = offset;
941   GST_LOG_OBJECT (sink, "set time offset to %" G_GINT64_FORMAT, offset);
942   GST_OBJECT_UNLOCK (sink);
943 }
944
945 /**
946  * gst_base_sink_get_ts_offset:
947  * @sink: the sink
948  *
949  * Get the synchronisation offset of @sink.
950  *
951  * Returns: The synchronisation offset.
952  *
953  * Since: 0.10.15
954  */
955 GstClockTimeDiff
956 gst_base_sink_get_ts_offset (GstBaseSink * sink)
957 {
958   GstClockTimeDiff res;
959
960   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
961
962   GST_OBJECT_LOCK (sink);
963   res = sink->priv->ts_offset;
964   GST_OBJECT_UNLOCK (sink);
965
966   return res;
967 }
968
969 /**
970  * gst_base_sink_get_last_buffer:
971  * @sink: the sink
972  *
973  * Get the last buffer that arrived in the sink and was used for preroll or for
974  * rendering. This property can be used to generate thumbnails.
975  *
976  * The #GstCaps on the buffer can be used to determine the type of the buffer.
977  *
978  * Free-function: gst_buffer_unref
979  *
980  * Returns: (transfer full): a #GstBuffer. gst_buffer_unref() after usage.
981  *     This function returns NULL when no buffer has arrived in the sink yet
982  *     or when the sink is not in PAUSED or PLAYING.
983  *
984  * Since: 0.10.15
985  */
986 GstBuffer *
987 gst_base_sink_get_last_buffer (GstBaseSink * sink)
988 {
989   GstBuffer *res;
990
991   g_return_val_if_fail (GST_IS_BASE_SINK (sink), NULL);
992
993   GST_OBJECT_LOCK (sink);
994   if ((res = sink->priv->last_buffer))
995     gst_buffer_ref (res);
996   GST_OBJECT_UNLOCK (sink);
997
998   return res;
999 }
1000
1001 /* with OBJECT_LOCK */
1002 static void
1003 gst_base_sink_set_last_buffer_unlocked (GstBaseSink * sink, GstBuffer * buffer)
1004 {
1005   GstBuffer *old;
1006
1007   old = sink->priv->last_buffer;
1008   if (G_LIKELY (old != buffer)) {
1009     GST_DEBUG_OBJECT (sink, "setting last buffer to %p", buffer);
1010     if (G_LIKELY (buffer))
1011       gst_buffer_ref (buffer);
1012     sink->priv->last_buffer = buffer;
1013   } else {
1014     old = NULL;
1015   }
1016   /* avoid unreffing with the lock because cleanup code might want to take the
1017    * lock too */
1018   if (G_LIKELY (old)) {
1019     GST_OBJECT_UNLOCK (sink);
1020     gst_buffer_unref (old);
1021     GST_OBJECT_LOCK (sink);
1022   }
1023 }
1024
1025 static void
1026 gst_base_sink_set_last_buffer (GstBaseSink * sink, GstBuffer * buffer)
1027 {
1028   if (!g_atomic_int_get (&sink->priv->enable_last_buffer))
1029     return;
1030
1031   GST_OBJECT_LOCK (sink);
1032   gst_base_sink_set_last_buffer_unlocked (sink, buffer);
1033   GST_OBJECT_UNLOCK (sink);
1034 }
1035
1036 /**
1037  * gst_base_sink_set_last_buffer_enabled:
1038  * @sink: the sink
1039  * @enabled: the new enable-last-buffer value.
1040  *
1041  * Configures @sink to store the last received buffer in the last-buffer
1042  * property.
1043  *
1044  * Since: 0.10.30
1045  */
1046 void
1047 gst_base_sink_set_last_buffer_enabled (GstBaseSink * sink, gboolean enabled)
1048 {
1049   g_return_if_fail (GST_IS_BASE_SINK (sink));
1050
1051   /* Only take lock if we change the value */
1052   if (g_atomic_int_compare_and_exchange (&sink->priv->enable_last_buffer,
1053           !enabled, enabled) && !enabled) {
1054     GST_OBJECT_LOCK (sink);
1055     gst_base_sink_set_last_buffer_unlocked (sink, NULL);
1056     GST_OBJECT_UNLOCK (sink);
1057   }
1058 }
1059
1060 /**
1061  * gst_base_sink_is_last_buffer_enabled:
1062  * @sink: the sink
1063  *
1064  * Checks if @sink is currently configured to store the last received buffer in
1065  * the last-buffer property.
1066  *
1067  * Returns: TRUE if the sink is configured to store the last received buffer.
1068  *
1069  * Since: 0.10.30
1070  */
1071 gboolean
1072 gst_base_sink_is_last_buffer_enabled (GstBaseSink * sink)
1073 {
1074   g_return_val_if_fail (GST_IS_BASE_SINK (sink), FALSE);
1075
1076   return g_atomic_int_get (&sink->priv->enable_last_buffer);
1077 }
1078
1079 /**
1080  * gst_base_sink_get_latency:
1081  * @sink: the sink
1082  *
1083  * Get the currently configured latency.
1084  *
1085  * Returns: The configured latency.
1086  *
1087  * Since: 0.10.12
1088  */
1089 GstClockTime
1090 gst_base_sink_get_latency (GstBaseSink * sink)
1091 {
1092   GstClockTime res;
1093
1094   GST_OBJECT_LOCK (sink);
1095   res = sink->priv->latency;
1096   GST_OBJECT_UNLOCK (sink);
1097
1098   return res;
1099 }
1100
1101 /**
1102  * gst_base_sink_query_latency:
1103  * @sink: the sink
1104  * @live: (out) (allow-none): if the sink is live
1105  * @upstream_live: (out) (allow-none): if an upstream element is live
1106  * @min_latency: (out) (allow-none): the min latency of the upstream elements
1107  * @max_latency: (out) (allow-none): the max latency of the upstream elements
1108  *
1109  * Query the sink for the latency parameters. The latency will be queried from
1110  * the upstream elements. @live will be TRUE if @sink is configured to
1111  * synchronize against the clock. @upstream_live will be TRUE if an upstream
1112  * element is live.
1113  *
1114  * If both @live and @upstream_live are TRUE, the sink will want to compensate
1115  * for the latency introduced by the upstream elements by setting the
1116  * @min_latency to a strictly possitive value.
1117  *
1118  * This function is mostly used by subclasses.
1119  *
1120  * Returns: TRUE if the query succeeded.
1121  *
1122  * Since: 0.10.12
1123  */
1124 gboolean
1125 gst_base_sink_query_latency (GstBaseSink * sink, gboolean * live,
1126     gboolean * upstream_live, GstClockTime * min_latency,
1127     GstClockTime * max_latency)
1128 {
1129   gboolean l, us_live, res, have_latency;
1130   GstClockTime min, max, render_delay;
1131   GstQuery *query;
1132   GstClockTime us_min, us_max;
1133
1134   /* we are live when we sync to the clock */
1135   GST_OBJECT_LOCK (sink);
1136   l = sink->sync;
1137   have_latency = sink->priv->have_latency;
1138   render_delay = sink->priv->render_delay;
1139   GST_OBJECT_UNLOCK (sink);
1140
1141   /* assume no latency */
1142   min = 0;
1143   max = -1;
1144   us_live = FALSE;
1145
1146   if (have_latency) {
1147     GST_DEBUG_OBJECT (sink, "we are ready for LATENCY query");
1148     /* we are ready for a latency query this is when we preroll or when we are
1149      * not async. */
1150     query = gst_query_new_latency ();
1151
1152     /* ask the peer for the latency */
1153     if ((res = gst_pad_peer_query (sink->sinkpad, query))) {
1154       /* get upstream min and max latency */
1155       gst_query_parse_latency (query, &us_live, &us_min, &us_max);
1156
1157       if (us_live) {
1158         /* upstream live, use its latency, subclasses should use these
1159          * values to create the complete latency. */
1160         min = us_min;
1161         max = us_max;
1162       }
1163       if (l) {
1164         /* we need to add the render delay if we are live */
1165         if (min != -1)
1166           min += render_delay;
1167         if (max != -1)
1168           max += render_delay;
1169       }
1170     }
1171     gst_query_unref (query);
1172   } else {
1173     GST_DEBUG_OBJECT (sink, "we are not yet ready for LATENCY query");
1174     res = FALSE;
1175   }
1176
1177   /* not live, we tried to do the query, if it failed we return TRUE anyway */
1178   if (!res) {
1179     if (!l) {
1180       res = TRUE;
1181       GST_DEBUG_OBJECT (sink, "latency query failed but we are not live");
1182     } else {
1183       GST_DEBUG_OBJECT (sink, "latency query failed and we are live");
1184     }
1185   }
1186
1187   if (res) {
1188     GST_DEBUG_OBJECT (sink, "latency query: live: %d, have_latency %d,"
1189         " upstream: %d, min %" GST_TIME_FORMAT ", max %" GST_TIME_FORMAT, l,
1190         have_latency, us_live, GST_TIME_ARGS (min), GST_TIME_ARGS (max));
1191
1192     if (live)
1193       *live = l;
1194     if (upstream_live)
1195       *upstream_live = us_live;
1196     if (min_latency)
1197       *min_latency = min;
1198     if (max_latency)
1199       *max_latency = max;
1200   }
1201   return res;
1202 }
1203
1204 /**
1205  * gst_base_sink_set_render_delay:
1206  * @sink: a #GstBaseSink
1207  * @delay: the new delay
1208  *
1209  * Set the render delay in @sink to @delay. The render delay is the time
1210  * between actual rendering of a buffer and its synchronisation time. Some
1211  * devices might delay media rendering which can be compensated for with this
1212  * function.
1213  *
1214  * After calling this function, this sink will report additional latency and
1215  * other sinks will adjust their latency to delay the rendering of their media.
1216  *
1217  * This function is usually called by subclasses.
1218  *
1219  * Since: 0.10.21
1220  */
1221 void
1222 gst_base_sink_set_render_delay (GstBaseSink * sink, GstClockTime delay)
1223 {
1224   GstClockTime old_render_delay;
1225
1226   g_return_if_fail (GST_IS_BASE_SINK (sink));
1227
1228   GST_OBJECT_LOCK (sink);
1229   old_render_delay = sink->priv->render_delay;
1230   sink->priv->render_delay = delay;
1231   GST_LOG_OBJECT (sink, "set render delay to %" GST_TIME_FORMAT,
1232       GST_TIME_ARGS (delay));
1233   GST_OBJECT_UNLOCK (sink);
1234
1235   if (delay != old_render_delay) {
1236     GST_DEBUG_OBJECT (sink, "posting latency changed");
1237     gst_element_post_message (GST_ELEMENT_CAST (sink),
1238         gst_message_new_latency (GST_OBJECT_CAST (sink)));
1239   }
1240 }
1241
1242 /**
1243  * gst_base_sink_get_render_delay:
1244  * @sink: a #GstBaseSink
1245  *
1246  * Get the render delay of @sink. see gst_base_sink_set_render_delay() for more
1247  * information about the render delay.
1248  *
1249  * Returns: the render delay of @sink.
1250  *
1251  * Since: 0.10.21
1252  */
1253 GstClockTime
1254 gst_base_sink_get_render_delay (GstBaseSink * sink)
1255 {
1256   GstClockTimeDiff res;
1257
1258   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1259
1260   GST_OBJECT_LOCK (sink);
1261   res = sink->priv->render_delay;
1262   GST_OBJECT_UNLOCK (sink);
1263
1264   return res;
1265 }
1266
1267 /**
1268  * gst_base_sink_set_blocksize:
1269  * @sink: a #GstBaseSink
1270  * @blocksize: the blocksize in bytes
1271  *
1272  * Set the number of bytes that the sink will pull when it is operating in pull
1273  * mode.
1274  *
1275  * Since: 0.10.22
1276  */
1277 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1278 void
1279 gst_base_sink_set_blocksize (GstBaseSink * sink, guint blocksize)
1280 {
1281   g_return_if_fail (GST_IS_BASE_SINK (sink));
1282
1283   GST_OBJECT_LOCK (sink);
1284   sink->priv->blocksize = blocksize;
1285   GST_LOG_OBJECT (sink, "set blocksize to %u", blocksize);
1286   GST_OBJECT_UNLOCK (sink);
1287 }
1288
1289 /**
1290  * gst_base_sink_get_blocksize:
1291  * @sink: a #GstBaseSink
1292  *
1293  * Get the number of bytes that the sink will pull when it is operating in pull
1294  * mode.
1295  *
1296  * Returns: the number of bytes @sink will pull in pull mode.
1297  *
1298  * Since: 0.10.22
1299  */
1300 /* FIXME 0.11: blocksize property should be int, otherwise min>max.. */
1301 guint
1302 gst_base_sink_get_blocksize (GstBaseSink * sink)
1303 {
1304   guint res;
1305
1306   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1307
1308   GST_OBJECT_LOCK (sink);
1309   res = sink->priv->blocksize;
1310   GST_OBJECT_UNLOCK (sink);
1311
1312   return res;
1313 }
1314
1315 /**
1316  * gst_base_sink_set_throttle_time:
1317  * @sink: a #GstBaseSink
1318  * @throttle: the throttle time in nanoseconds
1319  *
1320  * Set the time that will be inserted between rendered buffers. This
1321  * can be used to control the maximum buffers per second that the sink
1322  * will render. 
1323  *
1324  * Since: 0.10.33
1325  */
1326 void
1327 gst_base_sink_set_throttle_time (GstBaseSink * sink, guint64 throttle)
1328 {
1329   g_return_if_fail (GST_IS_BASE_SINK (sink));
1330
1331   GST_OBJECT_LOCK (sink);
1332   sink->priv->throttle_time = throttle;
1333   GST_LOG_OBJECT (sink, "set throttle_time to %" G_GUINT64_FORMAT, throttle);
1334   GST_OBJECT_UNLOCK (sink);
1335 }
1336
1337 /**
1338  * gst_base_sink_get_throttle_time:
1339  * @sink: a #GstBaseSink
1340  *
1341  * Get the time that will be inserted between frames to control the 
1342  * maximum buffers per second.
1343  *
1344  * Returns: the number of nanoseconds @sink will put between frames.
1345  *
1346  * Since: 0.10.33
1347  */
1348 guint64
1349 gst_base_sink_get_throttle_time (GstBaseSink * sink)
1350 {
1351   guint64 res;
1352
1353   g_return_val_if_fail (GST_IS_BASE_SINK (sink), 0);
1354
1355   GST_OBJECT_LOCK (sink);
1356   res = sink->priv->throttle_time;
1357   GST_OBJECT_UNLOCK (sink);
1358
1359   return res;
1360 }
1361
1362 static void
1363 gst_base_sink_set_property (GObject * object, guint prop_id,
1364     const GValue * value, GParamSpec * pspec)
1365 {
1366   GstBaseSink *sink = GST_BASE_SINK (object);
1367
1368   switch (prop_id) {
1369     case PROP_PREROLL_QUEUE_LEN:
1370       /* preroll lock necessary to serialize with finish_preroll */
1371       GST_BASE_SINK_PREROLL_LOCK (sink);
1372       g_atomic_int_set (&sink->preroll_queue_max_len, g_value_get_uint (value));
1373       GST_BASE_SINK_PREROLL_UNLOCK (sink);
1374       break;
1375     case PROP_SYNC:
1376       gst_base_sink_set_sync (sink, g_value_get_boolean (value));
1377       break;
1378     case PROP_MAX_LATENESS:
1379       gst_base_sink_set_max_lateness (sink, g_value_get_int64 (value));
1380       break;
1381     case PROP_QOS:
1382       gst_base_sink_set_qos_enabled (sink, g_value_get_boolean (value));
1383       break;
1384     case PROP_ASYNC:
1385       gst_base_sink_set_async_enabled (sink, g_value_get_boolean (value));
1386       break;
1387     case PROP_TS_OFFSET:
1388       gst_base_sink_set_ts_offset (sink, g_value_get_int64 (value));
1389       break;
1390     case PROP_BLOCKSIZE:
1391       gst_base_sink_set_blocksize (sink, g_value_get_uint (value));
1392       break;
1393     case PROP_RENDER_DELAY:
1394       gst_base_sink_set_render_delay (sink, g_value_get_uint64 (value));
1395       break;
1396     case PROP_ENABLE_LAST_BUFFER:
1397       gst_base_sink_set_last_buffer_enabled (sink, g_value_get_boolean (value));
1398       break;
1399     case PROP_THROTTLE_TIME:
1400       gst_base_sink_set_throttle_time (sink, g_value_get_uint64 (value));
1401       break;
1402     default:
1403       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1404       break;
1405   }
1406 }
1407
1408 static void
1409 gst_base_sink_get_property (GObject * object, guint prop_id, GValue * value,
1410     GParamSpec * pspec)
1411 {
1412   GstBaseSink *sink = GST_BASE_SINK (object);
1413
1414   switch (prop_id) {
1415     case PROP_PREROLL_QUEUE_LEN:
1416       g_value_set_uint (value, g_atomic_int_get (&sink->preroll_queue_max_len));
1417       break;
1418     case PROP_SYNC:
1419       g_value_set_boolean (value, gst_base_sink_get_sync (sink));
1420       break;
1421     case PROP_MAX_LATENESS:
1422       g_value_set_int64 (value, gst_base_sink_get_max_lateness (sink));
1423       break;
1424     case PROP_QOS:
1425       g_value_set_boolean (value, gst_base_sink_is_qos_enabled (sink));
1426       break;
1427     case PROP_ASYNC:
1428       g_value_set_boolean (value, gst_base_sink_is_async_enabled (sink));
1429       break;
1430     case PROP_TS_OFFSET:
1431       g_value_set_int64 (value, gst_base_sink_get_ts_offset (sink));
1432       break;
1433     case PROP_LAST_BUFFER:
1434       gst_value_take_buffer (value, gst_base_sink_get_last_buffer (sink));
1435       break;
1436     case PROP_ENABLE_LAST_BUFFER:
1437       g_value_set_boolean (value, gst_base_sink_is_last_buffer_enabled (sink));
1438       break;
1439     case PROP_BLOCKSIZE:
1440       g_value_set_uint (value, gst_base_sink_get_blocksize (sink));
1441       break;
1442     case PROP_RENDER_DELAY:
1443       g_value_set_uint64 (value, gst_base_sink_get_render_delay (sink));
1444       break;
1445     case PROP_THROTTLE_TIME:
1446       g_value_set_uint64 (value, gst_base_sink_get_throttle_time (sink));
1447       break;
1448     default:
1449       G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
1450       break;
1451   }
1452 }
1453
1454
1455 static GstCaps *
1456 gst_base_sink_get_caps (GstBaseSink * sink)
1457 {
1458   return NULL;
1459 }
1460
1461 static gboolean
1462 gst_base_sink_set_caps (GstBaseSink * sink, GstCaps * caps)
1463 {
1464   return TRUE;
1465 }
1466
1467 static GstFlowReturn
1468 gst_base_sink_buffer_alloc (GstBaseSink * sink, guint64 offset, guint size,
1469     GstCaps * caps, GstBuffer ** buf)
1470 {
1471   *buf = NULL;
1472   return GST_FLOW_OK;
1473 }
1474
1475 /* with PREROLL_LOCK, STREAM_LOCK */
1476 static void
1477 gst_base_sink_preroll_queue_flush (GstBaseSink * basesink, GstPad * pad)
1478 {
1479   GstMiniObject *obj;
1480
1481   GST_DEBUG_OBJECT (basesink, "flushing queue %p", basesink);
1482   while ((obj = g_queue_pop_head (basesink->preroll_queue))) {
1483     GST_DEBUG_OBJECT (basesink, "popped %p", obj);
1484     gst_mini_object_unref (obj);
1485   }
1486   /* we can't have EOS anymore now */
1487   basesink->eos = FALSE;
1488   basesink->priv->received_eos = FALSE;
1489   basesink->have_preroll = FALSE;
1490   basesink->priv->step_unlock = FALSE;
1491   basesink->eos_queued = FALSE;
1492   basesink->preroll_queued = 0;
1493   basesink->buffers_queued = 0;
1494   basesink->events_queued = 0;
1495   /* can't report latency anymore until we preroll again */
1496   if (basesink->priv->async_enabled) {
1497     GST_OBJECT_LOCK (basesink);
1498     basesink->priv->have_latency = FALSE;
1499     GST_OBJECT_UNLOCK (basesink);
1500   }
1501   /* and signal any waiters now */
1502   GST_BASE_SINK_PREROLL_SIGNAL (basesink);
1503 }
1504
1505 /* with STREAM_LOCK, configures given segment with the event information. */
1506 static void
1507 gst_base_sink_configure_segment (GstBaseSink * basesink, GstPad * pad,
1508     GstEvent * event, GstSegment * segment)
1509 {
1510   gboolean update;
1511   gdouble rate, arate;
1512   GstFormat format;
1513   gint64 start;
1514   gint64 stop;
1515   gint64 time;
1516
1517   /* the newsegment event is needed to bring the buffer timestamps to the
1518    * stream time and to drop samples outside of the playback segment. */
1519   gst_event_parse_new_segment_full (event, &update, &rate, &arate, &format,
1520       &start, &stop, &time);
1521
1522   /* The segment is protected with both the STREAM_LOCK and the OBJECT_LOCK.
1523    * We protect with the OBJECT_LOCK so that we can use the values to
1524    * safely answer a POSITION query. */
1525   GST_OBJECT_LOCK (basesink);
1526   gst_segment_set_newsegment_full (segment, update, rate, arate, format, start,
1527       stop, time);
1528
1529   if (format == GST_FORMAT_TIME) {
1530     GST_DEBUG_OBJECT (basesink,
1531         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1532         "format GST_FORMAT_TIME, "
1533         "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1534         ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1535         update, rate, arate, GST_TIME_ARGS (segment->start),
1536         GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1537         GST_TIME_ARGS (segment->accum));
1538   } else {
1539     GST_DEBUG_OBJECT (basesink,
1540         "configured NEWSEGMENT update %d, rate %lf, applied rate %lf, "
1541         "format %d, "
1542         "%" G_GINT64_FORMAT " -- %" G_GINT64_FORMAT ", time %"
1543         G_GINT64_FORMAT ", accum %" G_GINT64_FORMAT, update, rate, arate,
1544         segment->format, segment->start, segment->stop, segment->time,
1545         segment->accum);
1546   }
1547   GST_OBJECT_UNLOCK (basesink);
1548 }
1549
1550 /* with PREROLL_LOCK, STREAM_LOCK */
1551 static gboolean
1552 gst_base_sink_commit_state (GstBaseSink * basesink)
1553 {
1554   /* commit state and proceed to next pending state */
1555   GstState current, next, pending, post_pending;
1556   gboolean post_paused = FALSE;
1557   gboolean post_async_done = FALSE;
1558   gboolean post_playing = FALSE;
1559
1560   /* we are certainly not playing async anymore now */
1561   basesink->playing_async = FALSE;
1562
1563   GST_OBJECT_LOCK (basesink);
1564   current = GST_STATE (basesink);
1565   next = GST_STATE_NEXT (basesink);
1566   pending = GST_STATE_PENDING (basesink);
1567   post_pending = pending;
1568
1569   switch (pending) {
1570     case GST_STATE_PLAYING:
1571     {
1572       GstBaseSinkClass *bclass;
1573
1574       bclass = GST_BASE_SINK_GET_CLASS (basesink);
1575
1576       GST_DEBUG_OBJECT (basesink, "commiting state to PLAYING");
1577
1578       basesink->need_preroll = FALSE;
1579       post_async_done = TRUE;
1580       basesink->priv->commited = TRUE;
1581       post_playing = TRUE;
1582       /* post PAUSED too when we were READY */
1583       if (current == GST_STATE_READY) {
1584         post_paused = TRUE;
1585       }
1586       break;
1587     }
1588     case GST_STATE_PAUSED:
1589       GST_DEBUG_OBJECT (basesink, "commiting state to PAUSED");
1590       post_paused = TRUE;
1591       post_async_done = TRUE;
1592       basesink->priv->commited = TRUE;
1593       post_pending = GST_STATE_VOID_PENDING;
1594       break;
1595     case GST_STATE_READY:
1596     case GST_STATE_NULL:
1597       goto stopping;
1598     case GST_STATE_VOID_PENDING:
1599       goto nothing_pending;
1600     default:
1601       break;
1602   }
1603
1604   /* we can report latency queries now */
1605   basesink->priv->have_latency = TRUE;
1606
1607   GST_STATE (basesink) = pending;
1608   GST_STATE_NEXT (basesink) = GST_STATE_VOID_PENDING;
1609   GST_STATE_PENDING (basesink) = GST_STATE_VOID_PENDING;
1610   GST_STATE_RETURN (basesink) = GST_STATE_CHANGE_SUCCESS;
1611   GST_OBJECT_UNLOCK (basesink);
1612
1613   if (post_paused) {
1614     GST_DEBUG_OBJECT (basesink, "posting PAUSED state change message");
1615     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1616         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1617             current, next, post_pending));
1618   }
1619   if (post_async_done) {
1620     GST_DEBUG_OBJECT (basesink, "posting async-done message");
1621     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1622         gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
1623   }
1624   if (post_playing) {
1625     GST_DEBUG_OBJECT (basesink, "posting PLAYING state change message");
1626     gst_element_post_message (GST_ELEMENT_CAST (basesink),
1627         gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
1628             next, pending, GST_STATE_VOID_PENDING));
1629   }
1630
1631   GST_STATE_BROADCAST (basesink);
1632
1633   return TRUE;
1634
1635 nothing_pending:
1636   {
1637     /* Depending on the state, set our vars. We get in this situation when the
1638      * state change function got a change to update the state vars before the
1639      * streaming thread did. This is fine but we need to make sure that we
1640      * update the need_preroll var since it was TRUE when we got here and might
1641      * become FALSE if we got to PLAYING. */
1642     GST_DEBUG_OBJECT (basesink, "nothing to commit, now in %s",
1643         gst_element_state_get_name (current));
1644     switch (current) {
1645       case GST_STATE_PLAYING:
1646         basesink->need_preroll = FALSE;
1647         break;
1648       case GST_STATE_PAUSED:
1649         basesink->need_preroll = TRUE;
1650         break;
1651       default:
1652         basesink->need_preroll = FALSE;
1653         basesink->flushing = TRUE;
1654         break;
1655     }
1656     /* we can report latency queries now */
1657     basesink->priv->have_latency = TRUE;
1658     GST_OBJECT_UNLOCK (basesink);
1659     return TRUE;
1660   }
1661 stopping:
1662   {
1663     /* app is going to READY */
1664     GST_DEBUG_OBJECT (basesink, "stopping");
1665     basesink->need_preroll = FALSE;
1666     basesink->flushing = TRUE;
1667     GST_OBJECT_UNLOCK (basesink);
1668     return FALSE;
1669   }
1670 }
1671
1672 static void
1673 start_stepping (GstBaseSink * sink, GstSegment * segment,
1674     GstStepInfo * pending, GstStepInfo * current)
1675 {
1676   gint64 end;
1677   GstMessage *message;
1678
1679   GST_DEBUG_OBJECT (sink, "update pending step");
1680
1681   GST_OBJECT_LOCK (sink);
1682   memcpy (current, pending, sizeof (GstStepInfo));
1683   pending->valid = FALSE;
1684   GST_OBJECT_UNLOCK (sink);
1685
1686   /* post message first */
1687   message =
1688       gst_message_new_step_start (GST_OBJECT (sink), TRUE, current->format,
1689       current->amount, current->rate, current->flush, current->intermediate);
1690   gst_message_set_seqnum (message, current->seqnum);
1691   gst_element_post_message (GST_ELEMENT (sink), message);
1692
1693   /* get the running time of where we paused and remember it */
1694   current->start = gst_element_get_start_time (GST_ELEMENT_CAST (sink));
1695   gst_segment_set_running_time (segment, GST_FORMAT_TIME, current->start);
1696
1697   /* set the new rate for the remainder of the segment */
1698   current->start_rate = segment->rate;
1699   segment->rate *= current->rate;
1700   segment->abs_rate = ABS (segment->rate);
1701
1702   /* save values */
1703   if (segment->rate > 0.0)
1704     current->start_stop = segment->stop;
1705   else
1706     current->start_start = segment->start;
1707
1708   if (current->format == GST_FORMAT_TIME) {
1709     end = current->start + current->amount;
1710     if (!current->flush) {
1711       /* update the segment clipping regions for non-flushing seeks */
1712       if (segment->rate > 0.0) {
1713         segment->stop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1714         segment->last_stop = segment->stop;
1715       } else {
1716         gint64 position;
1717
1718         position = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1719         segment->time = position;
1720         segment->start = position;
1721         segment->last_stop = position;
1722       }
1723     }
1724   }
1725
1726   GST_DEBUG_OBJECT (sink,
1727       "segment now rate %lf, applied rate %lf, "
1728       "format GST_FORMAT_TIME, "
1729       "%" GST_TIME_FORMAT " -- %" GST_TIME_FORMAT
1730       ", time %" GST_TIME_FORMAT ", accum %" GST_TIME_FORMAT,
1731       segment->rate, segment->applied_rate, GST_TIME_ARGS (segment->start),
1732       GST_TIME_ARGS (segment->stop), GST_TIME_ARGS (segment->time),
1733       GST_TIME_ARGS (segment->accum));
1734
1735   GST_DEBUG_OBJECT (sink, "step started at running_time %" GST_TIME_FORMAT,
1736       GST_TIME_ARGS (current->start));
1737
1738   if (current->amount == -1) {
1739     GST_DEBUG_OBJECT (sink, "step amount == -1, stop stepping");
1740     current->valid = FALSE;
1741   } else {
1742     GST_DEBUG_OBJECT (sink, "step amount: %" G_GUINT64_FORMAT ", format: %s, "
1743         "rate: %f", current->amount, gst_format_get_name (current->format),
1744         current->rate);
1745   }
1746 }
1747
1748 static void
1749 stop_stepping (GstBaseSink * sink, GstSegment * segment,
1750     GstStepInfo * current, gint64 rstart, gint64 rstop, gboolean eos)
1751 {
1752   gint64 stop, position;
1753   GstMessage *message;
1754
1755   GST_DEBUG_OBJECT (sink, "step complete");
1756
1757   if (segment->rate > 0.0)
1758     stop = rstart;
1759   else
1760     stop = rstop;
1761
1762   GST_DEBUG_OBJECT (sink,
1763       "step stop at running_time %" GST_TIME_FORMAT, GST_TIME_ARGS (stop));
1764
1765   if (stop == -1)
1766     current->duration = current->position;
1767   else
1768     current->duration = stop - current->start;
1769
1770   GST_DEBUG_OBJECT (sink, "step elapsed running_time %" GST_TIME_FORMAT,
1771       GST_TIME_ARGS (current->duration));
1772
1773   position = current->start + current->duration;
1774
1775   /* now move the segment to the new running time */
1776   gst_segment_set_running_time (segment, GST_FORMAT_TIME, position);
1777
1778   if (current->flush) {
1779     /* and remove the accumulated time we flushed, start time did not change */
1780     segment->accum = current->start;
1781   } else {
1782     /* start time is now the stepped position */
1783     gst_element_set_start_time (GST_ELEMENT_CAST (sink), position);
1784   }
1785
1786   /* restore the previous rate */
1787   segment->rate = current->start_rate;
1788   segment->abs_rate = ABS (segment->rate);
1789
1790   if (segment->rate > 0.0)
1791     segment->stop = current->start_stop;
1792   else
1793     segment->start = current->start_start;
1794
1795   /* the clip segment is used for position report in paused... */
1796   memcpy (sink->clip_segment, segment, sizeof (GstSegment));
1797
1798   /* post the step done when we know the stepped duration in TIME */
1799   message =
1800       gst_message_new_step_done (GST_OBJECT_CAST (sink), current->format,
1801       current->amount, current->rate, current->flush, current->intermediate,
1802       current->duration, eos);
1803   gst_message_set_seqnum (message, current->seqnum);
1804   gst_element_post_message (GST_ELEMENT_CAST (sink), message);
1805
1806   if (!current->intermediate)
1807     sink->need_preroll = current->need_preroll;
1808
1809   /* and the current step info finished and becomes invalid */
1810   current->valid = FALSE;
1811 }
1812
1813 static gboolean
1814 handle_stepping (GstBaseSink * sink, GstSegment * segment,
1815     GstStepInfo * current, gint64 * cstart, gint64 * cstop, gint64 * rstart,
1816     gint64 * rstop)
1817 {
1818   gboolean step_end = FALSE;
1819
1820   /* see if we need to skip this buffer because of stepping */
1821   switch (current->format) {
1822     case GST_FORMAT_TIME:
1823     {
1824       guint64 end;
1825       gint64 first, last;
1826
1827       if (segment->rate > 0.0) {
1828         if (segment->stop == *cstop)
1829           *rstop = *rstart + current->amount;
1830
1831         first = *rstart;
1832         last = *rstop;
1833       } else {
1834         if (segment->start == *cstart)
1835           *rstart = *rstop + current->amount;
1836
1837         first = *rstop;
1838         last = *rstart;
1839       }
1840
1841       end = current->start + current->amount;
1842       current->position = first - current->start;
1843
1844       if (G_UNLIKELY (segment->abs_rate != 1.0))
1845         current->position /= segment->abs_rate;
1846
1847       GST_DEBUG_OBJECT (sink,
1848           "buffer: %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT,
1849           GST_TIME_ARGS (first), GST_TIME_ARGS (last));
1850       GST_DEBUG_OBJECT (sink,
1851           "got time step %" GST_TIME_FORMAT "-%" GST_TIME_FORMAT "/%"
1852           GST_TIME_FORMAT, GST_TIME_ARGS (current->position),
1853           GST_TIME_ARGS (last - current->start),
1854           GST_TIME_ARGS (current->amount));
1855
1856       if ((current->flush && current->position >= current->amount)
1857           || last >= end) {
1858         GST_DEBUG_OBJECT (sink, "step ended, we need clipping");
1859         step_end = TRUE;
1860         if (segment->rate > 0.0) {
1861           *rstart = end;
1862           *cstart = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1863         } else {
1864           *rstop = end;
1865           *cstop = gst_segment_to_position (segment, GST_FORMAT_TIME, end);
1866         }
1867       }
1868       GST_DEBUG_OBJECT (sink,
1869           "cstart %" GST_TIME_FORMAT ", rstart %" GST_TIME_FORMAT,
1870           GST_TIME_ARGS (*cstart), GST_TIME_ARGS (*rstart));
1871       GST_DEBUG_OBJECT (sink,
1872           "cstop %" GST_TIME_FORMAT ", rstop %" GST_TIME_FORMAT,
1873           GST_TIME_ARGS (*cstop), GST_TIME_ARGS (*rstop));
1874       break;
1875     }
1876     case GST_FORMAT_BUFFERS:
1877       GST_DEBUG_OBJECT (sink,
1878           "got default step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1879           current->position, current->amount);
1880
1881       if (current->position < current->amount) {
1882         current->position++;
1883       } else {
1884         step_end = TRUE;
1885       }
1886       break;
1887     case GST_FORMAT_DEFAULT:
1888     default:
1889       GST_DEBUG_OBJECT (sink,
1890           "got unknown step %" G_GUINT64_FORMAT "/%" G_GUINT64_FORMAT,
1891           current->position, current->amount);
1892       break;
1893   }
1894   return step_end;
1895 }
1896
1897 /* with STREAM_LOCK, PREROLL_LOCK
1898  *
1899  * Returns TRUE if the object needs synchronisation and takes therefore
1900  * part in prerolling.
1901  *
1902  * rsstart/rsstop contain the start/stop in stream time.
1903  * rrstart/rrstop contain the start/stop in running time.
1904  */
1905 static gboolean
1906 gst_base_sink_get_sync_times (GstBaseSink * basesink, GstMiniObject * obj,
1907     GstClockTime * rsstart, GstClockTime * rsstop,
1908     GstClockTime * rrstart, GstClockTime * rrstop, gboolean * do_sync,
1909     gboolean * stepped, GstSegment * segment, GstStepInfo * step,
1910     gboolean * step_end, guint8 obj_type)
1911 {
1912   GstBaseSinkClass *bclass;
1913   GstBuffer *buffer;
1914   GstClockTime start, stop;     /* raw start/stop timestamps */
1915   gint64 cstart, cstop;         /* clipped raw timestamps */
1916   gint64 rstart, rstop;         /* clipped timestamps converted to running time */
1917   GstClockTime sstart, sstop;   /* clipped timestamps converted to stream time */
1918   GstFormat format;
1919   GstBaseSinkPrivate *priv;
1920   gboolean eos;
1921
1922   priv = basesink->priv;
1923
1924   /* start with nothing */
1925   start = stop = GST_CLOCK_TIME_NONE;
1926
1927   if (G_UNLIKELY (OBJ_IS_EVENT (obj_type))) {
1928     GstEvent *event = GST_EVENT_CAST (obj);
1929
1930     switch (GST_EVENT_TYPE (event)) {
1931         /* EOS event needs syncing */
1932       case GST_EVENT_EOS:
1933       {
1934         if (basesink->segment.rate >= 0.0) {
1935           sstart = sstop = priv->current_sstop;
1936           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1937             /* we have not seen a buffer yet, use the segment values */
1938             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1939                 basesink->segment.format, basesink->segment.stop);
1940           }
1941         } else {
1942           sstart = sstop = priv->current_sstart;
1943           if (!GST_CLOCK_TIME_IS_VALID (sstart)) {
1944             /* we have not seen a buffer yet, use the segment values */
1945             sstart = sstop = gst_segment_to_stream_time (&basesink->segment,
1946                 basesink->segment.format, basesink->segment.start);
1947           }
1948         }
1949
1950         rstart = rstop = priv->eos_rtime;
1951         *do_sync = rstart != -1;
1952         GST_DEBUG_OBJECT (basesink, "sync times for EOS %" GST_TIME_FORMAT,
1953             GST_TIME_ARGS (rstart));
1954         /* if we are stepping, we end now */
1955         *step_end = step->valid;
1956         eos = TRUE;
1957         goto eos_done;
1958       }
1959       default:
1960         /* other events do not need syncing */
1961         /* FIXME, maybe NEWSEGMENT might need synchronisation
1962          * since the POSITION query depends on accumulated times and
1963          * we cannot accumulate the current segment before the previous
1964          * one completed.
1965          */
1966         return FALSE;
1967     }
1968   }
1969
1970   eos = FALSE;
1971
1972 again:
1973   /* else do buffer sync code */
1974   buffer = GST_BUFFER_CAST (obj);
1975
1976   bclass = GST_BASE_SINK_GET_CLASS (basesink);
1977
1978   /* just get the times to see if we need syncing, if the start returns -1 we
1979    * don't sync. */
1980   if (bclass->get_times)
1981     bclass->get_times (basesink, buffer, &start, &stop);
1982
1983   if (!GST_CLOCK_TIME_IS_VALID (start)) {
1984     /* we don't need to sync but we still want to get the timestamps for
1985      * tracking the position */
1986     gst_base_sink_get_times (basesink, buffer, &start, &stop);
1987     *do_sync = FALSE;
1988   } else {
1989     *do_sync = TRUE;
1990   }
1991
1992   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
1993       ", stop: %" GST_TIME_FORMAT ", do_sync %d", GST_TIME_ARGS (start),
1994       GST_TIME_ARGS (stop), *do_sync);
1995
1996   /* collect segment and format for code clarity */
1997   format = segment->format;
1998
1999   /* no timestamp clipping if we did not get a TIME segment format */
2000   if (G_UNLIKELY (format != GST_FORMAT_TIME)) {
2001     cstart = start;
2002     cstop = stop;
2003     /* do running and stream time in TIME format */
2004     format = GST_FORMAT_TIME;
2005     GST_LOG_OBJECT (basesink, "not time format, don't clip");
2006     goto do_times;
2007   }
2008
2009   /* clip, only when we know about time */
2010   if (G_UNLIKELY (!gst_segment_clip (segment, GST_FORMAT_TIME,
2011               (gint64) start, (gint64) stop, &cstart, &cstop))) {
2012     if (step->valid) {
2013       GST_DEBUG_OBJECT (basesink, "step out of segment");
2014       /* when we are stepping, pretend we're at the end of the segment */
2015       if (segment->rate > 0.0) {
2016         cstart = segment->stop;
2017         cstop = segment->stop;
2018       } else {
2019         cstart = segment->start;
2020         cstop = segment->start;
2021       }
2022       goto do_times;
2023     }
2024     goto out_of_segment;
2025   }
2026
2027   if (G_UNLIKELY (start != cstart || stop != cstop)) {
2028     GST_DEBUG_OBJECT (basesink, "clipped to: start %" GST_TIME_FORMAT
2029         ", stop: %" GST_TIME_FORMAT, GST_TIME_ARGS (cstart),
2030         GST_TIME_ARGS (cstop));
2031   }
2032
2033   /* set last stop position */
2034   if (G_LIKELY (stop != GST_CLOCK_TIME_NONE && cstop != GST_CLOCK_TIME_NONE))
2035     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstop);
2036   else
2037     gst_segment_set_last_stop (segment, GST_FORMAT_TIME, cstart);
2038
2039 do_times:
2040   rstart = gst_segment_to_running_time (segment, format, cstart);
2041   rstop = gst_segment_to_running_time (segment, format, cstop);
2042
2043   if (G_UNLIKELY (step->valid)) {
2044     if (!(*step_end = handle_stepping (basesink, segment, step, &cstart, &cstop,
2045                 &rstart, &rstop))) {
2046       /* step is still busy, we discard data when we are flushing */
2047       *stepped = step->flush;
2048       GST_DEBUG_OBJECT (basesink, "stepping busy");
2049     }
2050   }
2051   /* this can produce wrong values if we accumulated non-TIME segments. If this happens,
2052    * upstream is behaving very badly */
2053   sstart = gst_segment_to_stream_time (segment, format, cstart);
2054   sstop = gst_segment_to_stream_time (segment, format, cstop);
2055
2056 eos_done:
2057   /* eos_done label only called when doing EOS, we also stop stepping then */
2058   if (*step_end && step->flush) {
2059     GST_DEBUG_OBJECT (basesink, "flushing step ended");
2060     stop_stepping (basesink, segment, step, rstart, rstop, eos);
2061     *step_end = FALSE;
2062     /* re-determine running start times for adjusted segment
2063      * (which has a flushed amount of running/accumulated time removed) */
2064     if (!GST_IS_EVENT (obj)) {
2065       GST_DEBUG_OBJECT (basesink, "refresh sync times");
2066       goto again;
2067     }
2068   }
2069
2070   /* save times */
2071   *rsstart = sstart;
2072   *rsstop = sstop;
2073   *rrstart = rstart;
2074   *rrstop = rstop;
2075
2076   /* buffers and EOS always need syncing and preroll */
2077   return TRUE;
2078
2079   /* special cases */
2080 out_of_segment:
2081   {
2082     /* we usually clip in the chain function already but stepping could cause
2083      * the segment to be updated later. we return FALSE so that we don't try
2084      * to sync on it. */
2085     GST_LOG_OBJECT (basesink, "buffer skipped, not in segment");
2086     return FALSE;
2087   }
2088 }
2089
2090 /* with STREAM_LOCK, PREROLL_LOCK, LOCK
2091  * adjust a timestamp with the latency and timestamp offset. This function does
2092  * not adjust for the render delay. */
2093 static GstClockTime
2094 gst_base_sink_adjust_time (GstBaseSink * basesink, GstClockTime time)
2095 {
2096   GstClockTimeDiff ts_offset;
2097
2098   /* don't do anything funny with invalid timestamps */
2099   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2100     return time;
2101
2102   time += basesink->priv->latency;
2103
2104   /* apply offset, be carefull for underflows */
2105   ts_offset = basesink->priv->ts_offset;
2106   if (ts_offset < 0) {
2107     ts_offset = -ts_offset;
2108     if (ts_offset < time)
2109       time -= ts_offset;
2110     else
2111       time = 0;
2112   } else
2113     time += ts_offset;
2114
2115   /* subtract the render delay again, which was included in the latency */
2116   if (time > basesink->priv->render_delay)
2117     time -= basesink->priv->render_delay;
2118   else
2119     time = 0;
2120
2121   return time;
2122 }
2123
2124 /**
2125  * gst_base_sink_wait_clock:
2126  * @sink: the sink
2127  * @time: the running_time to be reached
2128  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2129  *
2130  * This function will block until @time is reached. It is usually called by
2131  * subclasses that use their own internal synchronisation.
2132  *
2133  * If @time is not valid, no sycnhronisation is done and #GST_CLOCK_BADTIME is
2134  * returned. Likewise, if synchronisation is disabled in the element or there
2135  * is no clock, no synchronisation is done and #GST_CLOCK_BADTIME is returned.
2136  *
2137  * This function should only be called with the PREROLL_LOCK held, like when
2138  * receiving an EOS event in the #GstBaseSinkClass.event() vmethod or when
2139  * receiving a buffer in
2140  * the #GstBaseSinkClass.render() vmethod.
2141  *
2142  * The @time argument should be the running_time of when this method should
2143  * return and is not adjusted with any latency or offset configured in the
2144  * sink.
2145  *
2146  * Since: 0.10.20
2147  *
2148  * Returns: #GstClockReturn
2149  */
2150 GstClockReturn
2151 gst_base_sink_wait_clock (GstBaseSink * sink, GstClockTime time,
2152     GstClockTimeDiff * jitter)
2153 {
2154   GstClockReturn ret;
2155   GstClock *clock;
2156   GstClockTime base_time;
2157
2158   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (time)))
2159     goto invalid_time;
2160
2161   GST_OBJECT_LOCK (sink);
2162   if (G_UNLIKELY (!sink->sync))
2163     goto no_sync;
2164
2165   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (sink)) == NULL))
2166     goto no_clock;
2167
2168   base_time = GST_ELEMENT_CAST (sink)->base_time;
2169   GST_LOG_OBJECT (sink,
2170       "time %" GST_TIME_FORMAT ", base_time %" GST_TIME_FORMAT,
2171       GST_TIME_ARGS (time), GST_TIME_ARGS (base_time));
2172
2173   /* add base_time to running_time to get the time against the clock */
2174   time += base_time;
2175
2176   /* Re-use existing clockid if available */
2177   if (G_LIKELY (sink->priv->cached_clock_id != NULL)) {
2178     if (!gst_clock_single_shot_id_reinit (clock, sink->priv->cached_clock_id,
2179             time)) {
2180       gst_clock_id_unref (sink->priv->cached_clock_id);
2181       sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2182     }
2183   } else
2184     sink->priv->cached_clock_id = gst_clock_new_single_shot_id (clock, time);
2185   GST_OBJECT_UNLOCK (sink);
2186
2187   /* A blocking wait is performed on the clock. We save the ClockID
2188    * so we can unlock the entry at any time. While we are blocking, we
2189    * release the PREROLL_LOCK so that other threads can interrupt the
2190    * entry. */
2191   sink->clock_id = sink->priv->cached_clock_id;
2192   /* release the preroll lock while waiting */
2193   GST_BASE_SINK_PREROLL_UNLOCK (sink);
2194
2195   ret = gst_clock_id_wait (sink->priv->cached_clock_id, jitter);
2196
2197   GST_BASE_SINK_PREROLL_LOCK (sink);
2198   sink->clock_id = NULL;
2199
2200   return ret;
2201
2202   /* no syncing needed */
2203 invalid_time:
2204   {
2205     GST_DEBUG_OBJECT (sink, "time not valid, no sync needed");
2206     return GST_CLOCK_BADTIME;
2207   }
2208 no_sync:
2209   {
2210     GST_DEBUG_OBJECT (sink, "sync disabled");
2211     GST_OBJECT_UNLOCK (sink);
2212     return GST_CLOCK_BADTIME;
2213   }
2214 no_clock:
2215   {
2216     GST_DEBUG_OBJECT (sink, "no clock, can't sync");
2217     GST_OBJECT_UNLOCK (sink);
2218     return GST_CLOCK_BADTIME;
2219   }
2220 }
2221
2222 /**
2223  * gst_base_sink_wait_preroll:
2224  * @sink: the sink
2225  *
2226  * If the #GstBaseSinkClass.render() method performs its own synchronisation
2227  * against the clock it must unblock when going from PLAYING to the PAUSED state
2228  * and call this method before continuing to render the remaining data.
2229  *
2230  * This function will block until a state change to PLAYING happens (in which
2231  * case this function returns #GST_FLOW_OK) or the processing must be stopped due
2232  * to a state change to READY or a FLUSH event (in which case this function
2233  * returns #GST_FLOW_WRONG_STATE).
2234  *
2235  * This function should only be called with the PREROLL_LOCK held, like in the
2236  * render function.
2237  *
2238  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2239  * continue. Any other return value should be returned from the render vmethod.
2240  *
2241  * Since: 0.10.11
2242  */
2243 GstFlowReturn
2244 gst_base_sink_wait_preroll (GstBaseSink * sink)
2245 {
2246   sink->have_preroll = TRUE;
2247   GST_DEBUG_OBJECT (sink, "waiting in preroll for flush or PLAYING");
2248   /* block until the state changes, or we get a flush, or something */
2249   GST_BASE_SINK_PREROLL_WAIT (sink);
2250   sink->have_preroll = FALSE;
2251   if (G_UNLIKELY (sink->flushing))
2252     goto stopping;
2253   if (G_UNLIKELY (sink->priv->step_unlock))
2254     goto step_unlocked;
2255   GST_DEBUG_OBJECT (sink, "continue after preroll");
2256
2257   return GST_FLOW_OK;
2258
2259   /* ERRORS */
2260 stopping:
2261   {
2262     GST_DEBUG_OBJECT (sink, "preroll interrupted because of flush");
2263     return GST_FLOW_WRONG_STATE;
2264   }
2265 step_unlocked:
2266   {
2267     sink->priv->step_unlock = FALSE;
2268     GST_DEBUG_OBJECT (sink, "preroll interrupted because of step");
2269     return GST_FLOW_STEP;
2270   }
2271 }
2272
2273 static inline guint8
2274 get_object_type (GstMiniObject * obj)
2275 {
2276   guint8 obj_type;
2277
2278   if (G_LIKELY (GST_IS_BUFFER (obj)))
2279     obj_type = _PR_IS_BUFFER;
2280   else if (GST_IS_EVENT (obj))
2281     obj_type = _PR_IS_EVENT;
2282   else if (GST_IS_BUFFER_LIST (obj))
2283     obj_type = _PR_IS_BUFFERLIST;
2284   else
2285     obj_type = _PR_IS_NOTHING;
2286
2287   return obj_type;
2288 }
2289
2290 /**
2291  * gst_base_sink_do_preroll:
2292  * @sink: the sink
2293  * @obj: (transfer none): the mini object that caused the preroll
2294  *
2295  * If the @sink spawns its own thread for pulling buffers from upstream it
2296  * should call this method after it has pulled a buffer. If the element needed
2297  * to preroll, this function will perform the preroll and will then block
2298  * until the element state is changed.
2299  *
2300  * This function should be called with the PREROLL_LOCK held.
2301  *
2302  * Returns: #GST_FLOW_OK if the preroll completed and processing can
2303  * continue. Any other return value should be returned from the render vmethod.
2304  *
2305  * Since: 0.10.22
2306  */
2307 GstFlowReturn
2308 gst_base_sink_do_preroll (GstBaseSink * sink, GstMiniObject * obj)
2309 {
2310   GstFlowReturn ret;
2311
2312   while (G_UNLIKELY (sink->need_preroll)) {
2313     guint8 obj_type;
2314     GST_DEBUG_OBJECT (sink, "prerolling object %p", obj);
2315
2316     obj_type = get_object_type (obj);
2317
2318     ret = gst_base_sink_preroll_object (sink, obj_type, obj);
2319     if (ret != GST_FLOW_OK)
2320       goto preroll_failed;
2321
2322     /* need to recheck here because the commit state could have
2323      * made us not need the preroll anymore */
2324     if (G_LIKELY (sink->need_preroll)) {
2325       /* block until the state changes, or we get a flush, or something */
2326       ret = gst_base_sink_wait_preroll (sink);
2327       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2328         goto preroll_failed;
2329     }
2330   }
2331   return GST_FLOW_OK;
2332
2333   /* ERRORS */
2334 preroll_failed:
2335   {
2336     GST_DEBUG_OBJECT (sink, "preroll failed: %s", gst_flow_get_name (ret));
2337     return ret;
2338   }
2339 }
2340
2341 /**
2342  * gst_base_sink_wait_eos:
2343  * @sink: the sink
2344  * @time: the running_time to be reached
2345  * @jitter: (out) (allow-none): the jitter to be filled with time diff, or NULL
2346  *
2347  * This function will block until @time is reached. It is usually called by
2348  * subclasses that use their own internal synchronisation but want to let the
2349  * EOS be handled by the base class.
2350  *
2351  * This function should only be called with the PREROLL_LOCK held, like when
2352  * receiving an EOS event in the ::event vmethod.
2353  *
2354  * The @time argument should be the running_time of when the EOS should happen
2355  * and will be adjusted with any latency and offset configured in the sink.
2356  *
2357  * Returns: #GstFlowReturn
2358  *
2359  * Since: 0.10.15
2360  */
2361 GstFlowReturn
2362 gst_base_sink_wait_eos (GstBaseSink * sink, GstClockTime time,
2363     GstClockTimeDiff * jitter)
2364 {
2365   GstClockReturn status;
2366   GstFlowReturn ret;
2367
2368   do {
2369     GstClockTime stime;
2370
2371     GST_DEBUG_OBJECT (sink, "checking preroll");
2372
2373     /* first wait for the playing state before we can continue */
2374     while (G_UNLIKELY (sink->need_preroll)) {
2375       ret = gst_base_sink_wait_preroll (sink);
2376       if ((ret != GST_FLOW_OK) && (ret != GST_FLOW_STEP))
2377         goto flushing;
2378     }
2379
2380     /* preroll done, we can sync since we are in PLAYING now. */
2381     GST_DEBUG_OBJECT (sink, "possibly waiting for clock to reach %"
2382         GST_TIME_FORMAT, GST_TIME_ARGS (time));
2383
2384     /* compensate for latency and ts_offset. We don't adjust for render delay
2385      * because we don't interact with the device on EOS normally. */
2386     stime = gst_base_sink_adjust_time (sink, time);
2387
2388     /* wait for the clock, this can be interrupted because we got shut down or
2389      * we PAUSED. */
2390     status = gst_base_sink_wait_clock (sink, stime, jitter);
2391
2392     GST_DEBUG_OBJECT (sink, "clock returned %d", status);
2393
2394     /* invalid time, no clock or sync disabled, just continue then */
2395     if (status == GST_CLOCK_BADTIME)
2396       break;
2397
2398     /* waiting could have been interrupted and we can be flushing now */
2399     if (G_UNLIKELY (sink->flushing))
2400       goto flushing;
2401
2402     /* retry if we got unscheduled, which means we did not reach the timeout
2403      * yet. if some other error occures, we continue. */
2404   } while (status == GST_CLOCK_UNSCHEDULED);
2405
2406   GST_DEBUG_OBJECT (sink, "end of stream");
2407
2408   return GST_FLOW_OK;
2409
2410   /* ERRORS */
2411 flushing:
2412   {
2413     GST_DEBUG_OBJECT (sink, "we are flushing");
2414     return GST_FLOW_WRONG_STATE;
2415   }
2416 }
2417
2418 /* with STREAM_LOCK, PREROLL_LOCK
2419  *
2420  * Make sure we are in PLAYING and synchronize an object to the clock.
2421  *
2422  * If we need preroll, we are not in PLAYING. We try to commit the state
2423  * if needed and then block if we still are not PLAYING.
2424  *
2425  * We start waiting on the clock in PLAYING. If we got interrupted, we
2426  * immediatly try to re-preroll.
2427  *
2428  * Some objects do not need synchronisation (most events) and so this function
2429  * immediatly returns GST_FLOW_OK.
2430  *
2431  * for objects that arrive later than max-lateness to be synchronized to the
2432  * clock have the @late boolean set to TRUE.
2433  *
2434  * This function keeps a running average of the jitter (the diff between the
2435  * clock time and the requested sync time). The jitter is negative for
2436  * objects that arrive in time and positive for late buffers.
2437  *
2438  * does not take ownership of obj.
2439  */
2440 static GstFlowReturn
2441 gst_base_sink_do_sync (GstBaseSink * basesink, GstPad * pad,
2442     GstMiniObject * obj, gboolean * late, gboolean * step_end, guint8 obj_type)
2443 {
2444   GstClockTimeDiff jitter = 0;
2445   gboolean syncable;
2446   GstClockReturn status = GST_CLOCK_OK;
2447   GstClockTime rstart, rstop, sstart, sstop, stime;
2448   gboolean do_sync;
2449   GstBaseSinkPrivate *priv;
2450   GstFlowReturn ret;
2451   GstStepInfo *current, *pending;
2452   gboolean stepped;
2453
2454   priv = basesink->priv;
2455
2456 do_step:
2457   sstart = sstop = rstart = rstop = GST_CLOCK_TIME_NONE;
2458   do_sync = TRUE;
2459   stepped = FALSE;
2460
2461   priv->current_rstart = GST_CLOCK_TIME_NONE;
2462
2463   /* get stepping info */
2464   current = &priv->current_step;
2465   pending = &priv->pending_step;
2466
2467   /* get timing information for this object against the render segment */
2468   syncable = gst_base_sink_get_sync_times (basesink, obj,
2469       &sstart, &sstop, &rstart, &rstop, &do_sync, &stepped, &basesink->segment,
2470       current, step_end, obj_type);
2471
2472   if (G_UNLIKELY (stepped))
2473     goto step_skipped;
2474
2475   /* a syncable object needs to participate in preroll and
2476    * clocking. All buffers and EOS are syncable. */
2477   if (G_UNLIKELY (!syncable))
2478     goto not_syncable;
2479
2480   /* store timing info for current object */
2481   priv->current_rstart = rstart;
2482   priv->current_rstop = (GST_CLOCK_TIME_IS_VALID (rstop) ? rstop : rstart);
2483
2484   /* save sync time for eos when the previous object needed sync */
2485   priv->eos_rtime = (do_sync ? priv->current_rstop : GST_CLOCK_TIME_NONE);
2486
2487   /* calculate inter frame spacing */
2488   if (G_UNLIKELY (priv->prev_rstart != -1 && priv->prev_rstart < rstart)) {
2489     GstClockTime in_diff;
2490
2491     in_diff = rstart - priv->prev_rstart;
2492
2493     if (priv->avg_in_diff == -1)
2494       priv->avg_in_diff = in_diff;
2495     else
2496       priv->avg_in_diff = UPDATE_RUNNING_AVG (priv->avg_in_diff, in_diff);
2497
2498     GST_LOG_OBJECT (basesink, "avg frame diff %" GST_TIME_FORMAT,
2499         GST_TIME_ARGS (priv->avg_in_diff));
2500
2501   }
2502   priv->prev_rstart = rstart;
2503
2504   if (G_UNLIKELY (priv->earliest_in_time != -1
2505           && rstart < priv->earliest_in_time))
2506     goto qos_dropped;
2507
2508 again:
2509   /* first do preroll, this makes sure we commit our state
2510    * to PAUSED and can continue to PLAYING. We cannot perform
2511    * any clock sync in PAUSED because there is no clock. */
2512   ret = gst_base_sink_do_preroll (basesink, obj);
2513   if (G_UNLIKELY (ret != GST_FLOW_OK))
2514     goto preroll_failed;
2515
2516   /* update the segment with a pending step if the current one is invalid and we
2517    * have a new pending one. We only accept new step updates after a preroll */
2518   if (G_UNLIKELY (pending->valid && !current->valid)) {
2519     start_stepping (basesink, &basesink->segment, pending, current);
2520     goto do_step;
2521   }
2522
2523   /* After rendering we store the position of the last buffer so that we can use
2524    * it to report the position. We need to take the lock here. */
2525   GST_OBJECT_LOCK (basesink);
2526   priv->current_sstart = sstart;
2527   priv->current_sstop = (GST_CLOCK_TIME_IS_VALID (sstop) ? sstop : sstart);
2528   GST_OBJECT_UNLOCK (basesink);
2529
2530   if (!do_sync)
2531     goto done;
2532
2533   /* adjust for latency */
2534   stime = gst_base_sink_adjust_time (basesink, rstart);
2535
2536   /* adjust for render-delay, avoid underflows */
2537   if (GST_CLOCK_TIME_IS_VALID (stime)) {
2538     if (stime > priv->render_delay)
2539       stime -= priv->render_delay;
2540     else
2541       stime = 0;
2542   }
2543
2544   /* preroll done, we can sync since we are in PLAYING now. */
2545   GST_DEBUG_OBJECT (basesink, "possibly waiting for clock to reach %"
2546       GST_TIME_FORMAT ", adjusted %" GST_TIME_FORMAT,
2547       GST_TIME_ARGS (rstart), GST_TIME_ARGS (stime));
2548
2549   /* This function will return immediatly if start == -1, no clock
2550    * or sync is disabled with GST_CLOCK_BADTIME. */
2551   status = gst_base_sink_wait_clock (basesink, stime, &jitter);
2552
2553   GST_DEBUG_OBJECT (basesink, "clock returned %d, jitter %c%" GST_TIME_FORMAT,
2554       status, (jitter < 0 ? '-' : ' '), GST_TIME_ARGS (ABS (jitter)));
2555
2556   /* invalid time, no clock or sync disabled, just render */
2557   if (status == GST_CLOCK_BADTIME)
2558     goto done;
2559
2560   /* waiting could have been interrupted and we can be flushing now */
2561   if (G_UNLIKELY (basesink->flushing))
2562     goto flushing;
2563
2564   /* check for unlocked by a state change, we are not flushing so
2565    * we can try to preroll on the current buffer. */
2566   if (G_UNLIKELY (status == GST_CLOCK_UNSCHEDULED)) {
2567     GST_DEBUG_OBJECT (basesink, "unscheduled, waiting some more");
2568     priv->call_preroll = TRUE;
2569     goto again;
2570   }
2571
2572   /* successful syncing done, record observation */
2573   priv->current_jitter = jitter;
2574
2575   /* check if the object should be dropped */
2576   *late = gst_base_sink_is_too_late (basesink, obj, rstart, rstop,
2577       status, jitter);
2578
2579 done:
2580   return GST_FLOW_OK;
2581
2582   /* ERRORS */
2583 step_skipped:
2584   {
2585     GST_DEBUG_OBJECT (basesink, "skipped stepped object %p", obj);
2586     *late = TRUE;
2587     return GST_FLOW_OK;
2588   }
2589 not_syncable:
2590   {
2591     GST_DEBUG_OBJECT (basesink, "non syncable object %p", obj);
2592     return GST_FLOW_OK;
2593   }
2594 qos_dropped:
2595   {
2596     GST_DEBUG_OBJECT (basesink, "dropped because of QoS %p", obj);
2597     *late = TRUE;
2598     return GST_FLOW_OK;
2599   }
2600 flushing:
2601   {
2602     GST_DEBUG_OBJECT (basesink, "we are flushing");
2603     return GST_FLOW_WRONG_STATE;
2604   }
2605 preroll_failed:
2606   {
2607     GST_DEBUG_OBJECT (basesink, "preroll failed");
2608     *step_end = FALSE;
2609     return ret;
2610   }
2611 }
2612
2613 static gboolean
2614 gst_base_sink_send_qos (GstBaseSink * basesink, GstQOSType type,
2615     gdouble proportion, GstClockTime time, GstClockTimeDiff diff)
2616 {
2617   GstEvent *event;
2618   gboolean res;
2619
2620   /* generate Quality-of-Service event */
2621   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2622       "qos: type %d, proportion: %lf, diff %" G_GINT64_FORMAT ", timestamp %"
2623       GST_TIME_FORMAT, type, proportion, diff, GST_TIME_ARGS (time));
2624
2625   event = gst_event_new_qos_full (type, proportion, diff, time);
2626
2627   /* send upstream */
2628   res = gst_pad_push_event (basesink->sinkpad, event);
2629
2630   return res;
2631 }
2632
2633 static void
2634 gst_base_sink_perform_qos (GstBaseSink * sink, gboolean dropped)
2635 {
2636   GstBaseSinkPrivate *priv;
2637   GstClockTime start, stop;
2638   GstClockTimeDiff jitter;
2639   GstClockTime pt, entered, left;
2640   GstClockTime duration;
2641   gdouble rate;
2642
2643   priv = sink->priv;
2644
2645   start = priv->current_rstart;
2646
2647   if (priv->current_step.valid)
2648     return;
2649
2650   /* if Quality-of-Service disabled, do nothing */
2651   if (!g_atomic_int_get (&priv->qos_enabled) ||
2652       !GST_CLOCK_TIME_IS_VALID (start))
2653     return;
2654
2655   stop = priv->current_rstop;
2656   jitter = priv->current_jitter;
2657
2658   if (jitter < 0) {
2659     /* this is the time the buffer entered the sink */
2660     if (start < -jitter)
2661       entered = 0;
2662     else
2663       entered = start + jitter;
2664     left = start;
2665   } else {
2666     /* this is the time the buffer entered the sink */
2667     entered = start + jitter;
2668     /* this is the time the buffer left the sink */
2669     left = start + jitter;
2670   }
2671
2672   /* calculate duration of the buffer */
2673   if (GST_CLOCK_TIME_IS_VALID (stop) && stop != start)
2674     duration = stop - start;
2675   else
2676     duration = priv->avg_in_diff;
2677
2678   /* if we have the time when the last buffer left us, calculate
2679    * processing time */
2680   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2681     if (entered > priv->last_left) {
2682       pt = entered - priv->last_left;
2683     } else {
2684       pt = 0;
2685     }
2686   } else {
2687     pt = priv->avg_pt;
2688   }
2689
2690   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "start: %" GST_TIME_FORMAT
2691       ", stop %" GST_TIME_FORMAT ", entered %" GST_TIME_FORMAT ", left %"
2692       GST_TIME_FORMAT ", pt: %" GST_TIME_FORMAT ", duration %" GST_TIME_FORMAT
2693       ",jitter %" G_GINT64_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (stop),
2694       GST_TIME_ARGS (entered), GST_TIME_ARGS (left), GST_TIME_ARGS (pt),
2695       GST_TIME_ARGS (duration), jitter);
2696
2697   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink, "avg_duration: %" GST_TIME_FORMAT
2698       ", avg_pt: %" GST_TIME_FORMAT ", avg_rate: %g",
2699       GST_TIME_ARGS (priv->avg_duration), GST_TIME_ARGS (priv->avg_pt),
2700       priv->avg_rate);
2701
2702   /* collect running averages. for first observations, we copy the
2703    * values */
2704   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_duration))
2705     priv->avg_duration = duration;
2706   else
2707     priv->avg_duration = UPDATE_RUNNING_AVG (priv->avg_duration, duration);
2708
2709   if (!GST_CLOCK_TIME_IS_VALID (priv->avg_pt))
2710     priv->avg_pt = pt;
2711   else
2712     priv->avg_pt = UPDATE_RUNNING_AVG (priv->avg_pt, pt);
2713
2714   if (priv->avg_duration != 0)
2715     rate =
2716         gst_guint64_to_gdouble (priv->avg_pt) /
2717         gst_guint64_to_gdouble (priv->avg_duration);
2718   else
2719     rate = 1.0;
2720
2721   if (GST_CLOCK_TIME_IS_VALID (priv->last_left)) {
2722     if (dropped || priv->avg_rate < 0.0) {
2723       priv->avg_rate = rate;
2724     } else {
2725       if (rate > 1.0)
2726         priv->avg_rate = UPDATE_RUNNING_AVG_N (priv->avg_rate, rate);
2727       else
2728         priv->avg_rate = UPDATE_RUNNING_AVG_P (priv->avg_rate, rate);
2729     }
2730   }
2731
2732   GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, sink,
2733       "updated: avg_duration: %" GST_TIME_FORMAT ", avg_pt: %" GST_TIME_FORMAT
2734       ", avg_rate: %g", GST_TIME_ARGS (priv->avg_duration),
2735       GST_TIME_ARGS (priv->avg_pt), priv->avg_rate);
2736
2737
2738   if (priv->avg_rate >= 0.0) {
2739     GstQOSType type;
2740     GstClockTimeDiff diff;
2741
2742     /* if we have a valid rate, start sending QoS messages */
2743     if (priv->current_jitter < 0) {
2744       /* make sure we never go below 0 when adding the jitter to the
2745        * timestamp. */
2746       if (priv->current_rstart < -priv->current_jitter)
2747         priv->current_jitter = -priv->current_rstart;
2748     }
2749
2750     if (priv->throttle_time > 0) {
2751       diff = priv->throttle_time;
2752       type = GST_QOS_TYPE_THROTTLE;
2753     } else {
2754       diff = priv->current_jitter;
2755       if (diff <= 0)
2756         type = GST_QOS_TYPE_OVERFLOW;
2757       else
2758         type = GST_QOS_TYPE_UNDERFLOW;
2759     }
2760
2761     gst_base_sink_send_qos (sink, type, priv->avg_rate, priv->current_rstart,
2762         diff);
2763   }
2764
2765   /* record when this buffer will leave us */
2766   priv->last_left = left;
2767 }
2768
2769 /* reset all qos measuring */
2770 static void
2771 gst_base_sink_reset_qos (GstBaseSink * sink)
2772 {
2773   GstBaseSinkPrivate *priv;
2774
2775   priv = sink->priv;
2776
2777   priv->last_render_time = GST_CLOCK_TIME_NONE;
2778   priv->prev_rstart = GST_CLOCK_TIME_NONE;
2779   priv->earliest_in_time = GST_CLOCK_TIME_NONE;
2780   priv->last_left = GST_CLOCK_TIME_NONE;
2781   priv->avg_duration = GST_CLOCK_TIME_NONE;
2782   priv->avg_pt = GST_CLOCK_TIME_NONE;
2783   priv->avg_rate = -1.0;
2784   priv->avg_render = GST_CLOCK_TIME_NONE;
2785   priv->avg_in_diff = GST_CLOCK_TIME_NONE;
2786   priv->rendered = 0;
2787   priv->dropped = 0;
2788
2789 }
2790
2791 /* Checks if the object was scheduled too late.
2792  *
2793  * rstart/rstop contain the running_time start and stop values
2794  * of the object.
2795  *
2796  * status and jitter contain the return values from the clock wait.
2797  *
2798  * returns TRUE if the buffer was too late.
2799  */
2800 static gboolean
2801 gst_base_sink_is_too_late (GstBaseSink * basesink, GstMiniObject * obj,
2802     GstClockTime rstart, GstClockTime rstop,
2803     GstClockReturn status, GstClockTimeDiff jitter)
2804 {
2805   gboolean late;
2806   gint64 max_lateness;
2807   GstBaseSinkPrivate *priv;
2808
2809   priv = basesink->priv;
2810
2811   late = FALSE;
2812
2813   /* only for objects that were too late */
2814   if (G_LIKELY (status != GST_CLOCK_EARLY))
2815     goto in_time;
2816
2817   max_lateness = basesink->max_lateness;
2818
2819   /* check if frame dropping is enabled */
2820   if (max_lateness == -1)
2821     goto no_drop;
2822
2823   /* only check for buffers */
2824   if (G_UNLIKELY (!GST_IS_BUFFER (obj)))
2825     goto not_buffer;
2826
2827   /* can't do check if we don't have a timestamp */
2828   if (G_UNLIKELY (!GST_CLOCK_TIME_IS_VALID (rstart)))
2829     goto no_timestamp;
2830
2831   /* we can add a valid stop time */
2832   if (GST_CLOCK_TIME_IS_VALID (rstop))
2833     max_lateness += rstop;
2834   else {
2835     max_lateness += rstart;
2836     /* no stop time, use avg frame diff */
2837     if (priv->avg_in_diff != -1)
2838       max_lateness += priv->avg_in_diff;
2839   }
2840
2841   /* if the jitter bigger than duration and lateness we are too late */
2842   if ((late = rstart + jitter > max_lateness)) {
2843     GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2844         "buffer is too late %" GST_TIME_FORMAT
2845         " > %" GST_TIME_FORMAT, GST_TIME_ARGS (rstart + jitter),
2846         GST_TIME_ARGS (max_lateness));
2847     /* !!emergency!!, if we did not receive anything valid for more than a
2848      * second, render it anyway so the user sees something */
2849     if (GST_CLOCK_TIME_IS_VALID (priv->last_render_time) &&
2850         rstart - priv->last_render_time > GST_SECOND) {
2851       late = FALSE;
2852       GST_ELEMENT_WARNING (basesink, CORE, CLOCK,
2853           (_("A lot of buffers are being dropped.")),
2854           ("There may be a timestamping problem, or this computer is too slow."));
2855       GST_CAT_DEBUG_OBJECT (GST_CAT_PERFORMANCE, basesink,
2856           "**emergency** last buffer at %" GST_TIME_FORMAT " > GST_SECOND",
2857           GST_TIME_ARGS (priv->last_render_time));
2858     }
2859   }
2860
2861 done:
2862   if (!late || !GST_CLOCK_TIME_IS_VALID (priv->last_render_time)) {
2863     priv->last_render_time = rstart;
2864     /* the next allowed input timestamp */
2865     if (priv->throttle_time > 0)
2866       priv->earliest_in_time = rstart + priv->throttle_time;
2867   }
2868   return late;
2869
2870   /* all is fine */
2871 in_time:
2872   {
2873     GST_DEBUG_OBJECT (basesink, "object was scheduled in time");
2874     goto done;
2875   }
2876 no_drop:
2877   {
2878     GST_DEBUG_OBJECT (basesink, "frame dropping disabled");
2879     goto done;
2880   }
2881 not_buffer:
2882   {
2883     GST_DEBUG_OBJECT (basesink, "object is not a buffer");
2884     return FALSE;
2885   }
2886 no_timestamp:
2887   {
2888     GST_DEBUG_OBJECT (basesink, "buffer has no timestamp");
2889     return FALSE;
2890   }
2891 }
2892
2893 /* called before and after calling the render vmethod. It keeps track of how
2894  * much time was spent in the render method and is used to check if we are
2895  * flooded */
2896 static void
2897 gst_base_sink_do_render_stats (GstBaseSink * basesink, gboolean start)
2898 {
2899   GstBaseSinkPrivate *priv;
2900
2901   priv = basesink->priv;
2902
2903   if (start) {
2904     priv->start = gst_util_get_timestamp ();
2905   } else {
2906     GstClockTime elapsed;
2907
2908     priv->stop = gst_util_get_timestamp ();
2909
2910     elapsed = GST_CLOCK_DIFF (priv->start, priv->stop);
2911
2912     if (!GST_CLOCK_TIME_IS_VALID (priv->avg_render))
2913       priv->avg_render = elapsed;
2914     else
2915       priv->avg_render = UPDATE_RUNNING_AVG (priv->avg_render, elapsed);
2916
2917     GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
2918         "avg_render: %" GST_TIME_FORMAT, GST_TIME_ARGS (priv->avg_render));
2919   }
2920 }
2921
2922 /* with STREAM_LOCK, PREROLL_LOCK,
2923  *
2924  * Synchronize the object on the clock and then render it.
2925  *
2926  * takes ownership of obj.
2927  */
2928 static GstFlowReturn
2929 gst_base_sink_render_object (GstBaseSink * basesink, GstPad * pad,
2930     guint8 obj_type, gpointer obj)
2931 {
2932   GstFlowReturn ret;
2933   GstBaseSinkClass *bclass;
2934   gboolean late, step_end;
2935   gpointer sync_obj;
2936   GstBaseSinkPrivate *priv;
2937
2938   priv = basesink->priv;
2939
2940   if (OBJ_IS_BUFFERLIST (obj_type)) {
2941     /*
2942      * If buffer list, use the first group buffer within the list
2943      * for syncing
2944      */
2945     sync_obj = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
2946     g_assert (NULL != sync_obj);
2947   } else {
2948     sync_obj = obj;
2949   }
2950
2951 again:
2952   late = FALSE;
2953   step_end = FALSE;
2954
2955   /* synchronize this object, non syncable objects return OK
2956    * immediatly. */
2957   ret =
2958       gst_base_sink_do_sync (basesink, pad, sync_obj, &late, &step_end,
2959       obj_type);
2960   if (G_UNLIKELY (ret != GST_FLOW_OK))
2961     goto sync_failed;
2962
2963   /* and now render, event or buffer/buffer list. */
2964   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type))) {
2965     /* drop late buffers unconditionally, let's hope it's unlikely */
2966     if (G_UNLIKELY (late))
2967       goto dropped;
2968
2969     bclass = GST_BASE_SINK_GET_CLASS (basesink);
2970
2971     if (G_LIKELY ((OBJ_IS_BUFFERLIST (obj_type) && bclass->render_list) ||
2972             (!OBJ_IS_BUFFERLIST (obj_type) && bclass->render))) {
2973       gint do_qos;
2974
2975       /* read once, to get same value before and after */
2976       do_qos = g_atomic_int_get (&priv->qos_enabled);
2977
2978       GST_DEBUG_OBJECT (basesink, "rendering object %p", obj);
2979
2980       /* record rendering time for QoS and stats */
2981       if (do_qos)
2982         gst_base_sink_do_render_stats (basesink, TRUE);
2983
2984       if (!OBJ_IS_BUFFERLIST (obj_type)) {
2985         GstBuffer *buf;
2986
2987         /* For buffer lists do not set last buffer. Creating buffer
2988          * with meaningful data can be done only with memcpy which will
2989          * significantly affect performance */
2990         buf = GST_BUFFER_CAST (obj);
2991         gst_base_sink_set_last_buffer (basesink, buf);
2992
2993         ret = bclass->render (basesink, buf);
2994       } else {
2995         GstBufferList *buflist;
2996
2997         buflist = GST_BUFFER_LIST_CAST (obj);
2998
2999         ret = bclass->render_list (basesink, buflist);
3000       }
3001
3002       if (do_qos)
3003         gst_base_sink_do_render_stats (basesink, FALSE);
3004
3005       if (ret == GST_FLOW_STEP)
3006         goto again;
3007
3008       if (G_UNLIKELY (basesink->flushing))
3009         goto flushing;
3010
3011       priv->rendered++;
3012     }
3013   } else if (G_LIKELY (OBJ_IS_EVENT (obj_type))) {
3014     GstEvent *event = GST_EVENT_CAST (obj);
3015     gboolean event_res = TRUE;
3016     GstEventType type;
3017
3018     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3019
3020     type = GST_EVENT_TYPE (event);
3021
3022     GST_DEBUG_OBJECT (basesink, "rendering event %p, type %s", obj,
3023         gst_event_type_get_name (type));
3024
3025     if (bclass->event)
3026       event_res = bclass->event (basesink, event);
3027
3028     /* when we get here we could be flushing again when the event handler calls
3029      * _wait_eos(). We have to ignore this object in that case. */
3030     if (G_UNLIKELY (basesink->flushing))
3031       goto flushing;
3032
3033     if (G_LIKELY (event_res)) {
3034       guint32 seqnum;
3035
3036       seqnum = basesink->priv->seqnum = gst_event_get_seqnum (event);
3037       GST_DEBUG_OBJECT (basesink, "Got seqnum #%" G_GUINT32_FORMAT, seqnum);
3038
3039       switch (type) {
3040         case GST_EVENT_EOS:
3041         {
3042           GstMessage *message;
3043
3044           /* the EOS event is completely handled so we mark
3045            * ourselves as being in the EOS state. eos is also
3046            * protected by the object lock so we can read it when
3047            * answering the POSITION query. */
3048           GST_OBJECT_LOCK (basesink);
3049           basesink->eos = TRUE;
3050           GST_OBJECT_UNLOCK (basesink);
3051
3052           /* ok, now we can post the message */
3053           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
3054
3055           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
3056           gst_message_set_seqnum (message, seqnum);
3057           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
3058           break;
3059         }
3060         case GST_EVENT_NEWSEGMENT:
3061           /* configure the segment */
3062           gst_base_sink_configure_segment (basesink, pad, event,
3063               &basesink->segment);
3064           break;
3065         case GST_EVENT_SINK_MESSAGE:{
3066           GstMessage *msg = NULL;
3067
3068           gst_event_parse_sink_message (event, &msg);
3069
3070           if (msg)
3071             gst_element_post_message (GST_ELEMENT_CAST (basesink), msg);
3072         }
3073         default:
3074           break;
3075       }
3076     }
3077   } else {
3078     g_return_val_if_reached (GST_FLOW_ERROR);
3079   }
3080
3081 done:
3082   if (step_end) {
3083     /* the step ended, check if we need to activate a new step */
3084     GST_DEBUG_OBJECT (basesink, "step ended");
3085     stop_stepping (basesink, &basesink->segment, &priv->current_step,
3086         priv->current_rstart, priv->current_rstop, basesink->eos);
3087     goto again;
3088   }
3089
3090   gst_base_sink_perform_qos (basesink, late);
3091
3092   GST_DEBUG_OBJECT (basesink, "object unref after render %p", obj);
3093   gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3094   return ret;
3095
3096   /* ERRORS */
3097 sync_failed:
3098   {
3099     GST_DEBUG_OBJECT (basesink, "do_sync returned %s", gst_flow_get_name (ret));
3100     goto done;
3101   }
3102 dropped:
3103   {
3104     priv->dropped++;
3105     GST_DEBUG_OBJECT (basesink, "buffer late, dropping");
3106
3107     if (g_atomic_int_get (&priv->qos_enabled)) {
3108       GstMessage *qos_msg;
3109       GstClockTime timestamp, duration;
3110
3111       timestamp = GST_BUFFER_TIMESTAMP (GST_BUFFER_CAST (sync_obj));
3112       duration = GST_BUFFER_DURATION (GST_BUFFER_CAST (sync_obj));
3113
3114       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3115           "qos: dropped buffer rt %" GST_TIME_FORMAT ", st %" GST_TIME_FORMAT
3116           ", ts %" GST_TIME_FORMAT ", dur %" GST_TIME_FORMAT,
3117           GST_TIME_ARGS (priv->current_rstart),
3118           GST_TIME_ARGS (priv->current_sstart), GST_TIME_ARGS (timestamp),
3119           GST_TIME_ARGS (duration));
3120       GST_CAT_DEBUG_OBJECT (GST_CAT_QOS, basesink,
3121           "qos: rendered %" G_GUINT64_FORMAT ", dropped %" G_GUINT64_FORMAT,
3122           priv->rendered, priv->dropped);
3123
3124       qos_msg =
3125           gst_message_new_qos (GST_OBJECT_CAST (basesink), basesink->sync,
3126           priv->current_rstart, priv->current_sstart, timestamp, duration);
3127       gst_message_set_qos_values (qos_msg, priv->current_jitter, priv->avg_rate,
3128           1000000);
3129       gst_message_set_qos_stats (qos_msg, GST_FORMAT_BUFFERS, priv->rendered,
3130           priv->dropped);
3131       gst_element_post_message (GST_ELEMENT_CAST (basesink), qos_msg);
3132     }
3133     goto done;
3134   }
3135 flushing:
3136   {
3137     GST_DEBUG_OBJECT (basesink, "we are flushing, ignore object");
3138     gst_mini_object_unref (obj);
3139     return GST_FLOW_WRONG_STATE;
3140   }
3141 }
3142
3143 /* with STREAM_LOCK, PREROLL_LOCK
3144  *
3145  * Perform preroll on the given object. For buffers this means
3146  * calling the preroll subclass method.
3147  * If that succeeds, the state will be commited.
3148  *
3149  * function does not take ownership of obj.
3150  */
3151 static GstFlowReturn
3152 gst_base_sink_preroll_object (GstBaseSink * basesink, guint8 obj_type,
3153     GstMiniObject * obj)
3154 {
3155   GstFlowReturn ret;
3156
3157   GST_DEBUG_OBJECT (basesink, "prerolling object %p", obj);
3158
3159   /* if it's a buffer, we need to call the preroll method */
3160   if (G_LIKELY (OBJ_IS_BUFFERFULL (obj_type) && basesink->priv->call_preroll)) {
3161     GstBaseSinkClass *bclass;
3162     GstBuffer *buf;
3163     GstClockTime timestamp;
3164
3165     if (OBJ_IS_BUFFERLIST (obj_type)) {
3166       buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
3167       g_assert (NULL != buf);
3168     } else {
3169       buf = GST_BUFFER_CAST (obj);
3170     }
3171
3172     timestamp = GST_BUFFER_TIMESTAMP (buf);
3173
3174     GST_DEBUG_OBJECT (basesink, "preroll buffer %" GST_TIME_FORMAT,
3175         GST_TIME_ARGS (timestamp));
3176
3177     /*
3178      * For buffer lists do not set last buffer. Creating buffer
3179      * with meaningful data can be done only with memcpy which will
3180      * significantly affect performance
3181      */
3182     if (!OBJ_IS_BUFFERLIST (obj_type)) {
3183       gst_base_sink_set_last_buffer (basesink, buf);
3184     }
3185
3186     bclass = GST_BASE_SINK_GET_CLASS (basesink);
3187     if (bclass->preroll)
3188       if ((ret = bclass->preroll (basesink, buf)) != GST_FLOW_OK)
3189         goto preroll_failed;
3190
3191     basesink->priv->call_preroll = FALSE;
3192   }
3193
3194   /* commit state */
3195   if (G_LIKELY (basesink->playing_async)) {
3196     if (G_UNLIKELY (!gst_base_sink_commit_state (basesink)))
3197       goto stopping;
3198   }
3199
3200   return GST_FLOW_OK;
3201
3202   /* ERRORS */
3203 preroll_failed:
3204   {
3205     GST_DEBUG_OBJECT (basesink, "preroll failed, abort state");
3206     gst_element_abort_state (GST_ELEMENT_CAST (basesink));
3207     return ret;
3208   }
3209 stopping:
3210   {
3211     GST_DEBUG_OBJECT (basesink, "stopping while commiting state");
3212     return GST_FLOW_WRONG_STATE;
3213   }
3214 }
3215
3216 /* with STREAM_LOCK, PREROLL_LOCK
3217  *
3218  * Queue an object for rendering.
3219  * The first prerollable object queued will complete the preroll. If the
3220  * preroll queue if filled, we render all the objects in the queue.
3221  *
3222  * This function takes ownership of the object.
3223  */
3224 static GstFlowReturn
3225 gst_base_sink_queue_object_unlocked (GstBaseSink * basesink, GstPad * pad,
3226     guint8 obj_type, gpointer obj, gboolean prerollable)
3227 {
3228   GstFlowReturn ret = GST_FLOW_OK;
3229   gint length;
3230   GQueue *q;
3231
3232   if (G_UNLIKELY (basesink->need_preroll)) {
3233     if (G_LIKELY (prerollable))
3234       basesink->preroll_queued++;
3235
3236     length = basesink->preroll_queued;
3237
3238     GST_DEBUG_OBJECT (basesink, "now %d prerolled items", length);
3239
3240     /* first prerollable item needs to finish the preroll */
3241     if (length == 1) {
3242       ret = gst_base_sink_preroll_object (basesink, obj_type, obj);
3243       if (G_UNLIKELY (ret != GST_FLOW_OK))
3244         goto preroll_failed;
3245     }
3246     /* need to recheck if we need preroll, commmit state during preroll
3247      * could have made us not need more preroll. */
3248     if (G_UNLIKELY (basesink->need_preroll)) {
3249       /* see if we can render now, if we can't add the object to the preroll
3250        * queue. */
3251       if (G_UNLIKELY (length <= basesink->preroll_queue_max_len))
3252         goto more_preroll;
3253     }
3254   }
3255   /* we can start rendering (or blocking) the queued object
3256    * if any. */
3257   q = basesink->preroll_queue;
3258   while (G_UNLIKELY (!g_queue_is_empty (q))) {
3259     GstMiniObject *o;
3260     guint8 ot;
3261
3262     o = g_queue_pop_head (q);
3263     GST_DEBUG_OBJECT (basesink, "rendering queued object %p", o);
3264
3265     ot = get_object_type (o);
3266
3267     /* do something with the return value */
3268     ret = gst_base_sink_render_object (basesink, pad, ot, o);
3269     if (ret != GST_FLOW_OK)
3270       goto dequeue_failed;
3271   }
3272
3273   /* now render the object */
3274   ret = gst_base_sink_render_object (basesink, pad, obj_type, obj);
3275   basesink->preroll_queued = 0;
3276
3277   return ret;
3278
3279   /* special cases */
3280 preroll_failed:
3281   {
3282     GST_DEBUG_OBJECT (basesink, "preroll failed, reason %s",
3283         gst_flow_get_name (ret));
3284     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3285     return ret;
3286   }
3287 more_preroll:
3288   {
3289     /* add object to the queue and return */
3290     GST_DEBUG_OBJECT (basesink, "need more preroll data %d <= %d",
3291         length, basesink->preroll_queue_max_len);
3292     g_queue_push_tail (basesink->preroll_queue, obj);
3293     return GST_FLOW_OK;
3294   }
3295 dequeue_failed:
3296   {
3297     GST_DEBUG_OBJECT (basesink, "rendering queued objects failed, reason %s",
3298         gst_flow_get_name (ret));
3299     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3300     return ret;
3301   }
3302 }
3303
3304 /* with STREAM_LOCK
3305  *
3306  * This function grabs the PREROLL_LOCK and adds the object to
3307  * the queue.
3308  *
3309  * This function takes ownership of obj.
3310  *
3311  * Note: Only GstEvent seem to be passed to this private method
3312  */
3313 static GstFlowReturn
3314 gst_base_sink_queue_object (GstBaseSink * basesink, GstPad * pad,
3315     GstMiniObject * obj, gboolean prerollable)
3316 {
3317   GstFlowReturn ret;
3318
3319   GST_BASE_SINK_PREROLL_LOCK (basesink);
3320   if (G_UNLIKELY (basesink->flushing))
3321     goto flushing;
3322
3323   if (G_UNLIKELY (basesink->priv->received_eos))
3324     goto was_eos;
3325
3326   ret =
3327       gst_base_sink_queue_object_unlocked (basesink, pad, _PR_IS_EVENT, obj,
3328       prerollable);
3329   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3330
3331   return ret;
3332
3333   /* ERRORS */
3334 flushing:
3335   {
3336     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3337     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3338     gst_mini_object_unref (obj);
3339     return GST_FLOW_WRONG_STATE;
3340   }
3341 was_eos:
3342   {
3343     GST_DEBUG_OBJECT (basesink,
3344         "we are EOS, dropping object, return UNEXPECTED");
3345     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3346     gst_mini_object_unref (obj);
3347     return GST_FLOW_UNEXPECTED;
3348   }
3349 }
3350
3351 static void
3352 gst_base_sink_flush_start (GstBaseSink * basesink, GstPad * pad)
3353 {
3354   /* make sure we are not blocked on the clock also clear any pending
3355    * eos state. */
3356   gst_base_sink_set_flushing (basesink, pad, TRUE);
3357
3358   /* we grab the stream lock but that is not needed since setting the
3359    * sink to flushing would make sure no state commit is being done
3360    * anymore */
3361   GST_PAD_STREAM_LOCK (pad);
3362   gst_base_sink_reset_qos (basesink);
3363   /* and we need to commit our state again on the next
3364    * prerolled buffer */
3365   basesink->playing_async = TRUE;
3366   if (basesink->priv->async_enabled) {
3367     gst_element_lost_state (GST_ELEMENT_CAST (basesink));
3368   } else {
3369     basesink->priv->have_latency = TRUE;
3370   }
3371   gst_base_sink_set_last_buffer (basesink, NULL);
3372   GST_PAD_STREAM_UNLOCK (pad);
3373 }
3374
3375 static void
3376 gst_base_sink_flush_stop (GstBaseSink * basesink, GstPad * pad)
3377 {
3378   /* unset flushing so we can accept new data, this also flushes out any EOS
3379    * event. */
3380   gst_base_sink_set_flushing (basesink, pad, FALSE);
3381
3382   /* for position reporting */
3383   GST_OBJECT_LOCK (basesink);
3384   basesink->priv->current_sstart = GST_CLOCK_TIME_NONE;
3385   basesink->priv->current_sstop = GST_CLOCK_TIME_NONE;
3386   basesink->priv->eos_rtime = GST_CLOCK_TIME_NONE;
3387   basesink->priv->call_preroll = TRUE;
3388   basesink->priv->current_step.valid = FALSE;
3389   basesink->priv->pending_step.valid = FALSE;
3390   if (basesink->pad_mode == GST_ACTIVATE_PUSH) {
3391     /* we need new segment info after the flush. */
3392     basesink->have_newsegment = FALSE;
3393     gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
3394     gst_segment_init (basesink->clip_segment, GST_FORMAT_UNDEFINED);
3395   }
3396   GST_OBJECT_UNLOCK (basesink);
3397 }
3398
3399 static gboolean
3400 gst_base_sink_event (GstPad * pad, GstEvent * event)
3401 {
3402   GstBaseSink *basesink;
3403   gboolean result = TRUE;
3404   GstBaseSinkClass *bclass;
3405
3406   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
3407   if (G_UNLIKELY (basesink == NULL)) {
3408     gst_event_unref (event);
3409     return FALSE;
3410   }
3411
3412   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3413
3414   GST_DEBUG_OBJECT (basesink, "received event %p %" GST_PTR_FORMAT, event,
3415       event);
3416
3417   switch (GST_EVENT_TYPE (event)) {
3418     case GST_EVENT_EOS:
3419     {
3420       GstFlowReturn ret;
3421
3422       GST_BASE_SINK_PREROLL_LOCK (basesink);
3423       if (G_UNLIKELY (basesink->flushing))
3424         goto flushing;
3425
3426       if (G_UNLIKELY (basesink->priv->received_eos)) {
3427         /* we can't accept anything when we are EOS */
3428         result = FALSE;
3429         gst_event_unref (event);
3430       } else {
3431         /* we set the received EOS flag here so that we can use it when testing if
3432          * we are prerolled and to refuse more buffers. */
3433         basesink->priv->received_eos = TRUE;
3434
3435         /* EOS is a prerollable object, we call the unlocked version because it
3436          * does not check the received_eos flag. */
3437         ret = gst_base_sink_queue_object_unlocked (basesink, pad,
3438             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), TRUE);
3439         if (G_UNLIKELY (ret != GST_FLOW_OK))
3440           result = FALSE;
3441       }
3442       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3443       break;
3444     }
3445     case GST_EVENT_NEWSEGMENT:
3446     {
3447       GstFlowReturn ret;
3448       gboolean update;
3449
3450       GST_DEBUG_OBJECT (basesink, "newsegment %p", event);
3451
3452       GST_BASE_SINK_PREROLL_LOCK (basesink);
3453       if (G_UNLIKELY (basesink->flushing))
3454         goto flushing;
3455
3456       gst_event_parse_new_segment_full (event, &update, NULL, NULL, NULL, NULL,
3457           NULL, NULL);
3458
3459       if (G_UNLIKELY (basesink->priv->received_eos && !update)) {
3460         /* we can't accept anything when we are EOS */
3461         result = FALSE;
3462         gst_event_unref (event);
3463       } else {
3464         /* the new segment is a non prerollable item and does not block anything,
3465          * we need to configure the current clipping segment and insert the event
3466          * in the queue to serialize it with the buffers for rendering. */
3467         gst_base_sink_configure_segment (basesink, pad, event,
3468             basesink->clip_segment);
3469
3470         ret =
3471             gst_base_sink_queue_object_unlocked (basesink, pad,
3472             _PR_IS_EVENT, GST_MINI_OBJECT_CAST (event), FALSE);
3473         if (G_UNLIKELY (ret != GST_FLOW_OK))
3474           result = FALSE;
3475         else {
3476           GST_OBJECT_LOCK (basesink);
3477           basesink->have_newsegment = TRUE;
3478           GST_OBJECT_UNLOCK (basesink);
3479         }
3480       }
3481       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3482       break;
3483     }
3484     case GST_EVENT_FLUSH_START:
3485       if (bclass->event)
3486         bclass->event (basesink, event);
3487
3488       GST_DEBUG_OBJECT (basesink, "flush-start %p", event);
3489
3490       gst_base_sink_flush_start (basesink, pad);
3491
3492       gst_event_unref (event);
3493       break;
3494     case GST_EVENT_FLUSH_STOP:
3495       if (bclass->event)
3496         bclass->event (basesink, event);
3497
3498       GST_DEBUG_OBJECT (basesink, "flush-stop %p", event);
3499
3500       gst_base_sink_flush_stop (basesink, pad);
3501
3502       gst_event_unref (event);
3503       break;
3504     default:
3505       /* other events are sent to queue or subclass depending on if they
3506        * are serialized. */
3507       if (GST_EVENT_IS_SERIALIZED (event)) {
3508         gst_base_sink_queue_object (basesink, pad,
3509             GST_MINI_OBJECT_CAST (event), FALSE);
3510       } else {
3511         if (bclass->event)
3512           bclass->event (basesink, event);
3513         gst_event_unref (event);
3514       }
3515       break;
3516   }
3517 done:
3518   gst_object_unref (basesink);
3519
3520   return result;
3521
3522   /* ERRORS */
3523 flushing:
3524   {
3525     GST_DEBUG_OBJECT (basesink, "we are flushing");
3526     GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3527     result = FALSE;
3528     gst_event_unref (event);
3529     goto done;
3530   }
3531 }
3532
3533 /* default implementation to calculate the start and end
3534  * timestamps on a buffer, subclasses can override
3535  */
3536 static void
3537 gst_base_sink_get_times (GstBaseSink * basesink, GstBuffer * buffer,
3538     GstClockTime * start, GstClockTime * end)
3539 {
3540   GstClockTime timestamp, duration;
3541
3542   timestamp = GST_BUFFER_TIMESTAMP (buffer);
3543   if (GST_CLOCK_TIME_IS_VALID (timestamp)) {
3544
3545     /* get duration to calculate end time */
3546     duration = GST_BUFFER_DURATION (buffer);
3547     if (GST_CLOCK_TIME_IS_VALID (duration)) {
3548       *end = timestamp + duration;
3549     }
3550     *start = timestamp;
3551   }
3552 }
3553
3554 /* must be called with PREROLL_LOCK */
3555 static gboolean
3556 gst_base_sink_needs_preroll (GstBaseSink * basesink)
3557 {
3558   gboolean is_prerolled, res;
3559
3560   /* we have 2 cases where the PREROLL_LOCK is released:
3561    *  1) we are blocking in the PREROLL_LOCK and thus are prerolled.
3562    *  2) we are syncing on the clock
3563    */
3564   is_prerolled = basesink->have_preroll || basesink->priv->received_eos;
3565   res = !is_prerolled;
3566
3567   GST_DEBUG_OBJECT (basesink, "have_preroll: %d, EOS: %d => needs preroll: %d",
3568       basesink->have_preroll, basesink->priv->received_eos, res);
3569
3570   return res;
3571 }
3572
3573 /* with STREAM_LOCK, PREROLL_LOCK
3574  *
3575  * Takes a buffer and compare the timestamps with the last segment.
3576  * If the buffer falls outside of the segment boundaries, drop it.
3577  * Else queue the buffer for preroll and rendering.
3578  *
3579  * This function takes ownership of the buffer.
3580  */
3581 static GstFlowReturn
3582 gst_base_sink_chain_unlocked (GstBaseSink * basesink, GstPad * pad,
3583     guint8 obj_type, gpointer obj)
3584 {
3585   GstBaseSinkClass *bclass;
3586   GstFlowReturn result;
3587   GstClockTime start = GST_CLOCK_TIME_NONE, end = GST_CLOCK_TIME_NONE;
3588   GstSegment *clip_segment;
3589   GstBuffer *time_buf;
3590
3591   if (G_UNLIKELY (basesink->flushing))
3592     goto flushing;
3593
3594   if (G_UNLIKELY (basesink->priv->received_eos))
3595     goto was_eos;
3596
3597   if (OBJ_IS_BUFFERLIST (obj_type)) {
3598     time_buf = gst_buffer_list_get (GST_BUFFER_LIST_CAST (obj), 0);
3599     g_assert (NULL != time_buf);
3600   } else {
3601     time_buf = GST_BUFFER_CAST (obj);
3602   }
3603
3604   /* for code clarity */
3605   clip_segment = basesink->clip_segment;
3606
3607   if (G_UNLIKELY (!basesink->have_newsegment)) {
3608     gboolean sync;
3609
3610     sync = gst_base_sink_get_sync (basesink);
3611     if (sync) {
3612       GST_ELEMENT_WARNING (basesink, STREAM, FAILED,
3613           (_("Internal data flow problem.")),
3614           ("Received buffer without a new-segment. Assuming timestamps start from 0."));
3615     }
3616
3617     /* this means this sink will assume timestamps start from 0 */
3618     GST_OBJECT_LOCK (basesink);
3619     clip_segment->start = 0;
3620     clip_segment->stop = -1;
3621     basesink->segment.start = 0;
3622     basesink->segment.stop = -1;
3623     basesink->have_newsegment = TRUE;
3624     GST_OBJECT_UNLOCK (basesink);
3625   }
3626
3627   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3628
3629   /* check if the buffer needs to be dropped, we first ask the subclass for the
3630    * start and end */
3631   if (bclass->get_times)
3632     bclass->get_times (basesink, time_buf, &start, &end);
3633
3634   if (!GST_CLOCK_TIME_IS_VALID (start)) {
3635     /* if the subclass does not want sync, we use our own values so that we at
3636      * least clip the buffer to the segment */
3637     gst_base_sink_get_times (basesink, time_buf, &start, &end);
3638   }
3639
3640   GST_DEBUG_OBJECT (basesink, "got times start: %" GST_TIME_FORMAT
3641       ", end: %" GST_TIME_FORMAT, GST_TIME_ARGS (start), GST_TIME_ARGS (end));
3642
3643   /* a dropped buffer does not participate in anything */
3644   if (GST_CLOCK_TIME_IS_VALID (start) &&
3645       (clip_segment->format == GST_FORMAT_TIME)) {
3646     if (G_UNLIKELY (!gst_segment_clip (clip_segment,
3647                 GST_FORMAT_TIME, (gint64) start, (gint64) end, NULL, NULL)))
3648       goto out_of_segment;
3649   }
3650
3651   /* now we can process the buffer in the queue, this function takes ownership
3652    * of the buffer */
3653   result = gst_base_sink_queue_object_unlocked (basesink, pad,
3654       obj_type, obj, TRUE);
3655   return result;
3656
3657   /* ERRORS */
3658 flushing:
3659   {
3660     GST_DEBUG_OBJECT (basesink, "sink is flushing");
3661     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3662     return GST_FLOW_WRONG_STATE;
3663   }
3664 was_eos:
3665   {
3666     GST_DEBUG_OBJECT (basesink,
3667         "we are EOS, dropping object, return UNEXPECTED");
3668     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3669     return GST_FLOW_UNEXPECTED;
3670   }
3671 out_of_segment:
3672   {
3673     GST_DEBUG_OBJECT (basesink, "dropping buffer, out of clipping segment");
3674     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3675     return GST_FLOW_OK;
3676   }
3677 }
3678
3679 /* with STREAM_LOCK
3680  */
3681 static GstFlowReturn
3682 gst_base_sink_chain_main (GstBaseSink * basesink, GstPad * pad,
3683     guint8 obj_type, gpointer obj)
3684 {
3685   GstFlowReturn result;
3686
3687   if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH))
3688     goto wrong_mode;
3689
3690   GST_BASE_SINK_PREROLL_LOCK (basesink);
3691   result = gst_base_sink_chain_unlocked (basesink, pad, obj_type, obj);
3692   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
3693
3694 done:
3695   return result;
3696
3697   /* ERRORS */
3698 wrong_mode:
3699   {
3700     GST_OBJECT_LOCK (pad);
3701     GST_WARNING_OBJECT (basesink,
3702         "Push on pad %s:%s, but it was not activated in push mode",
3703         GST_DEBUG_PAD_NAME (pad));
3704     GST_OBJECT_UNLOCK (pad);
3705     gst_mini_object_unref (GST_MINI_OBJECT_CAST (obj));
3706     /* we don't post an error message this will signal to the peer
3707      * pushing that EOS is reached. */
3708     result = GST_FLOW_UNEXPECTED;
3709     goto done;
3710   }
3711 }
3712
3713 static GstFlowReturn
3714 gst_base_sink_chain (GstPad * pad, GstBuffer * buf)
3715 {
3716   GstBaseSink *basesink;
3717
3718   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3719
3720   return gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER, buf);
3721 }
3722
3723 static GstFlowReturn
3724 gst_base_sink_chain_list (GstPad * pad, GstBufferList * list)
3725 {
3726   GstBaseSink *basesink;
3727   GstBaseSinkClass *bclass;
3728   GstFlowReturn result;
3729
3730   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
3731   bclass = GST_BASE_SINK_GET_CLASS (basesink);
3732
3733   if (G_LIKELY (bclass->render_list)) {
3734     result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFERLIST, list);
3735   } else {
3736     guint i, len;
3737     GstBuffer *buffer;
3738
3739     GST_INFO_OBJECT (pad, "chaining each group in list as a merged buffer");
3740
3741     len = gst_buffer_list_len (list);
3742
3743     result = GST_FLOW_OK;
3744     for (i = 0; i < len; i++) {
3745       buffer = gst_buffer_list_get (list, 0);
3746       result = gst_base_sink_chain_main (basesink, pad, _PR_IS_BUFFER,
3747           gst_buffer_ref (buffer));
3748       if (result != GST_FLOW_OK)
3749         break;
3750     }
3751     gst_buffer_list_unref (list);
3752   }
3753   return result;
3754 }
3755
3756
3757 static gboolean
3758 gst_base_sink_default_do_seek (GstBaseSink * sink, GstSegment * segment)
3759 {
3760   gboolean res = TRUE;
3761
3762   /* update our offset if the start/stop position was updated */
3763   if (segment->format == GST_FORMAT_BYTES) {
3764     segment->time = segment->start;
3765   } else if (segment->start == 0) {
3766     /* seek to start, we can implement a default for this. */
3767     segment->time = 0;
3768   } else {
3769     res = FALSE;
3770     GST_INFO_OBJECT (sink, "Can't do a default seek");
3771   }
3772
3773   return res;
3774 }
3775
3776 #define SEEK_TYPE_IS_RELATIVE(t) (((t) != GST_SEEK_TYPE_NONE) && ((t) != GST_SEEK_TYPE_SET))
3777
3778 static gboolean
3779 gst_base_sink_default_prepare_seek_segment (GstBaseSink * sink,
3780     GstEvent * event, GstSegment * segment)
3781 {
3782   /* By default, we try one of 2 things:
3783    *   - For absolute seek positions, convert the requested position to our
3784    *     configured processing format and place it in the output segment \
3785    *   - For relative seek positions, convert our current (input) values to the
3786    *     seek format, adjust by the relative seek offset and then convert back to
3787    *     the processing format
3788    */
3789   GstSeekType cur_type, stop_type;
3790   gint64 cur, stop;
3791   GstSeekFlags flags;
3792   GstFormat seek_format, dest_format;
3793   gdouble rate;
3794   gboolean update;
3795   gboolean res = TRUE;
3796
3797   gst_event_parse_seek (event, &rate, &seek_format, &flags,
3798       &cur_type, &cur, &stop_type, &stop);
3799   dest_format = segment->format;
3800
3801   if (seek_format == dest_format) {
3802     gst_segment_set_seek (segment, rate, seek_format, flags,
3803         cur_type, cur, stop_type, stop, &update);
3804     return TRUE;
3805   }
3806
3807   if (cur_type != GST_SEEK_TYPE_NONE) {
3808     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3809     res =
3810         gst_pad_query_convert (sink->sinkpad, seek_format, cur, &dest_format,
3811         &cur);
3812     cur_type = GST_SEEK_TYPE_SET;
3813   }
3814
3815   if (res && stop_type != GST_SEEK_TYPE_NONE) {
3816     /* FIXME: Handle seek_cur & seek_end by converting the input segment vals */
3817     res =
3818         gst_pad_query_convert (sink->sinkpad, seek_format, stop, &dest_format,
3819         &stop);
3820     stop_type = GST_SEEK_TYPE_SET;
3821   }
3822
3823   /* And finally, configure our output segment in the desired format */
3824   gst_segment_set_seek (segment, rate, dest_format, flags, cur_type, cur,
3825       stop_type, stop, &update);
3826
3827   if (!res)
3828     goto no_format;
3829
3830   return res;
3831
3832 no_format:
3833   {
3834     GST_DEBUG_OBJECT (sink, "undefined format given, seek aborted.");
3835     return FALSE;
3836   }
3837 }
3838
3839 /* perform a seek, only executed in pull mode */
3840 static gboolean
3841 gst_base_sink_perform_seek (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3842 {
3843   gboolean flush;
3844   gdouble rate;
3845   GstFormat seek_format, dest_format;
3846   GstSeekFlags flags;
3847   GstSeekType cur_type, stop_type;
3848   gboolean seekseg_configured = FALSE;
3849   gint64 cur, stop;
3850   gboolean update, res = TRUE;
3851   GstSegment seeksegment;
3852
3853   dest_format = sink->segment.format;
3854
3855   if (event) {
3856     GST_DEBUG_OBJECT (sink, "performing seek with event %p", event);
3857     gst_event_parse_seek (event, &rate, &seek_format, &flags,
3858         &cur_type, &cur, &stop_type, &stop);
3859
3860     flush = flags & GST_SEEK_FLAG_FLUSH;
3861   } else {
3862     GST_DEBUG_OBJECT (sink, "performing seek without event");
3863     flush = FALSE;
3864   }
3865
3866   if (flush) {
3867     GST_DEBUG_OBJECT (sink, "flushing upstream");
3868     gst_pad_push_event (pad, gst_event_new_flush_start ());
3869     gst_base_sink_flush_start (sink, pad);
3870   } else {
3871     GST_DEBUG_OBJECT (sink, "pausing pulling thread");
3872   }
3873
3874   GST_PAD_STREAM_LOCK (pad);
3875
3876   /* If we configured the seeksegment above, don't overwrite it now. Otherwise
3877    * copy the current segment info into the temp segment that we can actually
3878    * attempt the seek with. We only update the real segment if the seek suceeds. */
3879   if (!seekseg_configured) {
3880     memcpy (&seeksegment, &sink->segment, sizeof (GstSegment));
3881
3882     /* now configure the final seek segment */
3883     if (event) {
3884       if (sink->segment.format != seek_format) {
3885         /* OK, here's where we give the subclass a chance to convert the relative
3886          * seek into an absolute one in the processing format. We set up any
3887          * absolute seek above, before taking the stream lock. */
3888         if (!gst_base_sink_default_prepare_seek_segment (sink, event,
3889                 &seeksegment)) {
3890           GST_DEBUG_OBJECT (sink,
3891               "Preparing the seek failed after flushing. " "Aborting seek");
3892           res = FALSE;
3893         }
3894       } else {
3895         /* The seek format matches our processing format, no need to ask the
3896          * the subclass to configure the segment. */
3897         gst_segment_set_seek (&seeksegment, rate, seek_format, flags,
3898             cur_type, cur, stop_type, stop, &update);
3899       }
3900     }
3901     /* Else, no seek event passed, so we're just (re)starting the
3902        current segment. */
3903   }
3904
3905   if (res) {
3906     GST_DEBUG_OBJECT (sink, "segment configured from %" G_GINT64_FORMAT
3907         " to %" G_GINT64_FORMAT ", position %" G_GINT64_FORMAT,
3908         seeksegment.start, seeksegment.stop, seeksegment.last_stop);
3909
3910     /* do the seek, segment.last_stop contains the new position. */
3911     res = gst_base_sink_default_do_seek (sink, &seeksegment);
3912   }
3913
3914
3915   if (flush) {
3916     GST_DEBUG_OBJECT (sink, "stop flushing upstream");
3917     gst_pad_push_event (pad, gst_event_new_flush_stop ());
3918     gst_base_sink_flush_stop (sink, pad);
3919   } else if (res && sink->running) {
3920     /* we are running the current segment and doing a non-flushing seek,
3921      * close the segment first based on the last_stop. */
3922     GST_DEBUG_OBJECT (sink, "closing running segment %" G_GINT64_FORMAT
3923         " to %" G_GINT64_FORMAT, sink->segment.start, sink->segment.last_stop);
3924   }
3925
3926   /* The subclass must have converted the segment to the processing format
3927    * by now */
3928   if (res && seeksegment.format != dest_format) {
3929     GST_DEBUG_OBJECT (sink, "Subclass failed to prepare a seek segment "
3930         "in the correct format. Aborting seek.");
3931     res = FALSE;
3932   }
3933
3934   /* if successfull seek, we update our real segment and push
3935    * out the new segment. */
3936   if (res) {
3937     memcpy (&sink->segment, &seeksegment, sizeof (GstSegment));
3938
3939     if (sink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
3940       gst_element_post_message (GST_ELEMENT (sink),
3941           gst_message_new_segment_start (GST_OBJECT (sink),
3942               sink->segment.format, sink->segment.last_stop));
3943     }
3944   }
3945
3946   sink->priv->discont = TRUE;
3947   sink->running = TRUE;
3948
3949   GST_PAD_STREAM_UNLOCK (pad);
3950
3951   return res;
3952 }
3953
3954 static void
3955 set_step_info (GstBaseSink * sink, GstStepInfo * current, GstStepInfo * pending,
3956     guint seqnum, GstFormat format, guint64 amount, gdouble rate,
3957     gboolean flush, gboolean intermediate)
3958 {
3959   GST_OBJECT_LOCK (sink);
3960   pending->seqnum = seqnum;
3961   pending->format = format;
3962   pending->amount = amount;
3963   pending->position = 0;
3964   pending->rate = rate;
3965   pending->flush = flush;
3966   pending->intermediate = intermediate;
3967   pending->valid = TRUE;
3968   /* flush invalidates the current stepping segment */
3969   if (flush)
3970     current->valid = FALSE;
3971   GST_OBJECT_UNLOCK (sink);
3972 }
3973
3974 static gboolean
3975 gst_base_sink_perform_step (GstBaseSink * sink, GstPad * pad, GstEvent * event)
3976 {
3977   GstBaseSinkPrivate *priv;
3978   GstBaseSinkClass *bclass;
3979   gboolean flush, intermediate;
3980   gdouble rate;
3981   GstFormat format;
3982   guint64 amount;
3983   guint seqnum;
3984   GstStepInfo *pending, *current;
3985   GstMessage *message;
3986
3987   bclass = GST_BASE_SINK_GET_CLASS (sink);
3988   priv = sink->priv;
3989
3990   GST_DEBUG_OBJECT (sink, "performing step with event %p", event);
3991
3992   gst_event_parse_step (event, &format, &amount, &rate, &flush, &intermediate);
3993   seqnum = gst_event_get_seqnum (event);
3994
3995   pending = &priv->pending_step;
3996   current = &priv->current_step;
3997
3998   /* post message first */
3999   message = gst_message_new_step_start (GST_OBJECT (sink), FALSE, format,
4000       amount, rate, flush, intermediate);
4001   gst_message_set_seqnum (message, seqnum);
4002   gst_element_post_message (GST_ELEMENT (sink), message);
4003
4004   if (flush) {
4005     /* we need to call ::unlock before locking PREROLL_LOCK
4006      * since we lock it before going into ::render */
4007     if (bclass->unlock)
4008       bclass->unlock (sink);
4009
4010     GST_BASE_SINK_PREROLL_LOCK (sink);
4011     /* now that we have the PREROLL lock, clear our unlock request */
4012     if (bclass->unlock_stop)
4013       bclass->unlock_stop (sink);
4014
4015     /* update the stepinfo and make it valid */
4016     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4017         intermediate);
4018
4019     if (sink->priv->async_enabled) {
4020       /* and we need to commit our state again on the next
4021        * prerolled buffer */
4022       sink->playing_async = TRUE;
4023       priv->pending_step.need_preroll = TRUE;
4024       sink->need_preroll = FALSE;
4025       gst_element_lost_state_full (GST_ELEMENT_CAST (sink), FALSE);
4026     } else {
4027       sink->priv->have_latency = TRUE;
4028       sink->need_preroll = FALSE;
4029     }
4030     priv->current_sstart = GST_CLOCK_TIME_NONE;
4031     priv->current_sstop = GST_CLOCK_TIME_NONE;
4032     priv->eos_rtime = GST_CLOCK_TIME_NONE;
4033     priv->call_preroll = TRUE;
4034     gst_base_sink_set_last_buffer (sink, NULL);
4035     gst_base_sink_reset_qos (sink);
4036
4037     if (sink->clock_id) {
4038       gst_clock_id_unschedule (sink->clock_id);
4039     }
4040
4041     if (sink->have_preroll) {
4042       GST_DEBUG_OBJECT (sink, "signal waiter");
4043       priv->step_unlock = TRUE;
4044       GST_BASE_SINK_PREROLL_SIGNAL (sink);
4045     }
4046     GST_BASE_SINK_PREROLL_UNLOCK (sink);
4047   } else {
4048     /* update the stepinfo and make it valid */
4049     set_step_info (sink, current, pending, seqnum, format, amount, rate, flush,
4050         intermediate);
4051   }
4052
4053   return TRUE;
4054 }
4055
4056 /* with STREAM_LOCK
4057  */
4058 static void
4059 gst_base_sink_loop (GstPad * pad)
4060 {
4061   GstBaseSink *basesink;
4062   GstBuffer *buf = NULL;
4063   GstFlowReturn result;
4064   guint blocksize;
4065   guint64 offset;
4066
4067   basesink = GST_BASE_SINK (GST_OBJECT_PARENT (pad));
4068
4069   g_assert (basesink->pad_mode == GST_ACTIVATE_PULL);
4070
4071   if ((blocksize = basesink->priv->blocksize) == 0)
4072     blocksize = -1;
4073
4074   offset = basesink->segment.last_stop;
4075
4076   GST_DEBUG_OBJECT (basesink, "pulling %" G_GUINT64_FORMAT ", %u",
4077       offset, blocksize);
4078
4079   result = gst_pad_pull_range (pad, offset, blocksize, &buf);
4080   if (G_UNLIKELY (result != GST_FLOW_OK))
4081     goto paused;
4082
4083   if (G_UNLIKELY (buf == NULL))
4084     goto no_buffer;
4085
4086   offset += gst_buffer_get_size (buf);
4087
4088   gst_segment_set_last_stop (&basesink->segment, GST_FORMAT_BYTES, offset);
4089
4090   GST_BASE_SINK_PREROLL_LOCK (basesink);
4091   result = gst_base_sink_chain_unlocked (basesink, pad, _PR_IS_BUFFER, buf);
4092   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4093   if (G_UNLIKELY (result != GST_FLOW_OK))
4094     goto paused;
4095
4096   return;
4097
4098   /* ERRORS */
4099 paused:
4100   {
4101     GST_LOG_OBJECT (basesink, "pausing task, reason %s",
4102         gst_flow_get_name (result));
4103     gst_pad_pause_task (pad);
4104     if (result == GST_FLOW_UNEXPECTED) {
4105       /* perform EOS logic */
4106       if (basesink->segment.flags & GST_SEEK_FLAG_SEGMENT) {
4107         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4108             gst_message_new_segment_done (GST_OBJECT_CAST (basesink),
4109                 basesink->segment.format, basesink->segment.last_stop));
4110       } else {
4111         gst_base_sink_event (pad, gst_event_new_eos ());
4112       }
4113     } else if (result == GST_FLOW_NOT_LINKED || result <= GST_FLOW_UNEXPECTED) {
4114       /* for fatal errors we post an error message, post the error
4115        * first so the app knows about the error first. 
4116        * wrong-state is not a fatal error because it happens due to
4117        * flushing and posting an error message in that case is the
4118        * wrong thing to do, e.g. when basesrc is doing a flushing
4119        * seek. */
4120       GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4121           (_("Internal data stream error.")),
4122           ("stream stopped, reason %s", gst_flow_get_name (result)));
4123       gst_base_sink_event (pad, gst_event_new_eos ());
4124     }
4125     return;
4126   }
4127 no_buffer:
4128   {
4129     GST_LOG_OBJECT (basesink, "no buffer, pausing");
4130     GST_ELEMENT_ERROR (basesink, STREAM, FAILED,
4131         (_("Internal data flow error.")), ("element returned NULL buffer"));
4132     result = GST_FLOW_ERROR;
4133     goto paused;
4134   }
4135 }
4136
4137 static gboolean
4138 gst_base_sink_set_flushing (GstBaseSink * basesink, GstPad * pad,
4139     gboolean flushing)
4140 {
4141   GstBaseSinkClass *bclass;
4142
4143   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4144
4145   if (flushing) {
4146     /* unlock any subclasses, we need to do this before grabbing the
4147      * PREROLL_LOCK since we hold this lock before going into ::render. */
4148     if (bclass->unlock)
4149       bclass->unlock (basesink);
4150   }
4151
4152   GST_BASE_SINK_PREROLL_LOCK (basesink);
4153   basesink->flushing = flushing;
4154   if (flushing) {
4155     /* step 1, now that we have the PREROLL lock, clear our unlock request */
4156     if (bclass->unlock_stop)
4157       bclass->unlock_stop (basesink);
4158
4159     /* set need_preroll before we unblock the clock. If the clock is unblocked
4160      * before timing out, we can reuse the buffer for preroll. */
4161     basesink->need_preroll = TRUE;
4162
4163     /* step 2, unblock clock sync (if any) or any other blocking thing */
4164     if (basesink->clock_id) {
4165       gst_clock_id_unschedule (basesink->clock_id);
4166     }
4167
4168     /* flush out the data thread if it's locked in finish_preroll, this will
4169      * also flush out the EOS state */
4170     GST_DEBUG_OBJECT (basesink,
4171         "flushing out data thread, need preroll to TRUE");
4172     gst_base_sink_preroll_queue_flush (basesink, pad);
4173   }
4174   GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4175
4176   return TRUE;
4177 }
4178
4179 static gboolean
4180 gst_base_sink_default_activate_pull (GstBaseSink * basesink, gboolean active)
4181 {
4182   gboolean result;
4183
4184   if (active) {
4185     /* start task */
4186     result = gst_pad_start_task (basesink->sinkpad,
4187         (GstTaskFunction) gst_base_sink_loop, basesink->sinkpad);
4188   } else {
4189     /* step 2, make sure streaming finishes */
4190     result = gst_pad_stop_task (basesink->sinkpad);
4191   }
4192
4193   return result;
4194 }
4195
4196 static gboolean
4197 gst_base_sink_pad_activate (GstPad * pad)
4198 {
4199   gboolean result = FALSE;
4200   GstBaseSink *basesink;
4201
4202   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4203
4204   GST_DEBUG_OBJECT (basesink, "Trying pull mode first");
4205
4206   gst_base_sink_set_flushing (basesink, pad, FALSE);
4207
4208   /* we need to have the pull mode enabled */
4209   if (!basesink->can_activate_pull) {
4210     GST_DEBUG_OBJECT (basesink, "pull mode disabled");
4211     goto fallback;
4212   }
4213
4214   /* check if downstreams supports pull mode at all */
4215   if (!gst_pad_check_pull_range (pad)) {
4216     GST_DEBUG_OBJECT (basesink, "pull mode not supported");
4217     goto fallback;
4218   }
4219
4220   /* set the pad mode before starting the task so that it's in the
4221    * correct state for the new thread. also the sink set_caps and get_caps
4222    * function checks this */
4223   basesink->pad_mode = GST_ACTIVATE_PULL;
4224
4225   /* we first try to negotiate a format so that when we try to activate
4226    * downstream, it knows about our format */
4227   if (!gst_base_sink_negotiate_pull (basesink)) {
4228     GST_DEBUG_OBJECT (basesink, "failed to negotiate in pull mode");
4229     goto fallback;
4230   }
4231
4232   /* ok activate now */
4233   if (!gst_pad_activate_pull (pad, TRUE)) {
4234     /* clear any pending caps */
4235     GST_OBJECT_LOCK (basesink);
4236     gst_caps_replace (&basesink->priv->pull_caps, NULL);
4237     GST_OBJECT_UNLOCK (basesink);
4238     GST_DEBUG_OBJECT (basesink, "failed to activate in pull mode");
4239     goto fallback;
4240   }
4241
4242   GST_DEBUG_OBJECT (basesink, "Success activating pull mode");
4243   result = TRUE;
4244   goto done;
4245
4246   /* push mode fallback */
4247 fallback:
4248   GST_DEBUG_OBJECT (basesink, "Falling back to push mode");
4249   if ((result = gst_pad_activate_push (pad, TRUE))) {
4250     GST_DEBUG_OBJECT (basesink, "Success activating push mode");
4251   }
4252
4253 done:
4254   if (!result) {
4255     GST_WARNING_OBJECT (basesink, "Could not activate pad in either mode");
4256     gst_base_sink_set_flushing (basesink, pad, TRUE);
4257   }
4258
4259   gst_object_unref (basesink);
4260
4261   return result;
4262 }
4263
4264 static gboolean
4265 gst_base_sink_pad_activate_push (GstPad * pad, gboolean active)
4266 {
4267   gboolean result;
4268   GstBaseSink *basesink;
4269
4270   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4271
4272   if (active) {
4273     if (!basesink->can_activate_push) {
4274       result = FALSE;
4275       basesink->pad_mode = GST_ACTIVATE_NONE;
4276     } else {
4277       result = TRUE;
4278       basesink->pad_mode = GST_ACTIVATE_PUSH;
4279     }
4280   } else {
4281     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PUSH)) {
4282       g_warning ("Internal GStreamer activation error!!!");
4283       result = FALSE;
4284     } else {
4285       gst_base_sink_set_flushing (basesink, pad, TRUE);
4286       result = TRUE;
4287       basesink->pad_mode = GST_ACTIVATE_NONE;
4288     }
4289   }
4290
4291   gst_object_unref (basesink);
4292
4293   return result;
4294 }
4295
4296 static gboolean
4297 gst_base_sink_negotiate_pull (GstBaseSink * basesink)
4298 {
4299   GstCaps *caps;
4300   gboolean result;
4301
4302   result = FALSE;
4303
4304   /* this returns the intersection between our caps and the peer caps. If there
4305    * is no peer, it returns NULL and we can't operate in pull mode so we can
4306    * fail the negotiation. */
4307   caps = gst_pad_get_allowed_caps (GST_BASE_SINK_PAD (basesink));
4308   if (caps == NULL || gst_caps_is_empty (caps))
4309     goto no_caps_possible;
4310
4311   GST_DEBUG_OBJECT (basesink, "allowed caps: %" GST_PTR_FORMAT, caps);
4312
4313   caps = gst_caps_make_writable (caps);
4314   /* get the first (prefered) format */
4315   gst_caps_truncate (caps);
4316
4317   GST_DEBUG_OBJECT (basesink, "have caps: %" GST_PTR_FORMAT, caps);
4318
4319   if (gst_caps_is_any (caps)) {
4320     GST_DEBUG_OBJECT (basesink, "caps were ANY after fixating, "
4321         "allowing pull()");
4322     /* neither side has template caps in this case, so they are prepared for
4323        pull() without setcaps() */
4324     result = TRUE;
4325   } else {
4326     /* try to fixate */
4327     gst_pad_fixate_caps (GST_BASE_SINK_PAD (basesink), caps);
4328     GST_DEBUG_OBJECT (basesink, "fixated to: %" GST_PTR_FORMAT, caps);
4329
4330     if (gst_caps_is_fixed (caps)) {
4331       if (!gst_pad_set_caps (GST_BASE_SINK_PAD (basesink), caps))
4332         goto could_not_set_caps;
4333
4334       GST_OBJECT_LOCK (basesink);
4335       gst_caps_replace (&basesink->priv->pull_caps, caps);
4336       GST_OBJECT_UNLOCK (basesink);
4337
4338       result = TRUE;
4339     }
4340   }
4341
4342   gst_caps_unref (caps);
4343
4344   return result;
4345
4346 no_caps_possible:
4347   {
4348     GST_INFO_OBJECT (basesink, "Pipeline could not agree on caps");
4349     GST_DEBUG_OBJECT (basesink, "get_allowed_caps() returned EMPTY");
4350     if (caps)
4351       gst_caps_unref (caps);
4352     return FALSE;
4353   }
4354 could_not_set_caps:
4355   {
4356     GST_INFO_OBJECT (basesink, "Could not set caps: %" GST_PTR_FORMAT, caps);
4357     gst_caps_unref (caps);
4358     return FALSE;
4359   }
4360 }
4361
4362 /* this won't get called until we implement an activate function */
4363 static gboolean
4364 gst_base_sink_pad_activate_pull (GstPad * pad, gboolean active)
4365 {
4366   gboolean result = FALSE;
4367   GstBaseSink *basesink;
4368   GstBaseSinkClass *bclass;
4369
4370   basesink = GST_BASE_SINK (gst_pad_get_parent (pad));
4371   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4372
4373   if (active) {
4374     GstFormat format;
4375     gint64 duration;
4376
4377     /* we mark we have a newsegment here because pull based
4378      * mode works just fine without having a newsegment before the
4379      * first buffer */
4380     format = GST_FORMAT_BYTES;
4381
4382     gst_segment_init (&basesink->segment, format);
4383     gst_segment_init (basesink->clip_segment, format);
4384     GST_OBJECT_LOCK (basesink);
4385     basesink->have_newsegment = TRUE;
4386     GST_OBJECT_UNLOCK (basesink);
4387
4388     /* get the peer duration in bytes */
4389     result = gst_pad_query_peer_duration (pad, &format, &duration);
4390     if (result) {
4391       GST_DEBUG_OBJECT (basesink,
4392           "setting duration in bytes to %" G_GINT64_FORMAT, duration);
4393       gst_segment_set_duration (basesink->clip_segment, format, duration);
4394       gst_segment_set_duration (&basesink->segment, format, duration);
4395     } else {
4396       GST_DEBUG_OBJECT (basesink, "unknown duration");
4397     }
4398
4399     if (bclass->activate_pull)
4400       result = bclass->activate_pull (basesink, TRUE);
4401     else
4402       result = FALSE;
4403
4404     if (!result)
4405       goto activate_failed;
4406
4407   } else {
4408     if (G_UNLIKELY (basesink->pad_mode != GST_ACTIVATE_PULL)) {
4409       g_warning ("Internal GStreamer activation error!!!");
4410       result = FALSE;
4411     } else {
4412       result = gst_base_sink_set_flushing (basesink, pad, TRUE);
4413       if (bclass->activate_pull)
4414         result &= bclass->activate_pull (basesink, FALSE);
4415       basesink->pad_mode = GST_ACTIVATE_NONE;
4416       /* clear any pending caps */
4417       GST_OBJECT_LOCK (basesink);
4418       gst_caps_replace (&basesink->priv->pull_caps, NULL);
4419       GST_OBJECT_UNLOCK (basesink);
4420     }
4421   }
4422   gst_object_unref (basesink);
4423
4424   return result;
4425
4426   /* ERRORS */
4427 activate_failed:
4428   {
4429     /* reset, as starting the thread failed */
4430     basesink->pad_mode = GST_ACTIVATE_NONE;
4431
4432     GST_ERROR_OBJECT (basesink, "subclass failed to activate in pull mode");
4433     return FALSE;
4434   }
4435 }
4436
4437 /* send an event to our sinkpad peer. */
4438 static gboolean
4439 gst_base_sink_send_event (GstElement * element, GstEvent * event)
4440 {
4441   GstPad *pad;
4442   GstBaseSink *basesink = GST_BASE_SINK (element);
4443   gboolean forward, result = TRUE;
4444   GstActivateMode mode;
4445
4446   GST_OBJECT_LOCK (element);
4447   /* get the pad and the scheduling mode */
4448   pad = gst_object_ref (basesink->sinkpad);
4449   mode = basesink->pad_mode;
4450   GST_OBJECT_UNLOCK (element);
4451
4452   /* only push UPSTREAM events upstream */
4453   forward = GST_EVENT_IS_UPSTREAM (event);
4454
4455   GST_DEBUG_OBJECT (basesink, "handling event %p %" GST_PTR_FORMAT, event,
4456       event);
4457
4458   switch (GST_EVENT_TYPE (event)) {
4459     case GST_EVENT_LATENCY:
4460     {
4461       GstClockTime latency;
4462
4463       gst_event_parse_latency (event, &latency);
4464
4465       /* store the latency. We use this to adjust the running_time before syncing
4466        * it to the clock. */
4467       GST_OBJECT_LOCK (element);
4468       basesink->priv->latency = latency;
4469       if (!basesink->priv->have_latency)
4470         forward = FALSE;
4471       GST_OBJECT_UNLOCK (element);
4472       GST_DEBUG_OBJECT (basesink, "latency set to %" GST_TIME_FORMAT,
4473           GST_TIME_ARGS (latency));
4474
4475       /* We forward this event so that all elements know about the global pipeline
4476        * latency. This is interesting for an element when it wants to figure out
4477        * when a particular piece of data will be rendered. */
4478       break;
4479     }
4480     case GST_EVENT_SEEK:
4481       /* in pull mode we will execute the seek */
4482       if (mode == GST_ACTIVATE_PULL)
4483         result = gst_base_sink_perform_seek (basesink, pad, event);
4484       break;
4485     case GST_EVENT_STEP:
4486       result = gst_base_sink_perform_step (basesink, pad, event);
4487       forward = FALSE;
4488       break;
4489     default:
4490       break;
4491   }
4492
4493   if (forward) {
4494     result = gst_pad_push_event (pad, event);
4495   } else {
4496     /* not forwarded, unref the event */
4497     gst_event_unref (event);
4498   }
4499
4500   gst_object_unref (pad);
4501   return result;
4502 }
4503
4504 static gboolean
4505 gst_base_sink_get_position (GstBaseSink * basesink, GstFormat format,
4506     gint64 * cur, gboolean * upstream)
4507 {
4508   GstClock *clock = NULL;
4509   gboolean res = FALSE;
4510   GstFormat oformat, tformat;
4511   GstSegment *segment;
4512   GstClockTime now, latency;
4513   GstClockTimeDiff base;
4514   gint64 time, accum, duration;
4515   gdouble rate;
4516   gint64 last;
4517   gboolean last_seen, with_clock, in_paused;
4518
4519   GST_OBJECT_LOCK (basesink);
4520   /* we can only get the segment when we are not NULL or READY */
4521   if (!basesink->have_newsegment)
4522     goto wrong_state;
4523
4524   in_paused = FALSE;
4525   /* when not in PLAYING or when we're busy with a state change, we
4526    * cannot read from the clock so we report time based on the
4527    * last seen timestamp. */
4528   if (GST_STATE (basesink) != GST_STATE_PLAYING ||
4529       GST_STATE_PENDING (basesink) != GST_STATE_VOID_PENDING) {
4530     in_paused = TRUE;
4531   }
4532
4533   /* we don't use the clip segment in pull mode, when seeking we update the
4534    * main segment directly with the new segment values without it having to be
4535    * activated by the rendering after preroll */
4536   if (basesink->pad_mode == GST_ACTIVATE_PUSH)
4537     segment = basesink->clip_segment;
4538   else
4539     segment = &basesink->segment;
4540
4541   /* our intermediate time format */
4542   tformat = GST_FORMAT_TIME;
4543   /* get the format in the segment */
4544   oformat = segment->format;
4545
4546   /* report with last seen position when EOS */
4547   last_seen = basesink->eos;
4548
4549   /* assume we will use the clock for getting the current position */
4550   with_clock = TRUE;
4551   if (basesink->sync == FALSE)
4552     with_clock = FALSE;
4553
4554   /* and we need a clock */
4555   if (G_UNLIKELY ((clock = GST_ELEMENT_CLOCK (basesink)) == NULL))
4556     with_clock = FALSE;
4557   else
4558     gst_object_ref (clock);
4559
4560   /* collect all data we need holding the lock */
4561   if (GST_CLOCK_TIME_IS_VALID (segment->time))
4562     time = segment->time;
4563   else
4564     time = 0;
4565
4566   if (GST_CLOCK_TIME_IS_VALID (segment->stop))
4567     duration = segment->stop - segment->start;
4568   else
4569     duration = 0;
4570
4571   accum = segment->accum;
4572   rate = segment->rate * segment->applied_rate;
4573   latency = basesink->priv->latency;
4574
4575   if (oformat == GST_FORMAT_TIME) {
4576     gint64 start, stop;
4577
4578     start = basesink->priv->current_sstart;
4579     stop = basesink->priv->current_sstop;
4580
4581     if (in_paused) {
4582       /* in paused we use the last position as a lower bound */
4583       if (stop == -1 || segment->rate > 0.0)
4584         last = start;
4585       else
4586         last = stop;
4587     } else {
4588       /* in playing, use last stop time as upper bound */
4589       if (start == -1 || segment->rate > 0.0)
4590         last = stop;
4591       else
4592         last = start;
4593     }
4594   } else {
4595     /* convert last stop to stream time */
4596     last = gst_segment_to_stream_time (segment, oformat, segment->last_stop);
4597   }
4598
4599   if (in_paused) {
4600     /* in paused, use start_time */
4601     base = GST_ELEMENT_START_TIME (basesink);
4602     GST_DEBUG_OBJECT (basesink, "in paused, using start time %" GST_TIME_FORMAT,
4603         GST_TIME_ARGS (base));
4604   } else if (with_clock) {
4605     /* else use clock when needed */
4606     base = GST_ELEMENT_CAST (basesink)->base_time;
4607     GST_DEBUG_OBJECT (basesink, "using clock and base time %" GST_TIME_FORMAT,
4608         GST_TIME_ARGS (base));
4609   } else {
4610     /* else, no sync or clock -> no base time */
4611     GST_DEBUG_OBJECT (basesink, "no sync or no clock");
4612     base = -1;
4613   }
4614
4615   /* no base, we can't calculate running_time, use last seem timestamp to report
4616    * time */
4617   if (base == -1)
4618     last_seen = TRUE;
4619
4620   /* need to release the object lock before we can get the time,
4621    * a clock might take the LOCK of the provider, which could be
4622    * a basesink subclass. */
4623   GST_OBJECT_UNLOCK (basesink);
4624
4625   if (last_seen) {
4626     /* in EOS or when no valid stream_time, report the value of last seen
4627      * timestamp */
4628     if (last == -1) {
4629       /* no timestamp, we need to ask upstream */
4630       GST_DEBUG_OBJECT (basesink, "no last seen timestamp, asking upstream");
4631       res = FALSE;
4632       *upstream = TRUE;
4633       goto done;
4634     }
4635     GST_DEBUG_OBJECT (basesink, "using last seen timestamp %" GST_TIME_FORMAT,
4636         GST_TIME_ARGS (last));
4637     *cur = last;
4638   } else {
4639     if (oformat != tformat) {
4640       /* convert accum, time and duration to time */
4641       if (!gst_pad_query_convert (basesink->sinkpad, oformat, accum, &tformat,
4642               &accum))
4643         goto convert_failed;
4644       if (!gst_pad_query_convert (basesink->sinkpad, oformat, duration,
4645               &tformat, &duration))
4646         goto convert_failed;
4647       if (!gst_pad_query_convert (basesink->sinkpad, oformat, time, &tformat,
4648               &time))
4649         goto convert_failed;
4650       if (!gst_pad_query_convert (basesink->sinkpad, oformat, last, &tformat,
4651               &last))
4652         goto convert_failed;
4653
4654       /* assume time format from now on */
4655       oformat = tformat;
4656     }
4657
4658     if (!in_paused && with_clock) {
4659       now = gst_clock_get_time (clock);
4660     } else {
4661       now = base;
4662       base = 0;
4663     }
4664
4665     /* subtract base time and accumulated time from the clock time.
4666      * Make sure we don't go negative. This is the current time in
4667      * the segment which we need to scale with the combined
4668      * rate and applied rate. */
4669     base += accum;
4670     base += latency;
4671     if (GST_CLOCK_DIFF (base, now) < 0)
4672       base = now;
4673
4674     /* for negative rates we need to count back from the segment
4675      * duration. */
4676     if (rate < 0.0)
4677       time += duration;
4678
4679     *cur = time + gst_guint64_to_gdouble (now - base) * rate;
4680
4681     if (in_paused) {
4682       /* never report less than segment values in paused */
4683       if (last != -1)
4684         *cur = MAX (last, *cur);
4685     } else {
4686       /* never report more than last seen position in playing */
4687       if (last != -1)
4688         *cur = MIN (last, *cur);
4689     }
4690
4691     GST_DEBUG_OBJECT (basesink,
4692         "now %" GST_TIME_FORMAT " - base %" GST_TIME_FORMAT " - accum %"
4693         GST_TIME_FORMAT " + time %" GST_TIME_FORMAT "  last %" GST_TIME_FORMAT,
4694         GST_TIME_ARGS (now), GST_TIME_ARGS (base), GST_TIME_ARGS (accum),
4695         GST_TIME_ARGS (time), GST_TIME_ARGS (last));
4696   }
4697
4698   if (oformat != format) {
4699     /* convert to final format */
4700     if (!gst_pad_query_convert (basesink->sinkpad, oformat, *cur, &format, cur))
4701       goto convert_failed;
4702   }
4703
4704   res = TRUE;
4705
4706 done:
4707   GST_DEBUG_OBJECT (basesink, "res: %d, POSITION: %" GST_TIME_FORMAT,
4708       res, GST_TIME_ARGS (*cur));
4709
4710   if (clock)
4711     gst_object_unref (clock);
4712
4713   return res;
4714
4715   /* special cases */
4716 wrong_state:
4717   {
4718     /* in NULL or READY we always return FALSE and -1 */
4719     GST_DEBUG_OBJECT (basesink, "position in wrong state, return -1");
4720     res = FALSE;
4721     *cur = -1;
4722     GST_OBJECT_UNLOCK (basesink);
4723     goto done;
4724   }
4725 convert_failed:
4726   {
4727     GST_DEBUG_OBJECT (basesink, "convert failed, try upstream");
4728     *upstream = TRUE;
4729     res = FALSE;
4730     goto done;
4731   }
4732 }
4733
4734 static gboolean
4735 gst_base_sink_get_duration (GstBaseSink * basesink, GstFormat format,
4736     gint64 * dur, gboolean * upstream)
4737 {
4738   gboolean res = FALSE;
4739
4740   if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4741     GstFormat uformat = GST_FORMAT_BYTES;
4742     gint64 uduration;
4743
4744     /* get the duration in bytes, in pull mode that's all we are sure to
4745      * know. We have to explicitly get this value from upstream instead of
4746      * using our cached value because it might change. Duration caching
4747      * should be done at a higher level. */
4748     res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat, &uduration);
4749     if (res) {
4750       gst_segment_set_duration (&basesink->segment, uformat, uduration);
4751       if (format != uformat) {
4752         /* convert to the requested format */
4753         res = gst_pad_query_convert (basesink->sinkpad, uformat, uduration,
4754             &format, dur);
4755       } else {
4756         *dur = uduration;
4757       }
4758     }
4759     *upstream = FALSE;
4760   } else {
4761     *upstream = TRUE;
4762   }
4763
4764   return res;
4765 }
4766
4767 static const GstQueryType *
4768 gst_base_sink_get_query_types (GstElement * element)
4769 {
4770   static const GstQueryType query_types[] = {
4771     GST_QUERY_DURATION,
4772     GST_QUERY_POSITION,
4773     GST_QUERY_SEGMENT,
4774     GST_QUERY_LATENCY,
4775     0
4776   };
4777
4778   return query_types;
4779 }
4780
4781 static gboolean
4782 gst_base_sink_query (GstElement * element, GstQuery * query)
4783 {
4784   gboolean res = FALSE;
4785
4786   GstBaseSink *basesink = GST_BASE_SINK (element);
4787
4788   switch (GST_QUERY_TYPE (query)) {
4789     case GST_QUERY_POSITION:
4790     {
4791       gint64 cur = 0;
4792       GstFormat format;
4793       gboolean upstream = FALSE;
4794
4795       gst_query_parse_position (query, &format, NULL);
4796
4797       GST_DEBUG_OBJECT (basesink, "position query in format %s",
4798           gst_format_get_name (format));
4799
4800       /* first try to get the position based on the clock */
4801       if ((res =
4802               gst_base_sink_get_position (basesink, format, &cur, &upstream))) {
4803         gst_query_set_position (query, format, cur);
4804       } else if (upstream) {
4805         /* fallback to peer query */
4806         res = gst_pad_peer_query (basesink->sinkpad, query);
4807       }
4808       if (!res) {
4809         /* we can handle a few things if upstream failed */
4810         if (format == GST_FORMAT_PERCENT) {
4811           gint64 dur = 0;
4812           GstFormat uformat = GST_FORMAT_TIME;
4813
4814           res = gst_base_sink_get_position (basesink, GST_FORMAT_TIME, &cur,
4815               &upstream);
4816           if (!res && upstream) {
4817             res = gst_pad_query_peer_position (basesink->sinkpad, &uformat,
4818                 &cur);
4819           }
4820           if (res) {
4821             res = gst_base_sink_get_duration (basesink, GST_FORMAT_TIME, &dur,
4822                 &upstream);
4823             if (!res && upstream) {
4824               res = gst_pad_query_peer_duration (basesink->sinkpad, &uformat,
4825                   &dur);
4826             }
4827           }
4828           if (res) {
4829             gint64 pos;
4830
4831             pos = gst_util_uint64_scale (100 * GST_FORMAT_PERCENT_SCALE, cur,
4832                 dur);
4833             gst_query_set_position (query, GST_FORMAT_PERCENT, pos);
4834           }
4835         }
4836       }
4837       break;
4838     }
4839     case GST_QUERY_DURATION:
4840     {
4841       gint64 dur = 0;
4842       GstFormat format;
4843       gboolean upstream = FALSE;
4844
4845       gst_query_parse_duration (query, &format, NULL);
4846
4847       GST_DEBUG_OBJECT (basesink, "duration query in format %s",
4848           gst_format_get_name (format));
4849
4850       if ((res =
4851               gst_base_sink_get_duration (basesink, format, &dur, &upstream))) {
4852         gst_query_set_duration (query, format, dur);
4853       } else if (upstream) {
4854         /* fallback to peer query */
4855         res = gst_pad_peer_query (basesink->sinkpad, query);
4856       }
4857       if (!res) {
4858         /* we can handle a few things if upstream failed */
4859         if (format == GST_FORMAT_PERCENT) {
4860           gst_query_set_duration (query, GST_FORMAT_PERCENT,
4861               GST_FORMAT_PERCENT_MAX);
4862           res = TRUE;
4863         }
4864       }
4865       break;
4866     }
4867     case GST_QUERY_LATENCY:
4868     {
4869       gboolean live, us_live;
4870       GstClockTime min, max;
4871
4872       if ((res = gst_base_sink_query_latency (basesink, &live, &us_live, &min,
4873                   &max))) {
4874         gst_query_set_latency (query, live, min, max);
4875       }
4876       break;
4877     }
4878     case GST_QUERY_JITTER:
4879       break;
4880     case GST_QUERY_RATE:
4881       /* gst_query_set_rate (query, basesink->segment_rate); */
4882       res = TRUE;
4883       break;
4884     case GST_QUERY_SEGMENT:
4885     {
4886       if (basesink->pad_mode == GST_ACTIVATE_PULL) {
4887         gst_query_set_segment (query, basesink->segment.rate,
4888             GST_FORMAT_TIME, basesink->segment.start, basesink->segment.stop);
4889         res = TRUE;
4890       } else {
4891         res = gst_pad_peer_query (basesink->sinkpad, query);
4892       }
4893       break;
4894     }
4895     case GST_QUERY_SEEKING:
4896     case GST_QUERY_CONVERT:
4897     case GST_QUERY_FORMATS:
4898     default:
4899       res = gst_pad_peer_query (basesink->sinkpad, query);
4900       break;
4901   }
4902   GST_DEBUG_OBJECT (basesink, "query %s returns %d",
4903       GST_QUERY_TYPE_NAME (query), res);
4904   return res;
4905 }
4906
4907 static GstStateChangeReturn
4908 gst_base_sink_change_state (GstElement * element, GstStateChange transition)
4909 {
4910   GstStateChangeReturn ret = GST_STATE_CHANGE_SUCCESS;
4911   GstBaseSink *basesink = GST_BASE_SINK (element);
4912   GstBaseSinkClass *bclass;
4913   GstBaseSinkPrivate *priv;
4914
4915   priv = basesink->priv;
4916
4917   bclass = GST_BASE_SINK_GET_CLASS (basesink);
4918
4919   switch (transition) {
4920     case GST_STATE_CHANGE_NULL_TO_READY:
4921       if (bclass->start)
4922         if (!bclass->start (basesink))
4923           goto start_failed;
4924       break;
4925     case GST_STATE_CHANGE_READY_TO_PAUSED:
4926       /* need to complete preroll before this state change completes, there
4927        * is no data flow in READY so we can safely assume we need to preroll. */
4928       GST_BASE_SINK_PREROLL_LOCK (basesink);
4929       GST_DEBUG_OBJECT (basesink, "READY to PAUSED");
4930       basesink->have_newsegment = FALSE;
4931       gst_segment_init (&basesink->segment, GST_FORMAT_UNDEFINED);
4932       gst_segment_init (basesink->clip_segment, GST_FORMAT_UNDEFINED);
4933       basesink->offset = 0;
4934       basesink->have_preroll = FALSE;
4935       priv->step_unlock = FALSE;
4936       basesink->need_preroll = TRUE;
4937       basesink->playing_async = TRUE;
4938       priv->current_sstart = GST_CLOCK_TIME_NONE;
4939       priv->current_sstop = GST_CLOCK_TIME_NONE;
4940       priv->eos_rtime = GST_CLOCK_TIME_NONE;
4941       priv->latency = 0;
4942       basesink->eos = FALSE;
4943       priv->received_eos = FALSE;
4944       gst_base_sink_reset_qos (basesink);
4945       priv->commited = FALSE;
4946       priv->call_preroll = TRUE;
4947       priv->current_step.valid = FALSE;
4948       priv->pending_step.valid = FALSE;
4949       if (priv->async_enabled) {
4950         GST_DEBUG_OBJECT (basesink, "doing async state change");
4951         /* when async enabled, post async-start message and return ASYNC from
4952          * the state change function */
4953         ret = GST_STATE_CHANGE_ASYNC;
4954         gst_element_post_message (GST_ELEMENT_CAST (basesink),
4955             gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4956       } else {
4957         priv->have_latency = TRUE;
4958       }
4959       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4960       break;
4961     case GST_STATE_CHANGE_PAUSED_TO_PLAYING:
4962       GST_BASE_SINK_PREROLL_LOCK (basesink);
4963       if (!gst_base_sink_needs_preroll (basesink)) {
4964         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, don't need preroll");
4965         /* no preroll needed anymore now. */
4966         basesink->playing_async = FALSE;
4967         basesink->need_preroll = FALSE;
4968         if (basesink->eos) {
4969           GstMessage *message;
4970
4971           /* need to post EOS message here */
4972           GST_DEBUG_OBJECT (basesink, "Now posting EOS");
4973           message = gst_message_new_eos (GST_OBJECT_CAST (basesink));
4974           gst_message_set_seqnum (message, basesink->priv->seqnum);
4975           gst_element_post_message (GST_ELEMENT_CAST (basesink), message);
4976         } else {
4977           GST_DEBUG_OBJECT (basesink, "signal preroll");
4978           GST_BASE_SINK_PREROLL_SIGNAL (basesink);
4979         }
4980       } else {
4981         GST_DEBUG_OBJECT (basesink, "PAUSED to PLAYING, we are not prerolled");
4982         basesink->need_preroll = TRUE;
4983         basesink->playing_async = TRUE;
4984         priv->call_preroll = TRUE;
4985         priv->commited = FALSE;
4986         if (priv->async_enabled) {
4987           GST_DEBUG_OBJECT (basesink, "doing async state change");
4988           ret = GST_STATE_CHANGE_ASYNC;
4989           gst_element_post_message (GST_ELEMENT_CAST (basesink),
4990               gst_message_new_async_start (GST_OBJECT_CAST (basesink), FALSE));
4991         }
4992       }
4993       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
4994       break;
4995     default:
4996       break;
4997   }
4998
4999   {
5000     GstStateChangeReturn bret;
5001
5002     bret = GST_ELEMENT_CLASS (parent_class)->change_state (element, transition);
5003     if (G_UNLIKELY (bret == GST_STATE_CHANGE_FAILURE))
5004       goto activate_failed;
5005   }
5006
5007   switch (transition) {
5008     case GST_STATE_CHANGE_PLAYING_TO_PAUSED:
5009       GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED");
5010       /* FIXME, make sure we cannot enter _render first */
5011
5012       /* we need to call ::unlock before locking PREROLL_LOCK
5013        * since we lock it before going into ::render */
5014       if (bclass->unlock)
5015         bclass->unlock (basesink);
5016
5017       GST_BASE_SINK_PREROLL_LOCK (basesink);
5018       GST_DEBUG_OBJECT (basesink, "got preroll lock");
5019       /* now that we have the PREROLL lock, clear our unlock request */
5020       if (bclass->unlock_stop)
5021         bclass->unlock_stop (basesink);
5022
5023       /* we need preroll again and we set the flag before unlocking the clockid
5024        * because if the clockid is unlocked before a current buffer expired, we
5025        * can use that buffer to preroll with */
5026       basesink->need_preroll = TRUE;
5027
5028       if (basesink->clock_id) {
5029         GST_DEBUG_OBJECT (basesink, "unschedule clock");
5030         gst_clock_id_unschedule (basesink->clock_id);
5031       }
5032
5033       /* if we don't have a preroll buffer we need to wait for a preroll and
5034        * return ASYNC. */
5035       if (!gst_base_sink_needs_preroll (basesink)) {
5036         GST_DEBUG_OBJECT (basesink, "PLAYING to PAUSED, we are prerolled");
5037         basesink->playing_async = FALSE;
5038       } else {
5039         if (GST_STATE_TARGET (GST_ELEMENT (basesink)) <= GST_STATE_READY) {
5040           GST_DEBUG_OBJECT (basesink, "element is <= READY");
5041           ret = GST_STATE_CHANGE_SUCCESS;
5042         } else {
5043           GST_DEBUG_OBJECT (basesink,
5044               "PLAYING to PAUSED, we are not prerolled");
5045           basesink->playing_async = TRUE;
5046           priv->commited = FALSE;
5047           priv->call_preroll = TRUE;
5048           if (priv->async_enabled) {
5049             GST_DEBUG_OBJECT (basesink, "doing async state change");
5050             ret = GST_STATE_CHANGE_ASYNC;
5051             gst_element_post_message (GST_ELEMENT_CAST (basesink),
5052                 gst_message_new_async_start (GST_OBJECT_CAST (basesink),
5053                     FALSE));
5054           }
5055         }
5056       }
5057       GST_DEBUG_OBJECT (basesink, "rendered: %" G_GUINT64_FORMAT
5058           ", dropped: %" G_GUINT64_FORMAT, priv->rendered, priv->dropped);
5059
5060       gst_base_sink_reset_qos (basesink);
5061       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
5062       break;
5063     case GST_STATE_CHANGE_PAUSED_TO_READY:
5064       GST_BASE_SINK_PREROLL_LOCK (basesink);
5065       /* start by reseting our position state with the object lock so that the
5066        * position query gets the right idea. We do this before we post the
5067        * messages so that the message handlers pick this up. */
5068       GST_OBJECT_LOCK (basesink);
5069       basesink->have_newsegment = FALSE;
5070       priv->current_sstart = GST_CLOCK_TIME_NONE;
5071       priv->current_sstop = GST_CLOCK_TIME_NONE;
5072       priv->have_latency = FALSE;
5073       if (priv->cached_clock_id) {
5074         gst_clock_id_unref (priv->cached_clock_id);
5075         priv->cached_clock_id = NULL;
5076       }
5077       GST_OBJECT_UNLOCK (basesink);
5078
5079       gst_base_sink_set_last_buffer (basesink, NULL);
5080       priv->call_preroll = FALSE;
5081
5082       if (!priv->commited) {
5083         if (priv->async_enabled) {
5084           GST_DEBUG_OBJECT (basesink, "PAUSED to READY, posting async-done");
5085
5086           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5087               gst_message_new_state_changed (GST_OBJECT_CAST (basesink),
5088                   GST_STATE_PLAYING, GST_STATE_PAUSED, GST_STATE_READY));
5089
5090           gst_element_post_message (GST_ELEMENT_CAST (basesink),
5091               gst_message_new_async_done (GST_OBJECT_CAST (basesink)));
5092         }
5093         priv->commited = TRUE;
5094       } else {
5095         GST_DEBUG_OBJECT (basesink, "PAUSED to READY, don't need_preroll");
5096       }
5097       GST_BASE_SINK_PREROLL_UNLOCK (basesink);
5098       break;
5099     case GST_STATE_CHANGE_READY_TO_NULL:
5100       if (bclass->stop) {
5101         if (!bclass->stop (basesink)) {
5102           GST_WARNING_OBJECT (basesink, "failed to stop");
5103         }
5104       }
5105       gst_base_sink_set_last_buffer (basesink, NULL);
5106       priv->call_preroll = FALSE;
5107       break;
5108     default:
5109       break;
5110   }
5111
5112   return ret;
5113
5114   /* ERRORS */
5115 start_failed:
5116   {
5117     GST_DEBUG_OBJECT (basesink, "failed to start");
5118     return GST_STATE_CHANGE_FAILURE;
5119   }
5120 activate_failed:
5121   {
5122     GST_DEBUG_OBJECT (basesink,
5123         "element failed to change states -- activation problem?");
5124     return GST_STATE_CHANGE_FAILURE;
5125   }
5126 }